Cloud

A new “Kafka” novel : the OpenShift & Kubernetes deployment

This blog post doesn’t want to be an exhaustive tutorial to describe the way to go for having Apache Kafka deployed in an OpenShift or Kubernetes cluster but just the story of my journey for having a “working” deployment and using it as a starting point to improve over time with a daily basis work in progress. This journey started using Apache Kafka 0.8.0, went through 0.9.0, finally reaching the nowadays 0.10.1.0 version.

From “stateless” to “stateful”

One of the main reasons to use a platform like OpenShift/Kubernetes (let me to use OS/K8S from now) is the scalability feature we can have for our deployed applications. With “stateless” applications there are not so much problems to use such a platform for a Cloud deployment; every time an application instance crashes or needs to be restarted (and/or relocated to a different node), just spin up a new instance without any relationship with the previous one and your deployment will continue to work properly as before. There is no need for the new instance to have information or state related to the previous one.

It’s also true that, out there, we have a lot of different applications which need to persist state information if something goes wrong in the Cloud and they need to be restarted. Such applications are “stateful” by nature and their “story” is important so that just spinning up a new instance isn’t enough.

The main challenges we have with OS/K8S platform are :

  • pods are scaled out and scaled in through Replica Sets (or using Deployment object)
  • pods will be assigned an arbitrary name at runtime
  • pods may be restarted and relocated (on a different node) at any point in time
  • pods may never be referenced directly by the name or IP address
  • a service selects a set of pods that match specific criterion and exposes them through a well-defined endpoint

All the above considerations aren’t a problem for “stateless” applications but they are for “stateful” ones.

The difference between them is also know as “Pets vs Cattle” meme, where “stateless” applications are just a herd of cattle and when one of them die, you can just replace it with a new one having same characteristics but not exactly the same (of course !); the “stateful” applications are like pets, you have to take care of them and you can’t just replace a pet if it’s die 😦

Just as reference you can read about the history of “Pets vs Cattle” in this article.

Apache Kafka is one of these type of applications … it’s a pet … which needs to be handle with care. Today, we know that OS/K8S offers Stateful Sets (previously known as Pet Sets … for clear reasons!) that can be used in this scenario but I started this journey when they didn’t exist (or not released yet), so I’d like to share with you my story, the main problems I encountered and how I solved them (you’ll see that I have “emulated” something that Stateful Sets offer today out of box).

Let’s start with a simple architecture

Let’s start in a very simple way using a Replica Set (only one replica) for Zookeeper server and the related service and a Replica Set (with three replicas) for Kafka servers and the related service.

reference_architecture_1st_ver

The Kafka Replica Set has three replicas for “quorum” and leader election (even for topic replication). The Kafka service is needed to expose Kafka servers access even to clients. Each Kafka server may need :

  • unique ID (for Zookeeper)
  • advertised host/port (for clients)
  • logs directory (for storing topic partitions)
  • Zookeeper info (for connection)

The first approach is to use the broker id dynamic generation so that when a Kafka server starts and needs to connect to Zookeeper, a new broker id is generated and assigned to it. The advertised host/port are just container IP and the fixed 9092 port while the logs directory is predefined (by configuration file). Finally, the Zookeeper connection info are provided through the related Zookeeper service using the related environment variables that OS/K8S creates for us (ZOOKEEPER_SERVICE_HOST and ZOOKEEPER_SERVICE_PORT).

Let’s consider the following use case with a topic (1 partition and 3 replicas). The initial situation is having Kafka servers with broker id 1001, 1002, 1003 and the topic with current state :

  • leader : 1001
  • replicas : 1001, 1002, 1003
  • ISR : 1001, 1002, 1003

It means that clients need to connect to 1001 for sending/receiving messages for the topic and that 1002 and 1003 are followers for having this topic replicated handling failures.

Now, imagine that the Kafka server 1003 crashes and a new instance is just started. The topic description becomes :

  • leader : 1001
  • replicas : 1001, 1002, 1003 <– it’s still here !
  • ISR  : 1001, 1002 <– that’s right, 1003 is not “in-sync”

Zookeeper still sees the broker 1003 as a host for one of the topic replicas but not “in-sync” with the others. Meantime, the new started Kafka server has a new auto generated id 1004. A manual script execution (through the kafka-preferred-replica-election.sh) is needed in order to :

  • adding 1004 to the replicas
  • removing 1003 from replicas
  • new leader election for replicas

use_case_autogenerated_id.png

So what does it mean ?

