Merging & syncing tables

The following article describes how you can use the data from your data flow to insert new data in a destination table, update existing or delete removed records.

Syncing tables with Merge

ETLbox can be used to integrate data from different source and write them into different destinations. Most of the time you will use tables in a database as target. A very common use case here is to keep a source and destination tables “in sync”.

Please note that for the sake of simplification we use a Database table as source which we want to sync with a destination Database table. But of course the source doesn’t necessarily needs to be a table - every data that comes from any source or transformation in your data flow can be used as source for syncing.

Full or Delta

First, let’s differentiate 2 scenarios.

Scenario “Delta”: The source table has information about its changes. This can be e.g. a timestamp indicating when the record was inserted or updated. The source contains information about it’s changes, which is called “delta”. Getting delta information from the source (e.g. via Change Data Capture) can make data loads significantly faster.

Scenario “Full”: The source table does not have any information about its changes. So no delta information is provided by the source. The source is delivering data always in “full”.

Scenario “Delta” is a little bit more tricky when it comes to deletions. In scenario “Full” we always know which objects are currently existing and what data they have. Objects which are not delivered from the source don’t exist anymore. In the Delta scenario the handling of deletions is more problematic - there is no straight-forward solution how to manage deleted records here. A common approach would be a delta record which has a delete flag that indicates that this record is deleted. Or it could be that deletions are not transferred at all, and from time to time the a full load is needed to synchronize deletions.

DBMerge

Both scenarios are supported with ETLBox. The DBMerge component can be used to tackle this problem. The DBMerge is a destination component and is created on a destination table in your data flow. It will wait for all data from the flow to arrive, and then either insert, update or delete the data in the destination table. Deletion is optional, and can be be turned on or off by setting the proper MergeMode.

The following MergeModes are supported:

  • Delta: does inserts & updates, deletions only with flag
  • Full: does inserts, updates & deletions (deletions if record is missing)
  • InsertsAndUpdatesOnly: does inserts & updates only)
  • UpdatesOnly: only updates, no inserts nor deletions

Example

Data and object definition

To implement an example sync between two tables, we will need a DbSource pointing to our source table. In our case we just pass a table name for the source table, but you could also define a sql query (e.g. which gives you only the delta records). Also, any other data flow component can be used as source - either other sources or the transformations.

The source is then connected to the DBMerge, and data from the source will then be inserted, updated or deleted in the destination.

The DbMerge itself is a non generic class - but it does inherit from DBMerge. If you don’t use the generic class with a type, the default is DbMerge - the ExpandoObject is a dynamic object which can have properties added during runtime. If you decide to use your own Plain old component object (POCO) with the DbMerge, it will expect it that it implements the interface IMergeableRow. This interface needs to have a ChangeDate and ChangeAction defines on your object, as well a UniqueId property to describe how objects are compared.

The easiest (and recommended) way to implement the interface on a POCO is to inherit from the class MergeableRow. You will automatically have all the necessary implementation details to pass the object to a DbMerge. Only three things are left to do here:

  1. You need to flag the properties that identify the unique Id columns with the attribute IdColumn
  2. Optional: You can flag the properties used when comparing the values of a record to decide if it really needs to be updated with the attribute CompareColumn. Otherwise all non-id columns are used for comparison.
  3. Optional: You can flag the properties that you want to have updated in the target with the UpdateColumn attribute. If this is omitted, all non-Id columns are updated.

If you use an ExpandoObject, you can’t use attributes. Instead you can set the MergeProperties.IdColumns, MergeProperties.CompareColumns and optionally the MergeProperties.UpdateColumns directly.

Let’s start with a simple object, that has a property that should contain the key column (the id) and one property for holding a value:

public class MyMergeRow : MergeableRow
{
    [IdColumn]
    public int Key { get; set; }

    [CompareColumn]
    public string Value { get; set; }
}

In our scenario we have a source table that would look like this:

KeyValue
1Test - Insert
2Test - Update
3Test - Exists

And the destination table would like this:

KeyValue
2XXX
3Test - Exists
4Test - Deleted

Setting up the data flow

No we can already set up a data flow. It would look like this:

DbSource<MyMergeRow> source = new DbSource<MyMergeRow>(connection, "SourceTable");
DbMerge<MyMergeRow> merge = new DbMerge<MyMergeRow>(connection, "DestinationTable");
merge.DeltaMode = DeltaMode.Full;
source.LinkTo(merge);
Network.Execute(source);

In this example, we will start the scenario “Full”. That means that we will load all data from the source, and expect the merge to delete records that aren’t delivered.

Now what happens if we let this flow run? First, all records will be loaded from the destination into a memory object and compared with the source data. Within the memory object, the DBMerge will identify:

  • which records need to inserted (ChangeAction: Insert)
  • which records need to be updated (ChangeAction: Update)
  • which records exists and doesn’t need to be updated (ChangeAction: Exists)
  • which record needs to be deleted (ChangeAction: Delete), if deletions are allowed

To identify these different options, the IdColumn is used. In our example the id column is a unique primary key, and it is recommended to only use unique columns for that.

As you can see, there is a difference between an update and an existing records that doesn’t need to be updated. All records where the IdColumns match will be examined based on their value. All properties marked with the CompareColumn attribute are compared. If one property/columns differs, the record will be marked to be updated. If they are all equal, the record won’t be touched on the database and marked as Existing.

After this comparison is done, it will start to write the changes into the databases (in batches) First, it will update records flagged as Update in the destination table.. This will be performed batch wise in a bulk operation. Then it will delete all records which are flagged as Deletion in the destination table. This will also be performed in batches. Finally, the new records are written (in batches) into the destination table. Records that doesn’t need to be updated are left untouched in the destination.

In our example after doing the DBMerge, our destination table now looks like this:

KeyValue
1Test - Insert
2Test - Update
3Test - Exists

Please note that the record with Id 4 is now deleted. If the MergeMode would have been MergeMode.Delta (which is the default), this entry would still be in the target table.

Delete Column

Let’s assume for our example above the we used the MergeMode Delta. Now we would still end up with the record #4:

KeyValue
4Test - Deleted

How can we do deletions in a delta scenario? If you want to also have deletions executed when loading delta from the source, you can use the deletion attribute in your object.

E.g. consider we would add the following property to MyMergeRow

    [DeleteColumn("YES")]
    public string DoDelete { get; set; }

When executing the deletions, the DbMerge will check the value of the property DoDelete. If there is a matching entry in the target table (which is identified by the IdColumn attribute), then it will first check the value of the property and compare it to the expected value (which is “YES” in our example). If DoDelete does hold the value “YES”, then it will delete the record in the target. If DoDelete is null or holds any other string value, then no deletion would be executed.

Of course this will also work with boolean values or any other object types. Here is another example:

public class MyMergeRow : MergeableRow
{
    [IdColumn]
    public long Key { get; set; }
    [CompareColumn]
    public string Value { get; set; }
    [DeleteColumn(true)]
    public bool DeleteThisRow { get; set; }
}

In this example object, if the property DeleteThisRow is set to true, the record in the destination will be deleted if there is a already matching record in the destination table.

Cache Mode

By default, the DbMerge will always load all data from the destination into memory first. If you want to avoid this, e.g. because your target table is quite big, consider to set the CacheMode to CacheMode.Partial.

DbMerge<MyMergeRow> merge = new DbMerge<MyMergeRow>(connection, "DestinationTable");
merge.CacheMode = CacheMode.Partial;

Now data from the destination is only loaded into memory for the records that are currently processed.

Delta table

The DBMerge has a property DeltaTable which is a List containing additionally information what records where updated, existing, inserted or deleted. The operation and change-date is stored in the corresponding ChangeDate/ ChangeAction properties.

This delta table can be accessed if the DbMerge is not treated as as source, but like a transformation. If the DbMerge is linked to other components, it will write the delta records into it’s output.

In our example, it would contain the information, that 1 row was inserted (Key: 1) , 1 was updated (Key: 2), one column wasn’t changed (Key:3) and one column was deleted (Key: 4).

This information can be used as a source for further processing in the data flow, simple by connecting the DBMerge to a transformation or another Destination. So our complete flow could look like this:

DbSource<MyMergeRow> source = new DbSource<MyMergeRow>(connection, "SourceTable");
DBMerge<MyMergeRow> merge = new DBMerge<MyMergeRow>(connection, "DestinationTable");
DbDestination<MyMergeRow> delta = new DbDestination<MyMergeRow>(connection, "DeltaTable");
source.LinkTo(merge);
merge.LinkTo(delta);
Network.Execute(source);

The DeltaTable now will look like this:

KeyChangeDateChangeAction
12019-01-01 12:00:01Insert (1)
22019-01-01 12:00:02Update (2)
32019-01-01 12:00:02Exists (0)
42019-01-01 12:00:03Delete (3)

Additional configurations

Truncate instead delete

Because the DBMerge does delete records that need to be deleted or updated using a bulk delete sql statement, this method can sometimes be a performance bottleneck if you expect a lot of deletions to happen. The DbMerge does support a Truncate-approach by setting the property UseTruncateMethod to true. It will then read all existing data from the destination into the memory, identify the changes, truncate the destination table and then write all changes back into the database. This approach can be much faster if you expect a lot of deletions, but you will always need to read all data from the destination table and write it back. The CacheMode Partial won’t work if you use the truncate method. Also, the Truncate method is only allowed for MergeMode “Full”.

DbMerge<MyMergeRow> merge = new DbMerge<MyMergeRow>(connection, "DestinationTable");
merge.UseTruncateMethod = true;

ColumnMap attribute

If the columns have different names than our property, we need to add the ColumnMap attribute to have them mapped correctly. If the columns would be named Col1 for the Key and Col2 for the Value, our object would look like this:

public class MyMergeRow : MergeableRow
{
    [IdColumn]
    [ColumnMap("Col1")]
    public int Key { get; set; }

    [CompareColumn]
    [ColumnMap("Col1")]
    public string Value { get; set; }
}

For dynamic objects, you can define your column mappings via the ColumnMapping property.

Composite Keys

Composite keys are supported: just flag all the columns that you want to use as composite unique key with the IdColumn attribute. Internally, all properties are concatenated to a string by using the ToString() method of the properties. This concatenated string is then used as identifier for each record.

public class MyMergeRow : MergeableRow
{
    [IdColumn]
    public long ColKey1 { get; set; }

    [IdColumn]
    public string ColKey2 { get; set; }

    [CompareColumn]
    public string ColValue1 { get; set; }

    [CompareColumn]
    public string ColValue2 { get; set; }
}

As you can see, you can also use the CompareColumn attribute on each property that you want to use for identifying existing records.

Using the IMergabeRow interface

Sometimes, you want do the implementation of the IMergeableRow interface yourself. Here is an example implementation:

public class MySimpleRow : IMergeableRow
{
    [IdColumn]
    public long Key { get; set; }
    public string Value { get; set; }
    public DateTime ChangeDate { get; set; }
    public ChangeAction? ChangeAction { get; set; }
    public string UniqueId => Key.ToString();
}

Overwriting the Equals method and using the IdColumn attribute is optional. If no IdColumn is passed, then you would need to use the UseTruncateMethod.

If you use the ExpanoObject, the properties ChangeDate and ChangeAction are appended automatically. You will need to pass at least one IdColumn to the MergeProperties.IdColumns when using the dynamic approach.