Executing a Network
ETLBox networks can be executed synchronously or asynchronously, allowing flexible control over data processing. This article explains various execution methods, how to monitor progress, handle errors, and cancel a running network when needed.
Creating a Network
A Network
object represents the entire data flow and provides methods for execution, monitoring, and structural analysis. There are two ways to create a network:
Using Network.Init()
The preferred way to initialize a network is with Network.Init()
:
var network = Network.Init(source1);
This is particularly useful in combination with other network methods and allows for a simple and fluent notation when constructing networks.
Using new Network()
Alternatively, a network can be created explicitly by passing components to the constructor:
var network = new Network(source1, rowTrans, dest1);
Both approaches ensure that the network is aware of all connected components and can be used for execution, error handling, and monitoring.
Once created, the Network
object provides additional functionality beyond execution, such as retrieving the structure as JSON:
string jsonStructure = network.PrintAsJson();
Console.WriteLine(jsonStructure);
After creation, the network can be executed synchronously or asynchronously.
Synchronous Execution
A data flow can be executed synchronously, meaning the program waits until all data has been processed. This simplifies debugging since no asynchronous handling is required.
Internally, the data flow is always executed asynchronously. The synchronous Execute
method waits for the completion of the network tasks before proceeding.
Example for a synchronous network execution:
// Prepare the flow
DbSource source1 = new DbSource("SourceTable1");
DbSource source2 = new DbSource("SourceTable2");
RowTransformation rowTrans = new RowTransformation(row => row);
DbDestination dest1 = new DbDestination("DestTable");
DbDestination dest2 = new DbDestination("DestTable");
// Link the flow
source1.LinkTo(rowTrans);
source2.LinkTo(rowTrans);
rowTrans.LinkTo(dest1, row => row.Value < 10);
rowTrans.LinkTo(dest2, row => row.Value >= 10);
// Execute the network
Network.Execute(source1, source2);
It is not necessary to pass all components to Network.Execute()
. Providing at least one node within the network is sufficient:
Network.Execute(source1);
Network.Execute(rowTrans);
Network.Execute(source1, source2, dest1, dest2);
Using the Execute()
Shortcut
Each data flow source has an Execute()
method, which internally calls Network.Execute()
.
source1.Execute();
Starting execution on multiple sources manually is not recommended. Use Network.Execute(...)
or Network.ExecuteAsync(...)
instead.
Asynchronous Execution
Asynchronous execution allows processing to continue while the network runs in the background. For the previous example, we can start the execution asynchrounous like this:
await Network.ExecuteAsync(source1, source2);
Multiple networks can run in parallel:
Task t1 = Network.ExecuteAsync(source1);
Task t2 = Network.ExecuteAsync(source2);
await Task.WhenAll(t1, t2);
If you are not familiar with using the async
/await
pattern, there is a good article on Microsoft Docs.
Canceling a Network
A running network can be canceled at any time:
Network.Cancel(source1);
Alternatively, a cancellation token can be passed to ExecuteAsync()
for finer control:
CancellationTokenSource cts = new CancellationTokenSource();
Task networkTask = Network.ExecuteAsync(cts.Token, source1);
// Cancel execution when needed
cts.Cancel();
This allows external conditions to trigger cancellation while keeping task execution under control.
Extending a Network
By default, data flows follow explicitly defined LinkTo()
and LinkErrorTo()
connections. However, the Network
object allows applying additional behavior to the entire network without modifying individual components.
Adding Global Error Handling
Instead of linking errors for each component manually, LinkAllErrorTo()
ensures all unhandled errors in the network are redirected to a common destination:
var errorDest = new MemoryDestination<ETLBoxError>();
var network = Network.Init(source1);
network.LinkAllErrorTo(errorDest);
await network.ExecuteAsync();
This complements existing LinkErrorTo()
connections without overriding them.
Applying Actions to All Components
With ApplyToAllNodes()
, a function can be applied to every component in the network:
var network = Network.Init(source);
network.ApplyToAllNodes(n => n.DisableLogging = true);
await network.ExecuteAsync();
This ensures that all nodes in the network have logging disabled without modifying them individually.
Automatic Complement Void Links
If a network contains predicate-based links or void predicate links, some data might not be sent to any component. Since ETLBox waits for all records to be processed, an incomplete network can cause execution to hang indefinitely.
To prevent this, setting TryAddComplementVoidLinks = true
ensures that a complementary void link is automatically added for each predicate link, ensuring that all data flows to a component.
Example
// Define source and destinations
var source = new MemorySource<MyRow>();
source.DataAsList = new List<MyRow> {
new MyRow { Value = 5 },
new MyRow { Value = 15 },
new MyRow { Value = 25 }
};
// Define destinations
var destLow = new MemoryDestination<MyRow>();
var destHigh = new MemoryDestination<MyRow>();
// Apply predicate-based linking
source.LinkTo(destLow, row => row.Value < 10);
source.LinkTo(destHigh, row => row.Value > 20);
// Create a network and ensure all records are processed
var network = Network.Init(source);
network.TryAddComplementVoidLinks = true;
await network.ExecuteAsync();
- The first link sends rows with
Value < 10
todestLow
. - The second link sends rows with
Value > 20
todestHigh
. - The row with
Value = 15
is not covered by any predicate, which would cause the network to hang. TryAddComplementVoidLinks = true
automatically adds a void link for uncovered records, ensuring the execution completes successfully.
This guarantees that all records are processed and prevents execution from stalling due to unhandled data.
Fluent Notation
The Network
class provides a fluent way to define and execute data flows:
await Network.InitAndStartWith(source1)
.LinkTo(rowTrans)
.LinkTo(dest1)
.AddComplementVoidLinks()
.ApplyToAllNodes(n => n.DisableLogging = true)
.ExecuteAsync();
This eliminates the need for separate linking, configuration and execution steps.
When using linking in the fluent notation, all linked components must have compatible data types. For example, if source1
outputs MyRow
, rowTrans
must accept MyRow
as input, and dest1
must store MyRow
as well.
Advanced Network Features
Handling Unlinked Nodes
By default, all nodes in the network must be connected. If a node is unlinked, network creation or execution will fail. To allow unlinked nodes:
var network = Network.Init(source1);
network.IgnoreUnlinkedNodes = true;
await network.ExecuteAsync();
Completion Task on Components
Each component exposes a Completion
property, which returns a task that completes when the component finishes processing.
Task sourceTask1 = source1.ExecuteAsync();
Task sourceTask2 = source2.ExecuteAsync();
Task rowTask = rowTrans.Completion;
Task destTask1 = dest1.Completion;
Task destTask2 = dest2.Completion;
await Task.WhenAll(destTask1, destTask2, sourceTask1, sourceTask2, rowTask);
It is not necessary to wait for every task. Waiting for the destination tasks is often sufficient:
await Task.WhenAll(destTask1, destTask2);
In most cases, manually handling completion tasks is unnecessary because the Network object itself provides control over execution and tracking.
Monitoring Execution
The Network
class allows tracking execution progress:
var network = Network.Init(source1);
network.OnInitialization = () => Console.WriteLine("Started");
network.OnCompletion = () => Console.WriteLine("Completed");
network.OnException = (ex, msg) => Console.WriteLine($"Error: {msg}");
await network.ExecuteAsync();
Executing Multiple Networks in Parallel
With ParallelExecute()
, multiple networks can run concurrently, improving performance for independent data flows.
Example
// Define first network
var source1 = new MemorySource<MyRow>();
var transform1 = new RowTransformation<MyRow>(row => { row.Value *= 2; return row; });
var dest1 = new MemoryDestination<MyRow>();
source1.LinkTo(transform1).LinkTo(dest1);
// Define second network
var source2 = new MemorySource<MyRow>();
var transform2 = new RowTransformation<MyRow>(row => { row.Value += 10; return row; });
var dest2 = new MemoryDestination<MyRow>();
source2.LinkTo(transform2).LinkTo(dest2);
// Execute both networks in parallel
await Network.ParallelExecute(
Network.Init(source1),
Network.Init(source2)
);
Both networks are executed simultaneously, allowing independent data flows to run in parallel.
Using a Log Instance for Network Execution
A LogInstance
can be assigned to a network to collect and manage log output during execution. By default, ETLBox uses the logger set in Settings.LogInstance
, which applies to all networks. However, individual networks can override this setting with a custom logger.
Example
// Configure Serilog and convert it to an ILogger
Log.Logger = new LoggerConfiguration()
.WriteTo.Console()
.CreateLogger();
ILogger logger = new SerilogLoggerFactory(Log.Logger).CreateLogger("ETLBox");
// Set the logger only for a particular network
var network = Network.Init(source1);
network.SetLogInstance(logger);
await network.ExecuteAsync();
// Alternative: Set the default logger for all networks
// Settings.LogInstance = logger;
Retrieving Network Structure
ETLBox allows you to inspect the structure of a network by printing it as JSON or as a list of nodes. This is useful for debugging, logging, or visualizing the data flow.
Printing as JSON
PrintAsJson()
returns the entire network structure in JSON format, making it easy to store or analyze.
var network = Network.Init(source1);
string jsonStructure = network.PrintAsJson();
Console.WriteLine(jsonStructure);
Printing as Nodes
PrintAsNodes()
provides a structured list of all components in the network, showing how they are linked.
var network = Network.Init(source1);
var nodes = network.PrintAsNodes();
foreach (var node in nodes)
Console.WriteLine(node);
These methods help visualize how data flows through the network without executing it.