First of all, the new Kafka server instance needs to have the same id of the previous one and, of course, the same data so the partition replica of the topic. For this purpose, a persistent volume can be the solution used, through a claim, by the Replica Set for storing the logs directory for all the Kafka servers (i.e. /kafka-logs-<broker-id>). It’s important to know that, by Kafka design, a logs directory has a “lock” file locked by the server owner.

For searching for the “next” broker id to use, avoiding the auto-generation and getting the same data (logs directory) as the previous one, a script (in my case a Python one) can be used on container startup before launching the related Kafka server.

In particular, the script :

  • searches for a free “lock” file in order to reuse the broker id for the new Kafka server instance …
  • … otherwise a new broker id is used and a new logs directory is created

Using this approach, we obtain the following result for the previous use case :

  • the new started Kafka server instance acquires the broker id 1003 (as the previous one)
  • it’s just automatically part of the replicas and ISR

use_case_locked_id

But … what on Zookeeper side ?

In this deployment, the Zookeeper Replica Set has only one replica and the service is needed to allow connections from the Kafka servers. What happens if the Zookeeper crashes (application or node fails) ? The OS/K8S platform just restarts a new instance (not necessary on the same node) but what I see is that the currently running Kafka servers can’t connect to the new Zookeeper instance even if it holds the same IP address (through the service usage). The Zookeeper server closes the connections after an initial handshake, probably related to some Kafka servers information that Zookeeper stores locally. Restarting a new instance, this information are lost !

Even in this case, using a persistent volume for the Zookeeper Replica Set is a solution. It’s used for storing the data directory that will be the same for each instance restarted; the new instance just finds Kafka servers information in the volume and grants connections to them.

reference_architecture_1st_ver_zookeeper

When the Stateful Sets were born !

At some point (from the 1.5 Kubernetes release), the OS/K8S platform started to offer the Pet Sets then renamed in Stateful Sets like a sort of Replica Sets but for “stateful” application but … what they offer ?

First of all, each “pet” has a stable hostname that is always resolved by DNS. Each “pet” is being assigned a name with an ordinal index number (i.e. kafka-0, kafka-1, …) and finally a stable storage is linked to that hostname/ordinal index number.

It means that every time a “pet” crashes and it’s restarted, the new one will be the same : same hostname, same name with ordinal index number and same attached storage. The previous running situation is fully recovered and the new instance is exactly the same as the previous one. You could see them as something that I tried to emulate with my scripts on container startup.

So today, my current Kafka servers deployment has :

  • a Stateful set with three replicas for Kafka servers
  • an “headless” service (so without an assigned cluster IP) that is needed for having Stateful set working (so for DNS hostname resolution)
  • a “regular” service for providing access to the Kafka servers from clients
  • one persistent volume for each Kafka server with a claim template defined in the Stateful set declaration

reference_architecture_statefulsets

Other then to use a better implementation 🙂 … the current solution doesn’t use a single persistent volume for all the Kafka servers (having a logs directory for each of them) but it’s preferred to use a persistent storage dedicated to only one “pet”.

It’s great to read about it but … I want to try … I want to play !

You’re right, I told you my journey that isn’t finished yet but you would like to try … to play with some stuff for having Apache Kafka deployed on OS/K8S.

I called this project Barnabas like one of the main characters of the author Franz Kafka who was a … messenger in “The Castel” novel :-). It’s part of the bigger EnMasse project which provides a scalable messaging as a service (MaaS) infrastructure running on OS/K8S.

The repo provides different deployment types : from the “handmade” solution (based on bash and Python scripts) to the current Stateful Sets solution that I’ll improve in the coming weeks.

The great thing about that (in the context of the overall EnMasse project) is that today I’m able to use standard protocols like AMQP and MQTT to communicate with an Apache Kafka cluster (using an AMQP bridge and an MQTT gateway) for all the use cases where using Kafka makes sense against traditional messaging brokers … that from their side have to tell about a lot of stories and different scenarios 😉

Do you want to know more about that ? The Red Hat Summit 2017 (Boston, May 2-4) could be a good place, where me and Christian Posta (Principal Architect, Red Hat) will have the session “Red Hat JBoss A-MQ and Apache Kafka : which to use ?” … so what are you waiting for ? See you there !

IoT developer survey : my point of view

Few days ago, the Eclipse Foundation published the report of the last IoT developer survey sponsored by the foundation itself with IEEE IoT and Agile IoT. This survey has as main objective to understand what are the preferred technologies used by developers in terms of languages, standards and operating systems; furthermore, it shows what are the main concerns about IoT and how companies are shipping IoT solutions today.

Great content about this report was published by Ian Skerrett (Vice President of Marketing at Eclipse Foundation) on his blog and on slideshare with a summary of all main information about it.

