MS Certificering: AZ-204, overnieuw beginnen, deel 09

Ingediend door Dirk Hornstra op 21-dec-2021 15:40

Na het 1e blok: Azure App Service web apps (link), het 2e blok: Implement Azure Functions (link), het 3e blok: Develop solutions that use Blob Storage (link) en het 4e blok: Develop solutions that use Azure Cosmos DB (link), het 5e blok: Implement infrastructure as a service solutions: (link), het 6e blok Implement user authentication and authorization (link), het 7e blok Implement secure cloud solutions (link), het 8e blok: Implement API Management (link) is het nu tijd voor het 9e learning-block: Develop event-based solutions, link.

Module 1: Explore Azure Event Grid, link.

Azure Event grid is bijna overal beschikbaar in Azure services en kan ook geïntegreerd worden met 3rd party diensten. Het maakt het gebruik makkelijker en verlaagt kosten doordat je niet constant hoeft te pollen.

Event grid werkt met publishers (melden dat er iets gebeurd is) en subscribers (deze willen weten of er iets gebeurd is).

  • Events: wat is er gebeurd
  • Event sources: waar gebeurde het
  • Topics: eindpunt waar publishers hun events heen sturen
  • Event subscriptions: endpoint of ingebouwde mechanisme om events te routeren, soms naar meer dan 1 afhandelaar. Subscriptions worden ook gebruikt om te filteren op binnenkomende events.
  • Event handlers: de app of service die reageert op een event.


Een event bevat informatie over de bron, tijd van event en een unieke ID. Ook specifieke info welke betrekking heeft op het event, dus een nieuwe upload van een bestand, dan ook de lastTimeModified beschikbaar. Een event kan 64 KB groot zijn. Een event met een grootte van 1 MB is inmiddels in preview. Events die groter dan 64 KB zijn worden in 64 KB increments in rekening gebracht.

Je heb System topics, dat zijn standaard Azure zaken. Zijn niet zichtbaar voor jou, maar je kunt je er wel op abonneren. Custom topics zijn topics van applicaties en 3rd party zaken.

De handler, dat kan een eigen webhook zijn.

Events worden soms in een array naar de Event Grid gestuurd, deze array kan tot 1 MB groot zijn. Het JSON schema kun  je hier vinden: link.

Dit zijn de eigenschappen die elke event publisher gebruikt:


[
  {
    "topic": string,
    "subject": string,
    "id": string,
    "eventType": string,
    "eventTime": string,
    "data":{
      object-unique-to-each-publisher
    },
    "dataVersion": string,
    "metadataVersion": string
  }
]

Een ander schema is CloudEvents v1.0
Dit is een open specificatie voor het beschrijven van event data.
Voorbeeld van een event in Blob Storage:


{
    "specversion": "1.0",
    "type": "Microsoft.Storage.BlobCreated",  
    "source": "/subscriptions/{subscription-id}/resourceGroups/{resource-group}/providers/Microsoft.Storage/storageAccounts/{storage-account}",
    "id": "9aeb0fdf-c01e-0131-0922-9eb54906e209",
    "time": "2019-11-18T15:13:39.4589254Z",
    "subject": "blobServices/default/containers/{storage-container}/blobs/{new-file}",
    "dataschema": "#",
    "data": {
        "api": "PutBlockList",
        "clientRequestId": "4c5dd7fb-2c48-4a27-bb30-5361b5de920a",
        "requestId": "9aeb0fdf-c01e-0131-0922-9eb549000000",
        "eTag": "0x8D76C39E4407333",
        "contentType": "image/png",
        "contentLength": 30699,
        "blobType": "BlockBlob",
        "url": "https://gridtesting.blob.core.windows.net/testcontainer/{new-file}",
        "sequencer": "000000000000000000000000000099240000000000c41c18",
        "storageDiagnostics": {
            "batchId": "681fe319-3006-00a8-0022-9e7cde000000"
        }
    }
}

Meer informatie over CloudEvents: link.

In de header kun je het verschil zien, bij Event Grid is content-type="application/json; charset=utf-8", bij CloudEvents is dit content-type="application/cloudevents+json;charset=utf-8"

Event Grid probeert het bericht altijd bij de abonnee af te leveren. Let wel, volgorde is niet gegarandeerd!

Als een endpoint niet bereikbaar is, wordt een bericht in de dead-letters bak opgeslagen. Als je dat niet ingesteld hebt, dan wordt het bericht verwijderd;

Azure Resources: 400 Bad Request, 413 Request Entity Too Large, 403 Forbidden
Webhook: 400 Bad Request, 413 Request Entity Too Large, 403 Forbidden, 404 Not Found, 401 Unauthorized

