A new “Kafka” novel : the OpenShift & Kubernetes deployment

This blog post doesn’t want to be an exhaustive tutorial to describe the way to go for having Apache Kafka deployed in an OpenShift or Kubernetes cluster but just the story of my journey for having a “working” deployment and using it as a starting point to improve over time with a daily basis work in progress. This journey started using Apache Kafka 0.8.0, went through 0.9.0, finally reaching the nowadays version.

From “stateless” to “stateful”

One of the main reasons to use a platform like OpenShift/Kubernetes (let me to use OS/K8S from now) is the scalability feature we can have for our deployed applications. With “stateless” applications there are not so much problems to use such a platform for a Cloud deployment; every time an application instance crashes or needs to be restarted (and/or relocated to a different node), just spin up a new instance without any relationship with the previous one and your deployment will continue to work properly as before. There is no need for the new instance to have information or state related to the previous one.

It’s also true that, out there, we have a lot of different applications which need to persist state information if something goes wrong in the Cloud and they need to be restarted. Such applications are “stateful” by nature and their “story” is important so that just spinning up a new instance isn’t enough.

The main challenges we have with OS/K8S platform are :

  • pods are scaled out and scaled in through Replica Sets (or using Deployment object)
  • pods will be assigned an arbitrary name at runtime
  • pods may be restarted and relocated (on a different node) at any point in time
  • pods may never be referenced directly by the name or IP address
  • a service selects a set of pods that match specific criterion and exposes them through a well-defined endpoint

All the above considerations aren’t a problem for “stateless” applications but they are for “stateful” ones.

The difference between them is also know as “Pets vs Cattle” meme, where “stateless” applications are just a herd of cattle and when one of them die, you can just replace it with a new one having same characteristics but not exactly the same (of course !); the “stateful” applications are like pets, you have to take care of them and you can’t just replace a pet if it’s die 😦

Just as reference you can read about the history of “Pets vs Cattle” in this article.

Apache Kafka is one of these type of applications … it’s a pet … which needs to be handle with care. Today, we know that OS/K8S offers Stateful Sets (previously known as Pet Sets … for clear reasons!) that can be used in this scenario but I started this journey when they didn’t exist (or not released yet), so I’d like to share with you my story, the main problems I encountered and how I solved them (you’ll see that I have “emulated” something that Stateful Sets offer today out of box).

Let’s start with a simple architecture

Let’s start in a very simple way using a Replica Set (only one replica) for Zookeeper server and the related service and a Replica Set (with three replicas) for Kafka servers and the related service.


The Kafka Replica Set has three replicas for “quorum” and leader election (even for topic replication). The Kafka service is needed to expose Kafka servers access even to clients. Each Kafka server may need :

  • unique ID (for Zookeeper)
  • advertised host/port (for clients)
  • logs directory (for storing topic partitions)
  • Zookeeper info (for connection)

The first approach is to use the broker id dynamic generation so that when a Kafka server starts and needs to connect to Zookeeper, a new broker id is generated and assigned to it. The advertised host/port are just container IP and the fixed 9092 port while the logs directory is predefined (by configuration file). Finally, the Zookeeper connection info are provided through the related Zookeeper service using the related environment variables that OS/K8S creates for us (ZOOKEEPER_SERVICE_HOST and ZOOKEEPER_SERVICE_PORT).

Let’s consider the following use case with a topic (1 partition and 3 replicas). The initial situation is having Kafka servers with broker id 1001, 1002, 1003 and the topic with current state :

  • leader : 1001
  • replicas : 1001, 1002, 1003
  • ISR : 1001, 1002, 1003

It means that clients need to connect to 1001 for sending/receiving messages for the topic and that 1002 and 1003 are followers for having this topic replicated handling failures.

Now, imagine that the Kafka server 1003 crashes and a new instance is just started. The topic description becomes :

  • leader : 1001
  • replicas : 1001, 1002, 1003 <– it’s still here !
  • ISR  : 1001, 1002 <– that’s right, 1003 is not “in-sync”

