UWP

Azure IoT Hub : get telemetry data using AMQP stack and Azure SB Lite

To complete the last two article series (data from device to cloud, command/feedback from cloud to device) on using AMQP protocol stack to interact with Azure IoT Hub, we need to get telemetry data from the devices.

When devices send data through their D2C endpoints, these data flow into the IoT Hub system and are made available to the service through its D2C endpoint that is Event Hubs compatible which means we can use any Event Hubs client to get data from it.

eventhubcompatible

On the new Azure portal we can see a lot of information about that in the related “Messaging” tab for our IoT Hub. The main information are :

  • Partitions : the number of partitions through which data from devices are ingested by the IoT Hub;
  • Event Hub compatible name : it’s the name of the event hub;
  • Event Hub compatible endpoint : it’s the complete path (with namespace) of the event hub;
  • Retention time : it’s the time the messages are retained inside the event hub;
  • Consumer groups : the available consumer groups for reading messages from event hub using related receivers (there is always the $Default consumer group);

To read from this event hub endpoint we can use any shared access policy that has the ServiceConnect permission. The portal provides us a default policy named “service” with its related shared access key. Using three of the above information we are able to build the connection string needed to connect to this auto-generated event hub :

Endpoint={Event Hub-compatible endpoint};SharedAccessKeyName={iot hub policy name};SharedAccessKey={iot hub policy key}

Of course, other than the above connection string we have to use the Event Hub compatible name.

Now … how the get data from event hub ? What’s the code we have to write ?

The simpler way to do that is to use the Event Processor Host provided by Microsoft as Nuget package that instantiates receivers for us on all available partitions and handle their leases providing us a checkpoint feature. The “only” big problem we have with this awesome software component is that it works only on .Net Framework so we can use it on PC based and Web based service applications.

UWP apps ? : AMQP and Azure SB Lite the solution

What can we do for UWP apps ? What we can do if we want to monitor telemetry data from an UWP app on Windows 10 without bridging data from a Web application ?

As in the previous articles the solution is to use a good C# implementation of AMQP protocol stack as AMQP .Net Lite but in this case, to avoid AMQP stuff, we can use the Azure SB Lite library (available on Nuget too) that wraps the protocol stack and exposes same official Service Bus SDK APIs to access to Event Hubs (other than queues and topics/subscriptions).

If you know the above APIs to interact with Event Hubs, the following code will be familiar to you :

static string ConnectionString = "Endpoint=[EVENT_HUB_COMPATIBLE_ENDPOINT];SharedAccessKeyName=[IOT_HUB_POLICY_NAME];SharedAccessKey=[IOT_HUB_POLICY_KEY]";
static string eventHubEntity = "[EVENT_HUB_COMPATIBLE_NAME]";
static string partitionId = "[PARTIION_ID]";
static DateTime startingDateTimeUtc;

static void Main(string[] args)
{
   ServiceBusConnectionStringBuilder builder = new ServiceBusConnectionStringBuilder(ConnectionString);
   builder.TransportType = TransportType.Amqp;

   MessagingFactory factory = MessagingFactory.CreateFromConnectionString(ConnectionString);

   EventHubClient client = factory.CreateEventHubClient(eventHubEntity);
   EventHubConsumerGroup group = client.GetDefaultConsumerGroup();

   startingDateTimeUtc = new DateTime(2015, 10, 31, 16, 00, 00);
            
   EventHubReceiver receiver = group.CreateReceiver(partitionId, startingDateTimeUtc);
            
   while (true)
   {
        EventData data = receiver.Receive();
        Debug.WriteLine("{0} {1} {2}", data.PartitionKey, data.EnqueuedTimeUtc.ToLocalTime(), Encoding.UTF8.GetString(data.GetBytes()));
   }

   receiver.Close();
   client.Close();
   factory.Close();
}

The above example is using Azure SB Lite and connecting to my current IoT Hub, it produces the following output :

iot_hub_event_hub_sample

Without having the Event Processor Host available on UWP apps, the bid deal is to create one or more receivers on all partitions by ourselves and handling the checkpoint on what is the position inside the stream we have already read. In that case, it’s useful to save the last reading date/time and use it as offset for starting a new read on the stream.

Conclusion

If you scaried to develop a monitoring UWP application because the related IoT Hub SDK doesn’t work on it … don’t worry you always have a solution that is based on using the underlying AMQP protocol stack. In that case, the solution is simpler thanks to the work I already done for the community with the Azure SB Lite. Of course, the choice to use and UWP application on a Windows 10 device instead of a .Net / Web application for monitoring your telemetry data is up to you and depends on the scenario, so it can make sense or not.

