EMQ 2.x

MQTT for Transport and Sesssions

EMQ is an open-source MQTT broker implementation by Feng Lee. It's clean, concise and robust. We've developed an EMQ compatible plugin that works as plugin and have different connection schema than Cowboy's ranch integration of N2O.

Usually, Web Framework uses HTTP or WebSocket endpoints. But what if we need to connect application on devices with TCP/UDP transports? One of the protocols addressing this usecase is MQTT that was designed by Andy Stanford-Clark (IBM) and Arlen Nipper in 1999 for connecting Oil Pipeline telemetry systems over satellite. The protocol is designed to work over connectionless unreliable environments, and provides three levels of QoS delivery: at least once, at most once, exactly once.

Starting from 4.5, N2O reduces everything that duplicates MQTT features. On the other hand, MQTT version is a completely N2O-compatible embeddable protocol relay with limited features. And it takes only 25KB in Erlang.

  • n2o — N2O Protocol Server
    version 4.5 MQTT — 10KB
  • n2o_ftp — N2O File Protocol
    : FTP — 4KB
  • n2o_nitro — N2O Nitro Protocol
  • n2o_proto — N2O Protocols Loop
    : NITRO, FTP — 1KB
  • n2o_secret — N2O Security
    : HMAC AES/CBC-128 — 1KB

N2O MQTT provides real smooth experience for developers: clean git history, small codebase, nitro compatible, man/html docs.

Listing 1. N2O 4.5 MQTT
$ time git clone git://github.com/synrc/mqtt real 0m0.794s $ cloc . 370 $ cat rebar.config {deps, [{nitro, "git://github.com/synrc/nitro"}, {n2o, "git://github.com/synrc/mqtt"}]}. $ man ./man/n2o.1

Differences from WebSocket

N2O_start and n2o.js are no longer used in MQTT version of N2O. Instead of N2O_start, MQTT_start and mqtt.js should be used for session control replacement. We traded HEART protocol and session facilities for built-in MQTT features. N2O authentication and authorization mechanisms are also abandoned as MQTT could provide AUTH features too.


All N2O/MQTT messages which go directly to TCP and/or WebSocket. However there are some endpoints which are not TCP sockets, even non-sockets, like gen_server endpoint, HTTP REST endpoint, which is non-WebSocket endpoint, and there is a room for other endpoint types.

Here is a list of types of endpoints which are supported by EMQ and accesible to N2O apps: WebSockets, MQTT, MQTT-SN, TCP, UDP, CoAP. Normal use of N2O as a Web Framework or a Web Application Server is through WebSockets, but for IoT and MQTT applications it could be served through UDP or SCTP protocols providing application level message delivery consistency. By using MQTT as a transport we extend the supported set of endpoint protocols.

Also N2O is able to work under cowboy Erlang web server and mochiweb web server. There is rest library to deal with REST endpoint.

N2O and MQTT Formatters

N2O supports TEXT, JSON, XML, BERT, MessagePack formatters but you can add your own implementations either. They are terminal formatters before socket interface library (usually ranch or esockd). For example one may format IO message with BERT encoding n2o:format({io,Eval,Data},bert).

MQTT format messages only in MQTT format. You can wrap binary B in MQTT packet as emqttd_message:make(Name, 0, Name, B).

Sample Application

