Messaging

Apache Kafka on Kubernetes and OpenShift : Barnabas is died … long life to Strimzi !

Almost one year and half ago, I started my journey about running Apache Kafka on Kubernetes and OpenShift. At that time, these containers orchestration platforms were focused on “stateless” (micro)services so there wasn’t a real support for a technology like Apache Kafka which is “stateful” by definition.

I wrote this blog post about my “investigation”, highlighting the problems to address in order to have Apache Kafka running on such a platforms. The solution was trying to “mimic” something that would have been added in the following months : the PetSets (then renamed in StatefulSets).

I created a new project named “Barnabas” (the name came from a character in a Franz Kafka novel; he was a messenger) with the objective to help developers on having resources (i.e. all needed YAML files) for deploying Apache Kafka on Kubernetes and OpenShift.

I got few people using “Barnabas” for their demos and proofs of concept receiving feedback and improvements; for example the Debezium team started to use it for deploying Apache Kafka with their supported Kafka Connect connectors; some people from the Aerogear project used it for some POCs as well.

Barnabas is died … long life to Strimzi !

Today … “Barnabas” isn’t here anymore 😦

It’s sad but it’s not so true ! It just has a new name which is Strimzi !

strimzi

The objective here is always the same : providing a way to run an Apache Kafka cluster on Kubernetes and OpenShift. Of course, the project is open source and I hope that a new community can be born and grow around it : today I’m not the only one contributor and that’s great !

The current first early release (the 0.1.0) provides all the YAML resources needed for deploying the Apache Kafka cluster in terms of StatefulSets (used for the broker and Zookeeper nodes), Services (for having the nodes able to communicate each other and reachable by the clients), Persistent Volume Claims (for storing Kafka logs other then supporting “ephemeral” storage with emptyDir) and finally metrics support in order to get metrics data from the cluster through Prometheus and showing them in a Grafana dashboard.

Selection_011

Other than that, Strimzi provides a way for deploying Kafka Connect as well alongside a Kafka cluster. In order to simplify the addition of new connectors when running on OpenShift, the deployment leverage some unique OpenShift features like “Builds” and “S2I” images.

The future … is bright

While the current release already provides a simple way to deploy the Apache Kafka cluster (“templates” are also provided in the OpenShift use case) the future is rich of improvements and features we’d like to add.

First of all, we are working on not having these YAML resources anymore but using the “operator” approach (well known in the Kubernetes world).

Two main components will be part of such an approach in a short time : a cluster controller and a topic controller.

The cluster controller, running on Kubernetes (OpenShift), is in charge to deploy an Apache Kafka cluster based on the configuration provided by the user through a “cluster” ConfigMap resource. Its main work is to watch for a ConfigMap which contains the cluster configuration (i.e. number of broker nodes, number of Zookeeper nodes, healthcheck information, broker configuration properties, metrics configuration and so on) and then deploying the cluster based on such information. During its life, the cluster controller is also in charge to check updates on the ConfigMap and reflecting the changes on the already deployed cluster (i.e. the user increase the number of broker nodes in order to scale up the cluster or change some metrics parameters and so on).

The topic controller, always running on Kubernetes (OpenShift), provides a way to manage the Kafka topics without interacting with the Kafka brokers directly but using a ConfigMap approach. In the same way as the cluster controller, the topic controller is in charge to watch for specific ConfigMap resources which describe topics : this mean that a user can create a new ConfigMap containing topic information (i.e. name, number of partitions, replication factor and so on) and the topic controller will create the topic in the cluster. As already happens with the cluster controller, the topic controller is also in charge to check updates on the “topic” ConfigMap reflecting its changes to the cluster as well. Finally, this component is also able to handle scenarios where topics changes don’t happen on the ConfigMap(s) only but even directly into the Kafka cluster. It’s able to run a “3-way reconciliation” process in order to align topic information from these different sources.

Conclusion

Having these two components will be the next step for Strimzi in the short term but more improvements will come related to security, authentication/authorization and automatic cluster balancing where, thanks to the metrics, a cluster balancer will be able to balance the load across the different nodes in the cluster re-assign partitions when needed.

If you want to know more about the Strimzi project, you can engage with us in different ways, from IRC to the mailing list and starting following the official Twitter account. Thanks to its open source nature you can easily jump into the project providing feedback or opening issues and/or PRs … becoming a new contributor !

Looking forward to hear from you ! 😉

Reactive streams ? AMQP 1.0 is a really good “reactive” protocol !

Header

During the just passed Christmas holidays, I decided to spend the spare time for digging into the reactive programming paradigm, the “Reactive streams” manifesto and the related ReactiveX implementation (more specifically on the RxJava one).

This blog post doesn’t mean to be a discussion about what reactive streams are or what reactive programming is just because you can find a lot of really useful resources on these arguments on the Internet but, because I’m a messaging and IoT guy, during this article I’ll try to describe some (really trivial) thoughts I had “discovering” the reactive streams API  and comparing them to the AMQP 1.0 protocol.

On December 30th, I tweeted …

Selection_004

As you can see, I defined AMQP 1.0 as a “reactive” protocol because I really found all the semantics and the related definitions from the reactive streams API in the AMQP 1.0 specification.