Als de error niet in deze lijst staat, dan wacht Event Grid 30 seconden op je reactie. Nog geen reactie, dan in de queue voor een retry-poging.
Er wordt een exponentiele policy voor gebruikt. Als het eindpunt binnen 3 minuten reageert zal Event Grid proberen het even van de retry-queue te verwijderen. Dat lukt niet altijd, dus soms kan een duplicaat afgeleverd worden.

Je kunt de Retry Policy naar eigen wens aanpassen. Het aantal pogingen, tussen 1 en 30, standaard is 30. Time-to-live van een event, tussen de 1 en 1440 minuten, standaard is 1440 minuten, dus 1 dag.

Voorbeeld om het maximum op 18 pogingen in te stellen:


az eventgrid event-subscription create \
  -g gridResourceGroup \
  --topic-name <topic_name> \
  --name <event_subscription_name> \
  --endpoint <endpoint_URL> \
  --max-delivery-attempts 18

Je kunt output batching instellen, om HTTP performance bij hoge belasting te optimaliseren. Standaard staat batching uit.
Batching heeft 2 instellingen, maximum aantal events per batch, moet tussen 1 en 5.000 zijn (het aantal zal nooit overschreden worden, maar kan wel minder bevatten als de tijd om is en er nog niet genoeg events zijn) en de voorkeur van de batch grootte in KB. Dit is de hoogste waarde voor de batch size. Het is mogelijk dat een batch hoger is dan deze waarde als er 1 event is dat hoger is dan de max (bijvoorbeeld je max is 4 KB, maar 1 event is 10 KB, dan wordt die 10 KB in 1 losse batch verzonden).

Als een eindpunt problemen heeft zorgt Event Grid dat er "vertraagde aflevering" actief wordt, zodat de boel niet vastloopt.

De dead-letter bak kan gevuld worden doordat een event niet afgeleverd kan worden binnen zijn time-to-live of het aantal pogingen om een event af te leveren heeft het maximum aantal pogingen bereikt. Bij een 400 of 413 status wordt een event meteen een dead-letter want die kan nooit afgeleverd worden.

Je kunt in het afleveren van events maximaal 10 eigen headers toevoegen, per header maximaal 4.096 bytes. Voor als je het stuurt naar webhooks, Azure Service Bus topics en queues, Azure Event Hubs en Relay Hybrid Connections.

Het beheren van de zaken zoals een overzicht van event subscriptions, aanmaken en keys genereren, dat wordt ingericht met rollen (RBAC). EventGrid ondersteunt niet het publiceren van events naar topics of domeinen met rollen, daarvoor moet je een Shared Access Signature (SAS) gebruiken.

Je hebt een aantal rollen:

  • Event Grid Subscription Reader, lees subscriptions.
  • Event Grid Subscription Contributor, beheer subscription operations.
  • Event Grid Contribtor, maak en beheer Event Grid resources.
  • Event Grid Data Sender, stuur events naar Event Grid topics.


Als je een event handler hebt die geen webhook is, maar bijvoorbeeld een event hub of queue storage, dan moet je code schrijven om toegang te krijgen tot die resource. Hiervoor heb je Mirosoft.EventGrid/EventSubscriptions/Write rechten op de resource nodig welke de event source is. Je schrijft namelijk "vanaf die resource".

Nog even de scheiding per topic:

Topic Type Description
System topics Need permission to write a new event subscription at the scope of the resource publishing the event. The format of the resource is: /subscriptions/{subscription-id}/resourceGroups/{resource-group-name}/providers/{resource-provider}/{resource-type}/{resource-name}
Custom topics Need permission to write a new event subscription at the scope of the event grid topic. The format of the resource is: /subscriptions/{subscription-id}/resourceGroups/{resource-group-name}/providers/Microsoft.EventGrid/topics/{topic-name}


Je kunt events naar webhook sturen. Maar om te voorkomen dat ik een site van een kennis ga vol spammen, wil Event Grid bewijs dat je eigenaar bent van het webhook eindpunt. Als je 1 van deze drie diensten gebruikt handelt Azure dit zelf af:

  • Azure Logic Apps met Event Grid Connector
  • Azure Automation via webhook
  • Azure Functions met Event Grid Trigger

Als je een ander HTTP eindpunt hebt is er een "handshake" die afgehandeld moet worden.

  • Een sychrone handshake: op het moment dat het abonnement aangemaakt wordt, stuur event grid een validatie event naar je eindpunt. Zelfde schema als een normaal event. Hierin zit een validationCode. Je applicatie valideert deze en stuurt de code in de response synchroon terug.
  • Of de asynchrone handshake: soms kan dit niet, bijvoorbeeld omdat je een dienst van iemand anders gebruikt (zoals Zapier of IFTTT).


