In-Memory Data
Easily work with in-memory data using ETLBox’s In-Memory Data Connectors. Whether you need to pull data from a list, store results in memory, or handle concurrent access, these connectors provide a simple and efficient way to manage your data flow.
On this page
In-Memory Data Connectors
The MemorySource and MemoryDestination are in-memory components for reading and writing data in ETLBox. They enable handling data from .NET collections efficiently and are useful when working with in-memory datasets.
For concurrent scenarios, ConcurrentMemoryDestination provides thread-safe storage using BlockingCollection<T>
.
Overview
- Part of the ETLBox Core Package – no additional dependencies required
- Works with lists, collections, arrays, and dynamic objects (
ExpandoObject
) - Supports asynchronous execution in ETL workflows
MemoryDestination
stores results, whileMemorySource
provides an in-memory data sourceConcurrentMemoryDestination
enables thread-safe data handling
Memory Source
The MemorySource<T>
acts as an in-memory data provider. It reads from a .NET collection (IEnumerable<T>
) and sends records to the next component in the ETL pipeline. It is ideal when your data is already in memory and does not require external storage or retrieval.
By default, it initializes an empty List<T>
, which can be modified dynamically. You can assign an existing collection or use the built-in list to add records as needed.
Using an Array (POCOs)
MemorySource<MyRow> source = new MemorySource<MyRow>();
source.Data = new
[
new MyRow() { Id = 1, Value = "Test1" },
new MyRow() { Id = 2, Value = "Test2" },
new MyRow() { Id = 3, Value = "Test3" }
];
This approach allows you to assign a predefined list of objects (MyRow) directly to the source. The data is then available for processing in your ETL pipeline.
Using the Internal List
MemorySource<MyRow> source = new MemorySource<MyRow>();
source.DataAsList.Add(new MyRow() { Id = 1, Value = "Test1" });
source.DataAsList.Add(new MyRow() { Id = 2, Value = "Test2" });
source.DataAsList.Add(new MyRow() { Id = 3, Value = "Test3" });
DataAsList
returns the current Data
property cast to List<T>
. If casting is not possible (e.g., because Data
is assigned an IEnumerable<T>
that does not implement IList<T>
), it will return null
.
Using Dynamic Objects
dynamic row = new ExpandoObject();
row.Id = 1;
row.Value = "Test1";
MemorySource source = new MemorySource();
source.Data = new List<ExpandoObject> { row };
If your data structure is not predefined, you can use ExpandoObject
to create a flexible, dynamic data source. This is helpful when working with variable or loosely structured data.
Memory Destination
The MemoryDestination<T>
is an in-memory storage that collects incoming records into a .NET List
By default, MemoryDestination
initializes an internal List<T>
to store incoming data. However, it can be reassigned to other collection types such as SortedSet<T>
, arrays, or any other ICollection<T>
, depending on the required data structure and retrieval behavior.
Basic Usage (POCOs)
MemoryDestination<MyRow> dest = new MemoryDestination<MyRow>();
// Incoming Data is stored in dest.Data
foreach (var row in dest.Data)
Console.WriteLine($"Id: {row.Id}, Value: {row.Value}");
This creates a MemoryDestination
that stores records in an internal list. Data can be accessed from Data
property once the pipeline execution is complete.
Using Dynamic Objects
var dest = new MemoryDestination();
// Incoming Data is stored in dest.Data
foreach (dynamic row in dest.Data)
Console.WriteLine($"Dynamic property 1: {row.Prop1}, Dynamic property 2: {row.Prop2}");
By default, MemoryDestination
can store dynamic objects (ExpandoObject), making it flexible when working with non-fixed schemas.
Using a Custom Collection
var dest = new MemoryDestination<MyRow>();
dest.Data = new SortedSet<MyRow>(new MyRowComparer());
Assigning a custom collection such as SortedSet<T>
allows records to be stored in a specific order based on a comparison rule.
Example: Processing In-Memory Data
MemorySource<MyRow> source = new MemorySource<MyRow>();
source.Data = new List<MyRow>
{
new MyRow { Id = 1, Value = "Test" },
new MyRow { Id = 2, Value = "Test" }
};
var trans = new RowTransformation<MyRow>();
trans.TransformationFunc = row => {
row.Value += row.Id;
return row;
};
var dest = new MemoryDestination<MyRow>();
source.LinkTo(trans);
trans.LinkTo(dest);
Network.Execute(source);
foreach (var row in dest.Data)
Console.WriteLine($"Id: {row.Id}, Value: {row.Value}");
This example demonstrates how in-memory data can be transformed using an ETLBox pipeline before being stored in a MemoryDestination
.
This approach enables a true ETL pipeline for in-memory data, making it easier to extend the process with additional steps like error handling, filtering, merging, or multicasting. Instead of manually iterating over a collection with a foreach
loop, the pipeline ensures that transformations, validations, and data flow control are handled in a structured and scalable way.
Concurrent Memory Destination
The ConcurrentMemoryDestination<T>
is a thread-safe alternative to MemoryDestination<T>
.
- Uses a
BlockingCollection<T>
instead ofList<T>
- Supports concurrent reading and writing while data is still arriving
var dest = new ConcurrentMemoryDestination<MyRow>();
// Data is stored in a thread-safe BlockingCollection<T>
foreach (var row in dest.Data)
Console.WriteLine($"Id: {row.Id}, Value: {row.Value}");
Example: Concurrent Processing
var source = new MemorySource<MyRow>();
for (int i = 0; i < 1000000; i++)
source.DataAsList.Add(new MyRow() { Id = i, Value = "Test" + i });
var dest = new ConcurrentMemoryDestination<MyRow>();
source.LinkTo(dest);
var runTask = Network.ExecuteAsync(source);
Console.WriteLine("Adding completed? " + dest.Data.IsAddingCompleted);
dest.Data.TryTake(out MyRow firstTry);
Console.WriteLine("Data found? " + (firstTry != null));
await runTask;
dest.Data.TryTake(out MyRow secondTry);
Console.WriteLine("Adding completed? " + dest.Data.IsAddingCompleted);
Console.WriteLine("Data found? " + (secondTry != null));
/* Ouput:
Adding completed?False
Data found?False
Adding completed?True
Data found?True
*/