Shared Features

ETLBox Streaming Connectors provide a unified approach to reading, transforming, and writing data from various sources such as files, web services (HTTP), and Azure Blob Storage. While each format (CSV, JSON, XML, Excel, Text, Parquet) has specific features, all connectors share a common set of functionalities that make them highly flexible and configurable.

This article covers the shared properties and behaviors of all streaming connectors.

Supported Resource Types

Each connector can read from or write to different resource types, allowing integration with files, APIs, and cloud storage. The following resource types are supported:

  • File-Based Processing – Local files and network shares
  • HTTP-Based Processing – REST APIs and web services
  • Azure Blob Storage – Cloud-based storage integration

File-Based Processing

By default, all connectors read from and write to local files or network shares.

CsvSource source = new CsvSource("C:/data/input.csv");
source.ResourceType = ResourceType.File; // Default setting
CsvDestination dest = new CsvDestination("C:/data/output.csv");

HTTP-Based Processing (REST APIs, Web Services)

Streaming connectors support REST API and Web Services integration by changing the ResourceType to Http.

JsonSource source = new JsonSource("https://api.example.com/data");
source.ResourceType = ResourceType.Http;
JsonDestination dest = new JsonDestination("https://api.example.com/submit");
dest.ResourceType = ResourceType.Http;
dest.HttpContentType = "application/json";
dest.HttpRequestMessage.Method = HttpMethod.Post;
dest.HttpRequestMessage.Headers.Connection.Add("keep-alive");

Connectors allow custom HTTP configurations, including headers, authentication, timeouts, and compression. In this example we override the default implementation of HttpClientto accept encoded stream, and we configure some common http headers for our request message.

JsonSource source = new JsonSource("https://api.example.com/data");
source.ResourceType = ResourceType.Http;
source.HttpClient = new HttpClient(new HttpClientHandler() {
    AutomaticDecompression = System.Net.DecompressionMethods.All
});
source.HttpClient.Timeout = TimeSpan.FromSeconds(60);
source.HttpRequestMessage.Headers.Connection.Add("keep-alive");
source.HttpRequestMessage.Headers.Accept.Add(new System.Net.Http.Headers.MediaTypeWithQualityHeaderValue("*/*"));
source.HttpRequestMessage.Headers.AcceptEncoding.Add(new System.Net.Http.Headers.StringWithQualityHeaderValue("gzip"));
source.HttpRequestMessage.Headers.AcceptEncoding.Add(new System.Net.Http.Headers.StringWithQualityHeaderValue("deflate"));
source.HttpRequestMessage.Headers.AcceptEncoding.Add(new System.Net.Http.Headers.StringWithQualityHeaderValue("br"));
source.HttpRequestMessage.Headers.UserAgent.Add(new System.Net.Http.Headers.ProductInfoHeaderValue("ETLBox", "2.0"));
source.HttpRequestMessage.Headers.CacheControl = new System.Net.Http.Headers.CacheControlHeaderValue() {
  NoCache = true
};

Azure Blob Storage Integration

For cloud-based storage, ResourceType.AzureBlob is used.

CsvSource source = new CsvSource("dataset.csv");
source.ResourceType = ResourceType.AzureBlob;
source.AzureBlobStorage.ConnectionString = "<your_connection_string>";
source.AzureBlobStorage.ContainerName = "data-container";
CsvDestination dest = new CsvDestination("output.csv");
dest.ResourceType = ResourceType.AzureBlob;
dest.AzureBlobStorage.ConnectionString = "<your_connection_string>";
dest.AzureBlobStorage.ContainerName = "processed-data";
dest.AzureBlobStorage.BlockBlobOpenWriteOptions = new BlockBlobOpenWriteOptions() {
    BufferSize = 1 * 1024 * 1024 // 1MB
};

Streaming and Buffering

Streaming means data flows into ETLBox while it is being read. E.g. if processing a large JSON file from a web service, ETLBox starts sending data into the pipeline before the file is fully downloaded.

If the source sends data faster than it is processed, ETLBox stores incoming data in an internal buffern.

source.MaxBufferSize = 10000; // Change buffer size if needed

Working with Dynamic Objects

ETLBox supports ExpandoObject for flexible, schema-free processing.

CsvSource source = new CsvSource("data.csv");
source.RowModificationAction = (row, meta) => {
    dynamic r = row;
    Console.WriteLine($"Dynamic ID: {r.Id}, Name: {r.Name}");
};

Handling Large Data Sets with Paging

When working with paginated REST APIs, ETLBox automates multiple API requests until all data is retrieved.

JsonSource<MyRow> source = new JsonSource<MyRow>();
int page = 1;
source.GetNextUri = streamMetaData => $"https://api.example.com/data?page={page++}";
source.HasNextUri = streamMetaData => streamMetaData.ProcessedRows > 0;

The same pattern is used for splitting large output files.

var dest = new CsvDestination<MyRow>();
dest.HasNextUri = (streamMetaData, currentRow) => true;
dest.GetNextUri = (streamMetaData, currentRow) => $"SplitFile_{streamMetaData.ProgressCount}.csv";

Handling Multipart HTTP Responses

