Reactive streams ? AMQP 1.0 is a really good “reactive” protocol !


During the just passed Christmas holidays, I decided to spend the spare time for digging into the reactive programming paradigm, the “Reactive streams” manifesto and the related ReactiveX implementation (more specifically on the RxJava one).

This blog post doesn’t mean to be a discussion about what reactive streams are or what reactive programming is just because you can find a lot of really useful resources on these arguments on the Internet but, because I’m a messaging and IoT guy, during this article I’ll try to describe some (really trivial) thoughts I had “discovering” the reactive streams API  and comparing them to the AMQP 1.0 protocol.

On December 30th, I tweeted …


As you can see, I defined AMQP 1.0 as a “reactive” protocol because I really found all the semantics and the related definitions from the reactive streams API in the AMQP 1.0 specification.

What I’m going to describe is a mapping at 20,000 feet without digging into all the possible problems we can encounter doing that, just because it seemed rather trivial to me; I’d like to open a discussion on it or giving inputs to the other people for thinking about that.

It could be useful when it comes to use a reactive programming model in a microservices based system where a “good” messaging protocol for supporting such a model is needed.

The Reactive Streams API

We know that AMQP 1.0 is really a peer-to-peer protocol so we can establish a communication between two clients directly or using an intermediary (one or more) such as a broker (for allowing store-and-forward) or a router (providing direct-messaging as well). In all these use cases, it’s always about having a “sender” and a “receiver” which can be just mapped to a “publisher” and a “subscriber” in reactive streams API terms (if you think about ReactiveX, then you know them as “observable” and “observer”).

The reactive streams API are defined with four main interfaces with some methods which can be mapped in terms of specific AMQP 1.0 “performatives”.

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();