What I’m going to describe is a mapping at 20,000 feet without digging into all the possible problems we can encounter doing that, just because it seemed rather trivial to me; I’d like to open a discussion on it or giving inputs to the other people for thinking about that.

It could be useful when it comes to use a reactive programming model in a microservices based system where a “good” messaging protocol for supporting such a model is needed.

The Reactive Streams API

We know that AMQP 1.0 is really a peer-to-peer protocol so we can establish a communication between two clients directly or using an intermediary (one or more) such as a broker (for allowing store-and-forward) or a router (providing direct-messaging as well). In all these use cases, it’s always about having a “sender” and a “receiver” which can be just mapped to a “publisher” and a “subscriber” in reactive streams API terms (if you think about ReactiveX, then you know them as “observable” and “observer”).

The reactive streams API are defined with four main interfaces with some methods which can be mapped in terms of specific AMQP 1.0 “performatives”.

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

public interface Subscription {
    public void request(long n);
    public void cancel();
}

The above interfaces describes how a “subscriber” can subscribe in order to receive stream of events published by a “publisher” and how this one “pushes” events to the subscriber. The API also defines how it’s possible to for the subscriber to avoid being overwhelmed by events if it’s slower than the publisher, using a “request for events” mechanism on the subscription. Finally, the subscriber can be notified when the stream is completed (if it’s not infinite) or if an error occurred.

During this post I won’t consider the Processor interface which enables an entity to be both a “publisher” and “subscriber” and it’s mainly used for implementing “operators” in the stream processing chain.

Attaching as … subscribing

The Publisher interface provides a subscribe method which is called by the subscriber when it wants to start receiving all the published events on the related stream (in a “push” fashion).

If we assign a name to the stream which could be an “address” in AMQP terms, then such an operation could be an “attach” performative sent by the subscriber which acts as a receiver on the established link. In the opposite direction, the publisher can reply with an “attach” performative (on the same address) acting as a sender and this operation could be mapped as the onSubscribe method call on the Subscriber interface.

FIG1

FIG.1 – attach as Publisher.subscribe, Subscriber.onSubscribe

Credits based flow control and transfer … for back-pressure and pushing events

One of the main reactive streams concepts which convinced me that AMQP is really a “reactive” protocol was the back-pressure. It provides a way for handling scenarios where a subscriber is slower than the publisher avoiding to be overwhelmed by a lot of events it can’t handle (losing them); the subscriber can notify to the publisher the maximum number of events it can receive. In my mind it’s something that AMQP provides out-of-box with the credits based flow control (something that it’s not available with the MQTT 3.1.1 protocol for example).

In terms of reactive streams API, such a feature is provided by the Subscription interface with the request method; calling this method, the subscriber says the maximum number of events to the publisher. In AMQP terms, it means that the receiver sends a “flow” performative specifying the credits number as the maximum number of messages it can handle in a specific moment in time.

At this point, the publisher can start to push events to the subscriber and it’s available through the onNext method call on the Subscriber interface. Even in this case, in AMQP terms, the sender starts to send one or more “transfer” performatives to the receiver with the message payload (representing the event).

FIG2

FIG.2 – flow as Subscription.request(n) and transfer as Subscriber.onNext

Detaching … for cancelling, completed streams or errors

In order to complete this 20,000 feet mapping, there are few other methods provided by the reactive streams API I haven’t covered yet.

First of all, the subscriber can decide to not receiving events anymore calling the cancel method on the Subscription interface which in AMQP terms could be a simple “detach” performative sent by the receiver during the “normal” messages (events) exchanges.

FIG3

FIG.3 – detach from receiver as Subscription.cancel

Finally, it’s important to remember that the reactive streams API takes into account finite streams of events and errors as well.

Regarding finite streams of events, the Subscriber interface exposes the onComplete method which is called when the publisher hasn’t no more events to push anymore so the streams is completed. In AMQP, it could mean a “detach” performative sent by the sender without any error conditions.

FIG4

FIG.4 – detach from sender as Subscriber.onComplete

At same time, the reactive streams API defines a way to handle errors without catching exceptions but handling them as a special events. The Susbcriber interface provides the onError method which is called when an error happens and the subscriber is notified about that (in any case such an error is represented by a Throwable specific implementation). In AMQP, it could mean a “detach” performative sent by the sender (as it happens for a completed stream) but, this time, with an error condition providing specific error information.

FIG5

FIG.5 – detach from sender as Subscriber.onError

Conclusion

Maybe you could have a different opinion (and I’d like to hear about that) but, at a first glance, it seemed to me that AMQP 1.0 is really THE protocol suited for implementing the reactive streams API and the related reactive programming paradigm when it comes to microservices in a distributed system and how to design their communication in a reactive way. It provides the main communication patters (request/reply but mainly publish/subscribe for this specific use case), it provides flow-control for the back pressure as well. It’s a really “push” oriented protocol compared to the “pull” HTTP nature for example. MQTT could be another protocol used in a reactive fashion but it lacks of flow-control (at least in the current 3.1.1 specification).

IoT weekend 2017 : my session about messaging and IoT … video and slides !

iot_weekend

