要做什么

因为项目需要异地对达梦数据库进行实时数据同步,需要采用一个免费的方案,所以采用 达梦8 Flink CDC方案。

在我的场景中,需要将20个子节点的数据同步到一个中心节点,且子节点与中心节点的网络并不稳定。因此有如下设计:

子节点1 (DM) ──► Flink CDC Job1 ──► Kafka Topic (node1)
子节点2 (DM) ──► Flink CDC Job2 ──► Kafka Topic (node2)
...
子节点20 (DM) ──► Flink CDC Job20 ──► Kafka Topic (node20)

Kafka ──► Flink Merge Job (1 个) ──► 顶级节点 (DM)

简单来说,流程如下:

1.每个子节点都有自己的 达梦8 数据库,在达梦数据库中开启归档模式以支持CDC;

2.每个子节点单独配置一个Flink CDC Job,从达梦8 数据库中读取数据库变化记录;将读取到的记录转成 SQL 语句,随后写入 Kafka中;

3.顶级节点从Kafka中订阅记录并通过 JDBC 写入自己的数据库中;

怎么做

按照上面说的流程,分为三个大的步骤:

达梦8开启归档模式

  • 使用SYSDBA用户开启归档模式
-- 1. 切换数据库到 MOUNT 状态
ALTER DATABASE MOUNT;

-- 2. 开启归档模式
ALTER DATABASE ARCHIVELOG;

-- 3. 添加一个本地归档配置(目录、文件大小、空间限制请按需修改, DEST指定的目录需要先创建好,并且这个目录数据库用户要有写入权限)
ALTER DATABASE ADD ARCHIVELOG 'DEST = /dm8/arch, TYPE = LOCAL, FILE_SIZE = 1024, SPACE_LIMIT = 20480';

-- 4. 切回 OPEN 状态
ALTER DATABASE OPEN;
  • 验证是否成功
-- 以 SYSDBA 执行
SELECT para_name, para_value 
FROM v$dm_ini 
WHERE para_name IN ('ARCH_INI', 'RLOG_APPEND_LOGIC');

查询结果应该是这样的:

LINEID PARA_NAME PARA_VALUE

---------- ----------------- ----------

1 RLOG_APPEND_LOGIC 2
2 ARCH_INI 1

查出来的两个结果应该都不为 0 ,如果你查出来的值为0 ,可以通过以下 SQL 更改:

-- 1. 切换数据库到 MOUNT 状态
ALTER DATABASE MOUNT;

-- 2. 设置(推荐值,多数教程一致)
SP_SET_PARA_VALUE(1, 'ARCH_INI', 1);
SP_SET_PARA_VALUE(1, 'RLOG_APPEND_LOGIC', 2);   -- 1 或 2 均可,2 更全面支持 UPDATE/DELETE

-- 3. 切回 OPEN 状态
ALTER DATABASE OPEN;

特别注意:

有的文章中只是用 select arch_mode from v$database; 来验证是否开启了归档模式。当 arch_mode 值为 Y 时,即代码开启了归档模式,其可以实现对历史数据的记录归档,但是如果此时上面写到的 RLOG_APPEND_LOGIC 值是 0,则 UPDATE、DELETE 等语句实际没有被记录到。这也是很多人反馈更新记录没有获取到的原因。

搭建 Flink +Kafka 环境(docker)

环境搭建我们采用最简单的方式进行,直接使用 docker-compose一键启动即可!

要的资源不多,在本地的4G的虚拟机上也能正常启动,唯一到担心的就是你的网络能不能顺利的把镜像拉下来

docker-compose.yaml

注意:启动前把里面 kafka 的配置里面 IP改为自己服务器的IP,目前我写的是 192.168.20.220,只有一个地方

version: "3.8"

