Azure Service Bus
The ETLBox Azure Service Bus connector allows you to read messages from queues or topics and write events into them using ServiceBusSource and ServiceBusDestination. It supports both typed and dynamic data, batching, custom serialization, and integration with existing Service Bus clients or session management.
ServiceBusSource
ServiceBusSource
retrieves messages from an Azure Service Bus queue or topic subscription. Messages are received using the Azure SDK and are automatically deserialized into POCOs or dynamic objects using JSON.
Basic Usage
var source = new ServiceBusSource<MyRow> {
ConnectionString = "<your-connection-string>",
QueueOrTopicName = "etl-queue"
};
var dest = new MemoryDestination<MyRow>();
source.LinkTo(dest);
Network.Execute(source);
Subscription Support
To read from a topic with subscriptions, specify the subscription name:
var source = new ServiceBusSource<MyRow> {
ConnectionString = "<your-connection-string>",
QueueOrTopicName = "etl-topic",
SubscriptionName = "etl-subscription"
};
Receiving Messages in Micro-Batches
When a ServiceBusSource
is used inside an ETLBox data flow, it operates in a micro-batch mode. A trigger (e.g. a scheduler or job) starts the execution via Network.Execute
or Network.ExecuteAsync
. This opens a message receiver and pulls available messages from the queue/topic in a single receive operation (or several consecutive ones), depending on these properties:
MaxMessages
: Maximum number of messages fetched in one call to the queue or topic. Default is 10.MaxWaitTime
: How long the receiver will wait if no messages are immediately available. Ifnull
, waits indefinitely.ContinueReceiving
: Function that determines whether additional receive cycles should happen based on the last message batch.
Example:
var source = new ServiceBusSource<MyRow> {
ConnectionString = "<your-connection-string>",
QueueOrTopicName = "etl-topic",
SubscriptionName = "etl-subscription"
};
source.MaxMessages = 5;
source.MaxWaitTime = TimeSpan.FromSeconds(10);
source.ContinueReceiving = lastMessages => lastMessages?.Count > 0;
This mechanism ensures a controlled and efficient batch-oriented processing even in event-driven or scheduled jobs.
Receiver Options
You can pass ReceiverOptions
to configure how the receiver behaves (e.g., receive mode):
var source = new ServiceBusSource<MyRow>();
source.ReceiverOptions = new ServiceBusReceiverOptions {
ReceiveMode = ServiceBusReceiveMode.PeekLock
};
Refer to Azure Service Bus .NET SDK documentation for more configuration options.
Using Dynamic Objects
You can use ExpandoObject
when the message schema is unknown:
var source = new ServiceBusSource {
ConnectionString = "<your-connection-string>",
QueueOrTopicName = "etl-queue"
};
var dest = new MemoryDestination();
source.LinkTo(dest);
Network.Execute(source);
foreach (dynamic msg in dest.Data) {
Console.WriteLine(msg.Col1);
}
Shared Configuration
Both ServiceBusSource
and ServiceBusDestination
support shared authentication and connection management.
Connection via Connection String
Use a connection string to auto-create a ServiceBusClient
:
source.ConnectionString = "<service-bus-connection-string>";
Reusing ServiceBusClient
To share a ServiceBusClient
across multiple components or flows:
var client = new ServiceBusClient("<service-bus-connection-string>");
var source = new ServiceBusSource<MyRow> {
Client = client,
QueueOrTopicName = "etl-queue"
};
var dest = new ServiceBusDestination<MyRow> {
Client = client,
QueueOrTopicName = "etl-queue"
};
This avoids re-instantiation and is useful in long-running or parallel scenarios.
Custom Client Options
You can fine-tune the connection using ServiceBusClientOptions
:
source.ClientOptions = new ServiceBusClientOptions {
TransportType = ServiceBusTransportType.AmqpWebSockets
};
ServiceBusDestination
ServiceBusDestination
sends data from the data flow into a queue or topic on Azure Service Bus. Each object is serialized into a ServiceBusMessage
using JSON by default.
Basic Usage
var source = new MemorySource<MyRow>();
source.Data = new[] {
new MyRow { Col1 = 1, Col2 = "Test1" },
new MyRow { Col1 = 2, Col2 = "Test2" }
};
var dest = new ServiceBusDestination<MyRow> {
ConnectionString = "<your-connection-string>",
QueueOrTopicName = "etl-queue"
};
source.LinkTo(dest);
Network.Execute(source);
Custom Message Creation
To fully control the message structure, define the MessageCreation
function:
dest.MessageCreation = row => {
var message = new ServiceBusMessage(JsonConvert.SerializeObject(row));
message.Subject = "etl-message";
return message;
};
This allows setting custom headers, content type, and message properties.
Configuring Sender Options
You can customize how messages are sent to Azure Service Bus using the SenderOptions
property. This allows you to fine-tune aspects such as identifier settings or enabling message partitioning.
var dest = new ServiceBusDestination<MyRow> {
ConnectionString = "<your-connection-string>",
QueueOrTopicName = "myqueue",
SenderOptions = new ServiceBusSenderOptions {
Identifier = "etlbox-sender"
}
};
The SenderOptions
are passed to the CreateSender
method when the internal ServiceBusSender
is initialized. For a full list of settings, see the official documentation for ServiceBusSenderOptions
.
Dynamic Objects
You can send dynamic data by using ExpandoObject
:
var source = new MemorySource();
dynamic row = new ExpandoObject();
row.Col1 = 42;
row.Col2 = "Dynamic row";
source.DataAsList.Add(row);
var dest = new ServiceBusDestination {
ConnectionString = "<your-connection-string>",
QueueOrTopicName = "etl-queue"
};
source.LinkTo(dest);
Network.Execute(source);