Today I had a really interesting experience thanks to Jorge Maia who, few weeks ago, invited me to have a session for the online Brazilian event IoT Weekend.  Of course, I accepted his invitation in order to speak about “Open sourcing the messaging and IoT” focusing on IoT protocols, patterns and related platforms like EnMasse and Eclipse Hono.

The event is still going on while I’m writing this blog post (for all this weekend) but my session ended less than one hour ago so I’d like to share the material for all the attendees and for the other people who lose the session and could be interested to watch it !

So you can find the video on YouTube here and the slide deck on SlideShare here.

 

 

We can have more … EnMasse !

This morning, my working day started in a different way with an interesting news from AWS re:Invent 2017, the annual Amazon conference …

The news was about Amazon MQ, a new managed message broker service based on ActiveMQ 5.x with all the goodies that it provides in terms of supported protocols like MQTT, JMS, STOMP, … and … yes … AMQP 1.0 !

It seems that this news made Clemens Vaster (from Microsoft) happy as well 🙂

Selection_078

Finally, even Amazon added support for a “real” messaging protocol which is enterprise ready and from my point of view … even IoT ready 🙂

Taking a look to the blog post about this new service, another project came to my mind … guess what ? EnMasse !

We can have more : EnMasse !

What the AmazonMQ provides is the possibility to create a new broker instance and then accessing to the console, creating queues, topics and so on. It’s great but … we can have more !

For this reason I decided to write, for the first time, something about EnMasse even if I had a lot of sessions in different conferences in the past, speaking about it.

EnMasse is an open source “messaging as a service” platform which simplifies the deployment of a messaging infrastructure both “on premise” and in the Cloud. It provides scalability and elasticity in order to address all the problems we can have when the number of connected clients increases (and decreases) even reaching big numbers like in an IoT scenario.

It supports all the well-known messaging patterns (request/reply, publish/subscribe and competing consumers) and up today two main protocols, AMQP 1.0 and MQTT (but adding the HTTP support is on the road-map).

It provides multi-tenancy having different tenants sharing the same infrastructure but being isolated each other. Finally, it provides security in terms of using TLS protocol for establishing connections (with clients and between internal components) other than authentication using Keycloak as the identity management system.

Store and forward or … direct ?

One of the main features it provides is the support for two different messaging mechanisms, “store and forward” and “direct messaging”.

The “store and forward” mechanism is exactly what the messaging brokers provide today. The broker takes the ownership of the message sent by a producer before forwarding this message to a consumer which is asking for it (connecting to a queue or a topic on the broker itself). It means that “storing” the message is the first step executed by the broker and “forwarding” is the next one which can happen later, only when a consumer will be online for getting the message : it allows asynchronous communication between clients and time decoupling. There is always a double contract between produce-broker and broker-consumer, so that the producer knows that the messages reached the broker but not the consumer (a new messages exchange on the opposite direction is needed for having something like an “acknowledgement” from the consumer).

The “direct messaging” mechanism is not something new because it means having a sort of “direct” communication between clients, so that the producer is able to send the message only when the consumer is online with a single contract between the parties : when the producer receives the “acknowledgement”, it means that the consumer has got the message. Of course, EnMasse provides this mechanism in a reliable way : it uses an AMQP 1.0 routers network (connected in a mesh) so that clients aren’t really connected in a direct way but through this mesh. Every router, unlike a broker, doesn’t take ownership of the message but just forwards it to the next hop in the network in order to reach the destination. When a router crashes, the network is automatically re-configured in order to determine a new path for reaching the consumer; it means that high availability is provided in terms of “path redundancy”. Furthermore, thanks to the AMQP 1.0 protocol, a producer doesn’t receive “credits” from a router to send messages if the consumer isn’t online or can’t process more messages.

EnMasse provides these messaging mechanisms using two open source projects : Apache Qpid Dispatch Router, for the router network, and ActiveMQ Artemis (so ActiveMQ 6.x and not 5.x like in the AmazonMQ) for the brokers side.

enmasse_overall_view

I want to know only about “addresses” !

Comparing to the new AmazonMQ service, from a developers point of view, the interesting part is the abstraction layer that EnMasse adds to the underlying messaging infrastructure. You can create a new “address” using the console and specifying a type which can be :

  • queue : backed by a broker, for “store and forward” and for providing competing consumer pattern, asynchronous communication and so on.
  • topic : backed by a broker, for “store and forward” as well but for providing publish/subscribe pattern.
  • anycast : it’s something like a queue but in terms of “direct messaging”. A producer can send messages to such an address only when one or more consumers are listening on it and the routers network will deliver them in a competing consumer fashion.
  • multicast : it’s something like a topic but in terms of “direct messaging”, having a producer publishing messages to more consumers listening on the same address so that all of them receive the same message.

Selection_081

The developer doesn’t have to worry about creating the broker, configuring the routers and so on; using the console and a few simple steps in the wizard, he will have a usable “address” for exchanging messages between clients.

Selection_082

Good for microservices …

The interesting part of having the supported “direct messaging” mechanism is even, but not only, about the “micro-services” world. EnMasse can be used as a messaging infrastructure for handing request/reply and publish/subscribe between micro-services using an enterprise protocol like AMQP 1.0.