public interface Subscription {
    public void request(long n);
    public void cancel();

The above interfaces describes how a “subscriber” can subscribe in order to receive stream of events published by a “publisher” and how this one “pushes” events to the subscriber. The API also defines how it’s possible to for the subscriber to avoid being overwhelmed by events if it’s slower than the publisher, using a “request for events” mechanism on the subscription. Finally, the subscriber can be notified when the stream is completed (if it’s not infinite) or if an error occurred.

During this post I won’t consider the Processor interface which enables an entity to be both a “publisher” and “subscriber” and it’s mainly used for implementing “operators” in the stream processing chain.

Attaching as … subscribing

The Publisher interface provides a subscribe method which is called by the subscriber when it wants to start receiving all the published events on the related stream (in a “push” fashion).

If we assign a name to the stream which could be an “address” in AMQP terms, then such an operation could be an “attach” performative sent by the subscriber which acts as a receiver on the established link. In the opposite direction, the publisher can reply with an “attach” performative (on the same address) acting as a sender and this operation could be mapped as the onSubscribe method call on the Subscriber interface.


FIG.1 – attach as Publisher.subscribe, Subscriber.onSubscribe

Credits based flow control and transfer … for back-pressure and pushing events

One of the main reactive streams concepts which convinced me that AMQP is really a “reactive” protocol was the back-pressure. It provides a way for handling scenarios where a subscriber is slower than the publisher avoiding to be overwhelmed by a lot of events it can’t handle (losing them); the subscriber can notify to the publisher the maximum number of events it can receive. In my mind it’s something that AMQP provides out-of-box with the credits based flow control (something that it’s not available with the MQTT 3.1.1 protocol for example).

In terms of reactive streams API, such a feature is provided by the Subscription interface with the request method; calling this method, the subscriber says the maximum number of events to the publisher. In AMQP terms, it means that the receiver sends a “flow” performative specifying the credits number as the maximum number of messages it can handle in a specific moment in time.

At this point, the publisher can start to push events to the subscriber and it’s available through the onNext method call on the Subscriber interface. Even in this case, in AMQP terms, the sender starts to send one or more “transfer” performatives to the receiver with the message payload (representing the event).


FIG.2 – flow as Subscription.request(n) and transfer as Subscriber.onNext

Detaching … for cancelling, completed streams or errors

In order to complete this 20,000 feet mapping, there are few other methods provided by the reactive streams API I haven’t covered yet.

First of all, the subscriber can decide to not receiving events anymore calling the cancel method on the Subscription interface which in AMQP terms could be a simple “detach” performative sent by the receiver during the “normal” messages (events) exchanges.


FIG.3 – detach from receiver as Subscription.cancel

Finally, it’s important to remember that the reactive streams API takes into account finite streams of events and errors as well.

Regarding finite streams of events, the Subscriber interface exposes the onComplete method which is called when the publisher hasn’t no more events to push anymore so the streams is completed. In AMQP, it could mean a “detach” performative sent by the sender without any error conditions.


FIG.4 – detach from sender as Subscriber.onComplete

At same time, the reactive streams API defines a way to handle errors without catching exceptions but handling them as a special events. The Susbcriber interface provides the onError method which is called when an error happens and the subscriber is notified about that (in any case such an error is represented by a Throwable specific implementation). In AMQP, it could mean a “detach” performative sent by the sender (as it happens for a completed stream) but, this time, with an error condition providing specific error information.


FIG.5 – detach from sender as Subscriber.onError


Maybe you could have a different opinion (and I’d like to hear about that) but, at a first glance, it seemed to me that AMQP 1.0 is really THE protocol suited for implementing the reactive streams API and the related reactive programming paradigm when it comes to microservices in a distributed system and how to design their communication in a reactive way. It provides the main communication patters (request/reply but mainly publish/subscribe for this specific use case), it provides flow-control for the back pressure as well. It’s a really “push” oriented protocol compared to the “pull” HTTP nature for example. MQTT could be another protocol used in a reactive fashion but it lacks of flow-control (at least in the current 3.1.1 specification).


IoT weekend 2017 : my session about messaging and IoT … video and slides !


Today I had a really interesting experience thanks to Jorge Maia who, few weeks ago, invited me to have a session for the online Brazilian event IoT Weekend.  Of course, I accepted his invitation in order to speak about “Open sourcing the messaging and IoT” focusing on IoT protocols, patterns and related platforms like EnMasse and Eclipse Hono.

The event is still going on while I’m writing this blog post (for all this weekend) but my session ended less than one hour ago so I’d like to share the material for all the attendees and for the other people who lose the session and could be interested to watch it !

So you can find the video on YouTube here and the slide deck on SlideShare here.



We can have more … EnMasse !

This morning, my working day started in a different way with an interesting news from AWS re:Invent 2017, the annual Amazon conference …

The news was about Amazon MQ, a new managed message broker service based on ActiveMQ 5.x with all the goodies that it provides in terms of supported protocols like MQTT, JMS, STOMP, … and … yes … AMQP 1.0 !

It seems that this news made Clemens Vaster (from Microsoft) happy as well 🙂


Finally, even Amazon added support for a “real” messaging protocol which is enterprise ready and from my point of view … even IoT ready 🙂

Taking a look to the blog post about this new service, another project came to my mind … guess what ? EnMasse !

We can have more : EnMasse !

What the AmazonMQ provides is the possibility to create a new broker instance and then accessing to the console, creating queues, topics and so on. It’s great but … we can have more !

For this reason I decided to write, for the first time, something about EnMasse even if I had a lot of sessions in different conferences in the past, speaking about it.

EnMasse is an open source “messaging as a service” platform which simplifies the deployment of a messaging infrastructure both “on premise” and in the Cloud. It provides scalability and elasticity in order to address all the problems we can have when the number of connected clients increases (and decreases) even reaching big numbers like in an IoT scenario.

It supports all the well-known messaging patterns (request/reply, publish/subscribe and competing consumers) and up today two main protocols, AMQP 1.0 and MQTT (but adding the HTTP support is on the road-map).

It provides multi-tenancy having different tenants sharing the same infrastructure but being isolated each other. Finally, it provides security in terms of using TLS protocol for establishing connections (with clients and between internal components) other than authentication using Keycloak as the identity management system.

Store and forward or … direct ?

One of the main features it provides is the support for two different messaging mechanisms, “store and forward” and “direct messaging”.

The “store and forward” mechanism is exactly what the messaging brokers provide today. The broker takes the ownership of the message sent by a producer before forwarding this message to a consumer which is asking for it (connecting to a queue or a topic on the broker itself). It means that “storing” the message is the first step executed by the broker and “forwarding” is the next one which can happen later, only when a consumer will be online for getting the message : it allows asynchronous communication between clients and time decoupling. There is always a double contract between produce-broker and broker-consumer, so that the producer knows that the messages reached the broker but not the consumer (a new messages exchange on the opposite direction is needed for having something like an “acknowledgement” from the consumer).

The “direct messaging” mechanism is not something new because it means having a sort of “direct” communication between clients, so that the producer is able to send the message only when the consumer is online with a single contract between the parties : when the producer receives the “acknowledgement”, it means that the consumer has got the message. Of course, EnMasse provides this mechanism in a reliable way : it uses an AMQP 1.0 routers network (connected in a mesh) so that clients aren’t really connected in a direct way but through this mesh. Every router, unlike a broker, doesn’t take ownership of the message but just forwards it to the next hop in the network in order to reach the destination. When a router crashes, the network is automatically re-configured in order to determine a new path for reaching the consumer; it means that high availability is provided in terms of “path redundancy”. Furthermore, thanks to the AMQP 1.0 protocol, a producer doesn’t receive “credits” from a router to send messages if the consumer isn’t online or can’t process more messages.

EnMasse provides these messaging mechanisms using two open source projects : Apache Qpid Dispatch Router, for the router network, and ActiveMQ Artemis (so ActiveMQ 6.x and not 5.x like in the AmazonMQ) for the brokers side.


I want to know only about “addresses” !

Comparing to the new AmazonMQ service, from a developers point of view, the interesting part is the abstraction layer that EnMasse adds to the underlying messaging infrastructure. You can create a new “address” using the console and specifying a type which can be :