Vanaf versie 2018-05-01-preview wordt een handmatige handshake ondersteund. Je krijgt dan een validationURL eigenschap terug. Die moet je aanroepen met een GET. Die URL is geldig voor 5 minuten. Tijdens die periode is de status AwaitingManualAction. Als het niet binnen die tijd afgerond is wordt de status Failed.

Ook moet de HTTP status code 200 zijn, zodat het request ziet dat er geen validation response in zit, dat de 5 minuten starten.
Je kunt niet met een self signed certificaat valideren, dit moet met een geldig CA certificaat.

Als je een filter op een event uitvoert kun je filteren op event type, waar het subject (onderwerp) mee begint of eindigt, op geavanceerde velden en operatoren.


// voorbeeld event type filtering
 

"filter": {
  "includedEventTypes": [
    "Microsoft.Resources.ResourceWriteFailure",
    "Microsoft.Resources.ResourceWriteSuccess"
  ]
}

// voorbeeld subject filtering

"filter": {
  "subjectBeginsWith": "/blobServices/default/containers/mycontainer/log",
  "subjectEndsWith": ".jpg"
}

// voorbeelden geavanceerde filtering

"filter": {
  "advancedFilters": [
    {
      "operatorType": "NumberGreaterThanOrEquals",
      "key": "Data.Key1",
      "value": 5
    },
    {
      "operatorType": "StringContains",
      "key": "Subject",
      "values": ["container1", "container2"]
    }
  ]
}

Hierna volgt een oefening, waarbij je een custom event naar een web eindpunt routeert met behulp van de Azure CLI.


// maak de variabelen aan

let rNum=$RANDOM*$RANDOM
myLocation=<myLocation>
myTopicName="az204-egtopic-${rNum}"
mySiteName="az204-egsite-${rNum}"
mySiteURL="https://${mySiteName}.azurewebsites.net"

// maak de resource group aan

az group create --name az204-evgrid-rg --location $myLocation

// als je nog geen event grid hebt, maak er 1 aan:

az provider register --namespace Microsoft.EventGrid

// controleer de status

az provider show --namespace Microsoft.EventGrid --query "registrationState"

// maak een topic aan

az eventgrid topic create --name $myTopicName \
    --location $myLocation \
    --resource-group az204-evgrid-rg

// maak een berichten eindpunt

az deployment group create \
    --resource-group az204-evgrid-rg \
    --template-uri "https://raw.githubusercontent.com/Azure-Samples/azure-event-grid-viewer/main/azuredeploy.json" \
    --parameters siteName=$mySiteName hostingPlanName=viewerhost

echo "Your web app URL: ${mySiteURL}"
// abonneer je op een custom topic
endpoint="${mySiteURL}/api/updates"
subId=$(az account show --subscription "" | jq -r '.id')

az eventgrid event-subscription create \
    --source-resource-id "/subscriptions/$subId/resourceGroups/az204-evgrid-rg/providers/Microsoft.EventGrid/topics/$myTopicName" \
    --name az204ViewerSub \
    --endpoint $endpoint

// haal de URL op (om straks data heen te sturen)

topicEndpoint=$(az eventgrid topic show --name $myTopicName -g az204-evgrid-rg --query "endpoint" --output tsv)
key=$(az eventgrid topic key list --name $myTopicName -g az204-evgrid-rg --query "key1" --output tsv)

// bouw de data van het event op

event='[ {"id": "'"$RANDOM"'", "eventType": "recordInserted", "subject": "myapp/vehicles/motorcycles", "eventTime": "'`date +%Y-%m-%dT%H:%M:%S%z`'", "data":{ "make": "Contoso", "model": "Monster"},"dataVersion": "1.0"} ]'

// stuur de data

curl -X POST -H "aeg-sas-key: $key" -d "$event" $topicEndpoint

// als je in de web-app kijkt zou je dit moeten zien:

{
"id": "29078",
"eventType": "recordInserted",
"subject": "myapp/vehicles/motorcycles",
"eventTime": "2019-12-02T22:23:03+00:00",
"data": {
    "make": "Contoso",
    "model": "Northwind"
},
"dataVersion": "1.0",
"metadataVersion": "1",
"topic": "/subscriptions/{subscription-id}/resourceGroups/az204-evgrid-rg/providers/Microsoft.EventGrid/topics/az204-egtopic-589377852"
}

// ruim je spullen op

az group delete --name az204-evgrid-rg --no-wait


