Flink CDC 读取MySQL的数据

发布时间:2023-12-16 14:30

1、前提背景准备

Flink在1.11之后就已经支持从MySQL增量读取Binlog日志的方式。

pom文件如下:


    2.11
    2.11.12
    1.12.0
    1.2.72
    1.18.6
    2.3.0





    
        com.github.shyiko
        mysql-binlog-connector-java
        0.21.0
    

    
        org.apache.kafka
        kafka-clients
        ${kafka.version}
    

    
        org.apache.flink
        flink-connector-kafka_2.11
        ${flink.version}
        
            
                log4j
                *
            
            
                org.slf4j
                slf4j-log4j12
            
        
    

    
        org.projectlombok
        lombok
        ${lombok.version}
        provided
    

    
        com.alibaba
        fastjson
        ${fastjson.verson}
    

    
        com.alibaba.ververica
        flink-connector-mysql-cdc
        1.4.0
    

    
        org.apache.flink
        flink-java
        ${flink.version}
        
        
            
                log4j
                *
            
            
                org.slf4j
                slf4j-log4j12
            
        
    

    
        org.apache.flink
        flink-streaming-java_${scala.binary.version}
        ${flink.version}
        
        
            
                log4j
                *
            
            
                org.slf4j
                slf4j-log4j12
            
            
                com.google.code.findbugs
                jsr305
            
            
                org.apache.flink
                force-shading
            
        
    

    
        org.apache.flink
        flink-clients_${scala.binary.version}
        ${flink.version}
        
    

    
        org.apache.flink
        flink-scala_${scala.binary.version}
        ${flink.version}
    

    
        org.apache.flink
        flink-streaming-scala_${scala.binary.version}
        ${flink.version}
        
            
                log4j
                *
            
            
                org.slf4j
                slf4j-log4j12
            
        
    

2、全量读取某个数据库中的所有库中的所有表的Binlog方式代码如下:

public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();

        // 配置 Debezium在初始化快照的时候(扫描历史数据的时候) =》 不要锁表
        properties.setProperty("debezium.snapshot.locking.mode", "none");

        env.enableCheckpointing(TimeUnit.SECONDS.toMillis(5));

        CheckpointConfig checkpointConfig = env.getCheckpointConfig();

        //最大同时存在的ck数 和设置的间隔时间有一个就行
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        //超时时间
        checkpointConfig.setCheckpointTimeout(TimeUnit.SECONDS.toMillis(5));
        //2.3 指定从CK自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 5000L));
        //2.4 设置任务关闭的时候保留最后一次CK数据
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        DebeziumSourceFunction MysqlSource = MySQLSource.builder()
                .hostname("100.21.112.11")
                .port(3306)
                .deserializer(new MyDeserializationSchema()) //去参数里面找找实现类
                .username("root")
                .password("xxxx")
                .startupOptions(StartupOptions.latest())// 读取binlog策略 这个启动选项有五种
                .debeziumProperties(properties) //配置不要锁表 但是数据一致性不是精准一次 会变成最少一次
                .build();

        /*
         *  .startupOptions(StartupOptions.latest()) 参数配置
         *  1.initial() 全量扫描并且继续读取最新的binlog 最佳实践是第一次使用这个
         *  2.earliest() 从binlog的开头开始读取 就是啥时候开的binlog就从啥时候读
         *  3.latest() 从最新的binlog开始读取
         *  4.specificOffset(String specificOffsetFile, int specificOffsetPos) 指定offset读取
         *  5.timestamp(long startupTimestampMillis) 指定时间戳读取
         * */

        env.addSource(MysqlSource).print();

        env.execute("flink-cdc");

    }


    public static class MyDeserializationSchema implements DebeziumDeserializationSchema {

        @Override
        public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
            Struct value = (Struct) sourceRecord.value();
            Struct after = value.getStruct("after");
            Struct source = value.getStruct("source");

            String db = source.getString("db");//库名
            String table = source.getString("table");//表名

            //获取操作类型 直接将参数穿进去 会自己解析出来 里面是个enum对应每个操作
            /* READ("r"),
               CREATE("c"),
                UPDATE("u"),
                DELETE("d");*/
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            String opstr = operation.toString().toLowerCase();
            //类型修正 会把insert识别成create
            if (opstr.equals("create")) {
                opstr = "insert";
            }

            //获取after结构体里面的表数据,封装成json输出
            JSONObject json1 = new JSONObject();
            JSONObject json2 = new JSONObject();
            //加个判空
            if (after != null) {
                List data = after.schema().fields(); //获取结构体
                for (Field field : data) {
                    String name = field.name(); //结构体的名字
                    Object value2 = after.get(field);//结构体的字段值
                    //放进json2里面去 json2放到json1里面去
                    json2.put(name, value2);
                }
            }


            //整理成大json串输出
            json1.put("db", db);
            json1.put("table", table);
            json1.put("data", json2);
            json1.put("type", opstr);

            collector.collect(json1.toJSONString());

        }

        @Override
        public TypeInformation getProducedType() {
            return TypeInformation.of(String.class);
        }
    }

