Azure Event Hub SDK Internals - Part 1 (Overview & Control Flow)
May 23, 2020
Azure Event Hub SDK Series
This post is part 1 of a series of posts on Azure Event Hub SDK for Dot NET.
- Azure Event Hub SDK Internals - Part 1 (Overview & Control Flow)
- Azure Event Hub SDK Internals - Part 2 (Partition Manager & Lease Management)
- Azure Event Hub SDK Internals - Part 3 (Pumping Data & AMQP Links)
Do you think there is more that I should cover or something I should fix ? Please raise an issue and let me know.
Azure Event Hubs is a big data streaming platform and event ingestion service. It can receive and process millions of events per second. Data sent to an event hub can be transformed and stored by using any real-time analytics provider or batching/storage adapters. You can read more about Event Hubs here
The series focusses on Microsoft.Azure.EventHubs SDK - v4.2. This maps to Azure SDK for net commit Id 00d8f23cffe22afb0e574909039556d8ca891be9
Event processor
We will focus on the Event Hub Processor since that is the part which deals with checkpointing , leasing , etc.
There are 2 broad steps to receiving an event and we will dive into the details of each of the areas.
RegisterEventProcessorAsync - Initialization
Here is a typical code that you would use for registering an event processor.
var eventProcessorHost = new EventProcessorHost(
EventHubName,
PartitionReceiver.DefaultConsumerGroupName,
EventHubConnectionString,
StorageConnectionString,
StorageContainerName);
// Registers the Event Processor Host and starts receiving messages
await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>();
Here is what happens as a part of the registration.
- An Azure Storage CloudBlobClient is created with max execution time of 2 minutes.
- The PartitionManager is started which initializes the blob store.
- As a part of the initialization, first, it checks if the container exists.
- If not it will create the Azure blob container.
- Then it gets the run time information through an AMQP request to the Event Hub.
- The run time information has the partition count and partition ids.
- The response of the AMQP request looks like
-
It extracts the Partition Ids and then create the leases for each of the partitions.
- The blobs get created under
<containername>/$Default/<PartitonName>
-
The blob contents are
{ "Offset":null, "SequenceNumber":0, "PartitionId":"0", "Owner":"", "Token":"", "Epoch":0 }
- The blobs get created under
- Verify the checkpoint store exists. In this library checkpoints store is same as the lease store. So checkpoint operations turn into lease operations under the covers. Read the comment in the code here
- Similar to leases, it checks for the existence of checkpoints. Since it is the same as leases, it is marked as a no-op.
Initialization is now complete. Kicks off the run Async Processor and returns without waiting for RunAsync to complete
RegisterEventProcessorAsync - Run Async
As mentioned above, RunAsync part of the process is done asynchronously after RegisterEventProcessorAsync returns.
- The library checks the
$Default
blob directory and gets all the leases. -
It tries to renew downloaded leases
Then start the lease management process. As a part of lease management, the Partition Manager downloads the Lease Blob, rechecks, and then acquires the lease. Sets the owner to the current hostId uploads it back
For each Lease that is owned by the host, Partition Manager creates a Pump. The pump is responsible for querying the event hub for messages and pushing it to the subscriber
- Refresh the Blob again so that we get the latest and check the hostname and check its expiry.
- If everything looks alright we create the pump
- We open the Pump using Open Async
- Then pump fetches the lease blob and checks for offset.
- The pump keeps checking for messages and keeps pushing them to the Event Processor
Overall there are 3 sections in Azure Event Hub
- Partition Manager - This takes care of Lease Management & Creation of Pumps
- Event Hub Partition Pump - This is responsible for pushing the events to the Event Processor
- AMQPPartitionReceiver - This handles the AMQP side of things when receiving a message.