Module 2: Explore Azure Event Hubs, link.
 

We gaan door naar Event Hubs, een "big data" streaming platform. Kan miljoenen events per seconde ontvangen en verwerken.

Event Hubs is een PaaS (platform-as-a-service) waarbij je weinig hoeft te configureren, zodat je je kunt richten op de oplossing(en). Zo is de Kafka variant de implementatie waarbij je niet druk hoeft te maken om beheer, configuratie of uitvoeren van de clusters.
Het is real-time en batch processing wordt ondersteund. Het is schaalbaar, Auto-Inflate wordt ondersteund, schaalt het aantal doorvoer-units om aan jouw eisen te voldoen. Een rijk ecosysteem, zoals al genoemd wordt Apache Kafka (1.0 en later) ook ondersteund.

  • Een Event Hub client is de interface voor ontwikkelaars voor interactie met de Event Hubs client library. Er zijn meerdere clients, ieder gericht op een specifiek onderdeel van Event Hub, zoals het publiceren of consumeren van events.
  • Een Event Hub producer is een client die dient als bron van telemetrie data, diagnostische informatie, gebruikerslogs of andere log-data, deel van een embedded device, een mobiel device, een game die op een console uitgevoerd wordt of ander device, client of server gebaseerde business oplossing of web site.
  • Een Event Hub consumer is een client die informatie leest uit de Event Hub en dit verwerkt. Verwerking kan betekenen dat er geaggregeerd (samenvoegen) wordt, complexe berekeningen, filteren. Ook kan het de distributie of opslag van rauwe of aangepast formaat inhouden. Event Hub consumers zijn meestal robuuste en high-scale platform infrastructuur onderdelen met ingebouwde analytics mogelijkheden, zoals Azure Stream Analytics, Apache Spark of Apache Storm.
  • Een partitie is een aantal events dat in volgorde opgeslagen is. Op het moment dat je de Event Hub maakt en het aantal partities instelt dan zit je daar aan vast: dat is niet meer aan te passen.
  • Een consumer group is een view van een complete Event Hub. Deze zorgen dat meerdere consumerende applicaties elk een eigen view van de event stream hebben en deze stroom in hun eigen tempo en vanaf hun eigen positie kunnen bekijken. Er kunnen maximaal 5 concurrent readers op 1 partitie per consumer group zitten, het advies is dat er maar 1 actieve consumer op een partitie en van een groep zit. Elke actieve reader ontvangt alle events van de partitie, als er meerdere lezers zijn zullen ze duplicate events ontvangen.
  • Event receivers, elke entiteit die data leest van een event hub. Alle consumers verbinden via een AMQP 1.0 sessie. Alle Kafka consumers verbinden via het Kafka protocol 1.0 en later.
  • Throughput units of processing units: voor aangeschafte units van capaciteit die zorgen dat de doorvoer van Event Hubs goed is.

Event Hubs zorgen ervoor dat je data automatisch opgeslagen wordt in een Data Lake Storage account of in een Blob storage.
Data die wordt "opgevangen" wordt weggeschreven in Apache Avro formaat, een compact, snel, binair formaat die een rijke datastructuur en inline schema heeft. Dit formaat wordt wereldwijd in het Hadoop ecosysteem gebruikt, Stream Analytics en Azure Data Factory.

Event Hubs Capture stelt je in staat om een "window" op te zetten voor het opslaan van de data. Dat is op basis van minimum grootte en tijd-configuratie, waarbij "de eerste wint".


Conventie voor opslag is:

{Namespace}/{EventHub}/{PartitionId}/{Year}/{Month}/{Day}/{Hour}/{Minute}/{Second}

// voorbeeld:

https://mystorageaccount.blob.core.windows.net/mycontainer/mynamespace/myeventhub/0/2017/12/08/03/03/17.avro
 

Een enkele "throughput unit" staat 1 MB per seconde toe of 1.000 events per seconde. Standaard Event Hubs kunnen ingesteld worden met 1 tot 20 throughput units en je kunt meer kopen om je quota te verhogen. Event Hubs Capture kopieert rechtstreeks vanaf de interne Event Hubs storage, waarbij dus geen rekening gehouden hoeft te worden met deze quota's waardoor je die capaciteit beschikbaar houdt voor bijvoorbeeld Stream Analytics of Spark.

1x ingesteld en het eerste event gestuurd, dan blijft Event Hubs Capture draaien. Event Hubs schrijft lege bestanden als er geen data is (zodat je weet dat het proces nog wel loopt).

