MQTT (WebSocket und Retained-Flag)

Ich habe bisher den MQTT Broker von Symcon genutzt, doch nun muss ich auf einen Broker umsteigen der den Zugang zusätzlich über WebSocket anbietet. Da das Symcon nicht kann ist die Wahl auf einen Mosquitto Broker gefallen.

Doch mit dem Umstellen auf mosquitto funktioniert das retained-Flag aus Symcon heraus nicht mehr.
Publishe ich eine Nachricht mit Symcon und in der Konfiguration ist das Retain häckchen gesetzt wird zwar die Nachricht geschrieben doch nicht mit dem Retained Flag versehen.
Nutze ich hingegen MQTT-Explorer funktioniert das Retain setzten ohne Probleme.

Ist das ein Anwendungsfehler? Da ich das jedoch vorher mit dem hauseigenen Broker so genutzt habe und es funktioniert hat denke ich weniger daran?

Jemand ne Idee?

Gruß tasmanie

Du hast alle Instanzen jetzt quasi auf einen MQTT Client umgestellt der dann am Mosquitto hängt?

paresy

Hi,
ich habe eine neue MQTT Client Instanz hinzugefügt, dann habe ich von den vorhanden MQTT Geräten einfach das Gateway auf die neue Instanz geändert. Das hat auch alles so funktioniert bis auf das Retain. Als ich jedoch in den Konfigurator geschaut habe, habe ich gesehen das die ganzen Topics keine IDs hatten dadurch dann auch schnell gesehen das die MQTT Geräte ja alle Server Geräte waren. Dann habe ich im Konfigurator die erkannten Topics erstellt und in den ganzen Abläufen die IDs getauscht. Doch leider habe ich immer noch das Problem das Retain nicht geht.

Ich habe auch testweise ein komplett neues Topic mal angelegt mit dem selben Ergebnis.

Hab jetzt nochmal auf dem Mosquitto das Debug Log aktiviert und da ist ganz klar zu erkennen das bei Nachrichten von Symcon „r0“ übergeben wird.

1735988492: Received PUBLISH from 63136f88cf6fd0936bae (d0, q0, r0, m0, 'schlafzimmer/displayitem/6/value/set', ... (21 bytes))

Mit anderen MQTT Clients in diesem falle MQTT-Explorer, hingegen wird das Flag „r1“ ordentlich übergeben.

1735989081: Received PUBLISH from mqtt-explorer-c14d7a07 (d0, q0, r1, m0, 'schlafzimmer/displayitem/6/value/set', ... (3 bytes))

Daher gehe ich mal davon aus das Symcon das Flag nicht richtig setzt. Nutze ich aus Symcon hingegen das bluerhinos/phpMQTT Skript und übergebe dort das Flag, funktioniert es auch.

Das ist ja spannend - ich hänge an dem gleichen Thema.
Habe meinen MQTT Server von IPS auf einen eigenständigen Mosquitto umgezogen. In Symcon dann auch alles sauber umgestellt. Das empfangen und senden klappt auch, nur eben das Problem mit dem Retain Flag.

Es schaut so aus, als ob die MQTT Client Instanz in Symcon kein Retain unterstützt.

Dann schaue ich mir mal das verlinkte Script an.

@tasmanie : wie hast Du das Script eingebunden? Das wirft bei mir immer Fehlermeldungen raus.

Gruß,
Torsten

Oh das find ich schon mal gut das es schonmal nicht an mir liegt.
Symcon bietet dir aber die Möglichkeit das Retain Flag zu setzen, also sollte es auch unterstützt werden. Ich gehe hier eher von einem Bug aus.

Wobei schön wäre es wenn IPS einfach den Zugriff über WebSocket aufs MQTT anbieten würde.

Welche Fehlermeldung wird den ausgegeben?
Um das Skript von bluerhinos zu nutzen habe ich einfach dieses Skript eingebunden:

phpMQTT.php:

<?php

namespace Bluerhinos;

/*
 	phpMQTT
	An improved PHP class to connect, publish, and subscribe to an MQTT broker.
*/

class phpMQTT {

    private $socket;
    private int $msgid = 1;
    public int $keepalive = 10;
    public ?int $timesinceping;
    public array $topics = [];
    public bool $debug = false;
    private string $clientid;

    /**
     * Constructor
     * @param string $address MQTT Broker address
     * @param int $port MQTT Broker port
     * @param string $clientid Unique client ID
     */
    public function __construct(string $address, int $port, string $clientid) {
        $this->socket = fsockopen($address, $port, $errno, $errstr, 60);
        if (!$this->socket) {
            throw new \Exception("Could not connect to MQTT broker: $errstr ($errno)");
        }
        stream_set_timeout($this->socket, 5);
        $this->clientid = $clientid;
    }