Zookeeper still sees the broker 1003 as a host for one of the topic replicas but not “in-sync” with the others. Meantime, the new started Kafka server has a new auto generated id 1004. A manual script execution (through the is needed in order to :

  • adding 1004 to the replicas
  • removing 1003 from replicas
  • new leader election for replicas


So what does it mean ?

First of all, the new Kafka server instance needs to have the same id of the previous one and, of course, the same data so the partition replica of the topic. For this purpose, a persistent volume can be the solution used, through a claim, by the Replica Set for storing the logs directory for all the Kafka servers (i.e. /kafka-logs-<broker-id>). It’s important to know that, by Kafka design, a logs directory has a “lock” file locked by the server owner.

For searching for the “next” broker id to use, avoiding the auto-generation and getting the same data (logs directory) as the previous one, a script (in my case a Python one) can be used on container startup before launching the related Kafka server.

In particular, the script :

  • searches for a free “lock” file in order to reuse the broker id for the new Kafka server instance …
  • … otherwise a new broker id is used and a new logs directory is created

Using this approach, we obtain the following result for the previous use case :

  • the new started Kafka server instance acquires the broker id 1003 (as the previous one)
  • it’s just automatically part of the replicas and ISR


But … what on Zookeeper side ?

In this deployment, the Zookeeper Replica Set has only one replica and the service is needed to allow connections from the Kafka servers. What happens if the Zookeeper crashes (application or node fails) ? The OS/K8S platform just restarts a new instance (not necessary on the same node) but what I see is that the currently running Kafka servers can’t connect to the new Zookeeper instance even if it holds the same IP address (through the service usage). The Zookeeper server closes the connections after an initial handshake, probably related to some Kafka servers information that Zookeeper stores locally. Restarting a new instance, this information are lost !

Even in this case, using a persistent volume for the Zookeeper Replica Set is a solution. It’s used for storing the data directory that will be the same for each instance restarted; the new instance just finds Kafka servers information in the volume and grants connections to them.


When the Stateful Sets were born !

At some point (from the 1.5 Kubernetes release), the OS/K8S platform started to offer the Pet Sets then renamed in Stateful Sets like a sort of Replica Sets but for “stateful” application but … what they offer ?

First of all, each “pet” has a stable hostname that is always resolved by DNS. Each “pet” is being assigned a name with an ordinal index number (i.e. kafka-0, kafka-1, …) and finally a stable storage is linked to that hostname/ordinal index number.

It means that every time a “pet” crashes and it’s restarted, the new one will be the same : same hostname, same name with ordinal index number and same attached storage. The previous running situation is fully recovered and the new instance is exactly the same as the previous one. You could see them as something that I tried to emulate with my scripts on container startup.

So today, my current Kafka servers deployment has :

  • a Stateful set with three replicas for Kafka servers
  • an “headless” service (so without an assigned cluster IP) that is needed for having Stateful set working (so for DNS hostname resolution)
  • a “regular” service for providing access to the Kafka servers from clients
  • one persistent volume for each Kafka server with a claim template defined in the Stateful set declaration


Other then to use a better implementation 🙂 … the current solution doesn’t use a single persistent volume for all the Kafka servers (having a logs directory for each of them) but it’s preferred to use a persistent storage dedicated to only one “pet”.

It’s great to read about it but … I want to try … I want to play !

You’re right, I told you my journey that isn’t finished yet but you would like to try … to play with some stuff for having Apache Kafka deployed on OS/K8S.

I called this project Barnabas like one of the main characters of the author Franz Kafka who was a … messenger in “The Castel” novel :-). It’s part of the bigger EnMasse project which provides a scalable messaging as a service (MaaS) infrastructure running on OS/K8S.

The repo provides different deployment types : from the “handmade” solution (based on bash and Python scripts) to the current Stateful Sets solution that I’ll improve in the coming weeks.

The great thing about that (in the context of the overall EnMasse project) is that today I’m able to use standard protocols like AMQP and MQTT to communicate with an Apache Kafka cluster (using an AMQP bridge and an MQTT gateway) for all the use cases where using Kafka makes sense against traditional messaging brokers … that from their side have to tell about a lot of stories and different scenarios 😉

Do you want to know more about that ? The Red Hat Summit 2017 (Boston, May 2-4) could be a good place, where me and Christian Posta (Principal Architect, Red Hat) will have the session “Red Hat JBoss A-MQ and Apache Kafka : which to use ?” … so what are you waiting for ? See you there !

Vert.x and IoT in Rome : what a meetup !

