Efficient Incremental Data Processing for Redshift Using Time Stamps, CDC, File Systems and MD5

Typical options for incremental logic

  1. Time stamp based incremental logic
  2. Version based incremental logic
  3. MD5 uniqueness for each row
  4. Change Data Capture CDC using Talend
  5. Using file system of AWS s3 and REDSHIFT computation
  6. Using file system of AWS s3 and Talend as processing engine

Time stamp based incremental logic

  • Use lastModified timestamp column from incoming SQL Server table to fetch records modified since last run.
  • Using Talend we would know what last run time stamp is or else get Max(lastModified) from redshift and query.
  • Example: SELECT * FROM source_table WHERE LastModified > @LastRunTime

Version based Incremental logic

  • Use version or sequence column to fetch records with higher versions or sequences.

MD5 Uniqueness for each row

  • Calculate MD5 for each row and during first time ingestion all data would be available in REDSHIFT. Compare the new MD5 and update the table.
  • Example: ALTER TABLE source_table ADD MD5Hash AS HASHBYTES(‘MD5’, CONCAT(column1, column2, column3, …))
  • SELECT primary_key, MD5Hash FROM source_table WHERE LastModified > @LastRunTime
  • On redshift side using something like this.
    • INSERT INTO target_table (primary_key, MD5Hash, column1, column2, column3, …) SELECT primary_key, MD5Hash, column1, column2, column3, … FROM staging_table ON CONFLICT (primary_key) DO UPDATE SET MD5Hash = EXCLUDED.MD5Hash, column1 = EXCLUDED.column1, column2 = EXCLUDED.column2, column3 = EXCLUDED.column3;

Change data capture CDC

  • utilize SQL server’s CDC feature to identify changed records or use Talend CDC – SED type1 or type2 for CDC

Using file system of AWS s3 and REDSHIFT computation

  • Using Amazon S3 files for ETL (Extract, Transform, Load) operations in Amazon Redshift is a common and efficient approach. S3 serves as a staging area where raw or transformed data is stored before loading it into Redshift. This process typically involves extracting data into S3, optionally transforming it, and then loading it into Redshift for analysis.
  • Using following command – will load data into temp table
    • COPY my_table FROM ‘s3://my-bucket/path/to/s3_file.csv’ IAM_ROLE ‘arn:aws:iam::account-id:role/MyRedshiftRole’ CSV IGNOREHEADER 1 DELIMITER ‘,’ TIMEFORMAT ‘auto’ TRUNCATECOLUMNS EMPTYASNULL DATEFORMAT ‘auto’;
  • Once data is available in temp table use insert from table_b to table_a
    • INSERT INTO table_a (columns…) SELECT b.columns… FROM staging_table b LEFT JOIN table_a a ON a.primary_key = b.primary_key WHERE a.primary_key IS NULL;
  • Update existing records
    • UPDATE table_a SET (column1, column2, …) = (SELECT b.column1, b.column2, … FROM staging_table b WHERE table_a.primary_key = b.primary_key) FROM staging_table b WHERE table_a.primary_key = b.primary_key AND (table_a.last_modified < b.last_modified);
  • Here the compute is happening on REDSHIFT

Using file system of AWS s3 and Talend as processing engine

  •  Using Amazon S3 files for ETL (Extract, Transform, Load) operations in Amazon Redshift is a common and efficient approach. S3 serves as a staging area where raw or transformed data is stored before loading it into Redshift. This process typically involves extracting data into S3, optionally transforming it, and then process data on talend remote engines and then upsert or insert.
  •  Using Talend redshift output using insert or update , update or insert concept to use Talend compute.
  • This approach will be slow if there are too many daily updates.
  • If daily updates are in 10,000 or 100,000 it should be fine, if it crosses more then approach should be changed.

One of recommended way – if you want to save money on AWS and using Talend ETL

  • Create a Redshift Table for Staging
    • CREATE TABLE staging_table ( primary_key INT, column1 VARCHAR, column2 VARCHAR, MD5Hash VARCHAR, — other columns PRIMARY KEY (primary_key) );
    • CREATE TABLE target_table ( primary_key INT, column1 VARCHAR, column2 VARCHAR, — other columns PRIMARY KEY (primary_key) );
  • Extract Data from S3
    • Load data to staging table
    • COPY staging_table FROM ‘s3://your-bucket/path/to/datafile.csv’ IAM_ROLE ‘arn:aws:iam::account-id:role/your-redshift-role’ CSV IGNOREHEADER 1 DELIMITER ‘,’;
    • UPDATE staging_table SET MD5Hash = MD5(CONCAT(column1, column2, …));
  • Compare and Merge Data
    • Detect New or Updated Records
      • SELECT s.primary_key, s.column1, s.column2, … FROM staging_table s LEFT JOIN target_table t ON s.primary_key = t.primary_key WHERE t.primary_key IS NULL — New records OR s.MD5Hash <> t.MD5Hash; — Updated records
    • Delete Records Not Present in Staging
      • DELETE FROM target_table USING staging_table s WHERE target_table.primary_key = s.primary_key AND s.primary_key IS NULL; — Records to be deleted
  • Automate the above process and schedule it.
Talend Best Practices

Feel free to reach out to us with any questions at : solutions@thinkartha.com