services:
  # ==================== Flink JobManager ====================
  jobmanager:
    image: flink:1.20.3-scala_2.12-java17
    container_name: flink-jobmanager
    hostname: jobmanager
    restart: unless-stopped
    ports:
      - "8081:8081"
      - "6123:6123"
    command: jobmanager
    environment:
      TZ: Asia/Shanghai
      FLINK_JOBMANAGER_RPC_ADDRESS: jobmanager
      FLINK_JOBMANAGER_RPC_PORT: 6123
      FLINK_PROPERTIES: |
        jobmanager.rpc.address: jobmanager
        jobmanager.rpc.port: 6123
        jobmanager.heap.size: 1024m
        taskmanager.numberOfTaskSlots: 4
        parallelism.default: 4
        state.backend: rocksdb
        state.checkpoints.dir: file:///opt/flink/checkpoints
    volumes:
      - ./flink-logs:/opt/flink/log
      - ./checkpoints:/opt/flink/checkpoints
      - ./jars:/opt/flink/usrlib
      - /etc/localtime:/etc/localtime:ro
    networks:
      - flink-net

  # ==================== Flink TaskManager ====================
  taskmanager:
    image: flink:1.20.3-scala_2.12-java17
    hostname: taskmanager
    restart: unless-stopped
    deploy:
      replicas: 2
    depends_on:
      - jobmanager
    command: taskmanager
    environment:
      TZ: Asia/Shanghai
      FLINK_JOBMANAGER_RPC_ADDRESS: jobmanager
      FLINK_JOBMANAGER_RPC_PORT: 6123
      FLINK_PROPERTIES: |
        jobmanager.rpc.address: jobmanager
        jobmanager.rpc.port: 6123
        taskmanager.heap.size: 1536m          
        taskmanager.numberOfTaskSlots: 4
        taskmanager.memory.process.size: 2048m  
    volumes:
      - ./flink-logs:/opt/flink/log
      - ./jars:/opt/flink/usrlib
      - /etc/localtime:/etc/localtime:ro
    networks:
      - flink-net

  # ==================== Kafka ====================
  kafka:
    image: apache/kafka:3.8.0
    container_name: kafka
    restart: unless-stopped
    ports:
      - "9092:9092"
    environment:
      TZ: Asia/Shanghai
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.20.220:9092
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
    networks:
      - flink-net

  # ==================== Flink SQL Client ====================
  sql-client:
    image: flink:1.20.3-scala_2.12-java17
    container_name: flink-sql-client
    restart: unless-stopped
    command: /opt/flink/bin/sql-client.sh
    depends_on:
      - jobmanager
    environment:
      TZ: Asia/Shanghai
      FLINK_JOBMANAGER_RPC_ADDRESS: jobmanager
      FLINK_JOBMANAGER_RPC_PORT: 6123
    volumes:
      - ./jars:/opt/flink/usrlib
      - ./sql:/opt/flink/sql
    stdin_open: true
    tty: true
    networks:
      - flink-net

networks:
  flink-net:
    driver: bridge

启动成功后,访问你服务器的 8081 端口就可以看到你的 Flink 界面了。

关于版本的选择,参考如下(来自参考地址【1】)

Flink CDC 版本Flink版本JDK版本DM8版本
2.4.11.13.、1.14.、1.15.、1.16.、1.17.*1.8.08.1.3.77及以上
3.0.01.14.、1.15.、1.16.、1.17.、1.18.*1.8.0及以上8.1.3.77及以上
3.3.01.19.、1.20.1.8.0及以上8.1.3.77及以上

Flink Job将数据库操作记录写入Kafka

Flink 可以通 Flink SQL 或 DataStream API来定义 Job ,在本文中,我们采用 DataStream API, java代码的方式实现。(需要了解Flink SQL 的可以查看文章最后的参考地址)

很简单,总共3个文件:

引入需要的依赖:

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.bg</groupId>
    <artifactId>DmCdc</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <name>DmCdc</name>
    <description>达梦8 Flink CDC</description>

    <properties>
        <java.version>17</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.binary.version>2.12</scala.binary.version>
        <flink.version>1.20.3</flink.version>
        <flinkcdc.version>3.3.0</flinkcdc.version>
        <version.infinispan>13.0.17.Final</version.infinispan>
        <version.infinispan.protostream>4.4.4.Final</version.infinispan.protostream>
    </properties>

    <dependencies>
        <!-- Flink 核心依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Flink CDC 依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cdc-base</artifactId>
            <version>${flinkcdc.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-debezium</artifactId>
            <version>${flinkcdc.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-dm-cdc</artifactId>
            <version>${flinkcdc.version}</version>
        </dependency>

        <!-- Flink Kafka 依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>3.3.0-1.20</version>
        </dependency>

        <!-- 达梦数据库驱动    -->
        <dependency>
            <groupId>com.dameng</groupId>
            <artifactId>DmJdbcDriver18</artifactId>
            <version>8.1.3.140</version>
        </dependency>

        <!--  这里估计是 JsonDebeziumDeserializationSchema 这里的反序列化使用到了fastjson,如果不引入的话会报错反序列化失败,且不能是 fastjson2   -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>

        <!--  这部分只是用来打印日志 -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.17.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.17.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.17.1</version>
        </dependency>

        <dependency>
            <!-- API bridge between log4j 1 and 2 -->
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-1.2-api</artifactId>
            <version>2.17.1</version>
        </dependency>

        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.5.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <!-- 解决签名冲突 -->
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <!-- 主类 -->
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.bg.dm.cdc.DmCdcApplication</mainClass>
                                </transformer>
                            </transformers>
                            <!-- 不生成简化 pom -->
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

配置好日志打印文件

log4j2.xml 放到 resource下

<?xml version="1.0" encoding="UTF-8"?>
<!--monitorInterval 默认多长时间自动获取配置-->
<Configuration status="warn" monitorInterval="5">
    <properties>
        <property name="LOG_HOME">D:/java/logs</property>
        <property name="FILE_NAME">${date:yyyy-MM-dd}</property>
    </properties>
    <Appenders>
        <!--在控制台中显示-->
        <Console name="Console" target="SYSTEM_OUT">
            <!--显示的格式-->
            <PatternLayout pattern="%d{HH:mm:ss.SSS}[%-5level]%m%n"/>
        </Console>
        <!--在文件中储存 循环储存一天一个  fileName 文件名字.filePattern 文件名重复后生成的名字-->
        <RollingRandomAccessFile name="MyFile" fileName="D:/test.log"
                     filePattern="${LOG_HOME}/${date:yyyy-MM}/${FILE_NAME}-%d{yyyy-MM-dd HH-mm}-%i.log">
            <!--显示的格式-->
            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
            <Policies>
                <!--多长时间生成一个新的LOG(按分钟)-->
                <TimeBasedTriggeringPolicy interval="120"/>
                <!--文件大小超过多少生成新文件-->
                <SizeBasedTriggeringPolicy size="10 MB"/>
            </Policies>
            <!--默认储存多少个文件-->
            <DefaultRolloverStrategy max="20"/>
        </RollingRandomAccessFile>
    </Appenders>
    <Loggers>
        <!--异步日志 additivity防止重复日志-->
        <AsyncLogger name="com.bg" level="trace" additivity="false">
            <AppenderRef ref="MyFile"/>
            <AppenderRef ref="Console"/>
        </AsyncLogger>
        <!--全局日志 level日志级别-->
        <Root level="info">
            <AppenderRef ref="MyFile"/>
            <AppenderRef ref="Console"/>
        </Root>
        <!--局部日志-->
        <Logger name="mylog" level="trace" additivity="false">
            <AppenderRef ref="MyFile" />
        </Logger>
    </Loggers>
</Configuration>

编写CDC业务代码

注意: 里面包含了数据库的连接信息,改成自己的**

推荐先执行这个简单的版本进行调试:

DmCdcApplication.java

package org.bg.dm.cdc;


import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import org.apache.flink.cdc.connectors.dm.source.DMSourceBuilder;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

public class DmCdcApplication1 {

    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();
        properties.setProperty("database.tablename.case.insensitive", "false");
        properties.setProperty("log.mining.strategy", "offline_catalog");
        properties.setProperty("log.mining.continuous.mine", "true");
        properties.setProperty("lob.enabled", "true");

        // Schema History 配置(必须正确设置)
        properties.setProperty("schema.history.internal", "io.debezium.storage.kafka.history.KafkaSchemaHistory");
        properties.setProperty("schema.history.internal.kafka.bootstrap.servers", "192.168.20.220:9092");
        properties.setProperty("schema.history.internal.kafka.topic", "dm-history-topic");

        // 增加稳定性配置
        properties.setProperty("snapshot.mode", "initial");           // 先用 initial 测试
        properties.setProperty("snapshot.fetch.size", "512");
        properties.setProperty("connect.timeout.ms", "60000");
        properties.setProperty("heartbeat.interval.ms", "3000");

        JdbcIncrementalSource<String> changeEventSource =
                new DMSourceBuilder<String>()
                        .hostname("192.168.128.5")
                        .port(5236)
                        .databaseList("DMDB")
                        .tableList("BACKGROUND_DATA.OPEN_API_APP")   // 推荐这样写多表
                        .schemaList("BACKGROUND_DATA")
                        .username("SYSDBA")
                        .password("123456")
                        .startupOptions(StartupOptions.initial())   // 先用 initial 测试
                        .dmProperties(properties)
                        .includeSchemaChanges(true)
                        .deserializer(new JsonDebeziumDeserializationSchema())
                        .sliceSize(20)
                        .scanNewlyAddedTableEnabled(true)
                        .build();

        Configuration configuration = new Configuration();
        // 检查点文件
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

        env.enableCheckpointing(20 * 1000);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6 * 1000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.setStateBackend(new FsStateBackend("file:///opt/flink/dmcdc/ck"));

        DataStream<String> sourceStream = env.fromSource(
                changeEventSource,
                WatermarkStrategy.noWatermarks(),
                "DmSource"
        );
        sourceStream.print();

        env.execute();
    }
}

正常情况下,如果你配置的表中有数据,那你的日志中会打印出相应的数据来。且你去数据库执行更新操作会被记录并打印

上面的版本能跑通之后,可以跑写入的 Kafka 的版本(从 DM8 中获取操作记录,转为 SQL 后写入 Kafka):

DmCdcApplication.java

package org.bg.dm.cdc;


import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import org.apache.flink.cdc.connectors.dm.source.DMSourceBuilder;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

public class DmCdcApplication {

    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();
        properties.setProperty("database.tablename.case.insensitive", "false");
        properties.setProperty("log.mining.strategy", "offline_catalog");
        properties.setProperty("log.mining.continuous.mine", "true");
        properties.setProperty("lob.enabled", "true");

        // Schema History 配置(必须正确设置)
        properties.setProperty("schema.history.internal", "io.debezium.storage.kafka.history.KafkaSchemaHistory");
        properties.setProperty("schema.history.internal.kafka.bootstrap.servers", "192.168.20.220:9092");
        properties.setProperty("schema.history.internal.kafka.topic", "dm-history-topic");

        // 增加稳定性配置
        properties.setProperty("snapshot.mode", "initial");           // 先用 initial 测试
        properties.setProperty("snapshot.fetch.size", "512");
        properties.setProperty("connect.timeout.ms", "60000");
        properties.setProperty("heartbeat.interval.ms", "3000");

        JdbcIncrementalSource<String> changeEventSource =
                new DMSourceBuilder<String>()
                        .hostname("192.168.128.5")
                        .port(5236)
                        .databaseList("DMDB")
                        .tableList("BACKGROUND_DATA.OPEN_API_APP")   
                        .schemaList("BACKGROUND_DATA")
                        .username("SYSDBA")
                        .password("123456")
                        .startupOptions(StartupOptions.initial())   // 先用 initial 测试
                        .dmProperties(properties)
                        .includeSchemaChanges(true)
                        .deserializer(new JsonDebeziumDeserializationSchema())
                        .sliceSize(20)
                        .scanNewlyAddedTableEnabled(true)
                        .build();

        Configuration configuration = new Configuration();
        // 检查点文件
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

        env.enableCheckpointing(20 * 1000);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6 * 1000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.setStateBackend(new FsStateBackend("file:///opt/flink/dmcdc/ck"));

        DataStream<String> sourceStream = env.fromSource(
                changeEventSource,
                WatermarkStrategy.noWatermarks(),
                "DmSource"
        );

        // 处理CDC数据并生成JSON格式的SQL语句
        DataStream<String> jsonStream = sourceStream.map(new MapFunction<String, String>() {
            private transient ObjectMapper objectMapper;
            private final JsonNodeFactory nodeFactory = JsonNodeFactory.instance;

            @Override
            public String map(String value) throws Exception {
                if (objectMapper == null) {
                    objectMapper = new ObjectMapper();
                }

                JsonNode rootNode = objectMapper.readTree(value);
                String op = rootNode.path("op").asText();
                JsonNode sourceNode = rootNode.path("source");
                String schema = sourceNode.path("schema").asText();
                String table = sourceNode.path("table").asText();
                long tsMs = rootNode.path("ts_ms").asLong();

                // 创建JSON结构
                ObjectNode json = nodeFactory.objectNode();

                // 1. 添加metadata部分
                ObjectNode metadata = json.putObject("metadata")
                        .put("schema", schema)
                        .put("table", table)
                        .put("source_timestamp", tsMs);

                // 处理时间戳字段
                JsonNode afterNode = rootNode.path("after");
                if (!afterNode.isMissingNode() && afterNode.has("TIMESTAMP")) {
                    long timestampNs = afterNode.path("TIMESTAMP").asLong();
                    metadata.put("event_time", formatTimestamp(timestampNs));
                } else {
                    metadata.put("event_time", formatTimestamp(tsMs * 1000000L));
                }

                // 2. 根据操作类型生成SQL并添加到JSON
                String sql = "";
                switch (op) {
                    case "r":
                        sql = generateInsertSQL(schema + "." + table, afterNode);
                        json.putObject("sql").put("dml", sql);
                        break;
                    case "c":
                        sql = generateInsertSQL(schema + "." + table, afterNode);
                        json.putObject("sql").put("dml", sql);
                        break;
                    case "u":
                        JsonNode beforeNode = rootNode.path("before");
                        sql = generateUpdateSQL(schema + "." + table, beforeNode, afterNode);
                        json.putObject("sql").put("dml", sql);
                        break;
                    case "d":
                        JsonNode beforeNodeDelete = rootNode.path("before");
                        sql = generateDeleteSQL(schema + "." + table, beforeNodeDelete);
                        json.putObject("sql").put("dml", sql);
                        break;
                    default:
                        json.put("error", "UNKNOWN OPERATION: " + op);
                }
                // 配置ObjectMapper禁用美化打印(默认即为紧凑格式)
//                ObjectMapper mapper = new ObjectMapper();
//                String jsonString = mapper.writeValueAsString(json);
                return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(json);
            }


            private String generateInsertSQL(String tableName, JsonNode afterNode) {
                StringBuilder fields = new StringBuilder();
                StringBuilder values = new StringBuilder();

                afterNode.fieldNames().forEachRemaining(fieldName -> {
                    if (fields.length() > 0) {
                        fields.append(", ");
                        values.append(", ");
                    }
                    fields.append(fieldName);

                    JsonNode valueNode = afterNode.get(fieldName);
                    if (valueNode.isNull()) {
                        values.append("NULL");
                    } else if (valueNode.isTextual()) {
                        // 处理文本类型(含转义)
                        values.append("'").append(escapeSQL(valueNode.asText())).append("'");
                    } else if (valueNode.isLong() && isTimestampField(fieldName)) {
                        // 处理时间戳转换(纳秒/毫秒级)
                        values.append("'").append(formatTimestamp(valueNode.asLong())).append("'");
                    } else if (valueNode.isNumber()) {
                        // 处理其他数字类型
                        values.append(valueNode.asText());
                    } else {
                        // 默认处理(如布尔值等)
                        values.append(valueNode.asText());
                    }
                });

                return String.format("INSERT INTO %s (%s) VALUES (%s)",
                        tableName, fields.toString(), values.toString());

            }

            private String generateUpdateSQL(String tableName, JsonNode beforeNode, JsonNode afterNode) {
                StringBuilder setClause = new StringBuilder();
                StringBuilder whereClause = new StringBuilder();

                // 构建SET部分
                afterNode.fieldNames().forEachRemaining(fieldName -> {
                    if (!fieldName.equals("ID")) { // 假设ID是主键,不更新
                        if (setClause.length() > 0) {
                            setClause.append(", ");
                        }

                        JsonNode valueNode = afterNode.get(fieldName);
                        if (valueNode.isNull()) {
                            setClause.append(fieldName).append(" = NULL");
                        } else if (valueNode.isTextual()) {
                            setClause.append(fieldName)
                                    .append(" = '")
                                    .append(escapeSQL(valueNode.asText()))
                                    .append("'");
                        } else if (valueNode.isLong() && isTimestampField(fieldName)) {
                            // 处理时间戳转换(纳秒/毫秒级)
                            setClause.append(fieldName)
                                    .append(" = '")
                                    .append(formatTimestamp(valueNode.asLong()))
                                    .append("'");
                        } else {
                            setClause.append(fieldName)
                                    .append(" = ")
                                    .append(valueNode.asText());
                        }
                    }
                });

                // 构建WHERE部分(使用所有字段作为条件以确保准确性)
                beforeNode.fieldNames().forEachRemaining(fieldName -> {
                    if (whereClause.length() > 0) {
                        whereClause.append(" AND ");
                    }

                    JsonNode valueNode = beforeNode.get(fieldName);
                    if (valueNode.isNull()) {
                        whereClause.append(fieldName).append(" IS NULL");
                    } else if (valueNode.isTextual()) {
                        whereClause.append(fieldName)
                                .append(" = '")
                                .append(escapeSQL(valueNode.asText()))
                                .append("'");
                    } else if (valueNode.isLong() && isTimestampField(fieldName)) {
                        // 处理时间戳转换(纳秒/毫秒级)
                        whereClause.append(fieldName)
                                .append(" = '")
                                .append(formatTimestamp(valueNode.asLong()))
                                .append("'");
                    } else {
                        whereClause.append(fieldName)
                                .append(" = ")
                                .append(valueNode.asText());
                    }
                });

                return String.format("UPDATE %s SET %s WHERE %s",
                        tableName, setClause.toString(), whereClause.toString());
            }

            private String generateDeleteSQL(String tableName, JsonNode beforeNode) {
                StringBuilder whereClause = new StringBuilder();

                beforeNode.fieldNames().forEachRemaining(fieldName -> {
                    if (whereClause.length() > 0) {
                        whereClause.append(" AND ");
                    }

                    JsonNode valueNode = beforeNode.get(fieldName);
                    if (valueNode.isNull()) {
                        whereClause.append(fieldName).append(" IS NULL");
                    } else if (valueNode.isTextual()) {
                        whereClause.append(fieldName)
                                .append(" = '")
                                .append(escapeSQL(valueNode.asText()))
                                .append("'");
                    } else if (valueNode.isLong() && isTimestampField(fieldName)) {
                        // 处理时间戳转换(纳秒/毫秒级)
                        whereClause.append(fieldName)
                                .append(" = '")
                                .append(formatTimestamp(valueNode.asLong()))
                                .append("'");
                    } else {
                        whereClause.append(fieldName)
                                .append(" = ")
                                .append(valueNode.asText());
                    }
                });

                return String.format("DELETE FROM %s WHERE %s", tableName, whereClause.toString());
            }

            // 判断字段是否为时间戳字段(根据命名约定或业务逻辑)
            private boolean isTimestampField(String fieldName) {
                return fieldName.toLowerCase().contains("time") ||
                        fieldName.toLowerCase().contains("date") ||
                        fieldName.toLowerCase().contains("ts");
            }

            // 格式化时间戳(支持纳秒/毫秒/秒级)
            private String formatTimestamp(long timestamp) {
                // 判断时间戳精度(假设大于1e16为纳秒,大于1e12为微秒,其余为毫秒/秒)
                if (timestamp > 1e16) {
                    timestamp /= 1_000_000; // 纳秒转毫秒
                } else if (timestamp > 1e12) {
                    timestamp /= 1_000; // 微秒转毫秒
                } else if (timestamp < 1e10) {
                    timestamp *= 1_000; // 秒转毫秒
                }

                // 使用Java 8时间API格式化(线程安全)
                return Instant.ofEpochMilli(timestamp)
                        .atZone(ZoneId.systemDefault())
                        .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            }

            // SQL特殊字符转义(防止注入)
            private String escapeSQL(String input) {
                return input.replace("'", "''");
            }

        });


        // 创建Kafka Sink
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers("192.168.20.220:9092") // Kafka broker地址
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("dm-cdc-topic") // Kafka主题名称
                        .setValueSerializationSchema(new SimpleStringSchema()) // 使用字符串序列化
                        .build()
                )
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 精确一次交付
                .setProperty("transaction.timeout.ms", "300000") // 设置为5分钟(需小于Broker的15分钟限制)
                .setProperty("acks", "all") // 确保高可靠性
                .setProperty("schema.history.internal.kafka.topic", "dm-history-topic")
                .build();

        // 构建数据处理管道
        jsonStream.sinkTo(kafkaSink); // 将数据发送到Kafka

        env.execute();
    }
}

正常情况下,你的数据会被写入到 kafka 中,如果日志中提示错误的话,可以尝试手动在 kafka 中先建好主题:dm-cdc-topic

借助 idea 中的 kafka 插件能够查看到主题中的数据。

从Kafka读取记录并通过 JDBC 写入数据库

此部分比较灵活,可以根据实际的需要来接收 kafka 的数据并进行处理。博主目前是采用 idea 中的 kafka 插件创建一个 consumer 来验证消费数据的。

后续步骤

本地调试通过后,就可以打包出 jar 文件并上传到 Flink 中进行线上测试了,或者可以通过命令行的方式运行(将打包出来的 jar 文件上传到 docker-compose.yaml 同级的 jars 目录下后执行):

docker-compose exec jobmanager flink run   -d   -c org.bg.dm.cdc.DmCdcApplication   /opt/flink/usrlib/DmCdc-0.0.1-SNAPSHOT.jar

参考地址

【1】DM8 Flink CDC 连接器工具

【2】FlinkCDC 达梦数据库实时同步

【3】Flink 架构