RowKindExtractor is a transformation plugin in Apache SeaTunnel that can convert CDC data streams to Append-Only mode and extract the original RowKind information as a new field. This article introduces the core functionalities of RowKindExtractor, its usage in CDC data synchronization scenarios, configuration options, precautions, and multiple application examples.

RowKindExtractor

The RowKindExtractor transformation plugin is used to convert CDC (Change Data Capture) data streams into Append-Only mode while extracting the original RowKind information into a new field.

Core Functions:

Why is this plugin needed?

In CDC data synchronization scenarios, each data row carries a RowKind flag (+I-U+U-D), representing different types of changes. Some downstream systems (such as data lakes or analytical platforms) only support Append-Only mode and do not support UPDATE or DELETE operations. In such cases, it is necessary to:

  1. Convert all rows to INSERT type (Append-Only).
  2. Store the original change type in a regular field for downstream analysis.

Transformation Example:

Input (CDC data):
 RowKind: -D (DELETE)
 Data: id=1, name="test1", age=20

Output (Append-Only data):
 RowKind: +I (INSERT)
 Data: id=1, name="test1", age=20, row_kind="DELETE"

Typical Use Cases:

Configuration Options

Parameter NameTypeRequiredDefault ValueDescription
custom_field_namestringNorow_kindName of the new field used to store the original RowKind information
transform_typeenumNoSHORTOutput format of RowKind; possible values: SHORT (short format) or FULL (full format)

custom_field_name [string]

Specifies the name of the new field used to store the original RowKind information.

Default: row_kind

Notes:

Example:

custom_field_name = "operation_type"  # Use a custom field name

transform_type [enum]

Specifies the output format of the RowKind field value.

Options:

FormatDescriptionOutput Values
SHORTShort format (symbolic)+I, -U, +U, -D
FULLFull format (English name)INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE

Default: SHORT

Meaning of each value

RowKind TypeSHORT FormatFULL FormatDescription
INSERT+IINSERTInsert operation
UPDATE_BEFORE-UUPDATE_BEFOREValue before update
UPDATE_AFTER+UUPDATE_AFTERValue after update
DELETE-DDELETEDelete operation

Usage Recommendation:

Example:

transform_type = FULL  # Use full format

Full Examples

Example 1: Using Default Configuration (SHORT Format)

Using default configuration, CDC data is converted to Append-Only mode, with RowKind saved in short format.

env {
  parallelism = 1
  job.mode = "STREAMING"
}

source {
  MySQL-CDC {
    plugin_output = "cdc_source"
    server-id = 5652
    username = "root"
    password = "your_password"
    table-names = ["mydb.users"]
    url = "jdbc:mysql://localhost:3306/mydb"
  }
}

transform {
  RowKindExtractor {
    plugin_input = "cdc_source"
    plugin_output = "append_only_data"
    # Default configuration:
    # custom_field_name = "row_kind"
    # transform_type = SHORT
  }
}

sink {
  Console {
    plugin_input = "append_only_data"
  }
}

Data Transformation Process:

Input Data (CDC Format):
 1. RowKind=+I, id=1, name="Zhang San", age=25
 2. RowKind=-U, id=1, name="Zhang San", age=25
 3. RowKind=+U, id=1, name="Zhang San", age=26
 4. RowKind=-D, id=1, name="Zhang San", age=26

Output Data (Append-Only Format):
 1. RowKind=+I, id=1, name="Zhang San", age=25, row_kind="+I"
 2. RowKind=+I, id=1, name="Zhang San", age=25, row_kind="-U"
 3. RowKind=+I, id=1, name="Zhang San", age=26, row_kind="+U"
 4. RowKind=+I, id=1, name="Zhang San", age=26, row_kind="-D"

Example 2: Using FULL Format with Custom Field Name

Output RowKind in full format and use a custom field name.

env {
  parallelism = 1
  job.mode = "STREAMING"
}

source {
  MySQL-CDC {
    plugin_output = "cdc_source"
    server-id = 5652
    username = "root"
    password = "your_password"
    table-names = ["mydb.orders"]
    url = "jdbc:mysql://localhost:3306/mydb"
  }
}

transform {
  RowKindExtractor {
    plugin_input = "cdc_source"
    plugin_output = "append_only_data"
    custom_field_name = "operation_type"  # Custom field name
    transform_type = FULL                 # Use full format
  }
}

sink {
  Iceberg {
    plugin_input = "append_only_data"
    catalog_name = "iceberg_catalog"
    database = "mydb"
    table = "orders_history"
    # The Iceberg table will include the operation_type field to record the change type of each row
  }
}

Data Transformation Process:

Input Data (CDC Format):
 1. RowKind=+I, order_id=1001, amount=100.00
 2. RowKind=-U, order_id=1001, amount=100.00
 3. RowKind=+U, order_id=1001, amount=150.00
 4. RowKind=-D, order_id=1001, amount=150.00

Output Data (Append-Only Format, FULL):
 1. RowKind=+I, order_id=1001, amount=100.00, operation_type="INSERT"
 2. RowKind=+I, order_id=1001, amount=100.00, operation_type="UPDATE_BEFORE"
 3. RowKind=+I, order_id=1001, amount=150.00, operation_type="UPDATE_AFTER"
 4. RowKind=+I, order_id=1001, amount=150.00, operation_type="DELETE"

Example 3: Full Test Using FakeSource

Generate test data using FakeSource to demonstrate transformation of various RowKind values.

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    plugin_output = "fake_cdc_data"
    schema = {
      fields {
        pk_id = bigint
        name = string
        score = int
      }
      primaryKey {
        name = "pk_id"
        columnNames = [pk_id]
      }
    }
    rows = [
      { kind = INSERT, fields = [1, "A", 100] },
      { kind = INSERT, fields = [2, "B", 100] },
      { kind = UPDATE_BEFORE, fields = [1, "A", 100] },
      { kind = UPDATE_AFTER, fields = [1, "A_updated", 95] },
      { kind = UPDATE_BEFORE, fields = [2, "B", 100] },
      { kind = UPDATE_AFTER, fields = [2, "B_updated", 98] },
      { kind = DELETE, fields = [1, "A_updated", 95] }
    ]
  }
}

transform {
  RowKindExtractor {
    plugin_input = "fake_cdc_data"
    plugin_output = "transformed_data"
    custom_field_name = "change_type"
    transform_type = FULL
  }
}

sink {
  Console {
    plugin_input = "transformed_data"
  }
}

Expected Output:

+I, pk_id=1, name="A", score=100, change_type="INSERT"
+I, pk_id=2, name="B", score=100, change_type="INSERT"
+I, pk_id=1, name="A", score=100, change_type="UPDATE_BEFORE"
+I, pk_id=1, name="A_updated", score=95, change_type="UPDATE_AFTER"
+I, pk_id=2, name="B", score=100, change_type="UPDATE_BEFORE"
+I, pk_id=2, name="B_updated", score=98, change_type="UPDATE_AFTER"
+I, pk_id=1, name="A_updated", score=95, change_type="DELETE"