基于flinkcdc官网的Streaming ELT from MySQL to Doris的例子实践观察cdc在捕获mysql的变更时生成的Debezium JSON格式
搭建步骤
docker-compose.yml
yaml自动换行:关放大阅读展开代码version: '2.1' services: mysql: image: debezium/example-mysql:1.1 ports: - "3306:3306" environment: - MYSQL_ROOT_PASSWORD=123456 - MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpw
bash自动换行:关放大阅读展开代码docker-compose up -d docker-compose exec mysql mysql -uroot -p123456 -- create database CREATE DATABASE app_db; USE app_db; -- create orders table CREATE TABLE `orders` ( `id` INT NOT NULL, `price` DECIMAL(10,2) NOT NULL, PRIMARY KEY (`id`) ); -- insert records INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00); INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00); -- create shipments table CREATE TABLE `shipments` ( `id` INT NOT NULL, `city` VARCHAR(255) NOT NULL, PRIMARY KEY (`id`) ); -- insert records INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing'); INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian'); -- create products table CREATE TABLE `products` ( `id` INT NOT NULL, `product` VARCHAR(255) NOT NULL, PRIMARY KEY (`id`) ); -- insert records INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer'); INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap'); INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');
获取源码,选择master分支
bash自动换行:关放大阅读展开代码git clone https://github.com/apache/flink-cdc.git
在flink-connector-mysql-cdc模块的test目录下创建测试类:
java自动换行:关放大阅读展开代码package org.apache.flink.cdc.connectors.mysql.debug; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.cdc.connectors.mysql.source.MySqlSource; import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * Minimal IntelliJ entry to observe MySQL CDC JSON events directly. * * <p>This uses the MySQL connector API directly, without Doris and without an external Flink * cluster. Every captured record is printed to the IDEA Run console as Debezium JSON. */ public class MysqlChangeJsonDebugMain { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(3000L); MySqlSource<String> source = MySqlSource.<String>builder() .hostname(get("mysql.hostname", "127.0.0.1")) .port(Integer.parseInt(get("mysql.port", "3306"))) .databaseList(get("mysql.database.list", "app_db")) .tableList( get( "mysql.table.include.list", "app_db.orders,app_db.products,app_db.shipments") .split(",")) .username(get("mysql.username", "root")) .password(get("mysql.password", "123456")) .serverId(get("mysql.server.id", "5401")) .serverTimeZone(get("mysql.server.time.zone", "UTC")) .startupOptions(resolveStartup(get("mysql.startup.mode", "latest"))) .includeSchemaChanges(Boolean.parseBoolean(get("mysql.include.schema.changes", "true"))) .deserializer(new JsonDebeziumDeserializationSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "mysql-cdc-debug") .setParallelism(1) .print() .setParallelism(1); env.execute("Print MySQL CDC JSON Events"); } private static String get(String key, String defaultValue) { return System.getProperty(key, defaultValue); } private static StartupOptions resolveStartup(String mode) { switch (mode.toLowerCase()) { case "initial": return StartupOptions.initial(); case "earliest": return StartupOptions.earliest(); case "latest": default: return StartupOptions.latest(); } } }
启动测试类:
执行新增
sql自动换行:关放大阅读展开代码INSERT INTO app_db.orders (id, price) VALUES (4, 100.00);
观察测试类日志:
json自动换行:关放大阅读展开代码{ "before": null, "after": { "id": 4, "price": "JxA=" }, "source": { "version": "1.9.8.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1775561073000, "snapshot": "false", "db": "app_db", "sequence": null, "table": "orders", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 6005, "row": 0, "thread": 6, "query": null }, "op": "c", "ts_ms": 1775561073408, "transaction": null }
执行修改,如果修改语句不会对数据产生任何变更,则不会捕捉该操作
sql自动换行:关放大阅读展开代码UPDATE app_db.orders SET price=200.00 WHERE id=4;
json自动换行:关放大阅读展开代码{ "before": { "id": 4, "price": "JxA=" }, "after": { "id": 4, "price": "TiA=" }, "source": { "version": "1.9.8.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1775561316000, "snapshot": "false", "db": "app_db", "sequence": null, "table": "orders", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 6812, "row": 0, "thread": 6, "query": null }, "op": "u", "ts_ms": 1775561316581, "transaction": null }
execute delete record:
sql自动换行:关放大阅读展开代码DELETE FROM app_db.orders WHERE id=4;
json自动换行:关放大阅读展开代码{ "before": { "id": 4, "price": "TiA=" }, "after": null, "source": { "version": "1.9.8.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1775561440000, "snapshot": "false", "db": "app_db", "sequence": null, "table": "orders", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 7092, "row": 0, "thread": 6, "query": null }, "op": "d", "ts_ms": 1775561440920, "transaction": null }
execute table schema change:
sql自动换行:关放大阅读展开代码alter table orders add order_time datetime not null;
json自动换行:关放大阅读展开代码{ "source": { "version": "1.9.8.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1775561634829, "snapshot": "false", "db": "app_db", "sequence": null, "table": "orders", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 7233, "row": 0, "thread": null, "query": null }, "historyRecord": "{\"source\":{\"file\":\"mysql-bin.000003\",\"pos\":7233,\"server_id\":223344},\"position\":{\"transaction_id\":null,\"ts_sec\":1775561634,\"file\":\"mysql-bin.000003\",\"pos\":7406,\"server_id\":223344},\"databaseName\":\"app_db\",\"ddl\":\"alter table orders\\n add order_time datetime not null\",\"tableChanges\":[{\"type\":\"ALTER\",\"id\":\"\\\"app_db\\\".\\\"orders\\\"\",\"table\":{\"defaultCharsetName\":\"latin1\",\"primaryKeyColumnNames\":[\"id\"],\"columns\":[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"length\":11,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"price\",\"jdbcType\":3,\"typeName\":\"DECIMAL\",\"typeExpression\":\"DECIMAL\",\"charsetName\":null,\"length\":10,\"scale\":2,\"position\":2,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"order_time\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"position\":3,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]}]},\"comment\":null}]}" }
本文作者:hedeoer
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!