Yesterday I had a great day in Rome for a meetup hosted by Meet{cast} (powered by dotnetpodcast community) and Codemotion, speaking about Vert.x and how we can use it for developing “end to end” Internet of Things solutions.



I started with an high level introduction on Vert.x and how it works, its internals and its main usage then I moved to dig into some specific components useful for developing IoT applications like the MQTT server, AMQP Proton and Kafka client.



It was interesting to know that even in Italy a lot of developers and companies are moving to use Vert.x for developing microservices based solutions. A lot of interesting questions came out … people seem to like it !

Finally, in order to prove the Vert.x usage in enterprise applications I showed two real use cases that today work thanks to the above components : Eclipse Hono and EnMasse. I had few time to explain better how EnMasse works in details, the Qpid Dispatch Router component in particular and for this reason I hope to have a future meetup on that, the AMQP router concept is quite new today ! In any case, knowing that such a scalable platform is based (even) on Vert.x was a great news for the attendees.


If you are interested to know more about that, you can take a look to the slides and the demo. In the coming days, the video of the meetup will be available online but it will be in Italian (my apologies for my English only friends :-)). Hope you’ll enjoy the content !

Of course, I had some networking with attendees after the meetup and … with some beer 🙂


Designing MQTT over AMQP


Let’s think about what we can consider some weaknesses of the MQTT protocol …

  • it provides native publish/subscribe pattern only; having request/reply is quite cumbersome with “correlation injection” inside topics and payload messages;
  • it doesn’t provide flow control;
  • it doesn’t have metadata information inside messages (i.e. content type);
  • it’s fully broker centric;

but there are some unique features that we have to consider as strengths for this protocol as well …

  • retain message also known as “last well known” message;
  • last will testament (LWT);
  • session handling;

Now, let’s think about the main features that fill the gap for the MQTT protocol but provided by AMQP …

  • native support for both publish/subscribe and request/reply patterns (correlation for free);
  • flow control at different levels (with max window size and message credits);
  • a full type system and metadata information on each message;
  • peer-to-peer model (the broker is just a peer);

but it lacks of the above three MQTT features !

So how greater could be AMQP protocol having such features on top of it ?

Under the open source EnMasse project, I have been working on having a design (so a kind of “specification”) for having retain message, last will testament and session handling over AMQP. At same time I have been developing an MQTT “gateway” in order to allow remote MQTT clients to connect to an AMQP based “Message as a Service” like EnMasse.

Having such a design means that not only an MQTT client can leverage on receiving retain message after subscribing to a topic, sending its LWT on connection or receiving messages for a topic when it was offline; it means having the above features for native AMQP clients as well.

Each feature is made by a well defined AMQP message using specific subject, annotations and payload in order to bring all MQTT related information like retain flag, will QoS, will topic and so on but using AMQP semantic.

This sort of “specification” doesn’t force to use a specific implementation; the EnMasse project leverages on the Qpid Dispatch Router for connections and Apache Artemis brokers where state is needed (but other implementations could use something different like a simple file system or a database). Of course, some additional services are needed in order to handle LWT and subscriptions (we called them just “Will Service” and “Subscription Service”).

If you are so curious and want to give some feedback on that, you can find all the open source stuff on GitHub in the MQTT over AMQP documentation section.

Feel free to enjoy the related EnMasse implementation ! 😉

Node-RED : AMQP 1.0 support with the RHEA nodes library

I’m very happy to announce that I have just released the first version of a new messaging and IoT library for providing AMQP 1.0 protocol support on Node-RED that is a very interesting tool for wiring Internet of Things.

This library is based on the official JavaScript RHEA library which provides AMQP 1.0 support for NodeJS applications (developed by one of my best colleagues in Red Hat, Gordon Sim).

Up to now, the available nodes are :

  • sender : for sending messages to an AMQP address
  • receiver : for receiving messages from an AMQP address
  • requester : for executing a request to an AMQP address and waiting a response on the address specified in the “reply-to” field
  • responder : for receiving a request on an AMQP address and sending the response on the address specified in the “reply-to” field

Of course it’s open source (available on GitHub) with a getting started guide and documentation available on the related Wiki.


I published it on NPM (Node-RED is completely based on NodeJS) at following link but you can find it even on the official Node-Red web site in the flows section (searching for “amqp” for example).

