Messaging

Today meetup … “Open sourcing the IoT : running EnMasse on Kubernetes”

Yes … I’m at the airport waiting for my flight coming back home and I like to write something about the reason of my trip … as usual.

IMG_20170605_132419 DBjIUg1W0AEL7u7

Today, I had a meetup in Milan hosted in the Microsoft Office and organized by my friend Felice Pescatore who leads the AgileIoT project; of course my session was about messaging and IoT … so no news on that. The title ? “Open sourcing the IoT : running EnMasse on Kubernetes”.

Other friends were there with their sessions like Felice himself, Valter Minute speaking about how moving from an IoT prototype to a product and Clemente Giorio and Matteo Valoriani with very interesting sessions about Holo Lens real scenarios.

I started with an introduction about messaging and how it is related to the IoT then moving to the EnMasse project, an open source “messaging as a service” platform that is well suited for being the messaging infrastructure of an IoT solution (for example, it’s applicable inside the Eclipse Hono project).

I showed main EnMasse features and the new ones which will come in the next weeks and how EnMasse provides a messaging and IoT solution from an “on-premise” deployment to the “cloud” in a Kubernetes or OpenShift cluster. For this reason I said “open sourcing the IoT”, because all the components in such solution are open source !

IMG_20170605_132407 IMG_20170605_132359

For showing that, I had a demo with a Kubernetes cluster running on Azure Container Service deploying EnMasse and Apache Spark on that. This demo was made of an AMQP publisher sending simulated temperature values to a “temperature” address deployed in EnMasse (as a queue) and a Spark Streaming job reading such values in order to process them in real time and getting the max value in the latest 5 seconds writing the result to the “max” address (another queue); finally an AMQP receiver was running in order to read and show such values from “max”.

If you want to know more about that you can find the following resources :

Yesterday DevDay meetup : “messaging” in Naples !

devday_00

Yesterday evening I had the session titled “Messaging as a Service : building a scalable messaging service” during a meetup here in Naples speaking about the EnMasse project. The event was organized by the DevDay community which is active in my region in order to get in touch with developers who work with different technologies. I was very pleased to tell my experience (as a contributor) on developing a messaging service running “on premise” or in the cloud.

devday_01

devday_02

Following you can find the resources for this session :

  • the video published in the DevDay official YouTube channel
  • the slides and the demo code

Last but not least, I’d like to thank Davice Cerbo (from DevDay) who invited me to join the co-working space as guest during the day and setting up this meetup in the best way. Davide … keep up this great work for next events ! 😉

Let’s talk about EnMasse : the open source “Messaging as a Service”

After the Red Hat Summit speaking about JBoss AMQ and Apache Kafka using the EnMasse project, the coming weeks will be rich of sessions about this “Messaging as a Service” platform.

First of all, I’ll have a meetup on May 22nd in Naples organized by the DevDay community. It will be all around messaging (and I’m not going to speak about Whatsapp, Hangout, … :-)) and how we are developing a “Messaging as a Service” solution running on Kubernetes and OpenShift : it’s name is EnMasse.

Selection_058

The other session will be on June 5th in Milan during an IoT meetup organized by the AgileIoT community in the Microsoft House. There, I’ll always speak about EnMasse and how it “democratizes” the IoT giving you a full open source solution for that : in this case I’ll show how this “Messaging as a Service” platform can run in the Azure cloud as well.

Selection_059

So … if you want to know more about EnMasse just pick one of this events … or both ! 🙂

 

A new “Kafka” novel : the OpenShift & Kubernetes deployment

This blog post doesn’t want to be an exhaustive tutorial to describe the way to go for having Apache Kafka deployed in an OpenShift or Kubernetes cluster but just the story of my journey for having a “working” deployment and using it as a starting point to improve over time with a daily basis work in progress. This journey started using Apache Kafka 0.8.0, went through 0.9.0, finally reaching the nowadays 0.10.1.0 version.

From “stateless” to “stateful”

One of the main reasons to use a platform like OpenShift/Kubernetes (let me to use OS/K8S from now) is the scalability feature we can have for our deployed applications. With “stateless” applications there are not so much problems to use such a platform for a Cloud deployment; every time an application instance crashes or needs to be restarted (and/or relocated to a different node), just spin up a new instance without any relationship with the previous one and your deployment will continue to work properly as before. There is no need for the new instance to have information or state related to the previous one.

