Introduction
Application being develop frequently now a days which consist of different parts executing on separate computers and devices, which are in different location around the world. The main concern with these distributed application is how to communicate reliably between the components of a distributed application. we can use azure messaging to deliver the same.
Azure Service bus is a type of messaging system. Its main function is to provide interaction between application and services. It is a multi-tenant cloud messaging service.
In this blog we gonna learn how Azure Service Bus can help build an application that stays reliable during high demand.
Common Messaging Scenarios
- Messaging: transfer business data, such as sales or purchase orders, journals, or inventory movements.
- Decouple applications: improve reliability and scalability of applications and services (client and service do not have to be online at the same time).
- Topics and subscriptions: enable 1:n relationships between publishers and subscribers.
- Message sessions: implement workflows that require message ordering or message deferral.
Azure Messaging Services Selections
Azure provides three type of messaging service for delivering event messages throughout the solutions.
- Event Grid
- Event Hubs
- Service Bus
Event Grid
It uses a publish-subscribe model. Publishers emit events, but have no expectation about which events are handled. Subscribers decide which events they want to handle. Event Grid isn't a data pipeline, and doesn't deliver the actual object that was updated.
Characteristics:
- Dynamically scalable
- Low cost
- Serverless
- At least once delivery
Event Hubs
Azure Event Hubs is a big data pipeline. It provides services to capture, retention, replay of telemetry and event stream data. The data can come from many concurrent sources. Event Hubs allows telemetry and event data to be made available to a variety of stream-processing infrastructures and analytics services. It is available either as data streams or bundled event batches. This service provides a single solution that enables rapid data retrieval for real-time processing as well as repeated replay of stored raw data. It can capture the streaming data into a file for processing and analysis.
Characteristics:
- Low latency
- Capable of receiving and processing millions of events per second
- At least once delivery
Service Bus
Service Bus enables cloud-native applications to provide reliable state transition management for business processes. When handling high-value messages that cannot be lost or duplicated, use Azure Service Bus. Service Bus also facilitates highly secure communication across hybrid cloud solutions and can connect existing on-premises systems to cloud solutions.Service Bus is a brokered messaging system. It stores messages in a "broker" (for example, a queue) until the consuming party is ready to receive the messages.
Characteristics:
- Reliable asynchronous message delivery (enterprise messaging as a service) that requires polling
- Advanced messaging features like FIFO, batching/sessions, transactions, dead-lettering, temporal control, routing and filtering, and duplicate detection
- At least once delivery
- Optional in-order delivery
Service Bus Learning objectives
- Choose whether to use Service Bus queues, topics, or relays to communicate in a distributed application
- Create a Service Bus queue and use it to send and receive messages
- Create a Service Bus topic and use it to send and receive messages
Service Bus topics, queues, and relays
Azure Service Bus can exchange messages in three different ways: queues, topics, and relays.
Queue:
A queue is a simple temporary storage location for messages. A sending component adds a message to the queue. A destination component picks up the message at the front of the queue. Under ordinary circumstances, each message is received by only one receiver.
During peak times, messages may come in faster than destination components can handle them. Because source components have no direct connection to the destination, the source is unaffected and the queue will grow. Destination components will remove messages from the queue as they are able to handle them. When demand drops, destination components can catch up and the queue shortens.
Topic:
A topic is similar to a queue but can have multiple subscriptions. This means that multiple destination components can subscribe to a single topic, so each message is delivered to multiple receivers. Subscriptions can also filter the messages in the topic to receive only messages that are relevant. Subscriptions provide the same decoupled communications as queues and respond to high demand in the same way. Use a topic if you want each message to be delivered to more than one destination component.Relay:
A relay is an object that performs synchronous, two-way communication between applications. Unlike queues and topics, it is not a temporary storage location for messages. Instead, it provides bidirectional, unbuffered connections across network boundaries such as firewalls. Use a relay when you want direct communications between components as if they were located on the same network segment but separated by network security devices.Send messages to the queue
Launch Visual Studio and create a new Console App (.NET Core) project.Add the Service Bus NuGet package
- Right-click the newly created project and select Manage NuGet Packages.
- Click the Browse tab, search for Microsoft.Azure.ServiceBus, and then select the Microsoft.Azure.ServiceBus item. Click Install to complete the installation, then close this dialog box.
Write code to send messages to the queue
namespace CoreSenderApp
{
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
class Program
{
// Connection String for the namespace can be obtained from the Azure portal under the
// 'Shared Access policies' section.
const string ServiceBusConnectionString = "";
const string QueueName = "";
static IQueueClient queueClient;
static void Main(string[] args)
{
MainAsync().GetAwaiter().GetResult();
}
static async Task MainAsync()
{
const int numberOfMessages = 10;
queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
Console.WriteLine("======================================================");
Console.WriteLine("Press ENTER key to exit after sending all the messages.");
Console.WriteLine("======================================================");
// Send Messages
await SendMessagesAsync(numberOfMessages);
Console.ReadKey();
await queueClient.CloseAsync();
}
static async Task SendMessagesAsync(int numberOfMessagesToSend)
{
try
{
for (var i = 0; i < numberOfMessagesToSend; i++)
{
// Create a new message to send to the queue
string messageBody = $"Message {i}";
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
// Write the body of the message to the console
Console.WriteLine($"Sending message: {messageBody}");
// Send the message to the queue
await queueClient.SendAsync(message);
}
}
catch (Exception exception)
{
Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
}
}
}
}
Receive messages from the queue
To receive the messages you sent, create another .NET Core console application and install the Microsoft.Azure.ServiceBus NuGet package, similar to the previous sender application.
namespace CoreReceiverApp
{
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
class Program
{
// Connection String for the namespace can be obtained from the Azure portal under the
// 'Shared Access policies' section.
const string ServiceBusConnectionString = "";
const string QueueName = "";
static IQueueClient queueClient;
static void Main(string[] args)
{
MainAsync().GetAwaiter().GetResult();
}
static async Task MainAsync()
{
queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
Console.WriteLine("======================================================");
Console.WriteLine("Press ENTER key to exit after receiving all the messages.");
Console.WriteLine("======================================================");
// Register QueueClient's MessageHandler and receive messages in a loop
RegisterOnMessageHandlerAndReceiveMessages();
Console.ReadKey();
await queueClient.CloseAsync();
}
static void RegisterOnMessageHandlerAndReceiveMessages()
{
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentCalls = 1,
AutoComplete = false
};
// Register the function that will process messages
queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
}
static async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
// Process the message
Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
await queueClient.CompleteAsync(message.SystemProperties.LockToken);
}
static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
Console.WriteLine("Exception context for troubleshooting:");
Console.WriteLine($"- Endpoint: {context.Endpoint}");
Console.WriteLine($"- Entity Path: {context.EntityPath}");
Console.WriteLine($"- Executing Action: {context.Action}");
return Task.CompletedTask;
}
}
}
Send messages to the topic
Launch Visual Studio and create a new Console App (.NET Core) project.Add the Service Bus NuGet package
- Right-click the newly created project and select Manage NuGet Packages.
- Click the Browse tab, search for Microsoft.Azure.ServiceBus, and then select the Microsoft.Azure.ServiceBus item. Click Install to complete the installation, then close this dialog box.
Write code to send messages to the topic
namespace CoreSenderApp
{
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
class Program
{
const string ServiceBusConnectionString = "";
const string TopicName = "";
static ITopicClient topicClient;
static void Main(string[] args)
{
MainAsync().GetAwaiter().GetResult();
}
static async Task MainAsync()
{
const int numberOfMessages = 10;
topicClient = new TopicClient(ServiceBusConnectionString, TopicName);
Console.WriteLine("======================================================");
Console.WriteLine("Press ENTER key to exit after sending all the messages.");
Console.WriteLine("======================================================");
// Send messages.
await SendMessagesAsync(numberOfMessages);
Console.ReadKey();
await topicClient.CloseAsync();
}
static async Task SendMessagesAsync(int numberOfMessagesToSend)
{
try
{
for (var i = 0; i < numberOfMessagesToSend; i++)
{
// Create a new message to send to the topic
string messageBody = $"Message {i}";
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
// Write the body of the message to the console
Console.WriteLine($"Sending message: {messageBody}");
// Send the message to the topic
await topicClient.SendAsync(message);
}
}
catch (Exception exception)
{
Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
}
}
}
}
Receive messages from the subscription
To receive the messages you sent, create another .NET Core console application and install the Microsoft.Azure.ServiceBus NuGet package, similar to the previous sender application.Write code to receive messages from the subscription
namespace CoreReceiverApp
{
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
class Program
{
const string ServiceBusConnectionString = "";
const string TopicName = "";
const string SubscriptionName = "";
static ISubscriptionClient subscriptionClient;
static void Main(string[] args)
{
MainAsync().GetAwaiter().GetResult();
}
static async Task MainAsync()
{
subscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, SubscriptionName);
Console.WriteLine("======================================================");
Console.WriteLine("Press ENTER key to exit after receiving all the messages.");
Console.WriteLine("======================================================");
// Register subscription message handler and receive messages in a loop.
RegisterOnMessageHandlerAndReceiveMessages();
Console.ReadKey();
await subscriptionClient.CloseAsync();
}
static void RegisterOnMessageHandlerAndReceiveMessages()
{
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentCalls = 1,
AutoComplete = false
};
// Register the function that processes messages.
subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
}
static async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
// Process the message.
Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
}
static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
Console.WriteLine("Exception context for troubleshooting:");
Console.WriteLine($"- Endpoint: {context.Endpoint}");
Console.WriteLine($"- Entity Path: {context.EntityPath}");
Console.WriteLine($"- Executing Action: {context.Action}");
return Task.CompletedTask;
}
}
}