You can read more about building an AMQP 1.0 based API and a micro-services infrastructure in this article written by on of my colleague, Jakub Scholz.

Who orchestrate ? OpenShift and Kubernetes

Another aspect which makes EnMasse more appealing than other solutions is that it’s totally containerized and runs on the main containers orchestration platforms like Kubernetes and the enterprise OpenShift (using the OpenShift Origin project as well). It means that your messaging based (or IoT) solution can be deployed “on promise” and then easily moved to the Cloud without any changes to your applications (maybe just the addresses for establishing the connections from the clients).

Selection_079

Conclusion

Of course, this blog post didn’t mean to be an exhaustive guide on what EnMasse is but just a brief introduction that I wanted to write for a long time. The Amazon news gave me this opportunity for showing you that you can really have more than just creating a broker in the Cloud and taking care of it 🙂

 

EnMasse and Eclipse Hono ? Messaging and IoT ? I have some events for you !

 

events

Do you want to learn more about the EnMasse project ?

Do you want to learn the same for the Eclipse Hono project ?

Do you want to know more about their relationship and how they simplify the development and deployment of messaging and IoT solutions ?

Well … in the coming weeks my agenda will be packed of events about them. Let’s see !

Eclipse Day Milan 2017

On September 22nd there will be the Eclipse Day Milan 2017 and I’ll be there with the “Eclipse Hono : Connect. Command. Control.” session. What will you see there ?

The open source counterpart to closed and proprietary IoT solutions is called Hono ! Born out of the collaboration of big companies, including Red Hat and Bosch both members of the Eclipse Foundation, Hono is an open source framework which aims to add features, such as device management and authentication, on top of an highly scalable messaging infrastructure in order to guarantee secured data exchange between devices and cloud applications. Using its APIs, devices can send data (telemetry and event) and can be controlled remotely (command/control). During this session we will see the “bricks” that make its architecture, the exposed APIs and the integration with other solutions.

There will be a lot of other great sessions focused on the Eclipse Foundation ecosystem so I think that it will be a really awesome event !

Eclipse IoT Virtual meetup

The Eclipse Foundation hosts a lot of virtual meetups on IoT projects which are part of the foundation itself. Of course, Eclipse Hono is one of them !

On October 11st, I and Kai Hudalla (from Bosch Software Innovations) will have this meetup. We hope to reach a really huge “online” audience in order to show how Hono is a really powerful platform for connecting and control IoT devices at scale.

It will be simpler for you to be there … just take a seat at home !

JavaSI

On October 16th – 17th there will be the JavaSI conference organized by SIOUG.

I and my awesome team mate Ulf Lilleengen will be there with a session and a workshop.

First of all the “EnMasse : open sourcing the messaging and IoT” session as an introduction to the workshop.

Out there there are a lot of “closed” source products for developing messaging and IoT based solutions. What if you want to have more control on your platform ? EnMasse is the answer!

It’s a totally “open” source messaging-as-a-service platform which can be deployed on-premise, in the cloud or even in an hybrid scenario. At the same time it aims to be highly scalable in terms of throughput and the number of connections, based on standard protocols like AMQP 1.0 and MQTT and provides different patterns for messages exchange. Its nature makes EnMasse a great solution for IoT deployments in order to handle millions of connected devices for ingesting telemetry data and controlling them.

Then the “EnMasse – messaging and IoT in your hands” workshop speaking about the EnMasse project and how it’s really great for building messaging and IoT solutions.

Interested in messaging and IoT ? Kubernetes and OpenShift? In this workshop, we will dive into EnMasse, an open source Messaging-as-a-Service platform, built on top of Kubernetes and OpenShift. You will setup OpenShift or Kubernetes, deploy EnMasse, and build an end-2-end solution with edge devices, messaging, and analytics.

You will learn basic Kubernetes and OpenShift concepts as we go, and learn how you can use EnMasse to implement different messaging patterns in your application.

During this workshop you will touch messaging and IoT stuff with your hands !

MQTT v5 : what is on the way ?

“MQTT is a lightweight protocol for IoT” …. “MQTT lacks a lot of features” … and how many of other sentences you have heard speaking about MQTT with others developers ?

During the last year, the OASIS committee has worked a lot on the new MQTT v5 specification pushing the protocol to the next level in both directions : a lot of new features are coming and they will fill (part of) the gap that it has against other protocols which already provide them (my opinion is that, from some points of view, the new MQTT v5 is more AMQP-ish 🙂 ); on the other side, don’t tell me that MQTT is lightweight as before. Adding features means adding complexity making it heavier and maybe this is the reason why, today, a lot of IoT developers decide to not use AMQP for their projects … but repeating myself … more features mean more complexity and they are very welcome.

By the way, this blog post is about MQTT v5 and the new specification so … let’s start !

(the OASIS committee has opened a publish review and it will be opened until September 8th, you can find more information here if you want to read the entire specification. There is also a “Big ideas for MQTT v5” interesting document here with links to related issues on OASIS web site).

Why from 3.1.1 to 5 ?

A lot of people ask me why this “jump” from 3.1.1 to 5 ! The answer is in the protocol itself !