Azure IoT Hub : commands and feedback using AMQP .Net Lite

In the previous article, I described how it’s possibile to send telemetry data from a device and receive commands (sending feedback) from the cloud (to the device itself) using AMQP .Net Lite library connecting to the Azure IoT Hub. This approach is useful for some platforms that aren’t supported by the official SDKs like the .Net Micro Framework due to the Task and async/await programming model used in the C# implementation.

Covered the device side, it’s now time to see how we can use the same AMQP .Net Lite library on the service side. You could ask … “Why ?!! We have the great Microsoft.Azure.Devices Nuget package that provides the ServiceClient class to handle the connection, sending commands and receive feedback to/from devices !!”

You are right ! It’s true but … only if your service runs in a Web or a .Net Framework based application. If you try to install the above package in an UWP application you receive the following error !

nuget_uwp_error

As we can see, the official package uses some libraries that aren’t compatible with UAP like Microsoft.AspNet.WebApi.Core and Microsoft.Azure.Amqp. Tha latter sounds strange to me! It seems to be another AMQP stack implementation from Microsoft that can only run on .Net Framework. What are the differences with AMQP .Net Lite library? I’m sorry but I don’t know … the above stack isn’t open source and we can’t deep into it. However, there is a big and great difference that should be considered as a strength for AMQP .Net Lite : it works on all .Net platforms (micro, compact and full), on WinRT/UWP and Mono !

For this reason and using AMQP .Net Lite library we have a solution on developing an IoT Hub service inside a UWP application, a scenario that isn’t officially supported by Microsoft. As I love to say we can do that “knowing some AMQP protocol concepts and a bunch of IoT Hub node paths” !

IoT Hub service endpoints

Inside the IoT Hub architecture, the service has two endpoints to communicate with devices :

  • C2D (cloud to device) : the back end system can use this endpoint to send messages (for example commands) to the devices. This endpoint acts like a queue and each message has a TTL (Time To Live) so that it’s removed from the queue if the timeout expires (it’s useful to have commands executed in a short period of time and not executed too late when an offline device comes back online but the execution isn’t needed at that time because it could be harmful). The back end system can receive a confirmation message or delivery fault to understand if device has received command or not;
  • D2C (device to cloud) : it’s an Event Hubs compatible endpoint used by the back end system to retrieve messages from device (telemetry data) and feedback on command delivery (successful or not). “Event Hubs compatible” means that we can use an Event Hub client to receive messages from this endpoint (for example using an Event Processor Host implementation);

At AMQP level the endpoints are accessible from different entity paths; if you know Service Bus queues, topics/subscriptions and event hubs we can think them in the same way.

The entity path for sending command to devices is defined in the following way :

/messages/devicebound

while the entity path for receiving feedback (on commands sent) from devices is the following :

/messages/servicebound/feedback

As for the previous article, it means that after creating a connection and a session to our IoT Hub host we need to create two links to above entities (or nodes as defined in the AMQP spec). Using the programming model provided by AMQP .Net Lite library we have :

  • A SenderLink to the /messages/devicebound node;
  • A ReceiverLink to the /messages/servicebound/feedback node;

Authentication : sending the SAS token

The authentication mechanism is the same as device side. In this scenario, we need to send two SAS token on the two different AMQP nodes for sending command and receiving feedback.

The SAS token audience and resource URI for sending command are the same and defined in the following way :

string audience = Fx.Format("{0}/messages/devicebound", HOST);
string resourceUri = Fx.Format("{0}/messages/devicebound", HOST);

string sasToken = GetSharedAccessSignature(SHARED_ACCESS_KEY_NAME, SHARED_ACCESS_KEY, resourceUri, new TimeSpan(1, 0, 0));
bool cbs = PutCbsToken(connection, HOST, sasToken, audience);

For receiving feedback, they are the following :

string audience = Fx.Format("{0}/messages/servicebound/feedback", HOST);
string resourceUri = Fx.Format("{0}/messages/servicebound/feedback", HOST);

string sasToken = GetSharedAccessSignature(SHARED_ACCESS_KEY_NAME, SHARED_ACCESS_KEY, resourceUri, new TimeSpan(1, 0, 0));
bool cbs = PutCbsToken(connection, HOST, sasToken, audience);

Sending command

Using the SenderLink instance the device sends data calling the simple Send() method and passing it a Message class instance contains the data to send.

The sender link is created inside a new AMQP Session (using the related class of AMQP .Net Lite library) and the great news is that, thanks to the multiplexing feature of AMQP protocol, we can use the same session for both sender and receiver links all inside the same TCP connection.

