Linking Components

Before you can execute a data flow, you need to link your sources, transformations and destinations. The linking is quite easy - every source component and every transformation offers a LinkTo() method. This method accepts a link target, which either is another transformation or a destination.

Basic Linking

Here’s an example of linking a CsvSource to a RowTransformation and then to a MemoryDestination:

// Create components
CsvSource source = new CsvSource("data.csv");
RowTransformation rowTrans = new RowTransformation(row => row);
MemoryDestination dest = new MemoryDestination();

// Link components
source.LinkTo(rowTrans);
rowTrans.LinkTo(dest);

public class MyRow {
    public int Id { get; set; }
    public string Value { get; set; }
}

This creates a flow:

diagram

Your source file might look like this:

Id,Value
0,-
1,A
2,B
-1,C

Fluent Notation

You can write the same flow with fewer lines using fluent notation:

source.LinkTo(rowTrans).LinkTo(dest);

Note: This works in most cases, except for components with multiple inputs/outputs like Multicast or MergeJoin.

If your transformation changes the data type, adjust the linking like this:

source.LinkTo<OutputType>(rowTrans).LinkTo(dest);

This would be the fluent notation if we used a RowTransformation<ExpandObject, OutputType> row in our example.

Using Predicates for Conditional Linking

You can add filter expressions (predicates) to links. Predicates evaluate each row, directing data to specific components based on conditions.

CsvSource<MyRow> source = new CsvSource<MyRow>("data.csv");
MemoryDestination<MyRow> destA = new MemoryDestination<MyRow>();
MemoryDestination<MyRow> destB = new MemoryDestination<MyRow>();
MemoryDestination<MyRow> destOther = new MemoryDestination<MyRow>();

source.LinkTo(destA, row => row.Value == "A");
source.LinkTo(destB, row => row.Value == "B");
source.LinkTo(destOther, row => row.Value != "A" && row.Value != "B");

Data is sent to the first link whose predicate evaluates to true. To duplicate data, use Multicast.

Ensure all data reaches a destination; otherwise, your data flow may hang waiting for all records to be processed.

VoidDestination: Ignoring Data in ETLBox

When you have data that doesn’t need further processing, use VoidDestination to ensure your flow completes properly:

CsvSource<MyRow> source = new CsvSource<MyRow>("data.csv");
VoidDestination<MyRow> voidDest = new VoidDestination<MyRow>();
source.LinkTo(dest, row => row.Id > 0);
source.LinkTo(voidDest, row => row.Id <= 0);

This ignores rows where Id <= 0.

Filtering Data with FilterTransformation

The FilterTransformation component in ETLBox filters rows in your data flow based on a condition defined by a predicate. The predicate is a function that returns true for rows that should pass through and false for rows that should be discarded.

Example usage:

var source = new CsvSource<MyRow>("data.csv");

var filter = new FilterTransformation<MyRow>();
filter.FilterPredicate = row => row.Id > 0;

var dest = new MemoryDestination<MyRow>();

source.LinkTo(filter).LinkTo(dest);
Network.Execute(source);

foreach (var row in dest.Data)
    Console.WriteLine($"Id:{row.Id}, Value:{row.Value}");

In this example, only rows with an Id greater than zero will pass through the filter and be written to the destination. You can use FilterTransformation when you need to filter data based on specific conditions within your pipeline. Alternatively, filtering can also be done directly during the linking of components using predicates.

Splitting Data with ConditionalSplit

The ConditionalSplit transformation allows you to direct rows to different outputs based on a condition. You define a MatchPredicate that evaluates each row. Rows that meet the condition are sent to one output, and rows that don’t can be optionally routed to another output using LinkUnmatchedTo.

Example usage:

var source = new CsvSource<MyRow>("data.csv");
var split = new ConditionalSplit<MyRow>();
split.MatchPredicate = row => row.Id > 1;
var validIds = new MemoryDestination<MyRow>();
var invalidIds = new MemoryDestination<MyRow>();

source.LinkTo(split);
split.LinkTo(validIds);
split.LinkUnmatchedTo(invalidIds);

Network.Execute(source);

foreach (var row in validIds.Data)
    Console.WriteLine($"Valid: {row.Id}, {row.Value}");
foreach (var row in invalidIds.Data)
    Console.WriteLine($"Below: {row.Id}, {row.Value}");

This transformation helps you split data streams dynamically based on conditions, ensuring that each subset of data is processed or stored appropriately.

Implicit VoidDestination

You can also let ETLBox handle it implicitly:

source.LinkTo(dest, row => row.Id > 0, row => row.Id <= 0);

Handling Errors in Data Flows

Exceptions in ETLBox bubble up by default, faulting the component and canceling the flow. Consider the following example:

CsvSource<MyRow> source = new CsvSource<MyRow>("test.csv");
RowTransformation<MyRow> rowTrans = new RowTransformation<MyRow>();
rowTrans.TransformationFunc = row => {
    if (row.Id <= 0)
        throw new ArgumentException("Id must have a value > 0!");
    else
        return row;
};
MemoryDestination<MyRow> dest = new MemoryDestination<MyRow>();

source.LinkTo(rowTrans).LinkTo(dest);
try {
    Network.Execute(source); //<-- the exception will be rethrown here!
} catch(Exception e) {
    Console.WriteLine(e.Message);
}