  • queue : backed by a broker, for “store and forward” and for providing competing consumer pattern, asynchronous communication and so on.
  • topic : backed by a broker, for “store and forward” as well but for providing publish/subscribe pattern.
  • anycast : it’s something like a queue but in terms of “direct messaging”. A producer can send messages to such an address only when one or more consumers are listening on it and the routers network will deliver them in a competing consumer fashion.
  • multicast : it’s something like a topic but in terms of “direct messaging”, having a producer publishing messages to more consumers listening on the same address so that all of them receive the same message.


The developer doesn’t have to worry about creating the broker, configuring the routers and so on; using the console and a few simple steps in the wizard, he will have a usable “address” for exchanging messages between clients.


Good for microservices …

The interesting part of having the supported “direct messaging” mechanism is even, but not only, about the “micro-services” world. EnMasse can be used as a messaging infrastructure for handing request/reply and publish/subscribe between micro-services using an enterprise protocol like AMQP 1.0.

You can read more about building an AMQP 1.0 based API and a micro-services infrastructure in this article written by on of my colleague, Jakub Scholz.

Who orchestrate ? OpenShift and Kubernetes

Another aspect which makes EnMasse more appealing than other solutions is that it’s totally containerized and runs on the main containers orchestration platforms like Kubernetes and the enterprise OpenShift (using the OpenShift Origin project as well). It means that your messaging based (or IoT) solution can be deployed “on promise” and then easily moved to the Cloud without any changes to your applications (maybe just the addresses for establishing the connections from the clients).



Of course, this blog post didn’t mean to be an exhaustive guide on what EnMasse is but just a brief introduction that I wanted to write for a long time. The Amazon news gave me this opportunity for showing you that you can really have more than just creating a broker in the Cloud and taking care of it 🙂


Eclipse Hono : Virtual IoT meetup

Virtual IoT - Hono

Yesterday, thanks to the Eclipse Foundation I had the chance to talk about Eclipse Hono as speaker for this virtual IoT meetup as part of a meetup series where the focus is on the Eclipse IoT projects. I was with Kai Hudalla (Chief Software Architect at BoschSI) who is co-lead and main contributor on Hono.

It was my first virtual meetup and a really exciting experience for me with almost 90 “virtual” attendees and a lot of interesting questions showing the interest that developers had about this “new” project.

If you didn’t have a chance to watch the session or you want to re-watch it, you can find the recording on YouTube; the slides deck is available here as well.

A lot of fun with … AMQP, Spark, Kafka, EnMasse, MQTT, Vert.x & IoT

When I say to someone that I work for Red Hat they say me “Ah ! Are you working on Linux ?” … No, no, no and … no ! I’m not a Linux guy, I’m not a fan boy but I’m just a daily user 🙂

All people know that Red Hat is THE company which provides the best enterprise Linux distribution well known as Red Hat Enterprise Linux (RHEL) but Red Hat is not only Linux today. Its portfolio is huge : the cloud and containers business with the OpenShift effort, the microservices offer with Vert.x, Wildfly Swarm, Spring Boot, the IoT world with the involvement in the main Eclipse Foundation projects.

The objective of this blog is just showing briefly the projects I worked (or I’m working) on since last year when I was hired on March 1st. They are not “my” projects, they are projects I’m involved because the entire team is working on them … collaboration, you know 🙂

You could be surprised about that but … there is no Linux ! I’m on the messaging & IoT team, so you will see only projects about this stuff 🙂

AMQP – Apache Spark connector

This “little” component is strictly related to the “big” project which takes the powerful of Apache Spark for analytics (batch, real-time, machine learning, …) running on OpenShift.

Because the messaging team works mainly on projects like ActiveMQ Artemis and the Qpid Dispatch Router, where the main protocol is AMQP 1.0, the idea was developing a connector for Spark Streaming in order to ingest data through this protocol so from queues/topics on a broker or through the router in a direct messaging fashion.

You can find the component here and even an IoT demo here which shows how it’s possible to ingest data through AMQP 1.0 using the EnMasse project (see below) and then executing a real time streaming analytics with Spark Streaming, all running on Kubernetes and OpenShift.

AMQP – Apache Kafka bridge

Apache Kafka is one of the best technologies used today for ingesting data (i.e. IoT related scenarios) with an high throughput. Even in this case, the idea was providing a way for having AMQP 1.0 clients and JMS clients pushing messages to Apache Kafka topics without knowing the related custom protocol.

In this way, if you have such clients because you are already using a broker technology but then you need some specific Kafka features (i.e. re-reading streams), you can just switch the messaging system (from the broker to Kafka) and using the bridge you don’t need to update or modify clients. I showed how this is possible at the Red Hat summit as well and the related demo is available here.

MQTT on EnMasse

EnMasse is an open source messaging platform, with focus on scalability and performance. It can run on your own infrastructure (on premise) or in the cloud, and simplifies the deployment of messaging infrastructure.

It’s based on other open source projects like ActiveMQ Artemis and Qpid Dispatch Router supporting the AMQP 1.0 protocol natively.

In order to provide support for the MQTT protocol, we designed how to take “MQTT over AMQP” so having MQTT features on the AMQP protocol. From the design we moved to develop two main components :

