环境说明:
flink 1.15.2
mysql 版本5.7 注意:需要开启binlog,因为增量同步是基于binlog捕获数据
windows11 IDEA 本地运行
具体前提设置,请看这篇,包含 binlog 设置、Maven......
Flink CDC 基于mysql binlog 实时同步mysql表_彩虹豆的博客-CSDN博客
经过不懈努力,终于从阿里help页面找到了支持无主键同步的参数:
MySQL_实时计算 Flink版-阿里云帮助中心
然后就开始一顿模式,各种参数调试,终于达到了目的,无主键表实时同步,只不过在sink表关联目标表时,要指定几个字段为主键,这样就不会有重复的覆盖情况了,多给几个字段作为主键,不就避免重复冲突了嘛。比如id+date+local等,具体看表字段。
demo如下:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class MysqlToMysqlNonePrimaryKey { public static void main(String[] args) { //1.获取stream的执行环境 StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); senv.setParallelism(1); EnvironmentSettings settings = EnvironmentSettings.newInstance().build(); //2.创建表执行环境 StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv, settings); String sourceTable = "CREATE TABLE mysql_cdc_source (" + " id INT,\n" + " username STRING,\n" + " password STRING\n" + ") WITH (\n" + "'connector' = 'mysql-cdc',\n" + "'hostname' = 'localhost',\n" + "'port' = '3306',\n" + "'username' = 'root',\n" + "'password' = 'root',\n" + "'database-name' = 'test_cdc',\n" + "'debezium.snapshot.mode' = 'initial',\n" + "'scan.incremental.snapshot.enabled' = 'false',\n" + //如果开启增量快照,必须设置主键。 //默认开启增量快照。增量快照是一种读取全量数据快照的新机制。与旧的快照读取相比,增量快照有很多优点,包括: //读取全量数据时,Source可以是并行读取。 //读取全量数据时,Source支持chunk粒度的检查点。 //读取全量数据时,Source不需要获取全局读锁(FLUSH TABLES WITH read lock)。 //如果您希望Source支持并发读取,每个并发的Reader需要有一个唯一的服务器ID,因此server-id必须是5400-6400这样的范围,并且范围必须大于等于并发数。 "'scan.incremental.snapshot.chunk.key-column' = 'id' ,\n" + //可以指定某一列作为快照阶段切分分片的切分列。无主键表必填,选择的列必须是非空类型(NOT NULL)。 //有主键的表为选填,仅支持从主键中选择一列。 " 'table-name' = 'user'\n" + ")"; tEnv.executeSql(sourceTable); // tEnv.executeSql("select * from mysql_cdc_source").print(); String sinkTable = "CREATE TABLE mysql_cdc_sink (" + " id INT,\n" + " username STRING,\n" + " password STRING\n" + " ,PRIMARY KEY (id,username,password) NOT ENFORCED\n" + ") WITH (\n" + "'connector' = 'jdbc',\n" + "'driver' = 'com.mysql.cj.jdbc.Driver',\n" + "'url' = 'jdbc:mysql://localhost:3306/" + "test_cdc" + "?rewriteBatchedStatements=true',\n" + "'username' = 'root',\n" + "'password' = 'root',\n" + "'table-name' = 'user_new'\n" + ")"; tEnv.executeSql(sinkTable); tEnv.executeSql("insert into mysql_cdc_sink select id,username,password from mysql_cdc_source"); } }
由于无主键, debezium.snapshot.mode' = 'initial',这个参数会导致,程序运行几次,源表数据就会同步几次到目标表,并不会去重,如果想一直这个参数运行,需要在插入前先清空表,但是如果是数据量大的,推荐还是先用这个参数同步历史数据,完成后,再改为 schema_only,启动程序,然后把上面一个程序干掉。
上面设置的主键是三个字段,id、username、password,这三个字段不能为null,如果有数据为null,程序在启动的时候,就会报错,虽然没有打印到控制台上,但是可以看到控制台程序结束了,不是一直在运行,并且数据也是同步不过去的。所以挑选主键字段时一定要确定此字段一定不为null,如果为null的话,就需要能接受转换处理,比如:varchar 类型 将null值转换为空字符串
insert into mysql_cdc_sink select case when id is null then 0 else id end,case when username is null then '' else username end,case when password is null then '' else password end from mysql_cdc_source
具体如何处理,还看业务需求。不过,在数据同步时,尽量要做到不对数据做任何变动。如果是可以加入清洗,那就随便玩。
具体数据变化时同步的情况还需自行探索。
猜你喜欢
- 14天前(a级景区评定机构)全国A级旅游景区创建与提升培训班在敦煌市举办
- 14天前(希尔顿2021活动)希尔顿集团618盛夏大促开启
- 14天前(中旅酒店 维景)中旅酒店首次AI数字人直播亮相南京维景
- 14天前(瑞士大酒店-自助餐怎么样)瑞意心旅,以食为先 瑞士酒店开启全新"瑞士早餐计划"
- 14天前(七尚酒店百度百科)Lohkah七尚酒店首度开创充满新知的闽地研学旅程
- 14天前(上海迪士尼 夏天)酷爽夏日,奇妙相伴!来上海迪士尼度假区清凉入夏
- 14天前(福州“一县一桌菜”“两马乡宴”品鉴会圆满举办,马尾美食共叙血脉亲情)福州“一县一桌菜”“两马乡宴”品鉴会圆满举办,马尾美食共叙血脉亲情
- 14天前(锦江 iu)锦江荟APP原生鸿蒙版正式上线打造全场景旅行服务新体验
- 14天前(芜宣机场国际航班)新华丝路:芜宣机场开通至越南首都河内的国际货运航线
- 14天前(海南航空现况怎样)用一场直播找到市场扩张新渠道,海南航空做对了什么?
网友评论
- 搜索
- 最新文章
- (2020广州车展哈弗)你的猛龙 独一无二 哈弗猛龙广州车展闪耀登场
- (哈弗新能源suv2019款)智能科技颠覆出行体验 哈弗重塑新能源越野SUV价值认知
- (2021款全新哈弗h5自动四驱报价)新哈弗H5再赴保障之旅,无惧冰雪护航哈弗全民电四驱挑战赛
- (海南航空现况怎样)用一场直播找到市场扩张新渠道,海南航空做对了什么?
- (visa jcb 日本)优惠面面俱到 JCB信用卡邀您畅玩日本冰雪季
- (第三届“堡里有年味·回村过大年”民俗花灯会活动)第三届“堡里有年味·回村过大年”民俗花灯会活动
- (展示非遗魅力 长安启源助力铜梁龙舞出征)展示非遗魅力 长安启源助力铜梁龙舞出征
- (阿斯塔纳航空公司)阿斯塔纳航空机队飞机数量增至50架
- (北京香港航班动态查询)香港快运航空北京大兴新航线今日首航
- (我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉)我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉
- 热门文章