It’s also true that, out there, we have a lot of different applications which need to persist state information if something goes wrong in the Cloud and they need to be restarted. Such applications are “stateful” by nature and their “story” is important so that just spinning up a new instance isn’t enough.

The main challenges we have with OS/K8S platform are :

  • pods are scaled out and scaled in through Replica Sets (or using Deployment object)
  • pods will be assigned an arbitrary name at runtime
  • pods may be restarted and relocated (on a different node) at any point in time
  • pods may never be referenced directly by the name or IP address
  • a service selects a set of pods that match specific criterion and exposes them through a well-defined endpoint

All the above considerations aren’t a problem for “stateless” applications but they are for “stateful” ones.

The difference between them is also know as “Pets vs Cattle” meme, where “stateless” applications are just a herd of cattle and when one of them die, you can just replace it with a new one having same characteristics but not exactly the same (of course !); the “stateful” applications are like pets, you have to take care of them and you can’t just replace a pet if it’s die 😦

Just as reference you can read about the history of “Pets vs Cattle” in this article.

Apache Kafka is one of these type of applications … it’s a pet … which needs to be handle with care. Today, we know that OS/K8S offers Stateful Sets (previously known as Pet Sets … for clear reasons!) that can be used in this scenario but I started this journey when they didn’t exist (or not released yet), so I’d like to share with you my story, the main problems I encountered and how I solved them (you’ll see that I have “emulated” something that Stateful Sets offer today out of box).

Let’s start with a simple architecture

Let’s start in a very simple way using a Replica Set (only one replica) for Zookeeper server and the related service and a Replica Set (with three replicas) for Kafka servers and the related service.

reference_architecture_1st_ver

The Kafka Replica Set has three replicas for “quorum” and leader election (even for topic replication). The Kafka service is needed to expose Kafka servers access even to clients. Each Kafka server may need :

  • unique ID (for Zookeeper)
  • advertised host/port (for clients)
  • logs directory (for storing topic partitions)
  • Zookeeper info (for connection)

The first approach is to use the broker id dynamic generation so that when a Kafka server starts and needs to connect to Zookeeper, a new broker id is generated and assigned to it. The advertised host/port are just container IP and the fixed 9092 port while the logs directory is predefined (by configuration file). Finally, the Zookeeper connection info are provided through the related Zookeeper service using the related environment variables that OS/K8S creates for us (ZOOKEEPER_SERVICE_HOST and ZOOKEEPER_SERVICE_PORT).

Let’s consider the following use case with a topic (1 partition and 3 replicas). The initial situation is having Kafka servers with broker id 1001, 1002, 1003 and the topic with current state :

  • leader : 1001
  • replicas : 1001, 1002, 1003
  • ISR : 1001, 1002, 1003

It means that clients need to connect to 1001 for sending/receiving messages for the topic and that 1002 and 1003 are followers for having this topic replicated handling failures.

Now, imagine that the Kafka server 1003 crashes and a new instance is just started. The topic description becomes :

  • leader : 1001
  • replicas : 1001, 1002, 1003 <– it’s still here !
  • ISR  : 1001, 1002 <– that’s right, 1003 is not “in-sync”

Zookeeper still sees the broker 1003 as a host for one of the topic replicas but not “in-sync” with the others. Meantime, the new started Kafka server has a new auto generated id 1004. A manual script execution (through the kafka-preferred-replica-election.sh) is needed in order to :

  • adding 1004 to the replicas
  • removing 1003 from replicas
  • new leader election for replicas

use_case_autogenerated_id.png

So what does it mean ?

First of all, the new Kafka server instance needs to have the same id of the previous one and, of course, the same data so the partition replica of the topic. For this purpose, a persistent volume can be the solution used, through a claim, by the Replica Set for storing the logs directory for all the Kafka servers (i.e. /kafka-logs-<broker-id>). It’s important to know that, by Kafka design, a logs directory has a “lock” file locked by the server owner.

For searching for the “next” broker id to use, avoiding the auto-generation and getting the same data (logs directory) as the previous one, a script (in my case a Python one) can be used on container startup before launching the related Kafka server.