  • the MQTT gateway which handles connections with remote MQTT clients translating all messages from MQTT to AMQP and vice versa;
  • the MQTT LWT (Last and Will Testament) service which provides a way for notifying all clients connected to EnMasse that another client is suddenly died sending them its “will message”. The great thing about this service, is that it works with pure AMQP 1.0 clients so bringing the LWT feature on AMQP as well : for this reason the team is thinking to change its name just in AMQP LWT service.

EnMasse is great for IoT scenarios in order to handle a huge number of connections and ingesting a lot of data using AMQP and MQTT as protocols. I used it in all my IoT demos for showing how it’s possible to integrate it with streaming and analytics frameworks. It’s also the main choice as messaging infrastructure in the cloud for the Eclipse Hono project.

Vert.x and the IoT components

Vert.x is a great toolkit for developing reactive applications running on a JVM.

The reactive applications manifesto fits really well for IoT scenarios where responsiveness, resiliency, elasticity and the communication driven by messages are the pillars of all the IoT solutions.

Starting to work on the MQTT gateway for EnMasse using Vert.x for that, I decided to develop an MQTT server that was just able to handle communication with remote clients providing an API for interacting with them : this component was used for bridging MQTT to AMQP (in EnMasse) but can be used for any scenario where a sort of protocol translation or integration is needed (i.e. MQTT to Vert.x Event Bus, to Kafka, …). Pay attention, it’s not a full broker !

The other component was the Apache Kafka client, mainly developed by Julien Viet (lead on Vert.x) and then passed to me as maintainer for improving it and adding new features from the first release.

Finally, thanks to the Google Summer of Code, during the last 2 months I have been mentoring a student who is working on developing a Vert.x native MQTT client.

As you can see the Vert.x toolkit is really growing from an IoT perspective other then providing a lot of components useful for developing pure microservices based solutions.

Eclipse Hono

Eclipse Hono is a project under the big Eclipse IoT umbrealla in the Eclipse Foundation. It provides a service interfaces for connecting large numbers of IoT devices to a back end and interacting with them in a uniform way regardless of the device communication protocol.

It supports scalable and secure ingestion of large volumes of sensor data by means of its Telemetry API. The Command & Control API allows for sending commands (request messages) to devices and receive a reply to such a command from a device asynchronously in a reliable way.

This project is mainly developed by Red Hat and Bosch and I gave my support on designing all the API other then implementing the MQTT adapter even in this case using the Vert.x MQTT server component.

Because Eclipse Hono works on top of a messaging infrastructure for allowing messages exchange, the main choice was using ActiveMQ Artemis and the Qpid Dispatch Router even running them using Kubernetes and OpenShift with EnMasse.

Apache Kafka

Finally, I was involved to develop a POC named “barnabas” (a messenger character from a Frank Kafka novel :-)) in order to take Apache Kafka running on OpenShift.