Listing 2. Sample Web Application written in NITRO DSL
main() -> []. event(init) -> nitro:update(loginButton, #button{id=loginButton,body="Login", postback=login,source=[user,pass]}); event(login) -> nitro:redirect("index.htm?room=" ++ nitro:to_list( n2o:q(pass)));
Picture 3. N2O Review Application

MQTT JavaScript Client

Listing 5. mq.js
var subscribeOptions = { qos: 2, // QoS invocationContext: { foo: true }, // Passed to success / failure callback onSuccess: function (x) { console.log("N2O Subscribed"); }, onFailure: function (m) { console.log("N2O SubError: " + m.errorMessage); }, timeout: 2 }; var options = { timeout: 2, userName: module, password: token(), cleanSession: false, onFailure: function (m) { console.log("N2O ConnError: " + m.errorMessage); }, onSuccess: function () { console.log("N2O Connected "); ws.send(enc(tuple(atom('init'),bin(token())))); } }; function rnd() { return Math.floor((Math.random() * nodes)+1); } function pageModule() { return module || 'api'; } function token() { return localStorage.getItem("token") || ''; }; function topic(prefix) { return prefix + "/1/" + rnd() + "/" + pageModule() + "/anon/" + client() + "/" + token(); } function gen_client() { return Math.random().toString(36).substring(2) + (new Date()).getTime().toString(36); } function client() { var c = localStorage.getItem("client"), a; if (null == c) { c = 'emqttd_' + gen_client(); } localStorage.setItem("client", c); return c; } mqtt = new Paho.MQTT.Client(host, 8083, client()); mqtt.onConnectionLost = function (o) { console.log("connection lost: " + o.errorMessage); }; mqtt.onMessageArrived = function (m) { var BERT = m.payloadBytes.buffer.slice(m.payloadBytes.byteOffset, m.payloadBytes.byteOffset + m.payloadBytes.length); try { erlang = dec(BERT); for (var i = 0; i < $bert.protos.length; i++) { p = $bert.protos[i]; if (p.on(erlang, p.do).status == "ok") return; } } catch (e) { console.log(e); } }; mqtt.connect(options);

Enable EMQ Plugin

The N2O over MQTT bridge is a drop-in plugin for EMQ broker. After starting the server you should enable N2O over MQTT bridge EMQ plugin on plugin page and then open the application sample

Picture 4. N2O 4.5 MQTT as EMQ Plugin

The nice thing about EMQ is that it enables session introspection for N2O connections. The EMQ Dashboards is written in vue.js and a bit complex for admin application. The possible and expecting hackaton is on rewriting EMQ admin with hyperapp and n2o. The IBM library Paho is also too old and fat to fit N2O sizes. Some parts should be replaced with more modern and compact pinger and connection respawning from n2o.js. This is expecting further research and imporevement.

Picture 5. Inspecting N2O sessions with EMQ

EMQ — Lightweight Broker as Service Container for N2O Apps

EMQ is the latest software that has robust iOS and Android MQTT client libraries, just have a look at https://github.com/emqtt Github organization.

Picture 6. MQTT 3.1.1 vs MQTT 5.0

We made a compatible EMQ plugin and N2O Review sample application with built-in EMQ with minimal dependencies. Our fork is based on the latest EMQ master and is dedicated for building with our MAD build tool.

Listing 6. Setup MQTT flavoured N2O.
$ brew install erlang $ curl -fsSL https://raw.github.com/synrc/mad/master/mad > mad \ && chmod +x mad \ && sudo cp mad /usr/local/bin $ git clone git://github.com/voxoz/mq && cd mq $ time mad dep com Writing /apps/review/ebin/review.app OK real 1m45.357s user 0m17.166s sys 0m5.065s $ mad rep Configuration: [{n2o, [{port,8000}, {app,review}, {pickler,n2o_secret}, {formatter,bert}, {log_modules,config}, {log_level,config}]}, {emq_dashboard, [{listeners_dash, [{http,18083,[{acceptors,4},{max_clients,512}]}]}]}, {emqttd, [{listeners, [{http,8083,[{acceptors,4},{max_clients,512}]}, {tcp,1883,[{acceptors,4},{max_clients,512}]}]}, {sysmon, [{long_gc,false}, {long_schedule,240}, {large_heap,8000000}, {busy_port,false}, {busy_dist_port,true}]}, {session, [{upgrade_qos,off}, {max_inflight,32}, {retry_interval,20}, {max_awaiting_rel,100}, {await_rel_timeout,20}, {enable_stats,off}]}, {queue,[]}, {allow_anonymous,true}, {protocol, [{max_clientid_len,1024},{max_packet_size,64000}]}, {acl_file,"etc/acl.conf"}, {plugins_etc_dir,"etc/plugins/"}, {plugins_loaded_file,"etc/loaded_plugins"}, {pubsub, [{pool_size,8},{by_clientid,true},{async,true}]}]}, {kvs, [{dba,store_mnesia}, {schema,[kvs_user,kvs_acl,kvs_feed,kvs_subscription]}]}] Applications: [kernel,stdlib,gproc,lager_syslog,pbkdf2,asn1,fs,ranch,mnesia, compiler,inets,crypto,syntax_tools,xmerl,gen_logger,esockd, cowlib,goldrush,public_key,lager,ssl,cowboy,mochiweb,emqttd, erlydtl,kvs,mad,emqttc,nitro,rest,sh,syslog,review] Erlang/OTP 19 [erts-8.3] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false] [dtrace] Eshell V8.3 (abort with ^G) starting emqttd on node 'nonode@nohost' Nonexistent: [] Plugins: [{mqtt_plugin,emq_auth_username,"2.1.1", "Authentication with Username/Password",false}, {mqtt_plugin,emq_dashboard,"2.1.1","EMQ Web Dashboard",false}, {mqtt_plugin,emq_modules,"2.1.1","EMQ Modules",false}, {mqtt_plugin,n2o,"4.5-mqtt","N2O Server",false}] Names: [emq_dashboard,n2o] dashboard:http listen on with 4 acceptors. Async Start Attempt {handler,"timer",n2o,system,n2o,[],[]} Proc Init: init mqtt:ws listen on with 4 acceptors. mqtt:tcp listen on with 4 acceptors. emqttd 2.1.1 is running now >

Related Documents