    /**
     * Connect to the MQTT broker
     * @param bool $clean Clean session flag
     * @param int|null $will Will message
     * @param string|null $username Username for authentication
     * @param string|null $password Password for authentication
     * @return bool True on success, false on failure
     */
    public function connect(bool $clean = true, ?int $will = null, ?string $username = null, ?string $password = null): bool {
        $connectFlags = 0;
        if ($clean) {
            $connectFlags |= 0x02;
        }
        if ($username !== null) {
            $connectFlags |= 0x80;
        }
        if ($password !== null) {
            $connectFlags |= 0x40;
        }

        $payload = $this->strwritestring("MQTT") . "\x04" . chr($connectFlags) . chr($this->keepalive >> 8) . chr($this->keepalive & 0xFF);
        $payload .= $this->strwritestring($this->clientid);

        if ($username !== null) {
            $payload .= $this->strwritestring($username);
        }
        if ($password !== null) {
            $payload .= $this->strwritestring($password);
        }

        $this->send("\x10", $payload, strlen($payload));

        $response = $this->read(4);
        return $response && ord($response[0]) === 0x20 && ord($response[1]) === 0x02 && ord($response[3]) === 0x00;
    }

    /**
     * Close the socket connection
     */
    public function close(): void {
        $this->send("\xE0", "", 0);
        fclose($this->socket);
    }

    /**
    * Publish a message
    * @param string $topic The topic to publish to
    * @param string $content The message content
    * @param int $qos Quality of Service level
    * @param bool $retain Retain flag
    * @return bool True if the message was successfully sent, false otherwise
    */
    public function publish(string $topic, string $content, int $qos = 0, bool $retain = false): bool {
        $header = 0x30 | ($qos << 1) | ($retain ? 0x01 : 0x00);
        $payload = $this->strwritestring($topic) . $content;
        $result = fwrite($this->socket, chr($header) . $this->encodeLength(strlen($payload)) . $payload);

        if ($result === false || $result === 0) {
            if ($this->debug) {
                error_log("Failed to send MQTT message to topic: $topic");
            }
            return false;
        }

        return true;
    }


    /**
     * Subscribe to a topic
     * @param string $topic The topic to subscribe to
     * @param int $qos Quality of Service level
     */
    public function subscribe(string $topic, int $qos = 0): void {
        $this->msgid++;
        $payload = chr($this->msgid >> 8) . chr($this->msgid & 0xFF);
        $payload .= $this->strwritestring($topic) . chr($qos);
        $this->send("\x82", $payload, strlen($payload));
    }

    /**
     * Helper function to send data over the socket
     * @param string $header Message header
     * @param string $msg Message payload
     * @param int $len Length of the message
     */
    private function send(string $header, string $msg, int $len): void {
        fwrite($this->socket, $header . $this->encodeLength($len) . $msg);
    }

    /**
     * Helper function to read data from the socket
     * @param int $length Number of bytes to read
     * @return string Data read from the socket
     */
    private function read(int $length): string {
        return fread($this->socket, $length);
    }

    /**
     * Encode message length
     * @param int $len Message length
     * @return string Encoded length
     */
    private function encodeLength(int $len): string {
        $string = "";
        do {
            $digit = $len % 128;
            $len = (int)($len / 128);
            if ($len > 0) {
                $digit |= 0x80;
            }
            $string .= chr($digit);
        } while ($len > 0);
        return $string;
    }

    /**
     * Write a string with MQTT format
     * @param string $str Input string
     * @return string MQTT formatted string
     */
    private function strwritestring(string $str): string {
        return chr(strlen($str) >> 8) . chr(strlen($str) % 256) . $str;
    }
}

?>

Und nutzen kannst du es dann mit diesem Skript:

<?php

require("phpMQTT.php");

IPS_GetName($_IPS["SELF"]);

$server = 'MQTT_SERVER_IP';     // change if necessary
$port = 1883;                 // change if necessary
$username = 'USERNAME';       // set your username
$password = 'PASSWORT';          // set your password
$client_id = 'phpMQTT-publisher'; // make sure this is unique for connecting to server - you could use uniqid()

$mqtt = new Bluerhinos\phpMQTT($server, $port, $client_id);

if (!$mqtt->connect(true, NULL, $username, $password)) {
    IPS_LogMessage("MQTT Publish", "Connection failed");
    exit(1);
} else {
    IPS_LogMessage("MQTT Publish", "Connected successfully");
}


//Payload Inhalt setzten
$payload = "Payload Inhalt";

// Logge das erstellte Payload
IPS_LogMessage("MQTT Payload", $payload);

// Versuche, die Nachricht zu veröffentlichen
$topic = 'testtopic/1234';
IPS_LogMessage("MQTT Publish", "Publishing to topic: " . $topic);

$result = $mqtt->publish($topic, $payload, 0, false);



// Verbindung schließen
$mqtt->close();
IPS_LogMessage("MQTT Publish", "Connection closed");


?>

Habe es heute morgen nochmal neu von github geladen. Nun klappt es - vermutlich irgendwo etwas vergessen.

Als Workaround ist das machbar, wäre halt nur deutlich schöner wenn IP Symcon das auch könnte und im MQTT client das retain flag nicht per default auf false setzt (zumindest ist das meine Vermutung).

Grüße,
Torsten

Hi,
hätte da nochmal eine Frage. Habe es nun so wie bei Dir implementiert und es läuft auch.
Nun ist diese Lösung aber ja so, dass bei jedem publish eine Verbindung zum Broker aufgebaut, gepublished und dann die Verbindung geschlossen wird.
Wenn ich nun im Sekundentakt Daten publishe, wird somit auch im Sekundentakt die MQTT Verbindung aufgebaut und geschlossen - das müllt das Log des Brokers zu und erzeugt unnötigen Traffic / Connects / Disconnects.

Hast Du dafür eine andere/bessere Lösung gefunden?

Gruß,
Torsten