Considering the stetaful nature of a project like Kafka, I started when Kubernetes didn’t offer the StatefulSets feature doing something similar by myself. Today, the available deploy is based on StatefulSets and it’s a work in progress on which I’ll continue to work for pushing the POC to the next level.

Apache Kafka is a really great project which has its own use cases in the messaging world; today it’s more powerful thanks to the new Streams API which allows to execute a real time streaming analytics using topics from your cluster and running simple applications. My next step is to move my EnMasse + Spark demo to an EnMasse + Kafka (and streaming) deployment. I’m also giving my support on the Apache Kafka code.


The variety and heterogeneity of all the above projects is giving me a lot of fun in my day by day work even collaborating with different people with different knowledge. I like learning new stuff and the great thing is that … things to learn are endless ! 🙂


No winner in the (Industrial) IoT protocols war !

Yesterday, I read this article about declaring MQTT as the winner of the IIoT (Industrial IoT) protocols war and I have a completely different opinion on that so … I totally disagree with the author !

Don’t get me wrong, it’s not because I don’t like MQTT (who knows me, knows that I have done a lot of work around MQTT as well) but just because …

“There is NO winner in the (Industrial) IoT protocols war”

The IoT world is so rich of different use cases, scenarios, features needs and so on that most of the time, the better solution is an “hybrid” one which uses different protocols; even if you focus in the specific IIoT space, that’s true.

IoT has different communication patterns which come from the messaging land and every protocol provides support for one or more of them in different ways; sometimes we have builtin support, sometimes we need to do more work at application level.

MQTT for telemetry ? But …

MQTT fits really well for telemetry because it’s mainly based on publish/subscribe but at same time it has no flow control : what’s happen when the broker is overwhelmed by tons of messages at high rate and it can’t dispatch such messages to the subscribers at the same pace ? It’s even true that most of the time, MQTT devices are tiny sensors which send data with a slow rate (i.e. every second) because they are battery powered and use mobile connection so that they send a message, then go to sleep for few seconds and then wake up for sending the new message. In this case, you don’t have high rate but if you have thousands (millions ?) of these devices, the broker is overwhelmed as well : there is a burst of messages which come and it has to handle all of them.

AMQP doesn’t declare any specific supported pattern and it fits well for all. Regarding telemetry (so publish/subscribe), it provides flow control (even at different levels) so that the receiver node can stop the sender having more time for processing messages received up to now.

Why more complexity for Command & Control ?

Moving to command and control, so speaking about a request/reply pattern, all the MQTT limitations come. In this case, you have to build something on top of the protocol infrastructure defining specific topics for the requests and the related replies and having each client both subscriber (for receiving command) and publisher (for sending reply). There is no correlation between request and reply, it’s all defined at topic level (and/or using payload information).

