MS Certificering: AZ-204, overnieuw beginnen, deel 10

Ingediend door Dirk Hornstra op 21-dec-2021 15:45

Na het 1e blok: Azure App Service web apps (link), het 2e blok: Implement Azure Functions (link), het 3e blok: Develop solutions that use Blob Storage (link) en het 4e blok: Develop solutions that use Azure Cosmos DB (link), het 5e blok: Implement infrastructure as a service solutions: (link), het 6e blok Implement user authentication and authorization (link), het 7e blok Implement secure cloud solutions (link), het 8e blok: Implement API Management (link), het 9e blok Develop event-based solutions (link) is het nu tijd voor het 10e learning-block: Develop message-based solutions, link.

Dit blok bestaat uit 1 module, Discover Azure message queues, link.

Azure ondersteunt 2 soorten queue's: Service Bus queue's en Storage queue's.

Service Bus is deel van een grote infrastructuur die queue-ing, publiceren, abonneren en meer zaken ondersteunt. Deze zijn ontworpen om applicaties of applicatie componenten te integreren en die qua opzet meerdere soorten communicatieprotocollen gebruiken, data contracten, netwerkomgevingen.

Storage queue's zijn een deel van de Azure infrastructuur. Hiermee kun je een grote hoeveelheid berichten opslaan. Via HTTP en HTTPS kun je wereldwijd bij je berichten komen via geauthenticeerde verbindingen. Een bericht kan tot 64 KB groot zijn. Een queue kan miljoenen berichten bevatten, afhankelijk van de totale grootte van het storage account. Queue's worden voornamelijk gebruikt om een backlog te maken of zaken asynchroon af te handelen.

Welke optie moet je kiezen? Hier de voordelen van Service Bus:

  • je applicatie moet berichten ontvangen zonder een queue te hoeven pollen. Met Service Bus kan dit via een long-polling receive operation via TCP protocollen.
  • je applicatie met een garantie hebben dat het FIFO werkt: first in, first out.
  • je applicatie moet automatisch duplicaten kunnen detecteren.
  • je applicatie moet de berichten verwerken als parallel langlopende streams. Op zo'n stream kan de applicatie transacties uitvoeren.
  • je applicatie moet transacties kunnen ondersteunen en "atomicity" als er berichten ontvangen en verstuurd worden.
  • je berichten kunnen groter zijn dan 64 KB maar zullen waarschijnlijk de 256 KB niet halen.


De Storage queue biedt je deze opties:

  • je applicatie moet meer dan 80 GB aan berichten in een queue kunnen bevatten.
  • je applicatie wil de voortgang van het verwerken van berichten in een queue kunnen zien. Dat is handig als de "worker" tijdens het verwerken crasht. Een andere werker kan dan doorgaan op het punt waar de vorige gestopt is.
  • je moet server side logs hebben van alle transacties die op je queue uitgevoerd zijn.
     

We gaan eerst naar de Service Bus kijken. De data die een bericht kan bevatten kan JSON, XML, Apache Avro, plain text zijn.

Je hebt de keuze uit een standaard of een premium abonnement. Zijn bijna gelijk, behalve:

  • doorvoer: bij premium hoog, bij standaard variabel
  • performance: bij premium voorspelbaar, bij standaard variabel
  • prijs-model: bij premium fixed price, bij standaard pay as you go
  • schalen: bij premium op- en afschalen mogelijk, bij standaard niet beschikbaar
  • grootte: bij premium tot 1MB, 100MB momenteel in preview, bij standaard 256 KB


We zien hier nog even de features:

Feature Description
Message sessions To create a first-in, first-out (FIFO) guarantee in Service Bus, use sessions. Message sessions enable exclusive, ordered handling of unbounded sequences of related messages.
Autoforwarding The autoforwarding feature chains a queue or subscription to another queue or topic that is in the same namespace.
Dead-letter queue Service Bus supports a dead-letter queue (DLQ). A DLQ holds messages that can't be delivered to any receiver. Service Bus lets you remove messages from the DLQ and inspect them.
Scheduled delivery You can submit messages to a queue or topic for delayed processing. You can schedule a job to become available for processing by a system at a certain time.
Message deferral A queue or subscription client can defer retrieval of a message until a later time. The message remains in the queue or subscription, but it's set aside.
Batching Client-side batching enables a queue or topic client to delay sending a message for a certain period of time.
Transactions A transaction groups two or more operations together into an execution scope. Service Bus supports grouping operations against a single messaging entity within the scope of a single transaction. A message entity can be a queue, topic, or subscription.
Filtering and actions Subscribers can define which messages they want to receive from a topic. These messages are specified in the form of one or more named subscription rules.
Autodelete on idle Autodelete on idle enables you to specify an idle interval after which a queue is automatically deleted. The minimum duration is 5 minutes.
Duplicate detection An error could cause the client to have a doubt about the outcome of a send operation. Duplicate detection enables the sender to resend the same message, or for the queue or topic to discard any duplicate copies.
Security protocols Service Bus supports security protocols such as Shared Access Signatures (SAS), Role Based Access Control (RBAC) and Managed identities for Azure resources.
Geo-disaster recovery When Azure regions or datacenters experience downtime, Geo-disaster recovery enables data processing to continue operating in a different region or datacenter.
Security Service Bus supports standard AMQP 1.0 and HTTP/REST protocols.

 