3、全量读取某个数据库指定DB中的所有表

可以在build之前 ,添加一个

databaseList,用来指定特定的DB
public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();

        // 配置 Debezium在初始化快照的时候(扫描历史数据的时候) =》 不要锁表
        properties.setProperty("debezium.snapshot.locking.mode", "none");

        env.enableCheckpointing(TimeUnit.SECONDS.toMillis(5));

        CheckpointConfig checkpointConfig = env.getCheckpointConfig();

        //最大同时存在的ck数 和设置的间隔时间有一个就行
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        //超时时间
        checkpointConfig.setCheckpointTimeout(TimeUnit.SECONDS.toMillis(5));
        //2.3 指定从CK自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 5000L));
        //2.4 设置任务关闭的时候保留最后一次CK数据
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        DebeziumSourceFunction MysqlSource = MySQLSource.builder()
                .hostname("100.21.112.11")
                .port(3306)
                .deserializer(new MyDeserializationSchema()) //去参数里面找找实现类
                .username("root")
                .password("xxxx")
                .databaseList("horse") // 指定某个特定的库
                .startupOptions(StartupOptions.latest())// 读取binlog策略 这个启动选项有五种
                .debeziumProperties(properties) //配置不要锁表 但是数据一致性不是精准一次 会变成最少一次
                .build();

        /*
         *  .startupOptions(StartupOptions.latest()) 参数配置
         *  1.initial() 全量扫描并且继续读取最新的binlog 最佳实践是第一次使用这个
         *  2.earliest() 从binlog的开头开始读取 就是啥时候开的binlog就从啥时候读
         *  3.latest() 从最新的binlog开始读取
         *  4.specificOffset(String specificOffsetFile, int specificOffsetPos) 指定offset读取
         *  5.timestamp(long startupTimestampMillis) 指定时间戳读取
         * */

        env.addSource(MysqlSource).print();

        env.execute("flink-cdc");

    }


    public static class MyDeserializationSchema implements DebeziumDeserializationSchema {

        @Override
        public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
            Struct value = (Struct) sourceRecord.value();
            Struct after = value.getStruct("after");
            Struct source = value.getStruct("source");

            String db = source.getString("db");//库名
            String table = source.getString("table");//表名

            //获取操作类型 直接将参数穿进去 会自己解析出来 里面是个enum对应每个操作
            /* READ("r"),
               CREATE("c"),
                UPDATE("u"),
                DELETE("d");*/
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            String opstr = operation.toString().toLowerCase();
            //类型修正 会把insert识别成create
            if (opstr.equals("create")) {
                opstr = "insert";
            }

            //获取after结构体里面的表数据,封装成json输出
            JSONObject json1 = new JSONObject();
            JSONObject json2 = new JSONObject();
            //加个判空
            if (after != null) {
                List data = after.schema().fields(); //获取结构体
                for (Field field : data) {
                    String name = field.name(); //结构体的名字
                    Object value2 = after.get(field);//结构体的字段值
                    //放进json2里面去 json2放到json1里面去
                    json2.put(name, value2);
                }
            }


            //整理成大json串输出
            json1.put("db", db);
            json1.put("table", table);
            json1.put("data", json2);
            json1.put("type", opstr);

            collector.collect(json1.toJSONString());

        }

        @Override
        public TypeInformation getProducedType() {
            return TypeInformation.of(String.class);
        }
    }