The CONNECT packet, which brings the connection information from the client to the broker, has a “protocol version” byte inside the variable header : it’s a single byte which provides the revision level of the protocol used by the client. With version 3.1 it was 3 then, moving to the current 3.1.1, it became 4. Starting to write the new specification, the committee decided to align the “marketing” version of the protocol with the “internal” representation on the wire : from 3.1.1 to 5 … so from “protocol version” 4 to 5 !

You can see it even as a really “huge” specification change as it really is in terms of new features.

Properties … not only payload

The “variable header” is changed and now it contains some properties and each property is defined as a key-value pair. Some property are fixed and used in specific packets like for example the “content-type” which describes the type of content in the payload (JSON, XML, …) and the “response topic” used in the new supported request/response pattern (as we’ll see in the next paragraphs). There is the possibility to add “user properties” as well so that the developer can add more key-value pairs for bringing values meaningful at application level : it’s an interesting feature because in some IoT solutions, it could be interesting not sending the payload at all but just values using properties. This aspect is confirmed by the fact that the payload for the PUBLISH message is defined as “optional” now while it’s “required” in the current 3.1.1 specification.

AMQP already had this kind of feature : system properties (i.e. content-type, reply-to, ttl, …) and application properties.

Error handling : now I know what’s really happened

One of the missing thing in the current 3.1.1 specification is the support for a proper “error handling” at application level : when there is an error, the server just shuts down the connection and the client doesn’t have any possibility to know the reason. In the new specification quite much all the packets have a single byte “reason code” as part of the “variable header”.

Alongside the “reason code” there is a “reason string” that can be used for providing a more human readable information about the error.

Such a feature is something that HTTP and AMQP already provided.

Flow control for QoS 1 and 2

Flow control is the main lacking feature in the current 3.1.1 specification … something that the AMQP protocol already had even at different levels (i.e. session window and credits on messages).

The new v5 specification adds a simple flow control based on the “receive maximum” property. With this property the client can limit the number of QoS 1 and 2 messages that it is willing to process concurrently : it defines a limit quota about the number of PUBLISH messages which can be sent without receiving the acknowledge. There is no flow control for QoS 0 messages because as we know there is no acknowledgement mechanism for that; the acknowledgment mechanism for QoS 1 and 2 is used by the server for avoiding sending messages to the client; so overwhelming a client with QoS 0 publications is still possible.

Request/Response pattern … here we are !

The MQTT protocol is well known for its publish/subscribe nature without any built-in support for request/response pattern. With 3.1.1 specification, there is no “standard” way for a requester to specify the topic where it expects to receive a response from a responder : it’s something that could be encoded inside the message payload. The new v5 introduces the “response topic” property (something that AMQP already had with the “reply-to” system property) : using such property, the requester has a “standard” way to specify the subscribed topic on which it expects replies from a responder.

Shared subscriptions

The normal way to work for MQTT is that the broker delivers a received message to ALL subscribers for the topic on which the message is published : today we can call them “non shared” subscriptions. The v5 specification adds the “shared subscription” concept : a subscription can be shared among different clients and the messages load will be spread across them. It means that the broker doesn’t just send the received message to all subscribers but to only one of them. Effectively, the clients are something like “competing consumers” on the shared subscription.

A shared subscription is identified using a special topic filter with following format :

$share/{ShareName}/{filter}

Where :

  • $share is needed for specifying that the subscription is shared
  • {ShareName} is the name of the subscription used for grouping clients (it sounds to me something like the “consumer group” in Apache Kafka)
  • {filter} is the topic filter and it’s already well known for “non shared” subscription

For example, imagine to have a publisher P sending messages on topic “/foo” and we have two different applications A1 and A2 which need to get messages published on this topic for executing different actions (i.e. monitoring, logging, …). Using “non shared” subscriptions we can just have A1 and A2 subscribing to the topic “/foo” and starting to receive messages from that. If the load on the topic increase and we want to leverage on the huge potential we have with a cloud native and containerized applications so that we could spread the load across multiple instances of applications A1 and A2, we can use the “shared” subscriptions in the following way.

From the single topic “/foo” we can move to have :

  • $share/A1/foo
  • $share/A2/foo

All the instances of application A1 can subscribe to the first subscription and the instances of A2 can subscribe to the second one.

In this way, the A1 instances are competing consumers for messages published on “/foo” and the same for A2 instances. We still have all messages published to both applications but the load is spread across different instances (of the same application) thanks to the “shared” subscription support.

Session management

With MQTT a session is represented by the subscriptions for a client and any queued messages (when the client isn’t online). In the 3.1.1 specification, the “clean session” flag is used by the client for specifiying that : the server would delete any existing session and would not save the new session (if set 1); the server would need to recover any existing session on client re-connection (if set 0) and save it on disconnection.

In the new v5, the behavior is changed. First of all the flag was renamed in “clean start” and if set to 1, it means that the broker would discard any previous session (the client is asking for a “clean” start) otherwise it would keep session (the client is asking for not “cleaning” the current session).

Other than this change, the “session expiry interval” property was added (in the CONNECT packet) : after the client disconnects, the broker should remove session information for that client when this time is elapsed.

Delete please, if you can’t delivery on time …

