This package provides an asynchronous MQTT client built on the React socket library. All client methods return a promise which is fulfilled if the operation succeeded or rejected if the operation failed. Incoming messages of subscribed topics are delivered via the "message" event.
Via composer:
$ composer require binsoul/net-mqtt-client-reactConnect to a public broker and run forever.
<?php
declare(strict_types=1);
use BinSoul\Net\Mqtt\Client\React\ReactMqttClient;
use BinSoul\Net\Mqtt\Connection;
use BinSoul\Net\Mqtt\DefaultMessage;
use BinSoul\Net\Mqtt\DefaultSubscription;
use BinSoul\Net\Mqtt\Message;
use BinSoul\Net\Mqtt\Subscription;
use React\Socket\DnsConnector;
use React\Socket\TcpConnector;
include 'vendor/autoload.php';
// Setup client
$loop = \React\EventLoop\Loop::get();
$dnsResolverFactory = new \React\Dns\Resolver\Factory();
$connector = new DnsConnector(new TcpConnector($loop), $dnsResolverFactory->createCached('8.8.8.8', $loop));
$client = new ReactMqttClient($connector, $loop);
// Bind to events
$client->on(
'open',
function () use ($client) {
// Network connection established
echo sprintf("Open: %s:%d\n", $client->getHost(), $client->getPort());
}
);
$client->on(
'close',
function () use ($client, $loop) {
// Network connection closed
echo sprintf("Close: %s:%d\n", $client->getHost(), $client->getPort());
$loop->stop();
}
);
$client->on(
'connect',
function (Connection $connection) {
// Broker connected
echo sprintf("Connect: client=%s\n", $connection->getClientID());
}
);
$client->on(
'disconnect',
function (Connection $connection) {
// Broker disconnected
echo sprintf("Disconnect: client=%s\n", $connection->getClientID());
}
);
$client->on(
'message',
function (Message $message) {
// Incoming message
echo 'Message';
if ($message->isDuplicate()) {
echo ' (duplicate)';
}
if ($message->isRetained()) {
echo ' (retained)';
}
echo ': ' . $message->getTopic() . ' => ' . mb_strimwidth($message->getPayload(), 0, 50, '...');
echo "\n";
}
);
$client->on(
'warning',
function (Exception $e) {
echo sprintf("Warning: %s\n", $e->getMessage());
}
);
$client->on(
'error',
function (Exception $e) use ($loop) {
echo sprintf("Error: %s\n", $e->getMessage());
$loop->stop();
}
);
// Connect to broker
$client->connect('broker.hivemq.com')->then(
function () use ($client) {
// Subscribe to all topics
$client->subscribe(new DefaultSubscription('/#'))
->then(
function (Subscription $subscription) {
echo sprintf("Subscribe: %s\n", $subscription->getFilter());
}
)
->catch(
function (Exception $e) {
echo sprintf("Error: %s\n", $e->getMessage());
}
);
// Publish humidity once
$client->publish(new DefaultMessage('sensors/humidity', '55%'))
->then(
function (Message $message) {
echo sprintf("Publish: %s => %s\n", $message->getTopic(), $message->getPayload());
}
)
->catch(
function (Exception $e) {
echo sprintf("Error: %s\n", $e->getMessage());
}
);
// Publish a random temperature every 10 seconds
$generator = function (): int {
return random_int(-20, 30);
};
$onProgress = function (Message $message) {
echo sprintf("Publish: %s => %s\n", $message->getTopic(), $message->getPayload());
};
$client->publishPeriodically(10, new DefaultMessage('sensors/temperature'), $generator, $onProgress)
->catch(
function (Exception $e) {
echo sprintf("Error: %s\n", $e->getMessage());
}
);
}
);
$loop->run();$ composer testThe MIT License (MIT). Please see License File for more information.