Al eens eerder gezien, het protocol wat gebruikt wordt is Advances Messaging Queueing Protocol (AMQP) 1.0: link.
Met het protocol kun je ook code schrijven voor ActiveMQ en RabbitMQ. Meer inforatie hier: link.

En een lijstje met client libraries:


Queue's bieden de mogelijkheid om met First In, First Out te werken. Er is dan maar 1 message consumer die ontvangt en verwerkt. Een gerelateerd voordeel is load-leveling, waardoor producers en consumers met verschillende rates kunnen ontvangen of versturen. De verwerkingstijd voor elke "unit of work" blijft wel gelijk. Je kunt een queue maken via de portal, PowerShell, CLI of Resource Manager templates. Het verzenden en ontvangen, de clients daarvoor schrijf je in C#, Java, Python of JavaScript.

Je kunt kiezen uit 2 ontvang-modes: "receive and delete" of "peek lock". Bij receive and delete haalt een proces het bericht op en zou dit moeten verwerken. Crasht dit en start de boel opnieuw op dan is dat bericht weg: en dus niet verwerkt. Met een peek lock haal je een bericht binnen en zet je een lock op het bericht in de queue. Gaat het verwerken goed, dan markeer je het als "consumed". Als het niet lukt om deze te verwerken, dan wordt een abandon uitgevoerd en kan een ander het bericht proberen te verwerken. Ook heb je een time-out die hetzelfde doet.

Een queue werkt met 1 consumer. Topics en subscriptions bieden een 1-op-veel relatie. Met filter actions kun je waardes aanpassen. Als je een subscription (abonnement) aanmaakt, dan kun je een filter invoeren.

Berichten bevatten een payload en metadata. Metadata is in de vorm van key-value paren gekoppeld. Het bericht bevat een sectie met een binaire payload en 2 soorten eigenschappen. De broker properties zijn gedefinieerd door het systeem. De user properties zijn de key-value paren.

Een aantal eigenschappen worden gebruikt om de route te bepalen, de To, ReplyTo, ReplyToSessionId, MessageId, CorrelationId en SessionId.

  • Simpel request-reply: een publisher stuurt een bericht naar de queue en verwacht een antwoord van de consumer. Om die te ontvangen heeft een publisher een queue waar het antwoord-bericht in geplaatst moet worden. Dat adres staat in de ReplyTo eigenschap. Als de consumer reageert komt de MessageId van het originele bericht in het CorrelationId veld en stuurt deze naar de ReplyTo.
  • Multicast request-reply: een publisher stuurt een bericht naar een topic en meerdere abonnees willen deze verwerken. De reactie kan een zelfde zijn als hierboven. Dit kan gebruikt worden als een soort discovery patroon.
  • Multiplexing: de sessie feature zorgt dat een groep gerelateerde berichten, via het veld SesionId gerouteerd worden naar een specifieke ontvanger terwijl die ontvanger de sessie gelockt houdt.
  • Multiplexed request-reply: met de sessie feature kunnen nu meerdere publishers een reply queue delen. Door ReplyToSessionId instrueert de publisher de consumers om die waarde in de SessionId van de reply-message te plaatsen. De publishing queue of topic hoeft niets van die sessie te weten. Als het bericht verzonden wordt kan de publisher wachten op een sessie met het SessionId waarde.


De To is gereserveerd voor toekomstig gebruik.

Tijdens transport of opgeslagen in de Service Bus is de payload altijd een binair blok data. In ContentType staat aangegeven wat het type is, bijvoorbeeld application/json;charset=utf-8

Het .NET Framework ondersteunt BrokeredMessage instanties door .NET objecten in de constructor mee te geven. Java en .NET standard ondersteunen dit niet!

