Abhik's Blog

Azure Event Hub SDK Internals - Part 1 (Overview & Control Flow)

May 23, 2020

Azure Event Hub SDK Series

This post is part 1 of a series of posts on Azure Event Hub SDK for Dot NET.

  1. Azure Event Hub SDK Internals - Part 1 (Overview & Control Flow)
  2. Azure Event Hub SDK Internals - Part 2 (Partition Manager & Lease Management)
  3. Azure Event Hub SDK Internals - Part 3 (Pumping Data & AMQP Links)

Do you think there is more that I should cover or something I should fix ? Please raise an issue and let me know.


Azure Event Hubs is a big data streaming platform and event ingestion service. It can receive and process millions of events per second. Data sent to an event hub can be transformed and stored by using any real-time analytics provider or batching/storage adapters. You can read more about Event Hubs here

The series focusses on Microsoft.Azure.EventHubs SDK - v4.2. This maps to Azure SDK for net commit Id 00d8f23cffe22afb0e574909039556d8ca891be9

Event processor

We will focus on the Event Hub Processor since that is the part which deals with checkpointing , leasing , etc.

There are 2 broad steps to receiving an event and we will dive into the details of each of the areas.

RegisterEventProcessorAsync - Initialization

Here is a typical code that you would use for registering an event processor.

var eventProcessorHost = new EventProcessorHost(
                EventHubName,
                PartitionReceiver.DefaultConsumerGroupName,
                EventHubConnectionString,
                StorageConnectionString,
                StorageContainerName);

// Registers the Event Processor Host and starts receiving messages
await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>();

Here is what happens as a part of the registration.

  • An Azure Storage CloudBlobClient is created with max execution time of 2 minutes.
  • The PartitionManager is started which initializes the blob store.
  • As a part of the initialization, first, it checks if the container exists.
  • If not it will create the Azure blob container.
  • Then it gets the run time information through an AMQP request to the Event Hub.
  • The run time information has the partition count and partition ids.
  • The response of the AMQP request looks like this
  • It extracts the Partition Ids and then create the leases for each of the partitions.

    • The blobs get created under <containername>/$Default/<PartitonName>
    • The blob contents are

          {
              "Offset":null,
              "SequenceNumber":0,
              "PartitionId":"0",
              "Owner":"",
              "Token":"",
              "Epoch":0
          }
  • Verify the checkpoint store exists. In this library checkpoints store is same as the lease store. So checkpoint operations turn into lease operations under the covers. Read the comment in the code here
  • Similar to leases, it checks for the existence of checkpoints. Since it is the same as leases, it is marked as a no-op.
Main()EventProcessorHostAzure Blob StorageEventHubRegisterEventProcessorAsyncCheck If Container Exists for leasesAMQP: Gets Runtime InformationCreate Lease for Partition 0Create Lease for Partition 1loop[ For each Partition ]Check If Container Exists for CheckpointsReturnMain()EventProcessorHostAzure Blob StorageEventHub

Initialization is now complete. Kicks off the run Async Processor and returns without waiting for RunAsync to complete

RegisterEventProcessorAsync - Run Async

As mentioned above, RunAsync part of the process is done asynchronously after RegisterEventProcessorAsync returns.

  • The library checks the $Default blob directory and gets all the leases.
  • It tries to renew downloaded leases

    EventProcessorHostAzure Blob StorageListBlobsSegmentedAsyncEventProcessorHostAzure Blob Storage

    Then start the lease management process. As a part of lease management, the Partition Manager downloads the Lease Blob, rechecks, and then acquires the lease. Sets the owner to the current hostId uploads it back

EventProcessorHostAzure Blob StorageListBlobsSegmentedAsyncFetchAttributesAsyncDownloadTextAsyncFetchAttributesAsyncAcquireLeaseAsyncSetMetadataAsyncUploadTextAsyncloop[ For each Partition - Lease Management ]EventProcessorHostAzure Blob Storage

For each Lease that is owned by the host, Partition Manager creates a Pump. The pump is responsible for querying the event hub for messages and pushing it to the subscriber

  • Refresh the Blob again so that we get the latest and check the hostname and check its expiry.
  • If everything looks alright we create the pump
  • We open the Pump using Open Async
  • Then pump fetches the lease blob and checks for offset.
  • The pump keeps checking for messages and keeps pushing them to the Event Processor
EventProcessorHostAzure Blob StorageSimpleEventProcessorAMQPFetchAttributesAsyncDownloadTextAsyncFetchAttributesAsyncDownloadTextAsyncOpenAsyncFetchAttributesAsyncDownloadTextAsyncBeginReceiveMessagesEndReceiveMessagesOnReceiveAsyncloop[ polls ]opt[ Open Pump ]loop[ For each Lease owned by Host ]EventProcessorHostAzure Blob StorageSimpleEventProcessorAMQP

Overall there are 3 sections in Azure Event Hub

  1. Partition Manager - This takes care of Lease Management & Creation of Pumps
  2. Event Hub Partition Pump - This is responsible for pushing the events to the Event Processor
  3. AMQPPartitionReceiver - This handles the AMQP side of things when receiving a message.