
Stream-based Connection object for MQTT, extracted from MQTT.js
mqtt connection server

mqtt-connection

Barebone Connection object for MQTT. Works over any kind of binary Streams, TCP, TLS, WebSocket, ...

It uses mqtt-packet for generating and parsing MQTT packets. See it for the full documentations on the packet types.

This library is tested with node v4, v6 and v7. The last version to support older versions of node was [email protected].


npm install mqtt-connection --save


As a client:

var net = require('net')
var mqttCon = require('mqtt-connection')
var stream = net.createConnection(1883, 'localhost')
var conn = mqttCon(stream)

// conn is your MQTT connection!

As a server:

var net = require('net')
var mqttCon = require('mqtt-connection')
var server = new net.Server()

server.on('connection', function (stream) {
  var client = mqttCon(stream)

  // client connected
  client.on('connect', function (packet) {
    // acknowledge the connect packet
    client.connack({ returnCode: 0 });

  // client published
  client.on('publish', function (packet) {
    // send a puback with messageId (for QoS > 0)
    client.puback({ messageId: packet.messageId })

  // client pinged
  client.on('pingreq', function () {
    // send a pingresp

  // client subscribed
  client.on('subscribe', function (packet) {
    // send a suback with messageId and granted QoS level
    client.suback({ granted: [packet.qos], messageId: packet.messageId })

  // timeout idle streams after 5 minutes
  stream.setTimeout(1000 * 60 * 5)

  // connection error handling
  client.on('close', function () { client.destroy() })
  client.on('error', function () { client.destroy() })
  client.on('disconnect', function () { client.destroy() })

  // stream timeout
  stream.on('timeout', function () { client.destroy(); })

// listen on port 1883

As a websocket server:

var websocket = require('websocket-stream')
var WebSocketServer = require('ws').Server
var Connection = require('mqtt-connection')
var server = http.createServer()

var wss = new WebSocketServer({server: server})

if (handler) {
  server.on('client', handler)

wss.on('connection', function (ws) {
  var stream = websocket(ws)
  var connection = new Connection(stream)


function handle (conn) {
  // handle the MQTT connection like
  // the net example


new mqtt.Connection([options])

Creates a new MQTT Connection.


  • notData: do not listen to the 'data' event, so that it can respect backpressure. Pipe the Connection to another stream to consume the packets. If this option is passed true the object will emit no packet-related events.

Connection#connect(options, [callback])

Send a MQTT connect packet.

options supports the following properties:

  • protocolId: Protocol ID, usually MQIsdp. string
  • protocolVersion: Protocol version, usually 3. number
  • keepalive: keepalive period in seconds. number
  • clientId: client ID. string
  • will: the client's will message options. object that supports the following properties:
    • topic: the will topic. string
    • payload: the will payload. string
    • qos: will qos level. number
    • retain: will retain flag. boolean
    • properties: properties of will by MQTT 5.0:
      • willDelayInterval: representing the Will Delay Interval in seconds number,
      • payloadFormatIndicator: Will Message is UTF-8 Encoded Character Data or not boolean,
      • messageExpiryInterval: value is the lifetime of the Will Message in seconds and is sent as the Publication Expiry Interval when the Server publishes the Will Message number,
      • contentType: describing the content of the Will Message string,
      • responseTopic: String which is used as the Topic Name for a response message string,
      • correlationData: The Correlation Data is used by the sender of the Request Message to identify which request the Response Message is for when it is received binary,
      • userProperties: The User Property is allowed to appear multiple times to represent multiple name, value pairs object
  • properties: properties MQTT 5.0. object that supports the following properties:
    • sessionExpiryInterval: representing the Session Expiry Interval in seconds number,
    • receiveMaximum: representing the Receive Maximum value number,
    • maximumPacketSize: representing the Maximum Packet Size the Client is willing to accept number,
    • topicAliasMaximum: representing the Topic Alias Maximum value indicates the highest value that the Client will accept as a Topic Alias sent by the Server number,
    • requestResponseInformation: The Client uses this value to request the Server to return Response Information in the CONNACK boolean,
    • requestProblemInformation: The Client uses this value to indicate whether the Reason String or User Properties are sent in the case of failures boolean,
    • userProperties: The User Property is allowed to appear multiple times to represent multiple name, value pairs object,
    • authenticationMethod: the name of the authentication method used for extended authentication string,
    • authenticationData: Binary Data containing authentication data binary
  • clean: the 'clean start' flag. boolean
  • username: username for protocol v3.1. string
  • password: password for protocol v3.1. string

Connection#connack(options, [callback])

Send a MQTT connack packet.

options supports the following properties:

  • returnCode: the return code of the connack, success is for MQTT < 5.0
  • reasonCode: suback Reason Code number MQTT 5.0
  • properties: properties MQTT 5.0. object that supports the following properties:
    • sessionExpiryInterval: representing the Session Expiry Interval in seconds number,
    • receiveMaximum: representing the Receive Maximum value number,
    • maximumQoS: maximum qos supported by server number,
    • retainAvailable: declares whether the Server supports retained messages boolean,
    • maximumPacketSize: Maximum Packet Size the Server is willing to accept number,
    • assignedClientIdentifier: Assigned Client Identifier string,
    • topicAliasMaximum: representing the Topic Alias Maximum value indicates the highest value that the Client will accept as a Topic Alias sent by the Server number,
    • reasonString: representing the reason associated with this response string,
    • userProperties: The User Property is allowed to appear multiple times to represent multiple name, value pairs object,
    • wildcardSubscriptionAvailable: this byte declares whether the Server supports Wildcard Subscriptions boolean
    • subscriptionIdentifiersAvailable: declares whether the Server supports Subscription Identifiers boolean,
    • sharedSubscriptionAvailable: declares whether the Server supports Shared Subscriptions boolean,
    • serverKeepAlive: Keep Alive time assigned by the Server number,
    • responseInformation: String which is used as the basis for creating a Response Topic string,
    • serverReference: String which can be used by the Client to identify another Server to use string,
    • authenticationMethod: the name of the authentication method used for extended authentication string,
    • authenticationData: Binary Data containing authentication data binary

Connection#publish(options, [callback])

Send a MQTT publish packet.

options supports the following properties:

  • topic: the topic to publish to. string
  • payload: the payload to publish, defaults to an empty buffer. string or buffer
  • qos: the quality of service level to publish on. number
  • messageId: the message ID of the packet, required if qos > 0. number
  • retain: retain flag. boolean
  • properties: object
    • payloadFormatIndicator: Payload is UTF-8 Encoded Character Data or not boolean,
    • messageExpiryInterval: the lifetime of the Application Message in seconds number,
    • topicAlias: value that is used to identify the Topic instead of using the Topic Name number,
    • responseTopic: String which is used as the Topic Name for a response message string,
    • correlationData: used by the sender of the Request Message to identify which request the Response Message is for when it is received binary,
    • userProperties: The User Property is allowed to appear multiple times to represent multiple name, value pairs object,
    • subscriptionIdentifier: representing the identifier of the subscription number,
    • contentType: String describing the content of the Application Message string

Connection#puback #pubrec #pubcomp #unsuback(options, [callback])

Send a MQTT [puback, pubrec, pubcomp, unsuback] packet.

options supports the following properties:

  • messageId: the ID of the packet
  • reasonCode: Reason Code by packet number
  • properties: object
    • reasonString: representing the reason associated with this response string,
    • userProperties: The User Property is allowed to appear multiple times to represent multiple name, value pairs object

Connection#pubrel(options, [callback])

Send a MQTT pubrel packet.

options supports the following properties:

  • dup: duplicate message flag
  • reasonCode: pubrel Reason Code number
  • messageId: the ID of the packet
  • properties: object
    • reasonString: representing the reason associated with this response string,
    • userProperties: The User Property is allowed to appear multiple times to represent multiple name, value pairs object

Connection#subscribe(options, [callback])

Send a MQTT subscribe packet.

options supports the following properties:

  • dup: duplicate message flag
  • messageId: the ID of the packet
  • properties: object
    • subscriptionIdentifier: representing the identifier of the subscription number,
    • userProperties: The User Property is allowed to appear multiple times to represent multiple name, value pairs object
  • subscriptions: a list of subscriptions of the form [{topic: a, qos: 0}, {topic: b, qos: 1}] [{topic: a, qos: 0, nl: false, rap: true, rh: 15 }, {topic: b, qos: 1, nl: false, rap: false, rh: 100 }] MQTT 5.0 Example

Connection#suback(options, [callback])

Send a MQTT suback packet.

options supports the following properties:

  • granted: a vector of granted QoS levels, of the form [0, 1, 2]
  • messageId: the ID of the packet
  • reasonCode: suback Reason Code number
  • properties: object
    • reasonString: representing the reason associated with this response string,
    • userProperties: The User Property is allowed to appear multiple times to represent multiple name, value pairs object

Connection#unsubscribe(options, [callback])

Send a MQTT unsubscribe packet.

options supports the following properties:

  • messageId: the ID of the packet
  • reasonCode: unsubscribe Reason Code MQTT 5.0 number
  • dup: duplicate message flag
  • properties: object
    • userProperties: The User Property is allowed to appear multiple times to represent multiple name, value pairs object
  • unsubscriptions: a list of topics to unsubscribe from, of the form ["topic1", "topic2"]

Connection#pingreq #pingresp #disconnect(options, [callback])

Send a MQTT [pingreq, pingresp] packet.

Connection#disconnect(options, [callback])

Send a MQTT disconnect packet.

options supports the following properties only MQTT 5.0:

  • reasonCode: Disconnect Reason Code number
  • properties: object
    • sessionExpiryInterval: representing the Session Expiry Interval in seconds number,
    • reasonString: representing the reason for the disconnect string,
    • userProperties: The User Property is allowed to appear multiple times to represent multiple name, value pairs object,
    • serverReference: String which can be used by the Client to identify another Server to use string

Connection#auth(options, [callback])

Send a MQTT auth packet. Only MQTT 5.0

options supports the following properties only MQTT 5.0:

  • reasonCode: Auth Reason Code number
  • properties: object
    • authenticationMethod: the name of the authentication method used for extended authentication string,
    • authenticationData: Binary Data containing authentication data binary,
    • reasonString: representing the reason for the disconnect string,
    • userProperties: The User Property is allowed to appear multiple times to represent multiple name, value pairs object

Event: 'connect'

function(packet) {}

Emitted when a MQTT connect packet is received by the client.

packet is an object that may have the following properties:

  • version: the protocol version string
  • versionNum: the protocol version number
  • keepalive: the client's keepalive period
  • clientId: the client's ID
  • will: an object with the following keys:
    • topic: the client's will topic
    • payload: the will message
    • retain: will retain flag
    • qos: will qos level
    • properties: properties of will
  • properties: properties of packet
  • clean: clean start flag
  • username: v3.1 username
  • password: v3.1 password

Event: 'connack'

function(packet) {}

Emitted when a MQTT connack packet is received by the client.

packet is an object that may have the following properties:

  • returnCode: the return code of the connack packet
  • properties: properties of packet

Event: 'publish'

function(packet) {}

Emitted when a MQTT publish packet is received by the client.

packet is an object that may have the following properties:

  • topic: the topic the message is published on
  • payload: the payload of the message
  • messageId: the ID of the packet
  • properties: properties of packet
  • qos: the QoS level to publish at

Events: <'puback', 'pubrec', 'pubrel', 'pubcomp', 'unsuback'>

function(packet) {}

Emitted when a MQTT [puback, pubrec, pubrel, pubcomp, unsuback] packet is received by the client.

packet is an object that may contain the property:

  • messageId: the ID of the packet
  • properties: properties of packet

Event: 'subscribe'

function(packet) {}

Emitted when a MQTT subscribe packet is received.

packet is an object that may contain the properties:

  • messageId: the ID of the packet
  • properties: properties of packet
  • subscriptions: an array of objects representing the subscribed topics, containing the following keys
    • topic: the topic subscribed to
    • qos: the qos level of the subscription

Event: 'suback'

function(packet) {}

Emitted when a MQTT suback packet is received.

packet is an object that may contain the properties:

  • messageId: the ID of the packet
  • properties: properties of packet
  • granted: a vector of granted QoS levels

Event: 'unsubscribe'

function(packet) {}

Emitted when a MQTT unsubscribe packet is received.

packet is an object that may contain the properties:

  • messageId: the ID of the packet
  • properties: properties of packet
  • unsubscriptions: a list of topics the client is unsubscribing from, of the form [topic1, topic2, ...]

Events: <'pingreq', 'pingresp'>


Emitted when a MQTT [pingreq, pingresp, disconnect] packet is received.

packet only includes static header information and can be ignored.

Event: 'disconnect'

function(packet) {}

Emitted when a MQTT disconnect packet is received.

packet only includes static header information and can be ignored for MQTT < 5.0.

packet is an object that may contain the properties for MQTT 5.0:

  • reasonCode: disconnect Reason Code
  • properties: properties of packet

Event: 'auth'

function(packet) {}

Emitted when a MQTT auth packet is received.

packet is an object that may contain the properties:

  • reasonCode: Auth Reason Code
  • properties: properties of packet


Returns a Transform stream that calls generate(). The stream is configured into object mode.


Returns a Transform stream that embeds a Parser and calls Parser.parse() for each new Buffer. The stream is configured into object mode. It accepts the same options of parser(opts).


mqtt-connection is an OPEN Open Source Project. This means that:

Individuals making significant and valuable contributions are given commit-access to the project to contribute as they see fit. This project is more like an open wiki than a standard guarded open source project.

See the file for more details.


mqtt-connection is only possible due to the excellent work of the following contributors:

Matteo CollinaGitHub/mcollinaTwitter/@matteocollina
Adam RuddGitHub/adamvrTwitter/@adam_vr
Siarhei BuntsevichGitHub/scarry1992



  MIT
  • Whatever
  • Unknown
  • released 2/22/2021