I’d like just to add my 2 cents and doing some absolutely personal considerations about the results …

Companies are investing …

Regarding how companies are delivering IoT solutions, it’s clear that the IoT market is growing. A lot of companies already have IoT products in the fields and the others are planning to develop them in the coming months. It’s not a surprise, other than a buzzword, the IoT is a real business opportunity for all companies strictly related to the embedded devices (silicon vendors, OEMs, ..) or software companies (for the cloud and application side) which are rapidly change how their business is made.

Security and interoperability : the big concerns

The result related to the main concerns about IoT is very clear : people and companies are worried about the security. All data flowing from our personal life or owned by companies to the cloud need to be protected in order to avoid someone can steal them. The concern about security is strictly related to software protocols (i.e. SSL/TLS, …) and hardware stuff (i.e. cryptochip, …) and today it seems that a very good solution isn’t available. The same is for interoperability : having a lot of IoT standard protocols means having NO standard protocols. A lot of consortiums are trying to define some standard specifications and frameworks in order to define a standard but … they are too much; all big companies are divided in different consortiums and some of them are part of more then one : this is a big deal, as for protocols … it means NO standard.

Developer prefer Java and C … what about JavaScript ?

It’s not a surprise the first place of Java as preferred language and C as second one : Java is used in a lot of cloud solutions which are based on open source products and C is the better language for developing on devices side with great performance at low cost (at least from an hardware point of view). First strange position is about JavaScript as third most used language : I hope this position is related to its huge usage with NodeJS on server side and not as “embedded” language on devices … I’m scared about that.

Protocols : the current know-how is leading

Now, the protocols …

Having HTTP/1.1 as first used protocol is real because today it’s the only well known protocol in the developers world; in order to develop and deliver an IoT solution with a quick time to market, companies leverage on internal know-how and sometimes they don’t invest to figure out how other protocols work and if they have other advantages. It explains to me this position, thanks to HTTP/1.1 simplicity and its ASCII/text based nature : a lot of developers don’t like binary format so much. Last point is that the REST architecture is a very good solution in a lot of scenarios and HTTP/1.1 is the most used protocol (the only one ?) for that.

MQTT and CoAP are used a lot thanks to the available open source projects and their simplicity; MQTT is very lightweight and works great on tiny embedded devices, CoAP tries to overcome some HTTP/1.1 disadvantages (i.e. server push, observer, …) with new features and its binary nature.

A lot of developers are scared about AMQP because I have to admit it’s not so simple like the previous ones but it’s powerful and everyone should give it a try. If you want to start with it, you can find a lot of links and resources here.

I’m surprised by the fourth position of HTTP/2.0 ! I mean … how many developers know, love and use HTTP/2 today ? I was surprised by this high position … I expected it behind “in-house, proprietary”, AMQP and XMPP. I suppose that companies are prototyping solutions using this protocol because they think that thanks to the HTTP/1.1 knowledge it’s quite simple to move to the next version : I think it’s totally wrong, because HTTP/2.0 is completely different from HTTP/1.1. I love it … I’ll invest in it.

OS : Linux and RTOS on bare metal

Regarding operating systems, the first position for Linux isn’t a surprise but we have to consider it both on server side and devices side (even if embedded devices based on Linux are a lot). The other OS are only for embedded devices (low constrained devices) so the percentages don’t have any help from cloud side. Finally Linux is useful for IoT gateway too (as we know with Kura) even if Microsoft, for example, is investing in its Windows IoT Core and will release an IoT Gateway SDK in the next months.

All the services in the cloud

Not a surprise Amazon AWS with its first position as Cloud services provider but I don’t think about their relatively new AWS IoT platform but all the IoT open source stuff that developers prefer to run on Amazon VMs than Azure VMs.

Conclusion

Here the great news is that IoT market is growing and developers/companies are investing in it to try to be on the market as soon as possible. The “bad” news is that too much different protocols and frameworks are used and the way to interoperability and interconnection is quite long or … infinite ?

Azure IoT Hub is GA : the news !

Yesterday, the Microsoft Azure IoT Hub was released in GA !

The public preview had a good success with a lot of people (makers) and companies (professional) try to use it for developing their IoT end to end solutions.

In a previous blog post, I have already discussed about its mean features with a comparison with AWS IoT, the Internet of Things platform by Amazon.