In particular, the script :

  • searches for a free “lock” file in order to reuse the broker id for the new Kafka server instance …
  • … otherwise a new broker id is used and a new logs directory is created

Using this approach, we obtain the following result for the previous use case :

  • the new started Kafka server instance acquires the broker id 1003 (as the previous one)
  • it’s just automatically part of the replicas and ISR

use_case_locked_id

But … what on Zookeeper side ?

In this deployment, the Zookeeper Replica Set has only one replica and the service is needed to allow connections from the Kafka servers. What happens if the Zookeeper crashes (application or node fails) ? The OS/K8S platform just restarts a new instance (not necessary on the same node) but what I see is that the currently running Kafka servers can’t connect to the new Zookeeper instance even if it holds the same IP address (through the service usage). The Zookeeper server closes the connections after an initial handshake, probably related to some Kafka servers information that Zookeeper stores locally. Restarting a new instance, this information are lost !

Even in this case, using a persistent volume for the Zookeeper Replica Set is a solution. It’s used for storing the data directory that will be the same for each instance restarted; the new instance just finds Kafka servers information in the volume and grants connections to them.

reference_architecture_1st_ver_zookeeper

When the Stateful Sets were born !

At some point (from the 1.5 Kubernetes release), the OS/K8S platform started to offer the Pet Sets then renamed in Stateful Sets like a sort of Replica Sets but for “stateful” application but … what they offer ?

First of all, each “pet” has a stable hostname that is always resolved by DNS. Each “pet” is being assigned a name with an ordinal index number (i.e. kafka-0, kafka-1, …) and finally a stable storage is linked to that hostname/ordinal index number.

It means that every time a “pet” crashes and it’s restarted, the new one will be the same : same hostname, same name with ordinal index number and same attached storage. The previous running situation is fully recovered and the new instance is exactly the same as the previous one. You could see them as something that I tried to emulate with my scripts on container startup.

So today, my current Kafka servers deployment has :

  • a Stateful set with three replicas for Kafka servers
  • an “headless” service (so without an assigned cluster IP) that is needed for having Stateful set working (so for DNS hostname resolution)
  • a “regular” service for providing access to the Kafka servers from clients
  • one persistent volume for each Kafka server with a claim template defined in the Stateful set declaration

reference_architecture_statefulsets

Other then to use a better implementation 🙂 … the current solution doesn’t use a single persistent volume for all the Kafka servers (having a logs directory for each of them) but it’s preferred to use a persistent storage dedicated to only one “pet”.

It’s great to read about it but … I want to try … I want to play !

You’re right, I told you my journey that isn’t finished yet but you would like to try … to play with some stuff for having Apache Kafka deployed on OS/K8S.

I called this project Barnabas like one of the main characters of the author Franz Kafka who was a … messenger in “The Castel” novel :-). It’s part of the bigger EnMasse project which provides a scalable messaging as a service (MaaS) infrastructure running on OS/K8S.

The repo provides different deployment types : from the “handmade” solution (based on bash and Python scripts) to the current Stateful Sets solution that I’ll improve in the coming weeks.

The great thing about that (in the context of the overall EnMasse project) is that today I’m able to use standard protocols like AMQP and MQTT to communicate with an Apache Kafka cluster (using an AMQP bridge and an MQTT gateway) for all the use cases where using Kafka makes sense against traditional messaging brokers … that from their side have to tell about a lot of stories and different scenarios 😉

Do you want to know more about that ? The Red Hat Summit 2017 (Boston, May 2-4) could be a good place, where me and Christian Posta (Principal Architect, Red Hat) will have the session “Red Hat JBoss A-MQ and Apache Kafka : which to use ?” … so what are you waiting for ? See you there !

Vert.x and IoT in Rome : what a meetup !

Yesterday I had a great day in Rome for a meetup hosted by Meet{cast} (powered by dotnetpodcast community) and Codemotion, speaking about Vert.x and how we can use it for developing “end to end” Internet of Things solutions.

17352445_10208955590111131_6229030843024604532_n

17352567_10208955588791098_766816304298598626_n

I started with an high level introduction on Vert.x and how it works, its internals and its main usage then I moved to dig into some specific components useful for developing IoT applications like the MQTT server, AMQP Proton and Kafka client.