Enjoying AMQP messaging and IoT with Node-RED flow programming !

My Raspberry Pi runs the Qpid Dispatch Router

This morning my “embedded” soul had an idea for supporting my “messaging” soul and spending some time in a productive way … trying to compile the Qpid Dispatch Router on a different machine with a different architecture : it’s C/C++ code … so it’s portable by definition (even if related to Linux for now) … it uses Python but today this language is available on a lot of different platforms … so it seems to be no problems.

Searching for some embedded stuff in my closet and discarding Cortex-Mx based boards (for obvious reasons regarding Linux lack on them) I got my Raspberry Pi … the first version (ARM11 based … not Cortex-Ax) 🙂


I have the Pi2 as well (I have to by the third version) but I preferred to stop my research for starting immediately. I followed all the steps needed (explained in my previous article) using the Pi as a normal PC (connected via SSH) and after a while for compiling directly on the board, the router was up and running !


The only tweak needed was to force cmake to use the Python 2.7 library with following command line :

cmake .. -DCMAKE_INSTALL_PREFIX=/usr -DPYTHON_LIBRARY=/usr/lib/arm-linux-gnueabihf/ -DPYTHON_INCLUDE_DIR=/usr/include/python2.7 -DPYTHON_EXECUTABLE=/usr/bin/python

because the 3.x version is installed on the Pi as well but we can’t use it for the router.

I know .. it’s not the right way to compile source code for embedded devices and cross-compilation from my PC is the better way to do that but I preferred it in order to go fast and avoid to setup a complete ARM toolchain on the laptop; of course be patient … building the Qpid Proton took me about one hour ! I can suggest you to have a good coffee …

Before starting the router, another simple tweak was needed in order to make persistent the value of the PYTHONPATH environment variable writing the following line to the .bashrc file :

export PYTHONPATH=/usr/lib/python2.7/site-packages

Now I have an idea … Pi with its Linux version is SSL/TLS capable but there are a lot of resources constrained devices which are able to “speak” AMQP but can’t support SSL/TLS connections. Why don’t use the Pi as a “shadow” IoT gateway and it’s security capabilities in order to bring above constrained devices to reach SSL/TLS and AMQP based cloud platforms even if they can’t “speak” SSL/TLS ?

Routing mechanisms for AMQP protocol

In the previous article, we installed the Qpid Dispatch Router and had a quick overview about the available tools inside the installation package for both router management and monitoring.

Now it’s time to start using the router from simple to complex configurations and network topologies with some examples which will involve AMQP senders, receivers and/or brokers. The broker part won’t be always necessary because the AMQP protocol is a “peer to peer” protocol and it works great connecting two clients directly without the “store and forward” mechanism provided by a broker in the middle. For more information about that you can read the first article of this series.

In this article, I’ll use the router with the default configuration showing how a sender and a receiver can connect to it and exchange messages through the router itself.

Routing mechanisms

First of all it’s interesting to say that the router supports two different types of routing mechanisms.

Message routing

When the router receives a message on a link, it uses the address specified in the target terminus when the sender attached the link itself to the route; if this address wasn’t specified, the destination address is get from the “To” property of the message. Based on such information, the router inspects its routing table to determine the route for delivering the message : it could be a link attached by a direct receiver to the router or another router inside the network that will be the next hop for reaching the destination. Of course, the message could be sent to different receivers all interested in the same address. The main point here is that the routing decision is made for each received message and there is always a communication between internal router nodes and external clients.


As you can see in the above picture, a link is established between sender and router and between router and receiver. They are two completely distinct links that the router uses for messages exchange between sender and receiver through the routing mechanism on message basis.

For example, it means that there is a different flow control between router (with its internal receiver) and sender and between router (with its internal sender) and receiver : of course, it’s true that if the receiver grants few credits (i.e. 10) but the router (the internal receiver) grants more credits to the sender (i.e. 250 by default), it takes in account this difference. If for any reason the receiver closes the connection (after receiving 10 messages) and sender has already sent more than 10 messages (acknowledged by an “accepted” disposition), the router will reply with a “released” disposition for the next 40 messages because they can’t be delivered to the closed receiver.

Another interesting point is related to the message “settlement” : the router always propagates the delivery and its settlement along the network. On receiving a “pre-settled” message, its nature is propagated to the destination. The same is for “unsettled” messages : in that case, the router needs to track the incoming delivery and send the unsettled message to the destination; when it will receive the disposition (settlement) from the final receiver, it will reply in the same way to the original sender.