Relating to that article, there are the following differences it’s important to focus on :

  • Azure IoT Hub now supports MQTT 3.1.1 natively ! There is no need to use a field gateway for translating MQTT to AMQP (or HTTP) to communicate with the Hub. Now, your MQTT enabled devices can connect directly to the Cloud and you can use the SDK provided by Microsoft (with an API abstraction layer on top of MQTT) or any MQTT library (and M2Mqtt is a good choice for C# applications). Of course, the connection must be always encrypted with SSL/TLS protocol. More information at official documentation page here.
  • The pricing is changed : first of all, the pricing isn’t related to the number of devices (as the public preview) but only to the total number of messages/day. The bad news is that starting from April 1st the S1 and S2 plans will have a doubled price. Of course, the Free plan … will be still free !
  • AMQP over WebSockets : the AMQP protocol is supported on WebSockets too (like Event Hubs for example).

With the above two major news, the Azure IoT Hub offer is closer to AWS IoT offer : it supports MQTT and removed the devices limit on pricing.

News are not only on the Cloud side but on devices side too !

In the last months, a lot of OEMs and hardware companies worked hard to support Windows 10 for IoT Core and Azure IoT Hub connection on their platforms. Today the number of Azure Certified IoT Partners is literally increased !

 new_iothub_partners

It’s great to see that the Hub ecosystem is growing … now we have to wait for real IoT solutions based on it !

To start learning about Azure IoT Hub, I advice you the link to the Azure IoT Hub Learning Path which will guide you through all the steps needed to use the Hub in the best way.

The “hybrid” Internet of Things

The title of this blog post could sound strange to you … what’s the “hybrid” Internet of Things ? Why speaking about an hybrid nature related to IoT ? As we’ll see, It’s not something new but we can consider it both a new way to approach already running solutions or an old way to see new solutions in the IoT space … it’s up to you choosing the interpretation you like. In the Internet of Things nothing is new, today it’s only the right time for connecting “objects” on large scale.

So … what’s “hybrid” Internet of Things ?

The hybrid cloud

For several years, the cloud computing was the most used base for a lot of enterprise architectures and companies decided to move all their data, computation, processing and so on from “on premise” infrastructures to the cloud itself.

The cloud seems to offer infinite storage space and scaling for computation without any concerns from a company point of view which can configure all the features to change automatically. The result could be less time to spend on handling “on premise” infrastructures and less (?) moneys to invest.

hybrid-cloud

Even if all available cloud platforms (Microsoft Azure, Amazon AWS, IBM Bluemix, Google App Engine, …) have got certifications about data storage and related protection there is one big concern for companies which invest in a distributed architecture … the security and privacy on data.

Data are the money of the new century and for this reason today a lot of companies prefer to store sensible data in their private servers but rely on scaling and computational features of public servers in the cloud : the hybrid cloud was born in order to connect these two infrastructures.

In the hybrid architecture, the company protects sensible data in house and leverage on the computation of public cloud exchanging only not sensible data using encrypted connection of course.

One of the most important players in the hybrid cloud is Red Hat which leverage on all main open source projects to develop its offer.

The idea of Internet of Things

When Kevin Ashton coined the term “Internet of Things” in the 1999, he wanted to describe the connection between the physical world and Internet speaking about “things” (not only people) on the public network.

iot

At that time, the hardware was too much expensive, too big in size, not so powerful and with a poor connection. Nowadays we have very tiny devices with a lot of CPU power and memory other than all kind of connectivity : all these things at a very low cost. These conditions have favored the birth of “maker” movement : a lot of non professional people can now develop on embedded devices in a very simple way with the programming language they love. Not only the “old” but powerful C/C++ but many other high level languages like C#, Java, Python and JavaScript of course.

In this scenario, the main idea is connecting all these things to Internet directly, sending data to the cloud for processing and receiving command to control devices from the cloud itself. The connection is the main needed feature that is provided on the embedded devices themselves if they have enough resources or through a more powerful node called “field gateway”.

The direct connection to Internet means a very huge number of devices connected to the big network but it means a lot of problems too. Of course, it’s true for all the devices that are TCP/IP capable (via Ethernet or Wi-Fi) but not for a very huge number of “old” devices with legacy connections (RS232, RS485 …), devices with PAN (Personal Area Network) support (BLE, ZigBee, Z-Wave, …) or industrial protocols (OPC-UA, Modbus, CAN, …). In these scenarios, the field gateway becomes the bridging point between devices and the cloud.

Moving IoT in the “fog”

As we can see in most cases there is the need to add another node at the edge of an IoT solution to bring the real world data and control to the cloud. There are too much problems to rely on a public servers infrastructure only, it’s true both on devices and cloud side.

Moving part of the intelligence at the edge of an IoT solution and closed to the “T” side is an old practise in industrial environment but has a huge value today considering the growing number of connected devices. This approach has a well defined name today : the fog computing.

fog

There are a lot of concerns about cloud computing solved by the fog. Let’s try to summarize them !

The protocols Babel tower

The first role of a field gateway is the protocol translation. This scenario is always true : it’s true for devices which are already TCP/IP capable for connecting to the Internet but it’s also true for legacy and low constrained devices which need a bridging point to the cloud. Devices could be able to connect directly to the cloud from a network capabilities point of view but they couldn’t speak the same “language”, it means the same protocol. Babel tower is the home for a lot of protocols today and the online IoT platforms can’s speak all of them. The first need is a local translation from devices protocol to cloud protocol in both cases if it’s based on TCP/IP (see MQTT, AMQP, HTTP, …) or on personal area network (see BLE for example).

Reducing cloud workload

Using an IoT field gateway, part of the work is done in the fog at the edge of our complex solution and it means reducing the cloud workload. Data centers are huge with powerful public servers but the resources aren’t unlimited; speaking about million/billion/trillion of connected devices is a problem for the cloud  and reducing this number could be a very good solution. Thanks to a central node at the edge, we can establish to few connections but at same time sending data that are representative of more devices in a local network. Sometimes not all produced data are needed for processing in the cloud so a pre-processing is executed at the edge to analyze, filter, reduce, elaborate data to send.

Real time reaction

A lot of IoT solutions need a near real time reaction time. Connection to the cloud introduces latency : to control a device, the data is sending to the cloud through the “big net”, the server processes it and replies with a command to start an action on the device itself. There is a not negligible round trip related to the connection and to the server workload too, because it’s serving not only one device but a huge number of them.

Offline handling

Without an Internet connection, an IoT solution which is entirely based on an online platform can’t work ! Of course, all cloud providers offer a SLA (Service Level Agreement) very closed to 100% but a lot of times we have to consider the very low percentage of failure. With fog computing we have a local node and we have to deal with local network connection only. The same is true when the cloud platform is available but the connection isn’t reliable (and it’s like having the platform offline) or the bandwidth is low. Thanks to the central node at the edge we are able to handle a lot of offline scenarios : we can leverage on local storing data when the connection isn’t available.

Security and privacy

In a lot of scenarios protecting data is a must. Even if all cloud connections are encrypted and based on SSL/TLS protocol, more companies which build IoT solution prefer to have data in their private server to protect themselves and their customers (who are the data owners). Using a field gateway, we can filter data to avoid sending all of them to the cloud; we can hold sensible data in our local network and send the non sensible ones to the public servers.

Reducing price

All IoT cloud platforms aren’t for free but they have a cost which is related to the number of connected devices and the number of messages exchanged per hours/days. Thanks to the field gateway we can connect more local devices using only a single connection (for the gateway itself) and thanks to the pre-processing we can filter and reduce the amount of data to send : it means reducing cost. It’s more important when the traffic isn’t “free” but it has an higher cost like using GSM connection for example.

The “fog” computing is around us

If we think about the the now days IoT market, almost of the 100% of the solutions are “fog” solutions and don’t rely on a pure cloud architecture. We can think about the connected cars : all the sensors speak a specific protocol on a CAN bus and a central and unique gateway is the only one having an Internet connection to send gathered data to the cloud.

connected_cars

The same is for industrial environments (think about a cars production line) where all devices use industrial protocols like OPC-UA and need near real time reaction time (think about the robots in the production line).

production_line

The smart home is another example made of BLE, ZigBee, Z-Wave based devices connected to the Internet through a central router. In general, the smart grid solutions like smart cities are based on an architecture made of more local networks connected to the cloud through a single point. Last simple example could be considered the wearable market : all wearable devices are very low constrained and aren’t TCP/IP capable so for this reason we need a gateway to send their data to the cloud; in most cases this gateway is our smartphone !

Conclusion

All around the world, big companies speak about their great online IoT platforms (you can read about two of them on this article). Of course, we need them because building an “on premise” IoT solution is almost impossible. It’s also true that we can’t rely on a “pure” cloud architecture. We saw that the solutions already in the field aren’t so pure but they are “fog” solutions and it could be considered the only reliable approach we have to use for future implementation.

The future is not “pure” … it’s “hybrid” … for this reason I like to speak about “hybrid” Internet of Things !

WPC 2015 Milan : Azure IoT Hub and IoT Suite

wpc2015

Organized by Overnet, in collaboration with Microsoft, WPC is the most important italian conference focused on Microsoft technologies. This year it will be covered in two full immersion days on December 1st and 2nd with 70 sessions in 8 tracks.

I’m honoured to be part of the speakers team this year as Microsoft MVP on Windows Embedded and IoT; on December 2nd, I’ll have a session about Microsoft Azure IoT Hub with an overview of the new Azure cloud gateway and the related Azure IoT Suite.

For sure, the conference will be great for contents and networking with all experts about Microsoft technologies. Don’t forget the “Ask The Expert” corner with a “bunch” of Microsoft MVPs ready to answer your questions.

All information and details about the registration and the conference on the official web site.

Azure IoT Hub : get telemetry data using AMQP stack and Azure SB Lite

To complete the last two article series (data from device to cloud, command/feedback from cloud to device) on using AMQP protocol stack to interact with Azure IoT Hub, we need to get telemetry data from the devices.

When devices send data through their D2C endpoints, these data flow into the IoT Hub system and are made available to the service through its D2C endpoint that is Event Hubs compatible which means we can use any Event Hubs client to get data from it.

eventhubcompatible

On the new Azure portal we can see a lot of information about that in the related “Messaging” tab for our IoT Hub. The main information are :

  • Partitions : the number of partitions through which data from devices are ingested by the IoT Hub;
  • Event Hub compatible name : it’s the name of the event hub;
  • Event Hub compatible endpoint : it’s the complete path (with namespace) of the event hub;
  • Retention time : it’s the time the messages are retained inside the event hub;
  • Consumer groups : the available consumer groups for reading messages from event hub using related receivers (there is always the $Default consumer group);

To read from this event hub endpoint we can use any shared access policy that has the ServiceConnect permission. The portal provides us a default policy named “service” with its related shared access key. Using three of the above information we are able to build the connection string needed to connect to this auto-generated event hub :

Endpoint={Event Hub-compatible endpoint};SharedAccessKeyName={iot hub policy name};SharedAccessKey={iot hub policy key}

Of course, other than the above connection string we have to use the Event Hub compatible name.

Now … how the get data from event hub ? What’s the code we have to write ?

The simpler way to do that is to use the Event Processor Host provided by Microsoft as Nuget package that instantiates receivers for us on all available partitions and handle their leases providing us a checkpoint feature. The “only” big problem we have with this awesome software component is that it works only on .Net Framework so we can use it on PC based and Web based service applications.

UWP apps ? : AMQP and Azure SB Lite the solution

What can we do for UWP apps ? What we can do if we want to monitor telemetry data from an UWP app on Windows 10 without bridging data from a Web application ?

As in the previous articles the solution is to use a good C# implementation of AMQP protocol stack as AMQP .Net Lite but in this case, to avoid AMQP stuff, we can use the Azure SB Lite library (available on Nuget too) that wraps the protocol stack and exposes same official Service Bus SDK APIs to access to Event Hubs (other than queues and topics/subscriptions).

If you know the above APIs to interact with Event Hubs, the following code will be familiar to you :

static string ConnectionString = "Endpoint=[EVENT_HUB_COMPATIBLE_ENDPOINT];SharedAccessKeyName=[IOT_HUB_POLICY_NAME];SharedAccessKey=[IOT_HUB_POLICY_KEY]";
static string eventHubEntity = "[EVENT_HUB_COMPATIBLE_NAME]";
static string partitionId = "[PARTIION_ID]";
static DateTime startingDateTimeUtc;

static void Main(string[] args)
{
   ServiceBusConnectionStringBuilder builder = new ServiceBusConnectionStringBuilder(ConnectionString);
   builder.TransportType = TransportType.Amqp;

   MessagingFactory factory = MessagingFactory.CreateFromConnectionString(ConnectionString);

   EventHubClient client = factory.CreateEventHubClient(eventHubEntity);
   EventHubConsumerGroup group = client.GetDefaultConsumerGroup();

   startingDateTimeUtc = new DateTime(2015, 10, 31, 16, 00, 00);
            
   EventHubReceiver receiver = group.CreateReceiver(partitionId, startingDateTimeUtc);
            
   while (true)
   {
        EventData data = receiver.Receive();
        Debug.WriteLine("{0} {1} {2}", data.PartitionKey, data.EnqueuedTimeUtc.ToLocalTime(), Encoding.UTF8.GetString(data.GetBytes()));
   }

   receiver.Close();
   client.Close();
   factory.Close();
}

The above example is using Azure SB Lite and connecting to my current IoT Hub, it produces the following output :

iot_hub_event_hub_sample

Without having the Event Processor Host available on UWP apps, the bid deal is to create one or more receivers on all partitions by ourselves and handling the checkpoint on what is the position inside the stream we have already read. In that case, it’s useful to save the last reading date/time and use it as offset for starting a new read on the stream.

Conclusion

If you scaried to develop a monitoring UWP application because the related IoT Hub SDK doesn’t work on it … don’t worry you always have a solution that is based on using the underlying AMQP protocol stack. In that case, the solution is simpler thanks to the work I already done for the community with the Azure SB Lite. Of course, the choice to use and UWP application on a Windows 10 device instead of a .Net / Web application for monitoring your telemetry data is up to you and depends on the scenario, so it can make sense or not.

Connecting to the Azure IoT Hub using an AMQP stack

Accessing to the Azure IoT Hub service is so simple using all the available SDKs that Microsoft provides as open source on GitHub. We can laverage on more different languages for different platforms : from C# for .Net and UWP (Universal Windows Platform) applications to Java, from NodeJS (using JavaScript) to C. In this way, we are able to coverage different hardware from higher level devices like BeagleBoard, Raspberry Pi 2, Minnowboard Max to very low constrained devices like Kinetis K64F (Freescale) and CC3200 (Texas Instruments).

Of course, using directly HTTP and AMQP protocols, we can access to the IoT Hub from other platforms (not certified yet) and it could be the scenario of the .Net Micro Framework because up to now there isn’t an official SDK for it. The current C# client implementation uses Task and async/await programming model very heavily that isn’t supported (yet) by the “little” child in the .Net frameworks family. One choice to connect to the IoT Hub from a .Net Micro Framework board is to use directly the underlying AMQP protocol without the abstraction layer provided by an SDK. In this case you need to know some protocol concepts and a bunch of paths to the IoT Hub entities used for communication between devices and Cloud.

The purpose of this post is to show how to do this using the AMQP .Net Lite library that provides an AMQP protocol stack written in C# for a lot of different platforms like .Net Framework, .Net Compact Framework, .Net Micro Framework, WinRT (so UWP applications too), Mono, Windows Phone and so on; of course it’s available as open source on GitHub.

IoT Hub : connection and device endpoints

The IoT Hub is reachable using an address that has the following format

<IOT_HUB_NAME>.azure-devices.net

that we can retrieve from the Azure portal after creating the new IoT Hub instance service. As for all the services inside the Service Bus family (queues, topics/subscriptions and event hubs) the IoT Hub needs an SSL/TLS connection for data encryption and server authentication; it means that we have to connect to the host address to the default AMQPS (AMQP Secure) port that is the 5671.

We can create a new device inside the registry and get related credentials information using the Device Explorer application you can download here. After getting all needed information we can set them into the code.

private const string HOST = "IOT_HUB_NAME.azure-devices.net";
private const int PORT = 5671;
private const string DEVICE_ID = "DEVICE_ID";
private const string DEVICE_KEY = "DEVICE_KEY";

Using above information we can create an instance of the Address class and using it to establish the connection with the host thanks to the Connection class.

address = new Address(HOST, PORT, null, null);
connection = new Connection(address);

Inside the IoT Hub architecture, each device has two endpoints for accessing the Cloud :

  • D2C (device to cloud) : the device uses this endpoint to send messages to the cloud both as telemetry data and feedback for a received command (on the other endpoint, see below). It means that when we send a command to the device, it replies with a feedback at application level to confirm that the command is acquired and it’s going to be executed. Of course, it’s always true for a rejected command by the device;
  • C2D (cloud to device) : the device receives commands on this endpoint for executing the requested action. As already said, the device sends a confirmation (or rejection) of received command to the cloud using the other endpoint (D2C);

At AMQP level the endpoints are accessible from different entity paths; if you know Service Bus queues, topics/subscriptions and event hubs we can think them in the same way.

The entity path for sending data for telemetry purpose is defined in the following way :

/devices/<DEVICE_ID>/messages/events

where <DEVICE_ID> is the id assigned to the device when we create it inside the identity registry.

The entity path for receiving command from the Cloud is defined in the following way :

/devices/<DEVICE_ID>/messages/deviceBound

and as for the previous entity you need to provide the <DEVICE_ID> in the path.

It means that after creating a connection and a session to our IoT Hub host we need to create two links to above entities (or nodes as defined in the AMQP spec). Using the programming model provided by AMQP .Net Lite library we have :

  • A SenderLink to the /devices/<DEVICE_ID>/messages/events node;
  • A ReceiverLink to the /devices/<DEVICE_ID>/messages/deviceBound node;

Authentication : sending the SAS token

IoT Hub offers a per-device authentication through a SAS token that we can generate starting from device id and device key. After connection establishment we need to send such token to a specific CBS (Claim Based Security) endpoint to authorize the access to the specific entity.

As usual for Azure services, the token has the following format :

SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}