Another really interesting property is the “publication expiry interval” which can be set into the PUBLISH message by the client. It’s something similar a TTL (Time to Live), as it already exists in the AMQP protocol, and it means : if this time has passed and the server, for any reason, can’t deliver the message to subscribers then it MUST delete this copy of the message.

In the IoT, it’s really common to use this feature for the “command and control” pattern, in order to avoid that offline devices start to execute “stale” commands when they come back online : if the command isn’t executed in a specified amount of time, it should be never executed.

The new enhanced authentication

Today, with the 3.1.1 specification, the binary value 1111 for the higher nibble of the first byte in the “fixed header” is forbidden/reserved. It’s changed in v5 because it represents the new AUTH packet.

Other than using the already available username/password built-in authentication, the AUTH packet provides a way to use a different authentication mechanism between client and server for including challenge/response style authentication; it’s something that the AMQP protocol supports with SASL mechanism for example.

Let others know that I’m dead but … not immediately

The “Last Will and Testament” (LWT) is a really cool feature which gives the possibilities to interested clients to know that another client is dead (without sending a “clean” disconnection packet). In the new specification, it’s possible to specify a “will delay” so that when the server detects a “not clean” disconnection from a client, it doesn’t send the “will message” immediately but after such a delay.

Keep alive timeout ? Now the server can decide !

With current 3.1.1 specification, the client sends a “keep alive timeout” value (in seconds) to the server : it represents the maximum amount of time between two packets sent by the client. If it expires, the server can consider the client dead (so sending the related “will message” for example). Up today, the client decided this value (disabling keep alive with a 0 value) but with new v5, the server can provide a “keep alive timeout” value in the CONNACK packet for the client : it means that the client MUST use this value instead the one it sent in the CONNECT packet.

Miscellaneous

Password … but no username

It’s now possible to send a password in the CONNECT packet without specifying the corresponding username. This allows to use the “password” field for credentials and not just for password.

Maximum packet size

Using the corresponding property, the client can now specify the maximum packet size it can accept.

Pipelining messages

The client can now starts to send other messages before receiving the CONNACK after sending the CONNECT packet. Of course, it means that if the server is sending the CONNACK with a negative reason code, it MUST NOT process any messages already sent by client.

Pipelining is one of the features already provided by AMQP but in a really more powerful way.

What identifier have you assigned to me ?

We know that the client-id provided by the client on connection is really useful for maintaining the correlation with session information. It’s also allowed to connect providing a “zero length” client-id but in this case the server will assign such identifier to the client. Today, the client doesn’t receive such information from the server while with the v5, the server provides the assigned client-id using the CONNACK packet.

I can’t handle higher QoS

Using the “maximum QoS” property in the CONNACK packet, the server can provide the maximum QoS level that it can handle for published messages. It means that if the client sends a packet with an higher QoS, the server will disconnect and the cool thing, with v5 specification, is that it will do that not just closing the TCP connection but providing a specific reason code (QoS not supported) in the DISCONNECT packet.

From topic name to … alias

A lot of people say that MQTT is lightweight but they don’t think that the topic name is specified in every message sent by the client (it’s different from AMQP where the client “attaches” on an address after the connection and can publish without specifying it anymore). The v5 specification adds the concept of “topic alias” through the corresponding property : it seems to be stolen by MQTT-SN protocol which provides a way to assign a single byte identifier to a topic name, so that in the subsequent PUBLISH packets, the client can avoid to specify the entire topic name but can use such identifier instead (it reduces the packet size).

Today meetup … “Open sourcing the IoT : running EnMasse on Kubernetes”

Yes … I’m at the airport waiting for my flight coming back home and I like to write something about the reason of my trip … as usual.

IMG_20170605_132419 DBjIUg1W0AEL7u7

Today, I had a meetup in Milan hosted in the Microsoft Office and organized by my friend Felice Pescatore who leads the AgileIoT project; of course my session was about messaging and IoT … so no news on that. The title ? “Open sourcing the IoT : running EnMasse on Kubernetes”.

Other friends were there with their sessions like Felice himself, Valter Minute speaking about how moving from an IoT prototype to a product and Clemente Giorio and Matteo Valoriani with very interesting sessions about Holo Lens real scenarios.

I started with an introduction about messaging and how it is related to the IoT then moving to the EnMasse project, an open source “messaging as a service” platform that is well suited for being the messaging infrastructure of an IoT solution (for example, it’s applicable inside the Eclipse Hono project).

I showed main EnMasse features and the new ones which will come in the next weeks and how EnMasse provides a messaging and IoT solution from an “on-premise” deployment to the “cloud” in a Kubernetes or OpenShift cluster. For this reason I said “open sourcing the IoT”, because all the components in such solution are open source !

IMG_20170605_132407 IMG_20170605_132359

For showing that, I had a demo with a Kubernetes cluster running on Azure Container Service deploying EnMasse and Apache Spark on that. This demo was made of an AMQP publisher sending simulated temperature values to a “temperature” address deployed in EnMasse (as a queue) and a Spark Streaming job reading such values in order to process them in real time and getting the max value in the latest 5 seconds writing the result to the “max” address (another queue); finally an AMQP receiver was running in order to read and show such values from “max”.

If you want to know more about that you can find the following resources :