Als je het legacy SBMP protocol gebruikt, dan worden de objecten geserialiseerd met de standaard binary serializer of met een externe serializer. Met het AMQP protocol wordt het een AMQP object. Met GetBody<T>() kan de ontvanger de objecten binnen krijgen. AMQP, hierbij worden ze geserialiseerd in een AMQP graph of ArrayList en IDictionary<string,object> object.

We gaan door met een voorbeeld, het versturen en ontvangen van een bericht met Service Bus queue via .NET.


// maak eerst een resource-group aan
 

myLocation=<myLocation>
myNameSpaceName=az204svcbus$RANDOM

az group create --name az204-svcbus-rg --location $myLocation

// maak een service bus namespace aan

az servicebus namespace create \
    --resource-group az204-svcbus-rg \
    --name $myNameSpaceName \
    --location $myLocation

// maak vervolgens de service bus queue aan

az servicebus queue create --resource-group $myResourceGroup \
    --namespace-name $myNameSpaceName \
    --name az204-queue

// in de portal kun je bij de shared access policies, settings de RootManageSharedAccessKey policy selecteren
// kopieer hier de Primary Connection String

// vervolgens maak je een C# project in visual studio code

dotnet new console
dotnet add package Azure.Messaging.ServiceBus

// in program.cs

using System.Threading.Tasks;    
using Azure.Messaging.ServiceBus;

// connection string to your Service Bus namespace
static string connectionString = "<NAMESPACE CONNECTION STRING>";

// name of your Service Bus topic
static string queueName = "az204-queue";

// the client that owns the connection and can be used to create senders and receivers
static ServiceBusClient client;

// the sender used to publish messages to the queue
static ServiceBusSender sender;

// number of messages to be sent to the queue
private const int numOfMessages = 3;

// vervang de main methode met onderstaande

static async Task Main()
    {
        // Create the clients that we'll use for sending and processing messages.
        client = new ServiceBusClient(connectionString);
        sender = client.CreateSender(queueName);

        // create a batch
        using ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();

        for (int i = 1; i <= 3; i++)
        {
            // try adding a message to the batch
            if (!messageBatch.TryAddMessage(new ServiceBusMessage($"Message {i}")))
            {
                // if it is too large for the batch
                throw new Exception($"The message {i} is too large to fit in the batch.");
            }
        }

        try
        {
            // Use the producer client to send the batch of messages to the Service Bus queue
            await sender.SendMessagesAsync(messageBatch);
            Console.WriteLine($"A batch of {numOfMessages} messages has been published to the queue.");
        }
        finally
        {
            // Calling DisposeAsync on client types is required to ensure that network
            // resources and other unmanaged objects are properly cleaned up.
            await sender.DisposeAsync();
            await client.DisposeAsync();
        }

        Console.WriteLine("Press any key to end the application");
        Console.ReadKey();
    }

// voer bovenstaande uit en kijk daarna in de Portal hoe het ervoor staat
// hierna gaan we de code aanpassen om data uit de queue te halen

// verwijder deze regel
static ServiceBusClient client;

// voeg deze toe

// the processor that reads and processes messages from the queue
static ServiceBusProcessor processor;

// voeg deze code toe

// handle received messages
static async Task MessageHandler(ProcessMessageEventArgs args)
{
    string body = args.Message.Body.ToString();
    Console.WriteLine($"Received: {body}");

    // complete the message. messages is deleted from the queue.
    await args.CompleteMessageAsync(args.Message);
}

// handle any errors when receiving messages
static Task ErrorHandler(ProcessErrorEventArgs args)
{
    Console.WriteLine(args.Exception.ToString());
    return Task.CompletedTask;
}

// en vervang de main

static async Task Main()
{
    // Create the client object that will be used to create sender and receiver objects
    client = new ServiceBusClient(connectionString);

    // create a processor that we can use to process the messages
    processor = client.CreateProcessor(queueName, new ServiceBusProcessorOptions());

    try
    {
        // add handler to process messages
        processor.ProcessMessageAsync += MessageHandler;

        // add handler to process any errors
        processor.ProcessErrorAsync += ErrorHandler;

        // start processing
        await processor.StartProcessingAsync();

        Console.WriteLine("Wait for a minute and then press any key to end the processing");
        Console.ReadKey();

        // stop processing
        Console.WriteLine("\nStopping the receiver...");
        await processor.StopProcessingAsync();
        Console.WriteLine("Stopped receiving messages");
    }
    finally
    {
        // Calling DisposeAsync on client types is required to ensure that network
        // resources and other unmanaged objects are properly cleaned up.
        await processor.DisposeAsync();
        await client.DisposeAsync();
    }
}