The corresponding class in the official SDK is the ServiceClient class that provides the SendAsync() method. Regarding the original Message class (included into official SDK, not AMQP .Net Lite), it exposes the Ack property with following possible values :

  • none (default) : the service doesn’t want any feedback on command received by the device;
  • positive : the service receives a feedback message if the message was completed;
  • negative : the service receives a feedback message if the message expired (or max delivery count was reached) without being completed by the device;
  • full : the service receives both positive and negative feedbacks;

For more information you can refer to the previous article with a clear explanation of the message life cycle.

Using the AMQP .Net Lite library we don’t have an Ack property on the Message class but we need to use the application properties collection at AMQP level. The Ack property (at high level) is translated in an application property named “iothub-ack” (at AMQP level) which can have the above possible values. If we don’t set this application property, it means the same as “none” value so no feedback.

static private void SendCommand()
{
    string audience = Fx.Format("{0}/messages/devicebound", HOST);
    string resourceUri = Fx.Format("{0}/messages/devicebound", HOST);

    string sasToken = GetSharedAccessSignature(SHARED_ACCESS_KEY_NAME, SHARED_ACCESS_KEY, resourceUri, new TimeSpan(1, 0, 0));
    bool cbs = PutCbsToken(connection, HOST, sasToken, audience);

    if (cbs)
    {
         string to = Fx.Format("/devices/{0}/messages/devicebound", DEVICE_ID);
         string entity = "/messages/devicebound";

         SenderLink senderLink = new SenderLink(session, "sender-link", entity);

         var messageValue = Encoding.UTF8.GetBytes("i am a command.");
         Message message = new Message()
         {
              BodySection = new Data() { Binary = messageValue }
         };
         message.Properties = new Properties();
         message.Properties.To = to;
         message.Properties.MessageId = Guid.NewGuid().ToString();
         message.ApplicationProperties = new ApplicationProperties();
         message.ApplicationProperties["iothub-ack"] = "full";

         senderLink.Send(message);
         senderLink.Close();
    }
}

As we can see, the sending path “/messages/devicebound” hasn’t any information about the target device. To do that, the service need to set the To AMQP system property to the following value :

/devices/<DEVICE_ID>/messages/devicebound

where <DEVICE_ID> is the id assigned to the device when we create it inside the identity registry.

Finally, it’s importat to notice that the C2D endpoint queue can hold at most 50 messages.

Receiving feedback

Using the ReceiverLink instance the service can receive feedback from the device calling the Receive() method.

static private void ReceiveFeedback()
{
     string audience = Fx.Format("{0}/messages/servicebound/feedback", HOST);
     string resourceUri = Fx.Format("{0}/messages/servicebound/feedback", HOST);

     string sasToken = GetSharedAccessSignature(SHARED_ACCESS_KEY_NAME, SHARED_ACCESS_KEY, resourceUri, new TimeSpan(1, 0, 0));
     bool cbs = PutCbsToken(connection, HOST, sasToken, audience);

     if (cbs)
     {
          string entity = "/messages/servicebound/feedback";

          ReceiverLink receiveLink = new ReceiverLink(session, "receive-link", entity);

          Message received = receiveLink.Receive();
          if (received != null)
          {
               receiveLink.Accept(received);
               System.Diagnostics.Trace.WriteLine(Encoding.UTF8.GetString(received.GetBody<byte[]>()));
          }

          receiveLink.Close();
     }
}

The received message has a body in JSON format with an array of records (feedback from more different devices) each with following properties :

  • OriginalMessageId : it’s the MessageId of the original command (message) sent from the service to the device;
  • Description : description result that is related to the possible outcomes (success, message expired, maximum delivery count exceeded, message rejected);
  • DeviceGenerationId : device generation id related to the device that sent the feedback for a specific command;
  • DeviceId : device id related to the device that sent the feedback for a specific command;
  • EnqueuedTimeUtc : timestamp related to the outcome (it means when the feedback was enqueued);

For a single feedback, the JSON should be as following :

[{"originalMessageId":"5aac3169-af00-4536-acdb-cb9ea6b3980e","description":"Success","deviceGenerationId":"635794823643795743","deviceId":"<device_id>","enqueuedTimeUtc":"2015-10-29T07:59:00.9772497Z"}]

The full source code

As for all examples related to my blog posts, I update sample from previous article on GitHub. Now you can find a simple console application and a UWP application that are able to send command to a device and receive related feedback.

Conclusion

It’s clear that for a service running in a Web Application or .Net Framework based application, the best solution is to use the official Nuget package. With this article, I covered the NON officially supported possibility to use an UWP application to control devices through the IoT Hub thanks to the AMQP .Net Lite library and a bunch of AMQP paths.