Yesterday DevDay meetup : “messaging” in Naples !

devday_00

Yesterday evening I had the session titled “Messaging as a Service : building a scalable messaging service” during a meetup here in Naples speaking about the EnMasse project. The event was organized by the DevDay community which is active in my region in order to get in touch with developers who work with different technologies. I was very pleased to tell my experience (as a contributor) on developing a messaging service running “on premise” or in the cloud.

devday_01

devday_02

Following you can find the resources for this session :

  • the video published in the DevDay official YouTube channel
  • the slides and the demo code

Last but not least, I’d like to thank Davice Cerbo (from DevDay) who invited me to join the co-working space as guest during the day and setting up this meetup in the best way. Davide … keep up this great work for next events ! 😉

Let’s talk about EnMasse : the open source “Messaging as a Service”

After the Red Hat Summit speaking about JBoss AMQ and Apache Kafka using the EnMasse project, the coming weeks will be rich of sessions about this “Messaging as a Service” platform.

First of all, I’ll have a meetup on May 22nd in Naples organized by the DevDay community. It will be all around messaging (and I’m not going to speak about Whatsapp, Hangout, … :-)) and how we are developing a “Messaging as a Service” solution running on Kubernetes and OpenShift : it’s name is EnMasse.

Selection_058

The other session will be on June 5th in Milan during an IoT meetup organized by the AgileIoT community in the Microsoft House. There, I’ll always speak about EnMasse and how it “democratizes” the IoT giving you a full open source solution for that : in this case I’ll show how this “Messaging as a Service” platform can run in the Azure cloud as well.

Selection_059

So … if you want to know more about EnMasse just pick one of this events … or both ! 🙂

 

A new “Kafka” novel : the OpenShift & Kubernetes deployment

This blog post doesn’t want to be an exhaustive tutorial to describe the way to go for having Apache Kafka deployed in an OpenShift or Kubernetes cluster but just the story of my journey for having a “working” deployment and using it as a starting point to improve over time with a daily basis work in progress. This journey started using Apache Kafka 0.8.0, went through 0.9.0, finally reaching the nowadays 0.10.1.0 version.

From “stateless” to “stateful”

One of the main reasons to use a platform like OpenShift/Kubernetes (let me to use OS/K8S from now) is the scalability feature we can have for our deployed applications. With “stateless” applications there are not so much problems to use such a platform for a Cloud deployment; every time an application instance crashes or needs to be restarted (and/or relocated to a different node), just spin up a new instance without any relationship with the previous one and your deployment will continue to work properly as before. There is no need for the new instance to have information or state related to the previous one.

It’s also true that, out there, we have a lot of different applications which need to persist state information if something goes wrong in the Cloud and they need to be restarted. Such applications are “stateful” by nature and their “story” is important so that just spinning up a new instance isn’t enough.

The main challenges we have with OS/K8S platform are :

  • pods are scaled out and scaled in through Replica Sets (or using Deployment object)
  • pods will be assigned an arbitrary name at runtime
  • pods may be restarted and relocated (on a different node) at any point in time
  • pods may never be referenced directly by the name or IP address
  • a service selects a set of pods that match specific criterion and exposes them through a well-defined endpoint

All the above considerations aren’t a problem for “stateless” applications but they are for “stateful” ones.

The difference between them is also know as “Pets vs Cattle” meme, where “stateless” applications are just a herd of cattle and when one of them die, you can just replace it with a new one having same characteristics but not exactly the same (of course !); the “stateful” applications are like pets, you have to take care of them and you can’t just replace a pet if it’s die 😦

Just as reference you can read about the history of “Pets vs Cattle” in this article.

Apache Kafka is one of these type of applications … it’s a pet … which needs to be handle with care. Today, we know that OS/K8S offers Stateful Sets (previously known as Pet Sets … for clear reasons!) that can be used in this scenario but I started this journey when they didn’t exist (or not released yet), so I’d like to share with you my story, the main problems I encountered and how I solved them (you’ll see that I have “emulated” something that Stateful Sets offer today out of box).

Let’s start with a simple architecture

Let’s start in a very simple way using a Replica Set (only one replica) for Zookeeper server and the related service and a Replica Set (with three replicas) for Kafka servers and the related service.

reference_architecture_1st_ver

The Kafka Replica Set has three replicas for “quorum” and leader election (even for topic replication). The Kafka service is needed to expose Kafka servers access even to clients. Each Kafka server may need :

  • unique ID (for Zookeeper)
  • advertised host/port (for clients)
  • logs directory (for storing topic partitions)
  • Zookeeper info (for connection)

The first approach is to use the broker id dynamic generation so that when a Kafka server starts and needs to connect to Zookeeper, a new broker id is generated and assigned to it. The advertised host/port are just container IP and the fixed 9092 port while the logs directory is predefined (by configuration file). Finally, the Zookeeper connection info are provided through the related Zookeeper service using the related environment variables that OS/K8S creates for us (ZOOKEEPER_SERVICE_HOST and ZOOKEEPER_SERVICE_PORT).