17342690_10208955588751097_8818320599257580571_n

17352571_10208955588951102_2851165399929439718_n

It was interesting to know that even in Italy a lot of developers and companies are moving to use Vert.x for developing microservices based solutions. A lot of interesting questions came out … people seem to like it !

Finally, in order to prove the Vert.x usage in enterprise applications I showed two real use cases that today work thanks to the above components : Eclipse Hono and EnMasse. I had few time to explain better how EnMasse works in details, the Qpid Dispatch Router component in particular and for this reason I hope to have a future meetup on that, the AMQP router concept is quite new today ! In any case, knowing that such a scalable platform is based (even) on Vert.x was a great news for the attendees.

17264802_10208955590191133_8923182437405273553_n

If you are interested to know more about that, you can take a look to the slides and the demo. Following the link to the video of the meetup but only in Italian (my apologies for my English friends :-)). Hope you’ll enjoy the content !

Of course, I had some networking with attendees after the meetup and … with some beer 🙂

17310150_1421561734583219_8414988688301135801_o

Designing MQTT over AMQP

mqtt_over_amqp

Let’s think about what we can consider some weaknesses of the MQTT protocol …

  • it provides native publish/subscribe pattern only; having request/reply is quite cumbersome with “correlation injection” inside topics and payload messages;
  • it doesn’t provide flow control;
  • it doesn’t have metadata information inside messages (i.e. content type);
  • it’s fully broker centric;

but there are some unique features that we have to consider as strengths for this protocol as well …

  • retain message also known as “last well known” message;
  • last will testament (LWT);
  • session handling;

Now, let’s think about the main features that fill the gap for the MQTT protocol but provided by AMQP …

  • native support for both publish/subscribe and request/reply patterns (correlation for free);
  • flow control at different levels (with max window size and message credits);
  • a full type system and metadata information on each message;
  • peer-to-peer model (the broker is just a peer);

but it lacks of the above three MQTT features !

So how greater could be AMQP protocol having such features on top of it ?

Under the open source EnMasse project, I have been working on having a design (so a kind of “specification”) for having retain message, last will testament and session handling over AMQP. At same time I have been developing an MQTT “gateway” in order to allow remote MQTT clients to connect to an AMQP based “Message as a Service” like EnMasse.

Having such a design means that not only an MQTT client can leverage on receiving retain message after subscribing to a topic, sending its LWT on connection or receiving messages for a topic when it was offline; it means having the above features for native AMQP clients as well.

Each feature is made by a well defined AMQP message using specific subject, annotations and payload in order to bring all MQTT related information like retain flag, will QoS, will topic and so on but using AMQP semantic.

This sort of “specification” doesn’t force to use a specific implementation; the EnMasse project leverages on the Qpid Dispatch Router for connections and Apache Artemis brokers where state is needed (but other implementations could use something different like a simple file system or a database). Of course, some additional services are needed in order to handle LWT and subscriptions (we called them just “Will Service” and “Subscription Service”).

If you are so curious and want to give some feedback on that, you can find all the open source stuff on GitHub in the MQTT over AMQP documentation section.

Feel free to enjoy the related EnMasse implementation ! 😉

Node-RED : AMQP 1.0 support with the RHEA nodes library

I’m very happy to announce that I have just released the first version of a new messaging and IoT library for providing AMQP 1.0 protocol support on Node-RED that is a very interesting tool for wiring Internet of Things.

This library is based on the official JavaScript RHEA library which provides AMQP 1.0 support for NodeJS applications (developed by one of my best colleagues in Red Hat, Gordon Sim).

Up to now, the available nodes are :

  • sender : for sending messages to an AMQP address
  • receiver : for receiving messages from an AMQP address
  • requester : for executing a request to an AMQP address and waiting a response on the address specified in the “reply-to” field
  • responder : for receiving a request on an AMQP address and sending the response on the address specified in the “reply-to” field

Of course it’s open source (available on GitHub) with a getting started guide and documentation available on the related Wiki.

amqp_nodes

I published it on NPM (Node-RED is completely based on NodeJS) at following link but you can find it even on the official Node-Red web site in the flows section (searching for “amqp” for example).

Enjoying AMQP messaging and IoT with Node-RED flow programming !