If you consider the great portability of this AMQP stack implementation, you could control your devices using a .Net Compact Framework app (on Windows Embedded Compact 2013) or … another device based on .Net Micro Framework !

🙂

Connecting to the Azure IoT Hub using an AMQP stack

Accessing to the Azure IoT Hub service is so simple using all the available SDKs that Microsoft provides as open source on GitHub. We can laverage on more different languages for different platforms : from C# for .Net and UWP (Universal Windows Platform) applications to Java, from NodeJS (using JavaScript) to C. In this way, we are able to coverage different hardware from higher level devices like BeagleBoard, Raspberry Pi 2, Minnowboard Max to very low constrained devices like Kinetis K64F (Freescale) and CC3200 (Texas Instruments).

Of course, using directly HTTP and AMQP protocols, we can access to the IoT Hub from other platforms (not certified yet) and it could be the scenario of the .Net Micro Framework because up to now there isn’t an official SDK for it. The current C# client implementation uses Task and async/await programming model very heavily that isn’t supported (yet) by the “little” child in the .Net frameworks family. One choice to connect to the IoT Hub from a .Net Micro Framework board is to use directly the underlying AMQP protocol without the abstraction layer provided by an SDK. In this case you need to know some protocol concepts and a bunch of paths to the IoT Hub entities used for communication between devices and Cloud.

The purpose of this post is to show how to do this using the AMQP .Net Lite library that provides an AMQP protocol stack written in C# for a lot of different platforms like .Net Framework, .Net Compact Framework, .Net Micro Framework, WinRT (so UWP applications too), Mono, Windows Phone and so on; of course it’s available as open source on GitHub.

IoT Hub : connection and device endpoints

The IoT Hub is reachable using an address that has the following format

<IOT_HUB_NAME>.azure-devices.net

that we can retrieve from the Azure portal after creating the new IoT Hub instance service. As for all the services inside the Service Bus family (queues, topics/subscriptions and event hubs) the IoT Hub needs an SSL/TLS connection for data encryption and server authentication; it means that we have to connect to the host address to the default AMQPS (AMQP Secure) port that is the 5671.

We can create a new device inside the registry and get related credentials information using the Device Explorer application you can download here. After getting all needed information we can set them into the code.

private const string HOST = "IOT_HUB_NAME.azure-devices.net";
private const int PORT = 5671;
private const string DEVICE_ID = "DEVICE_ID";
private const string DEVICE_KEY = "DEVICE_KEY";

Using above information we can create an instance of the Address class and using it to establish the connection with the host thanks to the Connection class.

address = new Address(HOST, PORT, null, null);
connection = new Connection(address);

Inside the IoT Hub architecture, each device has two endpoints for accessing the Cloud :

  • D2C (device to cloud) : the device uses this endpoint to send messages to the cloud both as telemetry data and feedback for a received command (on the other endpoint, see below). It means that when we send a command to the device, it replies with a feedback at application level to confirm that the command is acquired and it’s going to be executed. Of course, it’s always true for a rejected command by the device;
  • C2D (cloud to device) : the device receives commands on this endpoint for executing the requested action. As already said, the device sends a confirmation (or rejection) of received command to the cloud using the other endpoint (D2C);

At AMQP level the endpoints are accessible from different entity paths; if you know Service Bus queues, topics/subscriptions and event hubs we can think them in the same way.

The entity path for sending data for telemetry purpose is defined in the following way :

/devices/<DEVICE_ID>/messages/events

where <DEVICE_ID> is the id assigned to the device when we create it inside the identity registry.

The entity path for receiving command from the Cloud is defined in the following way :

/devices/<DEVICE_ID>/messages/deviceBound

and as for the previous entity you need to provide the <DEVICE_ID> in the path.

It means that after creating a connection and a session to our IoT Hub host we need to create two links to above entities (or nodes as defined in the AMQP spec). Using the programming model provided by AMQP .Net Lite library we have :

  • A SenderLink to the /devices/<DEVICE_ID>/messages/events node;
  • A ReceiverLink to the /devices/<DEVICE_ID>/messages/deviceBound node;

Authentication : sending the SAS token

IoT Hub offers a per-device authentication through a SAS token that we can generate starting from device id and device key. After connection establishment we need to send such token to a specific CBS (Claim Based Security) endpoint to authorize the access to the specific entity.

As usual for Azure services, the token has the following format :

SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}

The big difference is that the skn field is absent in our case using device credentials .To get the SAS token I used the same code from my Azure SB Lite library because it’s processed almost in the same way.

