Month: July 2017

Apache Kafka consumer groups … don’t use them in the “wrong” way !

8In this blog post I’d like to focus the attention on how the “automatic” and “manual” partitions assignment can interfere with each other even breaking things. I’d like to give an advice on using them in the right way avoiding to mix them in the same scenario or being aware of what you are doing.

The consumer group experience

In Apache Kafka, the consumer group concepts is a way for achieving two things :

  • having consumers as part of the same consumer group means providing the “competing consumers” pattern with whom the messages from topic partitions are spread across the members of the group; each consumer receive messages from one or more partitions (“automatically” assigned to it) and the same messages won’t be received by the other consumers (assigned to different partitions). In this way we can scale the number of the consumers up to the number of the partitions (having one consumer reading only one partition); in this case a new consumer joining the group will be in a idle state without being assigned to any partition.
  • having consumers as part of different consumer groups means providing the “publish/subscribe” pattern where the messages from topic partitions are sent to all the consumers across the different groups. It means that inside the same consumer group we’ll have the rules explained above but across different groups, the consumers will receive the same messages. It’s useful when the messages inside a topic are of interest for different applications which will process them in different ways; we want that the all the interested applications will receive all the same messages from the topic.

Another great advantage of consumers grouping is the re-balancing feature. When a consumer joins a group, if there are still enough partitions available (so we haven’t reached the limit of one consumer per partition), a re-balancing starts and the partitions will be reassigned to the current consumers plus the new one. In the same way, if a consumer leaves a group, the partitions will be reassigned to the remaining consumers.

What I have told so far it’s really true using the subscribe() method provided by the KafkaConsumer API. This method enforces you to assign the consumer to a consumer group, setting the “group.id” property, because it’s needed for re-balancing. In any case, it’s not the consumer to decide the partitions it wants to read for; in general the first consumer which joins the group doing the assignment while other consumers join the group.

How things can be “broken”

Other than using the subscribe() method, there is another way for a consumer for reading from topic partitions : it’s using the assign() method and in this case, the consumer is able to specify the topic partitions it wants to read for.

This type of approach can be useful when you know exactly where some specific messages will be written (the partition) and you want to read directly from there. Of course, you lose the re-balancing feature in this case and it is the first big difference with using the subscribe way.

Another difference is that with “manual” assignment you can avoid to specify a consumer group (so the “group.id” property) for the consumer : it will be just empty. In any case it’s better specifying it.

Most of the people use the subscribing way, leveraging “automatic” assignment and re-balancing feature; using both the way together can brake something … let’s see.

Imagine to have a single “test” topic with only two partitions (P0 and P1) and a consumer C1 which subscribe to the topic as part of the consumer group G1. This consumer will be assigned to both the partitions receiving messages from them. Now, let’s start a new consumer C2 which is configured to be part of the same consumer group G1 but … it uses the assign way for asking for the partitions P0 and P1 explicitly.

subscribe_assignNow we have broken something ! What ? Maybe you are asking …

Both the consumer C1 and C2 will receive messages from the topic, so from both partitions P0 and P1, but … they are part of the same consumer group G1 ! So we have “broken” what we said in the previous paragraph about “competing consumers” when they are part of the same consumer group. So you are experiencing a “publish/subscribe” pattern but with consumers within the same consumer group.

What about offsets commits ?

In general you should avoid a scenario like the one described before even because there is a real side effect on that. Starting from the version 0.8.2.0, the offsets committed by the consumers aren’t saved in Zookeeper but on a partitioned and replicated topic named “__consumer_offsets” which is hosted on the Kafka brokers in the cluster.

When a consumer commits some offsets (for different partitions), it really sends a message to the broker to the “__consumer_offsets” topic and such message has the following structure :

  • key = [group, topic, partition]
  • value = offset

Coming back to the previous scenario what does it mean ?

Having C1 and C2 as part of the same consumer group but able to receive from the same partitions (both P0 and P1) it could happen something like the following :

  • C1 commits offset X for partition P0 writing a message like this :
    • key = [G1, “test”, P0], value = X
  • C2 commits offset Y for partition P0 writing a message like this :
    • key = [G1, “test”, P0], value = Y

So the consumer C2 has overwritten the committed offset for the same partition P0 of the consumer C1 and maybe X was less than Y; if C1 crashes and restarts it will lose messages starting to read from Y (remember Y > X).

Something like that can’t happen with consumers which use only the subscribe way for being assigned to partitions, because as part of the same consumer group they’ll receive different partitions so the key for the offset commit message will be always different.

[Update 28/07/2017]