Let’s consider the following use case with a topic (1 partition and 3 replicas). The initial situation is having Kafka servers with broker id 1001, 1002, 1003 and the topic with current state :

  • leader : 1001
  • replicas : 1001, 1002, 1003
  • ISR : 1001, 1002, 1003

It means that clients need to connect to 1001 for sending/receiving messages for the topic and that 1002 and 1003 are followers for having this topic replicated handling failures.

Now, imagine that the Kafka server 1003 crashes and a new instance is just started. The topic description becomes :

  • leader : 1001
  • replicas : 1001, 1002, 1003 <– it’s still here !
  • ISR  : 1001, 1002 <– that’s right, 1003 is not “in-sync”

Zookeeper still sees the broker 1003 as a host for one of the topic replicas but not “in-sync” with the others. Meantime, the new started Kafka server has a new auto generated id 1004. A manual script execution (through the kafka-preferred-replica-election.sh) is needed in order to :

  • adding 1004 to the replicas
  • removing 1003 from replicas
  • new leader election for replicas

use_case_autogenerated_id.png

So what does it mean ?

First of all, the new Kafka server instance needs to have the same id of the previous one and, of course, the same data so the partition replica of the topic. For this purpose, a persistent volume can be the solution used, through a claim, by the Replica Set for storing the logs directory for all the Kafka servers (i.e. /kafka-logs-<broker-id>). It’s important to know that, by Kafka design, a logs directory has a “lock” file locked by the server owner.

For searching for the “next” broker id to use, avoiding the auto-generation and getting the same data (logs directory) as the previous one, a script (in my case a Python one) can be used on container startup before launching the related Kafka server.

In particular, the script :

  • searches for a free “lock” file in order to reuse the broker id for the new Kafka server instance …
  • … otherwise a new broker id is used and a new logs directory is created

Using this approach, we obtain the following result for the previous use case :

  • the new started Kafka server instance acquires the broker id 1003 (as the previous one)
  • it’s just automatically part of the replicas and ISR

use_case_locked_id

But … what on Zookeeper side ?

In this deployment, the Zookeeper Replica Set has only one replica and the service is needed to allow connections from the Kafka servers. What happens if the Zookeeper crashes (application or node fails) ? The OS/K8S platform just restarts a new instance (not necessary on the same node) but what I see is that the currently running Kafka servers can’t connect to the new Zookeeper instance even if it holds the same IP address (through the service usage). The Zookeeper server closes the connections after an initial handshake, probably related to some Kafka servers information that Zookeeper stores locally. Restarting a new instance, this information are lost !

Even in this case, using a persistent volume for the Zookeeper Replica Set is a solution. It’s used for storing the data directory that will be the same for each instance restarted; the new instance just finds Kafka servers information in the volume and grants connections to them.

reference_architecture_1st_ver_zookeeper

When the Stateful Sets were born !

At some point (from the 1.5 Kubernetes release), the OS/K8S platform started to offer the Pet Sets then renamed in Stateful Sets like a sort of Replica Sets but for “stateful” application but … what they offer ?

First of all, each “pet” has a stable hostname that is always resolved by DNS. Each “pet” is being assigned a name with an ordinal index number (i.e. kafka-0, kafka-1, …) and finally a stable storage is linked to that hostname/ordinal index number.

It means that every time a “pet” crashes and it’s restarted, the new one will be the same : same hostname, same name with ordinal index number and same attached storage. The previous running situation is fully recovered and the new instance is exactly the same as the previous one. You could see them as something that I tried to emulate with my scripts on container startup.

So today, my current Kafka servers deployment has :

  • a Stateful set with three replicas for Kafka servers
  • an “headless” service (so without an assigned cluster IP) that is needed for having Stateful set working (so for DNS hostname resolution)
  • a “regular” service for providing access to the Kafka servers from clients
  • one persistent volume for each Kafka server with a claim template defined in the Stateful set declaration

reference_architecture_statefulsets

Other then to use a better implementation 🙂 … the current solution doesn’t use a single persistent volume for all the Kafka servers (having a logs directory for each of them) but it’s preferred to use a persistent storage dedicated to only one “pet”.

It’s great to read about it but … I want to try … I want to play !

You’re right, I told you my journey that isn’t finished yet but you would like to try … to play with some stuff for having Apache Kafka deployed on OS/K8S.

I called this project Barnabas like one of the main characters of the author Franz Kafka who was a … messenger in “The Castel” novel :-). It’s part of the bigger EnMasse project which provides a scalable messaging as a service (MaaS) infrastructure running on OS/K8S.

The repo provides different deployment types : from the “handmade” solution (based on bash and Python scripts) to the current Stateful Sets solution that I’ll improve in the coming weeks.

The great thing about that (in the context of the overall EnMasse project) is that today I’m able to use standard protocols like AMQP and MQTT to communicate with an Apache Kafka cluster (using an AMQP bridge and an MQTT gateway) for all the use cases where using Kafka makes sense against traditional messaging brokers … that from their side have to tell about a lot of stories and different scenarios 😉

Do you want to know more about that ? The Red Hat Summit 2017 (Boston, May 2-4) could be a good place, where me and Christian Posta (Principal Architect, Red Hat) will have the session “Red Hat JBoss A-MQ and Apache Kafka : which to use ?” … so what are you waiting for ? See you there !