If an API returns multiple content types in a single response, ETLBox can filter and extract only the relevant part.

From the following response we can extract only the relevant csv data:

Content-Type: multipart/mixed; boundary=boundary
--boundary
Content-Type: text/plain

Ignore this
--boundary
Content-Type: text/csv

Header1,Header2
1,Test1
2,Test2
3,Test3

--boundary
Content-Type: text/plain

    Ignore this also
--boundary--
CsvSource source = new CsvSource("https://api.example.com/multipart");
source.UseMulitpartContent = content => content.Headers.ContentType.MediaType == "text/csv";

Skipping Initial Rows in Streaming Sources

Some sources include headers, metadata, or garbage rows at the beginning. ETLBox allows skipping them.

source.SkipRows = 2; // Ignore the first 2 rows

Encoding Support

All text-based connectors (CSV, JSON, XML, Text) support custom encoding.

source.Encoding = Encoding.ASCII;

Using an Existing Stream

By default, all streaming sources and destinations in ETLBox create a StreamReader or StreamWriter based on their configuration. E.g. if a file path is provided, the component automatically creates a new FileStream to read or write data.

However, in scenarios where an existing stream needs to be used — such as reading from an in-memory stream or a custom data source — ETLBox allows injecting a custom stream using:

  • CreateStreamReader for sources
  • CreateStreamWriter for destinations

Example: Using an Existing Stream in a Source

MemoryStream memStream = new MemoryStream();
StreamWriter writer = new StreamWriter(memStream);
writer.WriteLine("ID,Name");
writer.WriteLine("1,John");
writer.WriteLine("2,Jane");
writer.Flush();
memStream.Seek(0, SeekOrigin.Begin);

StreamReader reader = new StreamReader(memStream, Encoding.UTF8);

CsvSource<MyRow> source = new CsvSource<MyRow>("memoryStream");
source.CreateStreamReader = url => reader;

Example: Using an Existing Stream in a Destination

MemoryStream outputStream = new MemoryStream();
StreamWriter streamWriter = new StreamWriter(outputStream, Encoding.UTF8);

CsvDestination<MyRow> dest = new CsvDestination<MyRow>("memoryStream");
dest.CreateStreamWriter = url => streamWriter;

Row-Level Data Modification

Each streaming source offers the RowModificationAction property, which allows modifying a record immediately after it is read from the source and before it enters the data flow pipeline.

This action is defined as: Action<TOutput, StreamMetaData>

Unlike transformations that occur asynchronously in the data flow, RowModificationAction is executed synchronously within the source component itself. This means:

  • Each record is modified immediately after being read before any other component processes it.
  • The action is performed in the same thread as the source reading process.
  • Data transformations using this method do not introduce concurrency issues since they occur before the data reaches the main ETL pipeline.

Example: Adding a Timestamp to Each Row

CsvSource<MyRow> source = new CsvSource<MyRow>("data.csv");
source.RowModificationAction = (row, meta) => {
    row.ProcessedAt = DateTime.UtcNow;
};

Example: Tracking the Request URI for API Data

For HTTP-based sources, RowModificationAction can be used to store metadata about the request that retrieved the row.

JsonSource<MyRow> source = new JsonSource<MyRow>("https://api.example.com/data?page=1");
source.GetNextUri = meta => $"https://api.example.com/data?page={meta.RequestCount + 1}";
source.HasNextUri = meta => meta.ProcessedRows > 0;

source.RowModificationAction = (row, meta) => {
    row.SourceUri = meta.RequestUri;
    row.ProcessingBatch = meta.RequestCount;
};

Accessing Unparsed Data

Each streaming source in ETLBox automatically collects unparsed data while reading from a file, web service, or cloud storage. This data is stored in the UnparsedData property and includes any skipped rows, metadata, or unread portions of the source.

By default, unparsed data collection is enabled for most sources, but it can be turned off by setting CollectUnparsedData = false. (For the XmlSource this setting is set to false by default)

Example: Accessing Unparsed Data

CsvSource<MyRow> source = new CsvSource<MyRow>("data.csv");
source.CollectUnparsedData = true;

source.SkipRows = 2; // Skip the first two rows

source.RowModificationAction = (row, meta) => {
    Console.WriteLine($"Processing Row: {row}");
    Console.WriteLine($"Unparsed Data: {source.UnparsedData}");
};

If unparsed data is not needed, disabling it can reduce memory usage.

Unparsed Data in Multi-Page Requests

For paginated HTTP responses or multiple file reads, unparsed data is collected for each request. The StreamMetaData object stores the unparsed portion of the current page:

JsonSource<MyRow> source = new JsonSource<MyRow>();
source.GetNextUri = meta => $"https://api.example.com/data?page={meta.RequestCount + 1}";
source.HasNextUri = meta => meta.ProcessedRows > 0;
source.CollectUnparsedData = true;

source.RowModificationAction = (row, meta) => {
    Console.WriteLine($"Current Page: {meta.RequestCount}");
    Console.WriteLine($"Unparsed Data: {meta.UnparsedData}");
};

This allows tracking any unused or ignored data during multi-page API requests.