The last interesting aspect of message routing is the available routing patterns which define the paths followed by messages across the network :

  • closest : even if there are more receivers for the same address, the message is sent on the shortest path to reach the destination. It means that only one receiver will get the message;
  • balanced : when more receivers are attached to the same address, the messages sent to that address are spread across receivers. It means that each receiver will receive a different message at time in a sort of “competing consumers” way;
  • multicast : it’s something like a “publish/subscribe” pattern where all the receivers will receive the same message published on the address they are attached;

When the receivers for a specific address are all connected to the same router, we could think that “closest” and “balanced” have the same behavior because all the paths have same length and receivers are closed at same level to the router. It’s not so true because with “closest” the router uses a strict round-robin distribution across receivers while with “balanced” it takes into account the number of unsettled deliveries for each receiver and favors the “faster” of them (who settled messages faster than others).

To be more clear, suppose to have a sender S, two receivers R1 and R2 and a routers network with R1 connected to the same router as S and R2 connected to a different router (connected to the previous). We can say that R1 is “one level closed” to S and R2 is “two level closed” to S.

In the following scenario, with the “closest” distribution all the messages sent by S will be always delivered to R1.


“closest” message routing

Using the “balanced” distribution, the messages are spread across both receivers and there is nor relation with the path length.


“balanced” message routing

Finally, with “multicast” distribution all messages are sent to all receivers.


“multicast” message routing

Link routing

When the router receives an attach request, it’s propagated through the network in order to reach the destination node and establish the real link. When the sender starts to send messages to the router, it propagates that message through the established link to the destination without making any decision at message level. You can think of it as a sort of virtual connection or a tunnel between sender and receiver through a routers network.


As you can see in the above picture, the link is established directly between the two peers and all performatives go through it.

From a flow control point of view, it’s directly handled between sender and receiver; any credits granted by the receiver arrives to the sender with a flow performative without any interference by the router in the middle. The link through the router is like a “tunnel” and it seems that the two peers are directly connected.

The same is true for disposition about settlement for “unsettled” messages that the sender receives directly from the receiver.

The concept of different routing patterns doesn’t make sense because in this case there is a direct link between sender and receiver so the router doesn’t make any decision on single message basis but it has only to propagate the frame along the link.

Let’s start … simple !

Now it’s time for a very simple example with a router started using the default configuration and only one sender and one receiver connected to it and attached to the “/my_address” address.

I used the “simple_recv” and “simple_send” C++ client examples for that and you can find them inside the Qpid Proton installation folder.

First of all let’s start the receiver specifying the address and the number of messages it wants to receive (it will grant link credits for that), i.e. 10 messages.


Using the qdstat management tool we can see that an endpoint for the “my_address” address is defined with “out” direction (from router to the receiver) with no messages delivered yet.


After that let’s start the sender in order to send some auto generated messages, i.e. 5 messages.


As you can see, the messages sent are all settled and confirmed. What’s happened at receiver side ?


All messages sent by sender are now received and the simple application doesn’t close the connection because it’s waiting for the other 5 messages in relation to the 10 credits granted (of course it’s only an application behavior and not related to the router mechanisms).

Inspecting the router with the qdstat management tool we can see that on the output endpoint for the “my_address” address there are 5 delivered messages. What we can’t see is the endpoint on the same address with the opposite “in” direction (from sender to router) because after sending the messages, the sender closed the connection to the router and the endpoint is deleted. You can trust me … it was alive for all the time the sender was sending messages !


As you can see we have directly connected sender and receiver without the need for a broker in the middle with its “store and forward” mechanism. In the above scenario, when the messages are settled and confirmed to the sender it means that they are really received by the receiver.


With this article I introduced the different mechanisms for messages routing that the Qpid Dispatch Router provides. For every scenarios we can choose the better way and what we need in order to distribute messages in a useful way for our distributed application. We saw a simple example on connecting sender and receiver through the router without the need for a broker in the middle. In the next articles, I’ll increase the complexity starting to use non default configuration files and exploring different way to connect routers with clients and brokers.

Qpid Dispatch Router installation on your Linux machine