The big difference is that the skn field is absent in our case using device credentials .To get the SAS token I used the same code from my Azure SB Lite library because it’s processed almost in the same way.

string audience = Fx.Format("{0}/devices/{1}", HOST, DEVICE_ID);
string resourceUri = Fx.Format("{0}/devices/{1}", HOST, DEVICE_ID);
            
string sasToken = GetSharedAccessSignature(null, DEVICE_KEY, resourceUri, new TimeSpan(1, 0, 0));
bool cbs = PutCbsToken(connection, HOST, sasToken, audience);

The PutCbsToken creates a new session and a new link to connect to the specific $cbs node always using the same TCP connection. The content of the message is well defined by the AMQP CBS draft spec. After sending the token we are authorized to access IoT Hub from the device.

Just a note : I’m using the Fx class provided by AMQP .Net Lite library to have the Format method that doesn’t exist in the String class for the .Net Micro Framework.

Sending telemetry data

Using the SenderLink instance the device sends data calling the simple Send() method and passing it a Message class instance contains the data to send.

The sender link is created inside a new AMQP Session (using the related class of AMQP .Net Lite library) and the great news is that, thanks to the multiplexing feature of AMQP protocol, we can use the same session for both sender and receiver links all inside the same TCP connection.

static private void SendEvent()
{
    string entity = Fx.Format("/devices/{0}/messages/events", DEVICE_ID);

    SenderLink senderLink = new SenderLink(session, "sender-link", entity);

    var messageValue = Encoding.UTF8.GetBytes("i am a message.");
    Message message = new Message()
    {
        BodySection = new Data() { Binary = messageValue }
    };

    senderLink.Send(message);
    senderLink.Close();
}