As a confirmation that mixing subscribe and assign isn’t a good thing to do, after a discussion with one of my colleagues, Henryk Konsek, it turned out that if you try to call both methods on the same consumer, the client library throws the following exception :

java.lang.IllegalStateException: Subscription to topics, partitions and pattern are mutually exclusive

Conclusion

The consumer groups mechanism in Apache Kafka works really well and leveraging on that for scaling consumers and having “automatic” partitions assignment with re-balancing is a great plus. There are cases where you would need to assign partitions “manually” but in that case pay attention on what could happen if you mix both solutions.

So … let’s consume from Apache Kafka but with judgment and the awareness of what we are doing.

Advertisements

A lot of fun with … AMQP, Spark, Kafka, EnMasse, MQTT, Vert.x & IoT

When I say to someone that I work for Red Hat they say me “Ah ! Are you working on Linux ?” … No, no, no and … no ! I’m not a Linux guy, I’m not a fan boy but I’m just a daily user 🙂

All people know that Red Hat is THE company which provides the best enterprise Linux distribution well known as Red Hat Enterprise Linux (RHEL) but Red Hat is not only Linux today. Its portfolio is huge : the cloud and containers business with the OpenShift effort, the microservices offer with Vert.x, Wildfly Swarm, Spring Boot, the IoT world with the involvement in the main Eclipse Foundation projects.

The objective of this blog is just showing briefly the projects I worked (or I’m working) on since last year when I was hired on March 1st. They are not “my” projects, they are projects I’m involved because the entire team is working on them … collaboration, you know 🙂

You could be surprised about that but … there is no Linux ! I’m on the messaging & IoT team, so you will see only projects about this stuff 🙂

AMQP – Apache Spark connector

This “little” component is strictly related to the “big” radanalytics.io project which takes the powerful of Apache Spark for analytics (batch, real-time, machine learning, …) running on OpenShift.

Because the messaging team works mainly on projects like ActiveMQ Artemis and the Qpid Dispatch Router, where the main protocol is AMQP 1.0, the idea was developing a connector for Spark Streaming in order to ingest data through this protocol so from queues/topics on a broker or through the router in a direct messaging fashion.

You can find the component here and even an IoT demo here which shows how it’s possible to ingest data through AMQP 1.0 using the EnMasse project (see below) and then executing a real time streaming analytics with Spark Streaming, all running on Kubernetes and OpenShift.

AMQP – Apache Kafka bridge

Apache Kafka is one of the best technologies used today for ingesting data (i.e. IoT related scenarios) with an high throughput. Even in this case, the idea was providing a way for having AMQP 1.0 clients and JMS clients pushing messages to Apache Kafka topics without knowing the related custom protocol.

In this way, if you have such clients because you are already using a broker technology but then you need some specific Kafka features (i.e. re-reading streams), you can just switch the messaging system (from the broker to Kafka) and using the bridge you don’t need to update or modify clients. I showed how this is possible at the Red Hat summit as well and the related demo is available here.

MQTT on EnMasse

EnMasse is an open source messaging platform, with focus on scalability and performance. It can run on your own infrastructure (on premise) or in the cloud, and simplifies the deployment of messaging infrastructure.

It’s based on other open source projects like ActiveMQ Artemis and Qpid Dispatch Router supporting the AMQP 1.0 protocol natively.

In order to provide support for the MQTT protocol, we designed how to take “MQTT over AMQP” so having MQTT features on the AMQP protocol. From the design we moved to develop two main components :

  • the MQTT gateway which handles connections with remote MQTT clients translating all messages from MQTT to AMQP and vice versa;
  • the MQTT LWT (Last and Will Testament) service which provides a way for notifying all clients connected to EnMasse that another client is suddenly died sending them its “will message”. The great thing about this service, is that it works with pure AMQP 1.0 clients so bringing the LWT feature on AMQP as well : for this reason the team is thinking to change its name just in AMQP LWT service.

EnMasse is great for IoT scenarios in order to handle a huge number of connections and ingesting a lot of data using AMQP and MQTT as protocols. I used it in all my IoT demos for showing how it’s possible to integrate it with streaming and analytics frameworks. It’s also the main choice as messaging infrastructure in the cloud for the Eclipse Hono project.

Vert.x and the IoT components

Vert.x is a great toolkit for developing reactive applications running on a JVM.

The reactive applications manifesto fits really well for IoT scenarios where responsiveness, resiliency, elasticity and the communication driven by messages are the pillars of all the IoT solutions.

