Control flow tasks

ETLBox comes with a fair set of ControlFlow Tasks that allow you to most common task on a relational database. This will give you an overview of these tasks.

Create, Drop and IfExists tasks

There are a lot of tasks that can help you to create, drop or check the existence of database objects. In the following there will be example how to create tables, views, procedures, indexes or databases, how to drop them and how to check for their existence.

Tables

CreateTables Task

CreateTableTask will help you to create a table on the database. You can pass either a TableDefinition object or a table name and a list of table columns.

Here is an example with passing the table name and a list of table columns

List<TableColumn> columns = new List<TableColumn>() {
    new TableColumn("Id", "INT",allowNulls:false,isPrimaryKey:true, isIdentity: true),
    new TableColumn("value2", "DATE", allowNulls:true),
    new TableColumn("value3", "DECIMAL(10,2)",allowNulls:false) { DefaultValue = "3.12" },
    new TableColumn("compValue", "BIGINT",allowNulls:true) { ComputedColumn = "(value1 * value2)" }
};            
           
CreateTableTask.Create(connectionManager, "tablename", columns);

A table column describe the column in the database with the most used attributes: Name, Data type (use the data type which you are most familiar with, there will be an attempt to convert the database type into the right database specific format), if the column is nullable, if the column is a primary key and if the column is used as identity column. (Serial for Postgres or auto increment for MySql). Additionally, you could specify if the column is a computed column or if it has a default value.

Here is an example for creating a TableDefinition and pass it to the CreateTableTask:

TableDefinition CustomerTableDef = new TableDefinition("customer",
    new List<TableColumn>() {
        new TableColumn("CustomerKey", "int",allowNulls: false, isPrimaryKey:true, isIdentity:true),
        new TableColumn("Name","nvarchar(200)", allowNulls: false),
    });
CreateTableTask.Create(connectionManager, CustomerTableDef);

Implicit data type conversions:

CreateTableTask will automatically try to convert the given data type into a type that your database actually understands. E.g., if you pass “DATETIME” as data type for a column, and you want to create your table on a Postgres connection manager, CreateTableTask will automatically convert “DATETIME” into “TIMESTAMP”.

Adding your own data type conversion

If you need to change the default data type conversion, you can overwrite it with your implementation. To achieve this, you need to pass an object that implements IDataTypeConverter. The interface has one method: TryConvertDbDataType and 2 parameter. dbSpecificTypeName is the name that you define in your TableColumn, and connection type defines the connection manager type (e.g. SqlServer or Oracle). The return value is the converted data type. Here is an example implementation:

public class MyDataTypeConverter : IDataTypeConverter
{
   public string TryConvertDbDataType(string dbSpecificTypeName, ConnectionType connectionType)
  {
     if (dbSpecificTypeName == "ABC")
       return "DATETIME";
     else
       return DataTypeConverter.TryGetDbSpecificType(dbSpecificTypeName, connectionType);
  }
}
List<TableColumn> columns = new List<TableColumn>() {
  new TableColumn("somedate", "ABC"),
  new TableColumn("sometext", "TEXT")
};

var ctt = new CreateTableTask("CreateTableIDataTypeConverter", columns)
{
   ConnectionManager = SqlConnection,
   DataTypeConverter = new MyDataTypeConverter()
};
ctt.Create();

DropTableTask

This task simple drops a table (and optionally checks if the table exists):

DropTableTask.Drop(connectionManager, "DropTableTest");
DropTableTask.DropIfExists(connectionManager, "DropTableTest");

IfExistsTableOrViewTask

This task checks if a table or view exists and returns true or false.

bool exists = IfTableOrViewExistsTask.IsExisting(connectionManager, "tablename");

Table Definition

If you are interesting in retrieving a TableDefinition object from an existing database table, use can use the static method FromTableName on the TableDefinition class:

TableDefinition.FromTableName(connectionManager, "demoTable");

You can also directly pass a type of an object or class, and ETLBox will try it’s best to create the best suitable TableDefinition for this object. The returned TableDefinition will vary for different database types.