Om je event processing applicatie te schalen kun je meerdere instanties van de applicatie opstarten en die zelf de load laten verdelen. In de oude versie was dat EventProcessorHost die dat toestond. In de nieuwe versie (5.0 en volgende) EventProcessorClient (.NET en Java) en EventHubConsumerClient (Python en JavaScript) doen hetzelfde.

We krijgen een voorbeeld scenario. Er is een beveiligingsbedrijf welke 100.000 huizen in de gaten houdt. Elke minuut komt er data van verschillende sensoren binnen. Elke sensor pusht de data naar een event hub. In de event hub heb je 15 partities. We willen aan het "consumerende deel" een mechanisme om die zaken uit te lezen, consolideren en een aggregatie naar een blob te sturen, die dan weer in een nette interface in een website getoond wordt.

Specs:

  • Schaal: er moeten meerdere consumers zijn, elke consumer neemt een deel van de partities onder de loep
  • Load balance: verhoog of verlaag het aantal consumers dynamisch. Er komt een nieuwe sensor, meer data, de partities moeten dan opnieuw uitgebalanceerd worden.
  • Seamless resume on failures: als klant A zijn virtual machine crasht, dan kan een andere consumer het overnemen. Dit punt, checkpoint/offset moet hetzelfde punt zijn waar klant A crashte.
  • Consume events: er moet code zijn die de events oppakt en er wat nuttigs mee doet, bijvoorbeeld aggregatie van data en uploaden naar blob storage.


De EventProcessorClient (.NET/Java) en EventHubConsumerClient (Python/JavaScript) bieden dit al.

Elke event processor krijg een unieke ID en claimt het eigenaarschap van een partitie door een entry in een checkpoint store toe te voegen of bij te werken.

Azure Event Hubs ondersteunen Active Directory en shared access signature (SAS) om authenticatie en authorizatie te regelen. Azure biedt de volgende rollen aan voor Event Hubs:
Owner: alle toegang tot Event Hubs resources, link
Sender: toegang om te "zenden naar" Event Hubs resources, link
Receiver: toegang om te "kunnen ontvangen van" Event Hubs resources, link

Ook hier kun je weer managed identities gebruiken.

Het voordeel van AD en Event Hubs is dat je credentials niet in code opgeslagen hoeven worden. Je kunt een OAuth 2.0 access token opvragen en gebruiken. AD authenticates de security principal (user, group, service principal).

Microsoft Identity Platform
Een event publisher definieert een virtueel eindpunt voor een Event Hub. De publisher kan alleen berichten sturen, niet ontvangen. Je hebt 1 publisher per client.

Shared Access Signature - autoriseer toegang tot publishers
Elke Event Hubs client heeft een uniek token welke geupload is naar de client. Een client die dat token heeft kan naar 1 publisher zenden, niet naar anderen. Als meerdere clients hetzelfde token hebben dan delen zij dezelfde publisher.

Alle tokens zijn toegewezen met shared access signature keys. Normaal worden alle tokens met dezelfde key getekend. Clients hebben geen weet van deze key, waardoor zij geen tokens kunnen aanmaken. Clients gebruiken hetzelfde token tot ze verlopen.

Shared Access Signature - autoriseer toegang tot consumers
Om back-end applciaties te authenticeren die data opvragen die gegenereerd is door producers, hiervoor moet de client of manage rechten of listen rechten hebben toegewezen aan de Event Hubs namespace of Event Hub instantie of topic. Data wordt geconsumeerd met consumer groups. SAS geeft je een algemene scope, maar alleen op het entity level en not op het consumer level. Dat betekent dat privileges ingesteld op het namespace level, event hub instance of topic level toegepast worden op de consumer groups van die entiteit.

Er is een client library en we krijgen een aantal voorbeelden wat je daarmee kunt:


// event hub inspecteren
 

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    string[] partitionIds = await producer.GetPartitionIdsAsync();
}

// event naar een event hub publiceren

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    using EventDataBatch eventBatch = await producer.CreateBatchAsync();
    eventBatch.TryAdd(new EventData(new BinaryData("First")));
    eventBatch.TryAdd(new EventData(new BinaryData("Second")));

    await producer.SendAsync(eventBatch);
}

// events lezen van een event hub
// de site geeft aan dat onderstaande alleen voor test geschikt is. voor productie:
// https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/eventhub/Azure.Messaging.EventHubs.Processor

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the Event Hub.  When an event
        // is available, the loop will iterate with the event that was received.  Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

// events van een partitie lezen

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    EventPosition startingPosition = EventPosition.Earliest;
    string partitionId = (await consumer.GetPartitionIdsAsync()).First();

    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the partition.  When an event
        // is available, the loop will iterate with the event that was received.  Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

// het verwerken van events met een client

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}