// vervolgens een dotnet build en dotnet run

// daarna kun je in de Portal checken dat de berichten verwerkt zijn.

// ruim je spullen op

az group delete --name az204-svcbus-rg --no-wait

Hierna kijken we naar de Azure Queue Storage. Deze is bedoeld om heel veel berichten op te slaan/verwerken.

Een queue krijgt een URL met formaat https://<storage account>.queue.core.windows.net/<queue>

Alle toegang loopt via een Storage Account.

De naam van een queue moet lowercase zijn.

Vanaf versie  2017-07-29 of later kan een bericht een time-to-live hebben van -1 (verloopt niet) of een hogere waarde. Als deze waarde niet gevuld is, is de standaard waarde 7 dagen.

Hierna gaan we met C# code een Azure Queue Storage queue maken en met berichten werken.

Je kunt hier een aantal nuget packages voor gebruiken, hier de linkjes van de site:


En dan nu de code:


QueueClient queueClient = new QueueClient(connectionString, queueName);
 

// maak een queue aan

// Get the connection string from app settings
string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"];

// Instantiate a QueueClient which will be used to create and manipulate the queue
QueueClient queueClient = new QueueClient(connectionString, queueName);

// Create the queue
queueClient.CreateIfNotExists();

// plaats een bericht in de queue

// Get the connection string from app settings
string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"];

// Instantiate a QueueClient which will be used to create and manipulate the queue
QueueClient queueClient = new QueueClient(connectionString, queueName);

// Create the queue if it doesn't already exist
queueClient.CreateIfNotExists();

if (queueClient.Exists())
{
    // Send a message to the queue
    queueClient.SendMessage(message);
}

// kijk of er een bericht is

// Get the connection string from app settings
string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"];

// Instantiate a QueueClient which will be used to manipulate the queue
QueueClient queueClient = new QueueClient(connectionString, queueName);

if (queueClient.Exists())
{
    // Peek at the next message
    PeekedMessage[] peekedMessage = queueClient.PeekMessages();
}

// pas de eigenschappen aan van een item in de queue. zo kun je bijvoorbeeld de verwerkingstijd verlengen
// zodat de huidige taak iets meer de tijd krijgt

// Get the connection string from app settings
string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"];

// Instantiate a QueueClient which will be used to manipulate the queue
QueueClient queueClient = new QueueClient(connectionString, queueName);

if (queueClient.Exists())
{
    // Get the message from the queue
    QueueMessage[] message = queueClient.ReceiveMessages();

    // Update the message contents
    queueClient.UpdateMessage(message[0].MessageId,
            message[0].PopReceipt,
            "Updated contents",
            TimeSpan.FromSeconds(60.0)  // Make it invisible for another 60 seconds
        );
}

// pak een item uit de queue
// deze wordt 30 seconden onzichtbaar voor andere processen
// als je geen deletemessage aan het einde doet komt ie dus weer terug

// Get the connection string from app settings
string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"];

// Instantiate a QueueClient which will be used to manipulate the queue
QueueClient queueClient = new QueueClient(connectionString, queueName);

if (queueClient.Exists())
{
    // Get the next message
    QueueMessage[] retrievedMessage = queueClient.ReceiveMessages();

    // Process (i.e. print) the message in less than 30 seconds
    Console.WriteLine($"Dequeued message: '{retrievedMessage[0].MessageText}'");

    // Delete the message
    queueClient.DeleteMessage(retrievedMessage[0].MessageId, retrievedMessage[0].PopReceipt);
}

// vraag op hoeveel berichten er in de queue staan

/// Instantiate a QueueClient which will be used to manipulate the queue
QueueClient queueClient = new QueueClient(connectionString, queueName);

if (queueClient.Exists())
{
    QueueProperties properties = queueClient.GetProperties();

    // Retrieve the cached approximate message count.
    int cachedMessagesCount = properties.ApproximateMessagesCount;

    // Display number of messages.
    Console.WriteLine($"Number of messages in queue: {cachedMessagesCount}");
}

// verwijder een queue

/// Get the connection string from app settings
string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"];

// Instantiate a QueueClient which will be used to manipulate the queue
QueueClient queueClient = new QueueClient(connectionString, queueName);

if (queueClient.Exists())
{
    // Delete the queue
    queueClient.Delete();
}