How can you efficiently synchronize MySQL data to S3file with Apache SeaTunnel? All steps detailed here have been fully tested and are suitable for building a data‑platform scenario based on object storage. They offer flexible deployment and strong scalability, making this guide highly valuable for users with MySQL‑to‑S3 integration needs. Give it a like, save it, and start learning!
Step 1: Create the Hive Table
CREATE EXTERNAL TABLE ods_ekp.`ods_sys_notify_todo_bak` (
`fd_id` STRING,
`fd_app_name` STRING,
`fd_model_name` STRING,
`fd_model_id` STRING,
`fd_key` STRING,
`fd_parameter1` STRING,
`fd_parameter2` STRING,
`fd_create_time` TIMESTAMP,
`fd_subject` STRING,
`fd_type` INT,
`fd_link` STRING,
`fd_mobile_link` STRING,
`fd_pad_link` STRING,
`fd_bundle` STRING,
`fd_replace_text` STRING,
`fd_md5` STRING,
`fd_del_flag` STRING,
`fd_level` INT,
`doc_creator_id` STRING,
`fd_extend_content` STRING,
`fd_lang` STRING,
`fd_cate_name` STRING,
`fd_cate_id` STRING,
`fd_template_name` STRING,
`fd_template_id` STRING,
`fd_hierarchy_id` STRING
)
COMMENT 'sys_notify_todo_bak data'
PARTITIONED BY (
`dctime` STRING COMMENT 'partition year‑month‑day'
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'
STORED AS PARQUET
LOCATION 's3a://seatunnel/doris/warehouse/ods_ekp/ods_sys_notify_todo_bak'
TBLPROPERTIES (
'parquet.compression'='ZSTD'
);
Notes:
- The delimiter setting
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'
must be configured identically later in SeaTunnel; otherwise the format will be wrong. - The compression algorithm
'parquet.compression'='ZSTD'
also needs the same configuration later in SeaTunnel. - The file format
STORED AS PARQUET
likewise must match the later SeaTunnel configuration.
Before use, remove the comments
env {
job.mode = "BATCH"
parallelism = 2
}
source {
Jdbc {
url = "jdbc:mysql://[server‑ip]:3306/[database]?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "[username]"
password = "[password]",
# dctime must be converted to string, because in the Hive table this field is a string; include the partition field in the query—SeaTunnel will handle it automatically in the sink
query = "select fd_id, fd_app_name, fd_model_name, fd_model_id, fd_key, fd_parameter1, fd_parameter2, fd_create_time, fd_subject, fd_type, fd_link, fd_mobile_link, fd_pad_link, fd_bundle, fd_replace_text, fd_md5, fd_del_flag, fd_level, doc_creator_id, fd_extend_content, fd_lang, fd_cate_name, fd_cate_id, fd_template_name, fd_template_id, fd_hierarchy_id, cast(date_format(fd_create_time, '%Y-%m-%d') as char) as dctime from sys_notify_todo_bak"
}
}
transform {
}
sink {
S3File {
bucket = "s3a://seatunnel"
fs.s3a.endpoint = "[minio‑host/ip]:9000"
access_key = "[username]"
secret_key = "[password]"
fs.s3a.aws.credentials.provider = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
# directory path
path = "/doris/warehouse/ods_ekp/ods_sys_notify_todo_bak"
tmp_path = "/data/tmp/seatunnel"
# Mandatory; because my MinIO is not SSL‑enabled, set as follows
hadoop_s3_properties {
"fs.s3a.connection.ssl.enabled" = "false"
"fs.s3a.path.style.access" = "true"
}
# Parquet file format
file_format_type = "parquet"
# Must use \\ to represent \
field_delimiter = "\\001"
# Required for Parquet or it will fail
parquet_avro_write_timestamp_as_int96 = true
# Compression algorithm
compress_codec = "zstd"
have_partition = true
partition_by = ["dctime"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = false
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode = "APPEND_DATA"
custom_filename = true
file_name_expression = "${transactionId}_${now}"
filename_time_format = "yyyy.MM.dd"
}
}
Step 2: Run the SeaTunnel Sync and Execute in Hive
-- Refresh the physical directory mapping
MSCK REPAIR TABLE ods_ekp.ods_sys_notify_todo_bak;
-- Query the Hive table to confirm data
SELECT * FROM ods_ekp.ods_sys_notify_todo_bak LIMIT 100;
Step 3: Create the Doris Hive Catalog External Database
CREATE CATALOG hive PROPERTIES (
'type' = 'hms',
'hive.metastore.uris' = 'thrift://[hive‑metastore‑ip]:9083',
"s3.endpoint" = "http://[minio‑host/ip]:9000",
"s3.region" = "us-east-1",
"s3.access_key" = "[username]",
"s3.secret_key" = "[password]",
"s3.connection.ssl.enabled" = "false",
"use_path_style" = "true",
"hive.version" = '2.1.1'
);
REFRESH CATALOG hive;
SHOW DATABASES FROM hive;
SELECT * FROM hive.ods_ekp.ods_sys_notify_todo_bak LIMIT 100;
Explanation:
- Because I am using CDH 6.3.2 and Hive 2.1.1, you need to specify
"hive.version" = '2.1.1'
when creating the catalog. - Since my MinIO is not SSL‑enabled, configure
"s3.connection.ssl.enabled" = "false"
. - MinIO uses path‑style addressing, so set
"use_path_style" = "true"
. - SeaTunnel version: 2.3.11
- Doris version: 2.0.15