string audience = Fx.Format("{0}/devices/{1}", HOST, DEVICE_ID);
string resourceUri = Fx.Format("{0}/devices/{1}", HOST, DEVICE_ID);
            
string sasToken = GetSharedAccessSignature(null, DEVICE_KEY, resourceUri, new TimeSpan(1, 0, 0));
bool cbs = PutCbsToken(connection, HOST, sasToken, audience);

The PutCbsToken creates a new session and a new link to connect to the specific $cbs node always using the same TCP connection. The content of the message is well defined by the AMQP CBS draft spec. After sending the token we are authorized to access IoT Hub from the device.

Just a note : I’m using the Fx class provided by AMQP .Net Lite library to have the Format method that doesn’t exist in the String class for the .Net Micro Framework.

Sending telemetry data

Using the SenderLink instance the device sends data calling the simple Send() method and passing it a Message class instance contains the data to send.

The sender link is created inside a new AMQP Session (using the related class of AMQP .Net Lite library) and the great news is that, thanks to the multiplexing feature of AMQP protocol, we can use the same session for both sender and receiver links all inside the same TCP connection.

static private void SendEvent()
{
    string entity = Fx.Format("/devices/{0}/messages/events", DEVICE_ID);

    SenderLink senderLink = new SenderLink(session, "sender-link", entity);

    var messageValue = Encoding.UTF8.GetBytes("i am a message.");
    Message message = new Message()
    {
        BodySection = new Data() { Binary = messageValue }
    };

    senderLink.Send(message);
    senderLink.Close();
}

Running the code, we can interact with the device using the Device Explorer application to receive the messages it sends.

dev_exp_iot_hub_1

Receiving command and send feedback

Using the ReceiverLink instance the device can receive command from the service in the Cloud calling the Receive() method. In addition to the sending commands features, the IoT Hub provides a feedback feature at application level for them; it means that the device is able to send a confirmation of received command to the service to accept or reject it. If the device is offline and doesn’t receive the command, the IoT Hub provides a TTL (Time To Live) you can set on every single message so that the command isn’t delivered to the device when it comes back online if the timeout is expired; this feature avoids to deliver a command that makes sense only if it’s executed on the device in a short time.

The device doesn’t need to send the feedback as a message on a specific AMQP node/entity but it’s handled by the IoT Hub when the ReceiverLink accepts or rejects the command. Using AMQP .Net Lite we can call the Accept() or Reject() methods on the ReceiverLink instance; at AMQP level it means that a “disposition” performative is sent to the IoT Hub with an outcome of “accepted” or “rejected”. Receiving this outcome the IoT Hub sends a feedback message to the D2C endpoint on the Cloud service side. With such outcomes the message goes into a completed state (positive feedback to the Cloud) or dead letter state (negative feedback).

static private void ReceiveCommands()
{
    string entity = Fx.Format("/devices/{0}/messages/deviceBound", DEVICE_ID);

    ReceiverLink receiveLink = new ReceiverLink(session, "receive-link", entity);

    Message received = receiveLink.Receive();
    if (received != null)
         receiveLink.Accept(received);

    receiveLink.Close();
}

Pay attention on the available Release() method in the library; in this case the outcome is “released” and the message returns into the command queue (enqueued state) ready to be re-delivered to the device if it calls the Receive() method again. If the device receives the messages more times and always calls the Release() method, the IoT Hub moves it into the dead letter state (removing it from the command queue) if the messages reaches the max delivery count; the same happens if the device doesn’t call neither Accept() nor Reject() methods and the TTL expires.

IoT Hub message lifecycle

IoT Hub message lifecycle

Executing the code and using Device Explorer to send the command we can see the feedback from the device too.

dev_exp_iot_hub_2

The full source code

The full source code I showed in the previous paragraphs is available on GitHub and it has projects for .Net Framework (so you can test very quickly it on your PC), generic .Net Micro Framework (for testing on your real device) and a third project for Netduino 3 WiFi as example of embedded device.

Of course, you can use any other board that support .Net Micro Framework and SSL/TLS protocol that is needed to connect to the IoT Hub. Other then Netduino 3 board, there are the FEZ Raptor and FEZ Spider from GHI Electronics (soon an example using them).

Conclusion

Knowing some AMQP concepts and some node paths you can find in the official IoT Hub Developer Guide, we are able to create an IoT Hub client using whatever AMQP stack implementation that in this example is AMQP .Net Lite; you can understand that it’s what the official IoT Hub client does creating an abstraction layer on top of it.

In this way we can add any other platform (not officially supported) like the .Net Micro Framework to the set of available devices for our Internet of Things solution based on IoT Hub.