Consumer Client Trait
The ConsumerClientTrait is a helper to implement a consumer client. It implements the ConsumerClientInterface.
tip
This trait simplifies the creation of consumer clients by providing a standard consume() method. You only need to define the connector, pipe, and message processing logic.
<?php
use ByJG\MessageQueueClient\ConsumerClientInterface;
use ByJG\MessageQueueClient\ConsumerClientTrait;
use ByJG\MessageQueueClient\Connector\ConnectorInterface;
use ByJG\MessageQueueClient\Connector\Pipe;
use ByJG\MessageQueueClient\Message;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
class MyConsumerClient implements ConsumerClientInterface
{
use ConsumerClientTrait;
protected ConnectorInterface $connector;
protected Pipe $pipe;
public function __construct(ConnectorInterface $connector, Pipe $pipe)
{
$this->connector = $connector;
$this->pipe = $pipe;
}
// From the Interface
public function getPipe(): Pipe
{
return $this->pipe;
}
public function getConnector(): ConnectorInterface
{
return $this->connector;
}
public function getLogger(): LoggerInterface
{
return new NullLogger();
}
public function getLogOutputStart(Message $message): string
{
return "Start";
}
public function getLogOutputException(\Throwable $exception, Message $message): string
{
return "Exception: " . $exception->getMessage();
}
public function getLogOutputSuccess(Message $message): string
{
return "Success";
}
public function processMessage(Message $message): void
{
echo "Process the message";
echo $message->getBody();
}
}
Required Methods
important
You must implement all methods from the ConsumerClientInterface:
getPipe(): Return the Pipe to consume fromgetConnector(): Return the ConnectorInterface instancegetLogger(): Return a PSR-3 LoggerInterface instancegetLogOutputStart(): Return log message when processing startsgetLogOutputException(): Return log message when an exception occursgetLogOutputSuccess(): Return log message when processing succeedsprocessMessage(): Implement your message processing logic
Publish
<?php
use ByJG\MessageQueueClient\Connector\ConnectorFactory;
use ByJG\MessageQueueClient\Connector\Pipe;
use ByJG\MessageQueueClient\Envelope;
use ByJG\MessageQueueClient\Message;
use ByJG\MessageQueueClient\MockConnector;
use ByJG\Util\Uri;
$consumerClient = new MyConsumerClient($connector, $pipe);
// Create a message
$message = new Message("Hello World");
// Publish the message into the queue
$consumerClient->getConnector()->publish(
new Envelope($consumerClient->getPipe(), $message)
);
Consume
<?php
use ByJG\MessageQueueClient\Connector\ConnectorFactory;
use ByJG\MessageQueueClient\Connector\Pipe;
use ByJG\MessageQueueClient\MockConnector;
use ByJG\Util\Uri;
// Register the connector and associate with a scheme
ConnectorFactory::registerConnector(MockConnector::class);
// Create a connector
$connector = ConnectorFactory::create(new Uri("mock://local"));
// Create a queue
$pipe = new Pipe("test");
// Create a consumer client
$consumerClient = new MyConsumerClient($connector, $pipe);
// Connect to the queue and wait to consume the message
$consumerClient->consume();