With AMQP, even this pattern is supported natively. The requester has the possibility to specify a “replyTo” address inside the message, saying to the responder that it expects to receive the reply on such address; even the correlation is supported at protocol level thanks to message and correlation identifiers.

The real feature which makes this difference between AMQP and MQTT is that the former has message metadata (header, annotations and so on) while the latter has just payload (raw bytes) so all the features that it lacks for providing a different pattern from publish/subscribe need to be defined in terms of topics architecture and/or payload structure … so the complexity is moved at application level.

If you want to read more about these differences (even with HTTP protocol) maybe you can find my article “Strengths and weaknesses of IoT communication patterns” on DZone IoT as a useful reference (it’s part of the latest DZone IoT Guide).

Let’s say things as they are

The mentioned article says some wrong things as well.

“AMQP offers robust features like queuing” … to be precise there is no mention about queue in the AMQP specification but container, node, link and so on. This is because AMQP doesn’t specify the network architecture in terms of brokers : pay attention here, I’m speaking about AMQP 1.0 … the only OASIS and ISO/IEC standard (against the AMQP 0.9, used in RabbitMQ). AMQP can be used for RPC without “store and forward” mechanism (provided by brokers) but just with “direct” messaging; AMQP is a peer to peer protocol !

About MQTT … “An example of this optimization is its use of 1 byte keep alive packets.” … no true ! It’s 2 bytes ! … I know I’m a little bit pedantic here 🙂

Finally, it’s not true that only MQTT can work without high-availability and with a low-bandwidth. It’s true even for AMQP, considering the QoS (Quality of Service) levels it supports as well.

Speaking about messages size and computational needs on the devices side.

With MQTT each message carries the topic information, not true with AMQP where the address is specified one time on attaching the link.

When security and encryption come, the SSL/TLS overload minimizes all this differences so that even a 2 bytes packet for keep alive becomes an even bigger message. In this case, it all depends on computational resources you have on your tiny device and the difference between protocols doesn’t matter.


So my conclusion is clear. I have just started with that at the top of this article : there is no winner  in the (Industrial) IoT protocols wars. There are different use cases, scenarios, features needs, limitations … they all drive to the right choice that sometimes means having multiple winners in one solution !

The good news about MQTT is that in the latest v5 specification they are addressing a lot of limitations of the current 3.1.1 version, adding some AMQP-like features 🙂

So stay tuned … the war is endless !




Today meetup … “Open sourcing the IoT : running EnMasse on Kubernetes”

Yes … I’m at the airport waiting for my flight coming back home and I like to write something about the reason of my trip … as usual.

IMG_20170605_132419 DBjIUg1W0AEL7u7

Today, I had a meetup in Milan hosted in the Microsoft Office and organized by my friend Felice Pescatore who leads the AgileIoT project; of course my session was about messaging and IoT … so no news on that. The title ? “Open sourcing the IoT : running EnMasse on Kubernetes”.

Other friends were there with their sessions like Felice himself, Valter Minute speaking about how moving from an IoT prototype to a product and Clemente Giorio and Matteo Valoriani with very interesting sessions about Holo Lens real scenarios.

I started with an introduction about messaging and how it is related to the IoT then moving to the EnMasse project, an open source “messaging as a service” platform that is well suited for being the messaging infrastructure of an IoT solution (for example, it’s applicable inside the Eclipse Hono project).

I showed main EnMasse features and the new ones which will come in the next weeks and how EnMasse provides a messaging and IoT solution from an “on-premise” deployment to the “cloud” in a Kubernetes or OpenShift cluster. For this reason I said “open sourcing the IoT”, because all the components in such solution are open source !

IMG_20170605_132407 IMG_20170605_132359

For showing that, I had a demo with a Kubernetes cluster running on Azure Container Service deploying EnMasse and Apache Spark on that. This demo was made of an AMQP publisher sending simulated temperature values to a “temperature” address deployed in EnMasse (as a queue) and a Spark Streaming job reading such values in order to process them in real time and getting the max value in the latest 5 seconds writing the result to the “max” address (another queue); finally an AMQP receiver was running in order to read and show such values from “max”.

If you want to know more about that you can find the following resources :