Running the code, we can interact with the device using the Device Explorer application to receive the messages it sends.

dev_exp_iot_hub_1

Receiving command and send feedback

Using the ReceiverLink instance the device can receive command from the service in the Cloud calling the Receive() method. In addition to the sending commands features, the IoT Hub provides a feedback feature at application level for them; it means that the device is able to send a confirmation of received command to the service to accept or reject it. If the device is offline and doesn’t receive the command, the IoT Hub provides a TTL (Time To Live) you can set on every single message so that the command isn’t delivered to the device when it comes back online if the timeout is expired; this feature avoids to deliver a command that makes sense only if it’s executed on the device in a short time.

The device doesn’t need to send the feedback as a message on a specific AMQP node/entity but it’s handled by the IoT Hub when the ReceiverLink accepts or rejects the command. Using AMQP .Net Lite we can call the Accept() or Reject() methods on the ReceiverLink instance; at AMQP level it means that a “disposition” performative is sent to the IoT Hub with an outcome of “accepted” or “rejected”. Receiving this outcome the IoT Hub sends a feedback message to the D2C endpoint on the Cloud service side. With such outcomes the message goes into a completed state (positive feedback to the Cloud) or dead letter state (negative feedback).

static private void ReceiveCommands()
{
    string entity = Fx.Format("/devices/{0}/messages/deviceBound", DEVICE_ID);

    ReceiverLink receiveLink = new ReceiverLink(session, "receive-link", entity);

    Message received = receiveLink.Receive();
    if (received != null)
         receiveLink.Accept(received);

    receiveLink.Close();
}