Starting to work on the MQTT gateway for EnMasse using Vert.x for that, I decided to develop an MQTT server that was just able to handle communication with remote clients providing an API for interacting with them : this component was used for bridging MQTT to AMQP (in EnMasse) but can be used for any scenario where a sort of protocol translation or integration is needed (i.e. MQTT to Vert.x Event Bus, to Kafka, …). Pay attention, it’s not a full broker !

The other component was the Apache Kafka client, mainly developed by Julien Viet (lead on Vert.x) and then passed to me as maintainer for improving it and adding new features from the first release.

Finally, thanks to the Google Summer of Code, during the last 2 months I have been mentoring a student who is working on developing a Vert.x native MQTT client.

As you can see the Vert.x toolkit is really growing from an IoT perspective other then providing a lot of components useful for developing pure microservices based solutions.

Eclipse Hono

Eclipse Hono is a project under the big Eclipse IoT umbrealla in the Eclipse Foundation. It provides a service interfaces for connecting large numbers of IoT devices to a back end and interacting with them in a uniform way regardless of the device communication protocol.

It supports scalable and secure ingestion of large volumes of sensor data by means of its Telemetry API. The Command & Control API allows for sending commands (request messages) to devices and receive a reply to such a command from a device asynchronously in a reliable way.

This project is mainly developed by Red Hat and Bosch and I gave my support on designing all the API other then implementing the MQTT adapter even in this case using the Vert.x MQTT server component.

Because Eclipse Hono works on top of a messaging infrastructure for allowing messages exchange, the main choice was using ActiveMQ Artemis and the Qpid Dispatch Router even running them using Kubernetes and OpenShift with EnMasse.

Apache Kafka

Finally, I was involved to develop a POC named “barnabas” (a messenger character from a Frank Kafka novel :-)) in order to take Apache Kafka running on OpenShift.

Considering the stetaful nature of a project like Kafka, I started when Kubernetes didn’t offer the StatefulSets feature doing something similar by myself. Today, the available deploy is based on StatefulSets and it’s a work in progress on which I’ll continue to work for pushing the POC to the next level.

Apache Kafka is a really great project which has its own use cases in the messaging world; today it’s more powerful thanks to the new Streams API which allows to execute a real time streaming analytics using topics from your cluster and running simple applications. My next step is to move my EnMasse + Spark demo to an EnMasse + Kafka (and streaming) deployment. I’m also giving my support on the Apache Kafka code.

Conclusion

The variety and heterogeneity of all the above projects is giving me a lot of fun in my day by day work even collaborating with different people with different knowledge. I like learning new stuff and the great thing is that … things to learn are endless ! 🙂

 

How to learn Kubernetes ? “Kubernetes in action” the answer !

Few months ago I started the Manning Access Early Program (MEAP) for one of the books that I think is the best resource for all the newbies who want to start studying Kubernetes and for all the experts who want to dig into its details and internals.

It’s “Kubernetes in action” written by Marko Luksa , one of my colleagues in Red Hat as software engineer in the Cloud Enablement Team.

First of all, I can guarantee on the author ! Marko has a really deep knowledge of Kubernetes and OpenShift and luckily for us, he decided to write a book about that. He is a very nice person, always available to help you to solve any problem that you are facing with Kube. I was lucky to start working with Marko when the EnMasse project was born.

Speaking about the book, it’s awesome !

After the first part introducing Docker and Kubernetes and the first steps with it, the book moves to the core concepts.

You can find all information about what pods are and how you can deploy containers (so your applications) with them and finally how you can replicate pods. Then how to make applications accessible inside and outside the cluster using services and how to use storage for having a persistence layer for your data shared between pods and always available on restarts. Do you want to know how your application is configurable even with sensitive data (i.e. certificates and credentials), you will find such information in this book !

After covering all these core concepts, the last big part has the objective to bring you more deep information about Kubernetes. First of all, the new StatefulSets feature which allows to deploy stateful applications with their stable identity and stored data across restarts. Then a really interesting look to Kubernetes internals speaking about all the components which made it : etcd, the API server, the controller manager and all the other stuff. Managing resources is another interesting chapter describing how you can request specific resources in terms of CPU and memory for the containers even setting limits on them.

But one of the big advantage of using a container orchestrator like Kubernetes is the possibility to scale your infrastructure based on the load you have against the applications. You will find this information about auto-scaling on CPU utilization or custom metrics !

The final part is enriched by best practices for developing cloud native applications which run on Kubernetes. In order to learn a new technology, knowing how it works is the main part but it’s more useful having examples and patterns provided by expert people like Marko.

Finally, the book ends explaining how it’s possible to extend Kubernetes defining new components and custom API objects so showing a powerful feature like its extensibility.