In this example, an exception is thrown when the Id is less than or equal to 0. Once thrown, the entire data flow is canceled, with the component that raised the error being faulted.

Each ETLBox component has an Exception property. If a component throws an exception, a reference to it is stored in this property for later inspection.

If your data flow runs asynchronously, exceptions may be wrapped in an AggregateException.

Redirecting Errors

ETLBox allows you to handle errors gracefully by redirecting faulty records to an error output. Use LinkErrorTo to send erroneous records to a separate component. We can now extend our previous example:

CsvSource<MyRow> source = new CsvSource<MyRow>("test.csv");
RowTransformation<MyRow> rowTrans = new RowTransformation<MyRow>();
rowTrans.TransformationFunc = row => {
    if (row.Id <= 0)
        throw new ArgumentException("Id must have a value > 0!");
    else
        return row;
};
MemoryDestination<MyRow> dest = new MemoryDestination<MyRow>();

CsvDestination<ETLBoxError> errorDest = new CsvDestination<ETLBoxError>("error.csv");

source.LinkTo(rowTrans).LinkTo(dest);

rowTrans.LinkErrorTo(errorDest);

Network.Execute(source); //<-- no exception will be thrown

ETLBoxError stores details about the exception, including the error message, timestamp, exception type, and a JSON representation of the record:

ErrorTextReportTimeExceptionTypeRecordAsJson
Id must be > 0!2021-03-26ArgumentException{“Id”:0,“Value”:"-"}

This approach ensures that your data flow continues processing even when errors occur, while also logging error details for later analysis. You may choose any type of destination, or you can even define a complete new pipeline for advanced error processing.

This would be the content of the error file when running the example data flow from before, now with linking errors into a CsvDestination:

ErrorText,ReportTime,ExceptionType,RecordAsJson
Id must have a value > 0!,2021-03-26 14:41:26.952,System.ArgumentException,"{""Id"":0,""Value"":""-""}"
Id must have a value > 0!,2021-03-26 14:41:27.679,System.ArgumentException,"{""Id"":-1,""Value"":""C""}"

Multiple Inputs & Outputs

ETLBox allows you to build complex networks with multiple inputs and outputs. You can link sources and transformations to several components simultaneously, enabling data joins, splits, and broadcasts.

Here’s an example of a more complex flow:

CsvSource > RowTransformation > DbDestination

This code snippet demonstrates how multiple components can be linked:

dbSource1.LinkTo(rowTransformation);
dbSource2.LinkTo(rowTransformation);
rowTransformation.LinkTo(mergeJoin);
jsonSource.LinkTo(mergeJoin);
mergeJoin.LinkTo(multicast, row => row.IsValid == true);
mergeJoin.LinkTo(xmlDestination, row => row.IsValid == false);
multicast.LinkTo(dbDestination);
multicast.LinkTo(aggregation);
aggregation.LinkTo(csvDestination);

This example highlights the flexibility ETLBox offers for creating intricate data pipelines, including merging data from different sources, broadcasting to multiple destinations, and applying conditional logic during the flow.

With normal predicates, you can only split your data. To merge data streams, use the MergeJoin component. For broadcasting the same data to multiple outputs, use the Multicast component."

Merging Data with MergeJoin

The MergeJoin transformation allows you to combine data from two input sources based on a matching condition. It supports both simple joins (without comparison logic) and advanced joins where you define how rows are compared.

Example:

var source1 = new MemorySource<MyLeftRow>();
source1.DataAsList.Add(new MyLeftRow() { FirstName = "Elvis" });
source1.DataAsList.Add(new MyLeftRow() { FirstName = "Marilyn" });

var source2 = new MemorySource<MyRightRow>();
source2.DataAsList.Add(new MyRightRow() { LastName = "Presley" });
source2.DataAsList.Add(new MyRightRow() { LastName = "Monroe" });

var join = new MergeJoin<MyLeftRow, MyRightRow, MyOutputRow>(
    (left, right) => new MyOutputRow { FullName = left.FirstName + " " + right.LastName }
);

var dest = new MemoryDestination<MyOutputRow>();

source1.LinkTo(join.LeftInput);
source2.LinkTo(join.RightInput);
join.LinkTo(dest);
Network.Execute(source1, source2);

foreach (var row in dest.Data)
    Console.WriteLine(row.FullName);
// Output: Elvis Presley, Marilyn Monroe

This component is especially useful when combining data from different sources that share a common key. Read more about the MergeJoin here.

Broadcasting Data with Multicast

The Multicast transformation replicates data from one source to multiple destinations. Each output receives the same data, enabling broadcasting the same data to different destinations.

Example:

var source = new MemorySource<MyRow>();
source.DataAsList.Add(new MyRow() { Id = 1, Value = "A" });
source.DataAsList.Add(new MyRow() { Id = 2, Value = "B" });

var dest1 = new MemoryDestination<MyRow>();
var dest2 = new MemoryDestination<MyRow>();

var multicast = new Multicast<MyRow>();

source.LinkTo(multicast);
multicast.LinkTo(dest1);
multicast.LinkTo(dest2);

Network.Execute(source);

In this example, both dest1 and dest2 receive identical copies of the data from source. Read more about the Multicast here.