In the previous article I introduced the Dispatch Router from the Apache Qpid project, its main features, capabilities and the scenarios where it could be useful in order to develop high scalable AMQP based messaging solutions.

The first step for starting to use the router is the installation step and I’ll explain how to do that in this short post. I’ll use some personal Docker files in order to build fully functional images but the you can find the official ones in the router GitHub account here.

Qpid Proton : the dispatch router foundation

This router is based on the Qpid Proton project, a messaging library developed in C, C++ and Java (pure ProtonJ implementation) with bindings for other different languages like Python, PHP and Ruby; in order to work properly, the router needs only the base ProtonC implementation and the Python binding.

First of all, we need the main compiler tools like gcc, cmake and make. The UUID library is needed for unique identifier generation (i.e. container name, message id, …) and the OpenSSL library for encryption and for handling SSL/TLS connections. Furthermore, the last library version leverages on the Cyrus library for the SASL protocol used for supporting different authentication mechanisms on the AMQP protocol.

To simplify the installation process, I wrote two Docker files available here on GitHub both for Fedora and Ubuntu you can use as reference for reproducing the steps on your machine or to generate a Docker image.

Now it’s router time

The Qpid Dispatch Router uses the same tools (like gcc, cmake, make, …) needed for Qpid Proton; if you can compile the messaging library without problems then you are ready to compile and install the router as well.

At time I’m writing this article the official released version is the 0.5 with the 0.6 version under beta. We can use both this versions : in the former case we can download the released package, in the latter we can clone the official GitHub repository and recompile the bits from there having in this way the latest updated code and features under development. The new 0.6 version has gone through major architectural changes to make it highly scalable; furthermore, the router configuration is now more intuitive and easy to understand than the previous versions.

Even in that case I wrote both Docker files for the official released version and for compiling the bits (on both Fedora and Ubuntu of course) in order to have a Docker image and the router up and running in a related container.

Let’s start the router

To check the installation process we can start the router with the simple “qdrouterd” command; it is launched automatically in the case of starting the Docker image.


Typing that command, the router starts with the default configuration file. We will dig into in the next articles to understand the meaning of all main available configuration options for tuning the router behavior.

In the above output, the main points are :

  • The router instance is named Qpid.Dispatch.Router.A;
  • The router operates in “standalone” mode which means that it doesn’t cooperate with other routers and won’t be used in a routers network;
  • It exposes a management endpoint we can use in order to interact with the router itself and changing its internal configuration. It’s a pure AMQP endpoint on which the available operations are defined by the AMQP management specification (which is in draft, here). For all developers who don’t know it well, you can think that as a sort of RESTful interface with CRUD operations for managing resources but instead of having HTTP as transport protocol, it used AMQP and its semantics;
  • A listener is started on all network available interfaces and listening for connections on the standard AMQP port (5672, so not encrypted);
  • The instance is using 4 threads for handling messages traffic and all other internal operations;

The package tools

Other then the router itself, the project provides a couple of tools useful for showing main running information and for interacting with it.

The “qdstat” tool shows all statistics information, endpoints and router traffic.


As we can see from the above picture (-l option is for showing router AMQP links), the router is exposing the $management endpoint and a local temporary endpoint that is used for communicating with the qdstat tool.

Indeed, when we start the tool, it opens an AMQP connection to the router and attaches a link on the management endpoint for sending requests as you can see in the following picture with traffic capture (using Wireshark).


At same time another link is attached for a temporary local endpoint used for receiving information to show on the console.


When the tool sends a request to the router through a transfer performative, it specifies the local reply address using the “Reply-To” AMQP property.

The “qdmanage” tool is a general purpose AMQP management tool that we can use to interact with a running router and changing its internal configuration. It’s very useful because even if our router starts with a static configuration written in the related file, we can change it dynamically at runtime to adapt the router behavior to our ongoing use case.

For example we can show all the active connections on the router.


In this case, there are no connections other than the one related to the tool itself which connects to the router via AMQP in order to query and receive the related information.


With this post now we have installed the dispatch router stuff on our machine (or on a Docker image) and checked that it runs properly with the default configuration at least. We peeked the tools available with the installation and all related operations and information provided in order to interact with the router at runtime. Starting from the next article we’ll see how to use the router in different scenarios with an increasing complexity, from a standalone router to a routers network.