Note: see this article for some new insights.
After the first two parts (1, 2) now it is time to look at the code to push data to the event hub and process the data from the event hub and push it to the Power BI API. Initially my idea is to use the Microsoft Band sensor to be pushed to Power BI, so I use for that purpose a HeartBeat class as records to be pushed.
Basically we need to code the following steps:
Before we can send data to an Azure event hub we first need the connection information. In the Event Hub the is an option to retrieve the connection information at the bottom Connection Information. This will popup a screen with the connection string for the different defined policies.
To access the event hub in C# we need to create a EventHubClient with the connectionstring and event hub name as parameters. After initializing the object we can fire a SendAsync method to send to event to the event hub. In this case we are creating a random heartbeat and serialize an initialized HeartBeat class to JSON as that is the in and output of the configured Stream Analytics. To initialize the event sending we can define a Task and call that task via the Wait() method.
static async Task SendingRandomMessages() {
var eventHubClient = EventHubClient.CreateFromConnectionString(eventHubConnectionString, eventHubName);
while(true) {
try {
List<HeartBeat>heartbeats = new List<HeartBeat>;
{
new HeartBeat {
EventDate = DateTime.Now,
HeartBeatValue = new Random().Next(80,150)
},
};
string message = heartbeats.ToJson(JavaScriptConverter<HeartBeat>
.GetSerializer());
await eventHubClient.SendAsync(
new EventData(Encoding.UTF8.GetBytes(message)));
Console.WriteLine("{0} > Sending message: {1}",
DateTime.Now.ToString(), message);
}
catch(Exception exception) {
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("{0} > Exception: {1}",
DateTime.Now.ToString(), exception.Message);
Console.ResetColor();
}
await Task.Delay(200);
}
}
First we need some basic configuration settings. Besides the event hub policies, retrieved via the same way as the sending event hub, we need the corresponding storage account its name and access key to create a connection sting.
With that information we can register a EventProcessorHost with the information from above and start the receiver with calling the RegisterEventProcessorAsync method with an IEventProcessor class as type.
host.RegisterEventProcessorAsync<PowerBIEventProcessing>().Wait();
To read the events from the event hub you need to define in the IEventProcessor class the IEventProcessor.ProcessEventsAsync method to process the received messages.
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (EventData eventData in messages)
{
//Process Recieved Message
}
if (this.checkpointStopWatch.Elapsed > TimeSpan.FromSeconds(10)) {
await context.CheckpointAsync();
this.checkpointStopWatch.Restart();
}
}
I started with downloading the Power BI API sample for .NET from here. This gives a basic idea of communicating with the API. If you do not have played with the API, please take a look at the sample and the way the API is working. Especially on how the authorization and authentication is working (https://msdn.microsoft.com/library/dn877544).
The biggest problem with the sample is that generic code and application specific code are mixed. For this sample that is not a problem, but to reuse the code for my application I had to rewritten a big part of the code. I converted the class with static methods to a generic class and methods without any specific application logic, like table structures. All that information has to be set during when the class is initialized. Also I changed the logic on how the AccessToken is used. In my code it is possible to retrieve the token after the class is initialized and be set when it is reinitialized. This last is important, because if we want to use it in combination with an Event Hub, multiple instances are created due to the partition size. And if we can eliminate the need to reauthorize the application over and over again, it would help.
After the change of the code to be usable in combination with an Event Hub, the reading of the code needs to be changed to be able to initialize the PowerBIHelper class with the correct AccessToken to eliminate an extra authorization. Problem with the previous way of initializing the event hub processing class is that it is instantiate under the hood when it is registered.
If we want to custom instantiate the helper class we need to use an IEventProcessorFactory which will instantiate the IEventProcessor classes to process the events. After initializing the factory with the preset parameters needed for the Power BI helper class, we can register the factory with a EventProcessorHost instance. When the factory is registered the event CreateEventProcessor is called multiple times regarding the amount of partitions which provides a custom initialized IEventProcessor class.
host = new EventProcessorHost(
eventProcessorHostName,
EventHubName,
EventHubConsumerGroup.DefaultGroupName,
EventHubConnectionString,
storageConnectionString);
PowerBIEventProcessingFactory.PowerBIEventProcessingFactory =
new PowerBIEventProcessingFactory(
ClientID,
DataSetName,
TableName,
JsonTableSchema,
Token,
DataSetID);
await host.RegisterEventProcessorFactoryAsync(
PowerBIEventProcessingFactory);
IEventProcessor IEventProcessorFactory.CreateEventProcessor(PartitionContextcontext) {
return new PowerBIEventProcessing(
clientID,
dataSetName,
tableName,
jsonTableSchema,
token,
dataSetID);
}
After registering the event processing part, the real processing of the events triggers an event that will write the collected json to Power BI. The json that is returned by the event hub, is the original HeartBeat class information (because Stream Analytics doesn’t alter any column structure) wrapped in an event hub class. We need to deserialize that json to an custom object to get the included HeartBeat classes.
public class EventMessage
{
public List<HeartBeat>rows {get;set;}
public DateTime EventProcessedUtcTime {get;set;}
public int PartitionId {get;set;}
public DateTime EventEnqueuedUtcTime {get;set;}
}
List<EventMessage>eventMessages =
JsonConvert.DeserializeObject<List<EventMessage>>(
Encoding.UTF8.GetString(eventData.GetBytes()));
In the snippet above the eventData is one of the messages that is returned by the event hub. After we have it deserialized to a known object we can read all the rows to read and store those in Power BI via the API.
See the attached code for more details: the code.
-JP