4、全量读取某个数据库指定DB中的指定表

可以在build之前 ,添加一个

tableList,用来指定特定的DB中的特定表
public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();

        // 配置 Debezium在初始化快照的时候(扫描历史数据的时候) =》 不要锁表
        properties.setProperty("debezium.snapshot.locking.mode", "none");

        env.enableCheckpointing(TimeUnit.SECONDS.toMillis(5));

        CheckpointConfig checkpointConfig = env.getCheckpointConfig();

        //最大同时存在的ck数 和设置的间隔时间有一个就行
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        //超时时间
        checkpointConfig.setCheckpointTimeout(TimeUnit.SECONDS.toMillis(5));
        //2.3 指定从CK自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 5000L));
        //2.4 设置任务关闭的时候保留最后一次CK数据
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        DebeziumSourceFunction MysqlSource = MySQLSource.builder()
                .hostname("100.21.112.11")
                .port(3306)
                .deserializer(new MyDeserializationSchema()) //去参数里面找找实现类
                .username("root")
                .password("xxxx")
                .databaseList("horse") // 指定某个特定的库
                .tableList("horse.t_dri_info") //指定特定的表
                .startupOptions(StartupOptions.latest())// 读取binlog策略 这个启动选项有五种
                .debeziumProperties(properties) //配置不要锁表 但是数据一致性不是精准一次 会变成最少一次
                .build();

        /*
         *  .startupOptions(StartupOptions.latest()) 参数配置
         *  1.initial() 全量扫描并且继续读取最新的binlog 最佳实践是第一次使用这个
         *  2.earliest() 从binlog的开头开始读取 就是啥时候开的binlog就从啥时候读
         *  3.latest() 从最新的binlog开始读取
         *  4.specificOffset(String specificOffsetFile, int specificOffsetPos) 指定offset读取
         *  5.timestamp(long startupTimestampMillis) 指定时间戳读取
         * */

        env.addSource(MysqlSource).print();

        env.execute("flink-cdc");

    }


    public static class MyDeserializationSchema implements DebeziumDeserializationSchema {

        @Override
        public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
            Struct value = (Struct) sourceRecord.value();
            Struct after = value.getStruct("after");
            Struct source = value.getStruct("source");

            String db = source.getString("db");//库名
            String table = source.getString("table");//表名

            //获取操作类型 直接将参数穿进去 会自己解析出来 里面是个enum对应每个操作
            /* READ("r"),
               CREATE("c"),
                UPDATE("u"),
                DELETE("d");*/
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            String opstr = operation.toString().toLowerCase();
            //类型修正 会把insert识别成create
            if (opstr.equals("create")) {
                opstr = "insert";
            }

            //获取after结构体里面的表数据,封装成json输出
            JSONObject json1 = new JSONObject();
            JSONObject json2 = new JSONObject();
            //加个判空
            if (after != null) {
                List data = after.schema().fields(); //获取结构体
                for (Field field : data) {
                    String name = field.name(); //结构体的名字
                    Object value2 = after.get(field);//结构体的字段值
                    //放进json2里面去 json2放到json1里面去
                    json2.put(name, value2);
                }
            }


            //整理成大json串输出
            json1.put("db", db);
            json1.put("table", table);
            json1.put("data", json2);
            json1.put("type", opstr);

            collector.collect(json1.toJSONString());

        }

        @Override
        public TypeInformation getProducedType() {
            return TypeInformation.of(String.class);
        }
    }

ItVuer - 免责声明 - 关于我们 - 联系我们

本网站信息来源于互联网,如有侵权请联系:561261067@qq.com

桂ICP备16001015号