Pay attention on the available Release() method in the library; in this case the outcome is “released” and the message returns into the command queue (enqueued state) ready to be re-delivered to the device if it calls the Receive() method again. If the device receives the messages more times and always calls the Release() method, the IoT Hub moves it into the dead letter state (removing it from the command queue) if the messages reaches the max delivery count; the same happens if the device doesn’t call neither Accept() nor Reject() methods and the TTL expires.

IoT Hub message lifecycle

IoT Hub message lifecycle

Executing the code and using Device Explorer to send the command we can see the feedback from the device too.

dev_exp_iot_hub_2

The full source code

The full source code I showed in the previous paragraphs is available on GitHub and it has projects for .Net Framework (so you can test very quickly it on your PC), generic .Net Micro Framework (for testing on your real device) and a third project for Netduino 3 WiFi as example of embedded device.

Of course, you can use any other board that support .Net Micro Framework and SSL/TLS protocol that is needed to connect to the IoT Hub. Other then Netduino 3 board, there are the FEZ Raptor and FEZ Spider from GHI Electronics (soon an example using them).

Conclusion

Knowing some AMQP concepts and some node paths you can find in the official IoT Hub Developer Guide, we are able to create an IoT Hub client using whatever AMQP stack implementation that in this example is AMQP .Net Lite; you can understand that it’s what the official IoT Hub client does creating an abstraction layer on top of it.

In this way we can add any other platform (not officially supported) like the .Net Micro Framework to the set of available devices for our Internet of Things solution based on IoT Hub.