In conclusion, I think that this book deserves to be read and to be part of your books collection even because after reading it you’ll become expert on two technologies for developing cloud native applications : not only Kubernetes but OpenShift as well ! 🙂

No winner in the (Industrial) IoT protocols war !

Yesterday, I read this article about declaring MQTT as the winner of the IIoT (Industrial IoT) protocols war and I have a completely different opinion on that so … I totally disagree with the author !

Don’t get me wrong, it’s not because I don’t like MQTT (who knows me, knows that I have done a lot of work around MQTT as well) but just because …

“There is NO winner in the (Industrial) IoT protocols war”

The IoT world is so rich of different use cases, scenarios, features needs and so on that most of the time, the better solution is an “hybrid” one which uses different protocols; even if you focus in the specific IIoT space, that’s true.

IoT has different communication patterns which come from the messaging land and every protocol provides support for one or more of them in different ways; sometimes we have builtin support, sometimes we need to do more work at application level.

MQTT for telemetry ? But …

MQTT fits really well for telemetry because it’s mainly based on publish/subscribe but at same time it has no flow control : what’s happen when the broker is overwhelmed by tons of messages at high rate and it can’t dispatch such messages to the subscribers at the same pace ? It’s even true that most of the time, MQTT devices are tiny sensors which send data with a slow rate (i.e. every second) because they are battery powered and use mobile connection so that they send a message, then go to sleep for few seconds and then wake up for sending the new message. In this case, you don’t have high rate but if you have thousands (millions ?) of these devices, the broker is overwhelmed as well : there is a burst of messages which come and it has to handle all of them.

AMQP doesn’t declare any specific supported pattern and it fits well for all. Regarding telemetry (so publish/subscribe), it provides flow control (even at different levels) so that the receiver node can stop the sender having more time for processing messages received up to now.

Why more complexity for Command & Control ?

Moving to command and control, so speaking about a request/reply pattern, all the MQTT limitations come. In this case, you have to build something on top of the protocol infrastructure defining specific topics for the requests and the related replies and having each client both subscriber (for receiving command) and publisher (for sending reply). There is no correlation between request and reply, it’s all defined at topic level (and/or using payload information).

With AMQP, even this pattern is supported natively. The requester has the possibility to specify a “replyTo” address inside the message, saying to the responder that it expects to receive the reply on such address; even the correlation is supported at protocol level thanks to message and correlation identifiers.

The real feature which makes this difference between AMQP and MQTT is that the former has message metadata (header, annotations and so on) while the latter has just payload (raw bytes) so all the features that it lacks for providing a different pattern from publish/subscribe need to be defined in terms of topics architecture and/or payload structure … so the complexity is moved at application level.

If you want to read more about these differences (even with HTTP protocol) maybe you can find my article “Strengths and weaknesses of IoT communication patterns” on DZone IoT as a useful reference (it’s part of the latest DZone IoT Guide).

Let’s say things as they are

The mentioned article says some wrong things as well.

“AMQP offers robust features like queuing” … to be precise there is no mention about queue in the AMQP specification but container, node, link and so on. This is because AMQP doesn’t specify the network architecture in terms of brokers : pay attention here, I’m speaking about AMQP 1.0 … the only OASIS and ISO/IEC standard (against the AMQP 0.9, used in RabbitMQ). AMQP can be used for RPC without “store and forward” mechanism (provided by brokers) but just with “direct” messaging; AMQP is a peer to peer protocol !

About MQTT … “An example of this optimization is its use of 1 byte keep alive packets.” … no true ! It’s 2 bytes ! … I know I’m a little bit pedantic here 🙂

Finally, it’s not true that only MQTT can work without high-availability and with a low-bandwidth. It’s true even for AMQP, considering the QoS (Quality of Service) levels it supports as well.

Speaking about messages size and computational needs on the devices side.

With MQTT each message carries the topic information, not true with AMQP where the address is specified one time on attaching the link.

When security and encryption come, the SSL/TLS overload minimizes all this differences so that even a 2 bytes packet for keep alive becomes an even bigger message. In this case, it all depends on computational resources you have on your tiny device and the difference between protocols doesn’t matter.

Conclusion

So my conclusion is clear. I have just started with that at the top of this article : there is no winner  in the (Industrial) IoT protocols wars. There are different use cases, scenarios, features needs, limitations … they all drive to the right choice that sometimes means having multiple winners in one solution !

The good news about MQTT is that in the latest v5 specification they are addressing a lot of limitations of the current 3.1.1 version, adding some AMQP-like features 🙂

So stay tuned … the war is endless !