Shared Features
ETLBox provides various components to build ETL pipelines. Each component shares a common structure, behavior, and set of properties, ensuring consistency across sources, transformations, and destinations. This article explains the core properties, methods, and behaviors of all ETLBox components.
Component Types
Sources and Destinations
Sources extract data from files, databases, APIs, or in-memory collections and push it into the data flow. Destinations receive processed data and write it to files, databases, APIs, or memory.
Transformations
Transformations modify data as it flows through the pipeline. They can be categorized as:
- Non-blocking transformations – Process rows as they arrive, requiring minimal memory.
- Blocking transformations – Load and process all data before producing an output (e.g., sorting operations).
Input and Output Buffers
Each component has buffers to optimize data flow:
- Sources have an output buffer, temporarily storing extracted data.
- Destinations have an input buffer, holding records before writing.
- Transformations have both input and output buffers.
If a transformation is slower than its predecessor, its input buffer prevents data loss by temporarily holding excess records.
Configuring Buffer Size
You can limit buffer sizes globally or per component using MaxBufferSize
:
var component = new RowTransformation<MyRow>(row => row);
component.MaxBufferSize = 50000; // Set buffer limit for this component
Settings.MaxBufferSize = 100000; // Set default for all components
Backpressure in Data Flow
Backpressure occurs when a slower component in the pipeline causes data to accumulate in upstream buffers. Once all buffers reach their limit, the source stops reading new data until space becomes available. This prevents excessive memory usage and ensures controlled data processing.
Example Scenario:
- A database source reads data and pushes it into a transformation component.
- The transformation performs a complex operation and is slower than the source.
- As the transformation processes data, its input buffer fills up.
- Once full, the source pauses reading until the transformation frees space in its buffer by processing rows.
If MaxBufferSize
is set too high, memory usage may increase. Setting it too low can cause unnecessary pauses in data flow. Tuning buffer sizes based on processing speed helps maintain a balanced flow.
Global Settings
The Settings
class provides global configuration options that affect all ETLBox components. These settings allow fine-tuning of logging, buffering, and execution behavior.
Logging Configuration
LogThreshold
– Defines how often logging occurs during data processing (default: 1,000 rows). This prevents excessive log output.LogInstance
– Sets a globalILogger
instance for logging. If a component has no specific logger set, it will use this instance.DisableAllLogging
– Iftrue
, disables logging for all components globally. Individual components can override this with their own logging settings.AdditionalScope
– A dictionary for defining custom scope variables that are included in all log messages when usingILogger
.
Buffering and Performance
MaxBufferSize
– Defines the default buffer size for all components (default: 100,000 rows). Each component can override this individually.
Database Connection
DefaultDbConnection
– Stores a default database connection. If a task or component does not specify a connection, it will use this default.
Settings.DefaultDbConnection = new SqlConnectionManager("Server=myServer;Database=myDB;User=myUser;Password=myPass;");
Execution Behavior
AllowMultipleExecutions
– Iftrue
, components can be executed multiple times within the same or different networks. Setting this tofalse
prevents potential side effects from reusing components.
Resetting Settings
You can reset all settings to their default values with:
Settings.ClearSettings();
Shared Component Properties
Progress and Error Tracking
Each component tracks both the number of processed records and the number of encountered errors.
ProgressCount
– Indicates how many records have been processed.
ErrorCount
– Counts how many errors occurred during execution.
var component = new RowTransformation<MyRow>(row => row);
Console.WriteLine($"Rows processed: {component.ProgressCount}");
Console.WriteLine($"Errors: {component.ErrorCount}");
Limiting Rows in Sources
The Limit
property restricts the number of rows read from a source. This is useful for debugging or when only a subset of data is needed for processing.
var source = new DbSource<MyRow>(conn, "SourceTable") {
Limit = 3 // Read only the first 3 rows
};
This property applies only to source components and does not affect transformations or destinations.
Execution Timing
Components track when they start and finish processing data:
ExecutionStartTime
– The timestamp when execution begins.ExecutionEndTime
– The timestamp when execution completes.
var component = new RowTransformation<MyRow>(row => row);
Console.WriteLine($"Started at: {component.ExecutionStartTime}");
Console.WriteLine($"Finished at: {component.ExecutionEndTime}");
Exception Handling
If a component encounters an error, the Exception
property captures the failure details.
var component = new RowTransformation<MyRow>(row => row);
if (component.Exception != null)
Console.WriteLine($"Error: {component.Exception.Message}");
Completion Tracking
Each component has a Completion
property, returning a task that completes when the component finishes processing.
var component = new RowTransformation<MyRow>(row => row);
await component.Completion;
Linked Components
Each component tracks its predecessors (input components) and successors (output components) through the Predecessors
and Successors
properties.
var component = new RowTransformation<MyRow>(row => row);
Console.WriteLine($"Predecessors: {component.Predecessors.Count}");
Console.WriteLine($"Successors: {component.Successors.Count}");
Customizable Execution Hooks
Tag Property
The Tag
property allows attaching metadata to a component, useful for tracking or debugging.
var component = new RowTransformation<MyRow>(row => row);
component.Tag = "CustomIdentifier";
Console.WriteLine(component.Tag);
OnProgress Action
Executes a function every time a row is processed:
var component = new RowTransformation<MyRow>(row => row);
component.OnProgress = count => Console.WriteLine($"Processed {count} rows");
OnInitialization Action
Executes a function when the component starts processing:
var component = new RowTransformation<MyRow>(row => row);
component.OnInitialization = () => Console.WriteLine("Component initialized");
OnCompletion Action
Executes a function when the component finishes processing:
var component = new RowTransformation<MyRow>(row => row);
component.OnCompletion = () => Console.WriteLine("Processing complete");
OnException
The OnException
action is triggered when an error occurs in the component. This allows logging or handling errors without immediately stopping execution.
var component = new RowTransformation<MyRow>(row => row);
component.OnException = (ex, recordAsJson) => Console.WriteLine($"Error in {component.TaskName}: {ex.Message}. Flawed record: '{recordAsJson}'");