TableDefinition td = TableDefinition.FromCLRType(ConnectionType.SqlServer, typeof(MyClass));

Views

CreateViewTask

Creates a view on the database. If the view already exists, it will alter (or replace) the existing view.

 CreateViewTask.CreateOrAlter(connectionManager, "View1", "SELECT 1 AS Test");

DropViewTask

Drops a view (and optionally checks if the view exists).

DropViewTask.Drop(connectionManager, "viewname");
DropViewTask.DropIfExists(connectionManager, "viewname");

Indexes

Similar to views and tables, you can (re)create, drop or check the existence as well on indexes.

//Create an index
CreateIndexTask.CreateOrRecreate(connection, "indexname", "tablename",
    new List<string>() { "index_column_1", "index_column_1" });

//Drop an index
DropIndexTask.DropIfExists(connectionManager, "indexname", "tablename");

//Check if an index exists
bool exists = IfIndexExistsTask.IsExisting(connectionManager, "indexname", "tablename");

Procedures

//Create a procedure
List<ProcedureParameter> pars = new List<ProcedureParameter>() {
    new ProcedureParameter("Par1", "VARCHAR(10)"),
    new ProcedureParameter("Par2", "INT", "7"),
};
CreateProcedureTask.CreateOrAlter(connectionManager, "ProcedureName", "SELECT 1;", pars);

//Drop a procedure
DropProcedureTask.DropIfExists(connectionManager, "ProcedureName");

//Check if a procedure exists
bool exists = IfProcedureExistsTask.IsExisting(connectionManager, "ProcedureName");

Schema

Schema are only available for Sql Server and Postgres databases. For MySql, use Create/Drop/Exists Database instead.

//Create a schema
CreateSchemaTask.CreateOrAlter(connectionManager, "SchemaName");

//Drop a schema
DropSchemaTask.DropIfExists(connectionManager, "SchemaName");

//Check if a schema exists
bool exists = IfSchemaExistsTask.IsExisting(connectionManager, "SchemaName");

Databases

This is not supported with SQLite.

//Create a database
CreateDatabaseTask.CreateOrAlter(connectionManager, "DBName");

//Drop a database
DropDatabaseTask.DropIfExists(connectionManager, "DBName");

//Check if a database exists
bool exists = IfDatabaseExistsTask.IsExisting(connectionManager, "DBName");

Retrieving the connection without catalog or database

In some cases, you might want to get a connection string without a catalog, e.g. because you need to create the database first. This is where you could use the ConnectionString-Wrapper for you database. E.g., for Postgres you could run the following code:

PostgresConnectionString conStringWrapper = new PostgresConnectionString("Server=10.37.128.2;Database=ETLBox_DataFlow;User Id=postgres;Password=etlboxpassword;");
PostgresConnectionString connectionWithoutCatalog = conStringWrapper.GetMasterConnection();
PostgresConnectionManager connectionManager = new PostgresConnectionManager(connectionWithoutCatalog);

Your connection manager would now connect to the “postgres” database (which would be the “master” database in Sql Server and “mysql” database in MySql).

RowCount

You can count rows in a table using the RowCountTask.

Here an example for a simple count:

SqlConnectionManager connectionManager = new SqlConnectionManager("Data Source=.; Database=Sample; Integrated Security=SSPI"");
int count = RowCountTask.Count(connectionManager, "demotable");

You can optionally add a condition for the count:

int count = RowCountTask.Count(connectionManager, "demotable", "Co1 >= 3 AND Col2 == 'Test'");

Truncate a table

Truncating a table is as simple as

TruncateTableTask.Truncate(connectionManager, "demo.table1");

SqlTask

This is the swiss-army knife for running sql on your database. It will use the underlying ADO.NET connection manager, which allows you to do almost everything on the database, without the “overhead” and boilerplate code that ADO.NET brings with it.

SqlTask always expects a descriptive name when you use it - this name is used for logging purposes.

SqlTask.ExecuteNonQuery(connectionManager, 
    $@"insert into demo.table1 (value) select * from (values ('Text'), ('More text')) as data(v)");

ExecuteNonQuery will just execute the sql code without reading any results from the database.

Using parameters

You can pass parameterized sql code to have the database reuse existing plans in the cache.

var parameter = new List<QueryParameter>
{
    new QueryParameter("value1", "INT", 1),
    new QueryParameter("value2", "NVARCHAR(100)", "Test1")
};
SqlTask.ExecuteNonQuery(connectionManager, 
    $"INSERT INTO ParameterTest VALUES (@value1, @value2)", parameter);

Reading result sets

Scalar values

If you result set contains only one row with one column, you can use the ExecuteScalar methods to retrieve that value.

//without type conversion
object result = SqlTask.ExecuteScalar(connectionManager, 
    $@"SELECT 'Hallo Welt' AS ScalarResult");

//with type conversion
double? result = SqlTask.ExecuteScalar<double>(connectionManager,
    $@"SELECT CAST(1.343 AS NUMERIC(4,3)) AS ScalarResult"));

Result sets

Use the following code to read a result set from your database and store it in a List object. The table to read from would have two columns (ColumnA and ColumnB), and the object MyRow would have two properties (Col1 and Col2).

List<MyRow> result = new List<MyRow>();
MyRow CurRow = new MyRow();

SqlTask.ExecuteReader(connectionManager,
    "Test execute reader",
    $"SELECT ColumnA, ColumnB FROM ResultTable"
    , () => CurRow = new MyRow()                    //this is executed before each row
    , () => result.Add(CurRow)                      //this is execute after each row
    , colA => CurRow.Col1 = int.Parse(colA.ToString())
    , colB => CurRow.Col2 = (string)colB
    );

Bulk Inserts

Bulk inserts in ADO.NET normally need an object which implement IDataReader. Normally, you use a DataTable for this purpose. But as the implementation of the ADO.NET DataTable has a large overhead and comes with some performance downside, ETLBox provides it’s own object that implements IDataReader: TableData can be used to be passed to a bulk insert operation.

Here is an example for a bulk insert:

TableData<string[]> data = new TableData<string[]>(destTable.TableDefinition);
string[] values = { "1", "Test1" };
data.Rows.Add(values);
string[] values2 = { "2", "Test2" };
data.Rows.Add(values2);
string[] values3 = { "3", "Test3" };
data.Rows.Add(values3);

//Act
SqlTask.BulkInsert(connection, "Bulk insert demo data", data, "TableName");

Bulk deletes

You can use the BulkDelete method exposed on the SqlTask to perform a batch delete on the database. It allows you to delete data from a table by passing a TableData object which contains the columns names and the values of each column which you would like to delete. The SqlTask will then exceute a single DELETE statement on your database that removes all matching records in one execution.

Here is an example:

CREATE TABLE BulkDeleteTable(
  Id INT NOT NULL,
  Value1 VARCHAR(100) NULL;
  Value2 VARCHAR(100) NULL
)
INSERT INTO dest (Id,Value1,Value2) VALUES (1, 'A', 'Test1');
INSERT INTO dest (Id,Value1,Value2) VALUES (2, 'B' , 'Test2');
INSERT INTO dest (Id,Value1,Value2) VALUES (3, 'C', 'Test3');
INSERT INTO dest (Id,Value1,Value2) VALUES (4, 'D', 'Test4');
TableData data = new TableData(new TableDefinition() {
    Name = "BulkDeleteTable",
    Columns = new List<TableColumn>() {
                    new TableColumn() { Name = "Id"},
                    new TableColumn() { Name = "Value1"}
    }
});

var row1 = new object[] { 1, "A" };
var row2 = new object[] { 2, "B" };
var row2 = new object[] { 4, "X" }; //no match!
data.Rows.Add(row1);
data.Rows.Add(row2);

SqlTask.BulkDelete(SqlConnection, data);

This will delete the first two rows (with Id 1 and 2), because the value in the columns ‘Id’ and ‘Value1’ matched with the provided data.