要做什么
因为项目需要异地对达梦数据库进行实时数据同步,需要采用一个免费的方案,所以采用 达梦8 Flink CDC方案。
在我的场景中,需要将20个子节点的数据同步到一个中心节点,且子节点与中心节点的网络并不稳定。因此有如下设计:
子节点1 (DM) ──► Flink CDC Job1 ──► Kafka Topic (node1)
子节点2 (DM) ──► Flink CDC Job2 ──► Kafka Topic (node2)
...
子节点20 (DM) ──► Flink CDC Job20 ──► Kafka Topic (node20)
Kafka ──► Flink Merge Job (1 个) ──► 顶级节点 (DM)简单来说,流程如下:
1.每个子节点都有自己的 达梦8 数据库,在达梦数据库中开启归档模式以支持CDC;
2.每个子节点单独配置一个Flink CDC Job,从达梦8 数据库中读取数据库变化记录;将读取到的记录转成 SQL 语句,随后写入 Kafka中;
3.顶级节点从Kafka中订阅记录并通过 JDBC 写入自己的数据库中;
怎么做
按照上面说的流程,分为三个大的步骤:
达梦8开启归档模式
- 使用SYSDBA用户开启归档模式
-- 1. 切换数据库到 MOUNT 状态
ALTER DATABASE MOUNT;
-- 2. 开启归档模式
ALTER DATABASE ARCHIVELOG;
-- 3. 添加一个本地归档配置(目录、文件大小、空间限制请按需修改, DEST指定的目录需要先创建好,并且这个目录数据库用户要有写入权限)
ALTER DATABASE ADD ARCHIVELOG 'DEST = /dm8/arch, TYPE = LOCAL, FILE_SIZE = 1024, SPACE_LIMIT = 20480';
-- 4. 切回 OPEN 状态
ALTER DATABASE OPEN;- 验证是否成功
-- 以 SYSDBA 执行
SELECT para_name, para_value
FROM v$dm_ini
WHERE para_name IN ('ARCH_INI', 'RLOG_APPEND_LOGIC');查询结果应该是这样的:
LINEID PARA_NAME PARA_VALUE
---------- ----------------- ----------
1 RLOG_APPEND_LOGIC 2
2 ARCH_INI 1
查出来的两个结果应该都不为 0 ,如果你查出来的值为0 ,可以通过以下 SQL 更改:
-- 1. 切换数据库到 MOUNT 状态
ALTER DATABASE MOUNT;
-- 2. 设置(推荐值,多数教程一致)
SP_SET_PARA_VALUE(1, 'ARCH_INI', 1);
SP_SET_PARA_VALUE(1, 'RLOG_APPEND_LOGIC', 2); -- 1 或 2 均可,2 更全面支持 UPDATE/DELETE
-- 3. 切回 OPEN 状态
ALTER DATABASE OPEN;特别注意:
有的文章中只是用 select arch_mode from v$database; 来验证是否开启了归档模式。当 arch_mode 值为 Y 时,即代码开启了归档模式,其可以实现对历史数据的记录归档,但是如果此时上面写到的 RLOG_APPEND_LOGIC 值是 0,则 UPDATE、DELETE 等语句实际没有被记录到。这也是很多人反馈更新记录没有获取到的原因。
搭建 Flink +Kafka 环境(docker)
环境搭建我们采用最简单的方式进行,直接使用 docker-compose一键启动即可!
要的资源不多,在本地的4G的虚拟机上也能正常启动,唯一到担心的就是你的网络能不能顺利的把镜像拉下来
docker-compose.yaml
注意:启动前把里面 kafka 的配置里面 IP改为自己服务器的IP,目前我写的是 192.168.20.220,只有一个地方
version: "3.8"
services:
# ==================== Flink JobManager ====================
jobmanager:
image: flink:1.20.3-scala_2.12-java17
container_name: flink-jobmanager
hostname: jobmanager
restart: unless-stopped
ports:
- "8081:8081"
- "6123:6123"
command: jobmanager
environment:
TZ: Asia/Shanghai
FLINK_JOBMANAGER_RPC_ADDRESS: jobmanager
FLINK_JOBMANAGER_RPC_PORT: 6123
FLINK_PROPERTIES: |
jobmanager.rpc.address: jobmanager
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 4
parallelism.default: 4
state.backend: rocksdb
state.checkpoints.dir: file:///opt/flink/checkpoints
volumes:
- ./flink-logs:/opt/flink/log
- ./checkpoints:/opt/flink/checkpoints
- ./jars:/opt/flink/usrlib
- /etc/localtime:/etc/localtime:ro
networks:
- flink-net
# ==================== Flink TaskManager ====================
taskmanager:
image: flink:1.20.3-scala_2.12-java17
hostname: taskmanager
restart: unless-stopped
deploy:
replicas: 2
depends_on:
- jobmanager
command: taskmanager
environment:
TZ: Asia/Shanghai
FLINK_JOBMANAGER_RPC_ADDRESS: jobmanager
FLINK_JOBMANAGER_RPC_PORT: 6123
FLINK_PROPERTIES: |
jobmanager.rpc.address: jobmanager
jobmanager.rpc.port: 6123
taskmanager.heap.size: 1536m
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.process.size: 2048m
volumes:
- ./flink-logs:/opt/flink/log
- ./jars:/opt/flink/usrlib
- /etc/localtime:/etc/localtime:ro
networks:
- flink-net
# ==================== Kafka ====================
kafka:
image: apache/kafka:3.8.0
container_name: kafka
restart: unless-stopped
ports:
- "9092:9092"
environment:
TZ: Asia/Shanghai
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.20.220:9092
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
networks:
- flink-net
# ==================== Flink SQL Client ====================
sql-client:
image: flink:1.20.3-scala_2.12-java17
container_name: flink-sql-client
restart: unless-stopped
command: /opt/flink/bin/sql-client.sh
depends_on:
- jobmanager
environment:
TZ: Asia/Shanghai
FLINK_JOBMANAGER_RPC_ADDRESS: jobmanager
FLINK_JOBMANAGER_RPC_PORT: 6123
volumes:
- ./jars:/opt/flink/usrlib
- ./sql:/opt/flink/sql
stdin_open: true
tty: true
networks:
- flink-net
networks:
flink-net:
driver: bridge启动成功后,访问你服务器的 8081 端口就可以看到你的 Flink 界面了。
关于版本的选择,参考如下(来自参考地址【1】)
| Flink CDC 版本 | Flink版本 | JDK版本 | DM8版本 |
|---|---|---|---|
| 2.4.1 | 1.13.、1.14.、1.15.、1.16.、1.17.* | 1.8.0 | 8.1.3.77及以上 |
| 3.0.0 | 1.14.、1.15.、1.16.、1.17.、1.18.* | 1.8.0及以上 | 8.1.3.77及以上 |
| 3.3.0 | 1.19.、1.20. | 1.8.0及以上 | 8.1.3.77及以上 |
Flink Job将数据库操作记录写入Kafka
Flink 可以通 Flink SQL 或 DataStream API来定义 Job ,在本文中,我们采用 DataStream API, java代码的方式实现。(需要了解Flink SQL 的可以查看文章最后的参考地址)
很简单,总共3个文件:
引入需要的依赖:
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.bg</groupId>
<artifactId>DmCdc</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>DmCdc</name>
<description>达梦8 Flink CDC</description>
<properties>
<java.version>17</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.binary.version>2.12</scala.binary.version>
<flink.version>1.20.3</flink.version>
<flinkcdc.version>3.3.0</flinkcdc.version>
<version.infinispan>13.0.17.Final</version.infinispan>
<version.infinispan.protostream>4.4.4.Final</version.infinispan.protostream>
</properties>
<dependencies>
<!-- Flink 核心依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink CDC 依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-base</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-debezium</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-dm-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<!-- Flink Kafka 依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.3.0-1.20</version>
</dependency>
<!-- 达梦数据库驱动 -->
<dependency>
<groupId>com.dameng</groupId>
<artifactId>DmJdbcDriver18</artifactId>
<version>8.1.3.140</version>
</dependency>
<!-- 这里估计是 JsonDebeziumDeserializationSchema 这里的反序列化使用到了fastjson,如果不引入的话会报错反序列化失败,且不能是 fastjson2 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<!-- 这部分只是用来打印日志 -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<!-- API bridge between log4j 1 and 2 -->
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<!-- 解决签名冲突 -->
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<!-- 主类 -->
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.bg.dm.cdc.DmCdcApplication</mainClass>
</transformer>
</transformers>
<!-- 不生成简化 pom -->
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
配置好日志打印文件
log4j2.xml 放到 resource下
<?xml version="1.0" encoding="UTF-8"?>
<!--monitorInterval 默认多长时间自动获取配置-->
<Configuration status="warn" monitorInterval="5">
<properties>
<property name="LOG_HOME">D:/java/logs</property>
<property name="FILE_NAME">${date:yyyy-MM-dd}</property>
</properties>
<Appenders>
<!--在控制台中显示-->
<Console name="Console" target="SYSTEM_OUT">
<!--显示的格式-->
<PatternLayout pattern="%d{HH:mm:ss.SSS}[%-5level]%m%n"/>
</Console>
<!--在文件中储存 循环储存一天一个 fileName 文件名字.filePattern 文件名重复后生成的名字-->
<RollingRandomAccessFile name="MyFile" fileName="D:/test.log"
filePattern="${LOG_HOME}/${date:yyyy-MM}/${FILE_NAME}-%d{yyyy-MM-dd HH-mm}-%i.log">
<!--显示的格式-->
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
<Policies>
<!--多长时间生成一个新的LOG(按分钟)-->
<TimeBasedTriggeringPolicy interval="120"/>
<!--文件大小超过多少生成新文件-->
<SizeBasedTriggeringPolicy size="10 MB"/>
</Policies>
<!--默认储存多少个文件-->
<DefaultRolloverStrategy max="20"/>
</RollingRandomAccessFile>
</Appenders>
<Loggers>
<!--异步日志 additivity防止重复日志-->
<AsyncLogger name="com.bg" level="trace" additivity="false">
<AppenderRef ref="MyFile"/>
<AppenderRef ref="Console"/>
</AsyncLogger>
<!--全局日志 level日志级别-->
<Root level="info">
<AppenderRef ref="MyFile"/>
<AppenderRef ref="Console"/>
</Root>
<!--局部日志-->
<Logger name="mylog" level="trace" additivity="false">
<AppenderRef ref="MyFile" />
</Logger>
</Loggers>
</Configuration>编写CDC业务代码
注意: 里面包含了数据库的连接信息,改成自己的**
推荐先执行这个简单的版本进行调试:
DmCdcApplication.java
package org.bg.dm.cdc;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import org.apache.flink.cdc.connectors.dm.source.DMSourceBuilder;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
public class DmCdcApplication1 {
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.setProperty("database.tablename.case.insensitive", "false");
properties.setProperty("log.mining.strategy", "offline_catalog");
properties.setProperty("log.mining.continuous.mine", "true");
properties.setProperty("lob.enabled", "true");
// Schema History 配置(必须正确设置)
properties.setProperty("schema.history.internal", "io.debezium.storage.kafka.history.KafkaSchemaHistory");
properties.setProperty("schema.history.internal.kafka.bootstrap.servers", "192.168.20.220:9092");
properties.setProperty("schema.history.internal.kafka.topic", "dm-history-topic");
// 增加稳定性配置
properties.setProperty("snapshot.mode", "initial"); // 先用 initial 测试
properties.setProperty("snapshot.fetch.size", "512");
properties.setProperty("connect.timeout.ms", "60000");
properties.setProperty("heartbeat.interval.ms", "3000");
JdbcIncrementalSource<String> changeEventSource =
new DMSourceBuilder<String>()
.hostname("192.168.128.5")
.port(5236)
.databaseList("DMDB")
.tableList("BACKGROUND_DATA.OPEN_API_APP") // 推荐这样写多表
.schemaList("BACKGROUND_DATA")
.username("SYSDBA")
.password("123456")
.startupOptions(StartupOptions.initial()) // 先用 initial 测试
.dmProperties(properties)
.includeSchemaChanges(true)
.deserializer(new JsonDebeziumDeserializationSchema())
.sliceSize(20)
.scanNewlyAddedTableEnabled(true)
.build();
Configuration configuration = new Configuration();
// 检查点文件
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.enableCheckpointing(20 * 1000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6 * 1000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setStateBackend(new FsStateBackend("file:///opt/flink/dmcdc/ck"));
DataStream<String> sourceStream = env.fromSource(
changeEventSource,
WatermarkStrategy.noWatermarks(),
"DmSource"
);
sourceStream.print();
env.execute();
}
}
正常情况下,如果你配置的表中有数据,那你的日志中会打印出相应的数据来。且你去数据库执行更新操作会被记录并打印
上面的版本能跑通之后,可以跑写入的 Kafka 的版本(从 DM8 中获取操作记录,转为 SQL 后写入 Kafka):
DmCdcApplication.java
package org.bg.dm.cdc;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import org.apache.flink.cdc.connectors.dm.source.DMSourceBuilder;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
public class DmCdcApplication {
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.setProperty("database.tablename.case.insensitive", "false");
properties.setProperty("log.mining.strategy", "offline_catalog");
properties.setProperty("log.mining.continuous.mine", "true");
properties.setProperty("lob.enabled", "true");
// Schema History 配置(必须正确设置)
properties.setProperty("schema.history.internal", "io.debezium.storage.kafka.history.KafkaSchemaHistory");
properties.setProperty("schema.history.internal.kafka.bootstrap.servers", "192.168.20.220:9092");
properties.setProperty("schema.history.internal.kafka.topic", "dm-history-topic");
// 增加稳定性配置
properties.setProperty("snapshot.mode", "initial"); // 先用 initial 测试
properties.setProperty("snapshot.fetch.size", "512");
properties.setProperty("connect.timeout.ms", "60000");
properties.setProperty("heartbeat.interval.ms", "3000");
JdbcIncrementalSource<String> changeEventSource =
new DMSourceBuilder<String>()
.hostname("192.168.128.5")
.port(5236)
.databaseList("DMDB")
.tableList("BACKGROUND_DATA.OPEN_API_APP")
.schemaList("BACKGROUND_DATA")
.username("SYSDBA")
.password("123456")
.startupOptions(StartupOptions.initial()) // 先用 initial 测试
.dmProperties(properties)
.includeSchemaChanges(true)
.deserializer(new JsonDebeziumDeserializationSchema())
.sliceSize(20)
.scanNewlyAddedTableEnabled(true)
.build();
Configuration configuration = new Configuration();
// 检查点文件
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.enableCheckpointing(20 * 1000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6 * 1000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setStateBackend(new FsStateBackend("file:///opt/flink/dmcdc/ck"));
DataStream<String> sourceStream = env.fromSource(
changeEventSource,
WatermarkStrategy.noWatermarks(),
"DmSource"
);
// 处理CDC数据并生成JSON格式的SQL语句
DataStream<String> jsonStream = sourceStream.map(new MapFunction<String, String>() {
private transient ObjectMapper objectMapper;
private final JsonNodeFactory nodeFactory = JsonNodeFactory.instance;
@Override
public String map(String value) throws Exception {
if (objectMapper == null) {
objectMapper = new ObjectMapper();
}
JsonNode rootNode = objectMapper.readTree(value);
String op = rootNode.path("op").asText();
JsonNode sourceNode = rootNode.path("source");
String schema = sourceNode.path("schema").asText();
String table = sourceNode.path("table").asText();
long tsMs = rootNode.path("ts_ms").asLong();
// 创建JSON结构
ObjectNode json = nodeFactory.objectNode();
// 1. 添加metadata部分
ObjectNode metadata = json.putObject("metadata")
.put("schema", schema)
.put("table", table)
.put("source_timestamp", tsMs);
// 处理时间戳字段
JsonNode afterNode = rootNode.path("after");
if (!afterNode.isMissingNode() && afterNode.has("TIMESTAMP")) {
long timestampNs = afterNode.path("TIMESTAMP").asLong();
metadata.put("event_time", formatTimestamp(timestampNs));
} else {
metadata.put("event_time", formatTimestamp(tsMs * 1000000L));
}
// 2. 根据操作类型生成SQL并添加到JSON
String sql = "";
switch (op) {
case "r":
sql = generateInsertSQL(schema + "." + table, afterNode);
json.putObject("sql").put("dml", sql);
break;
case "c":
sql = generateInsertSQL(schema + "." + table, afterNode);
json.putObject("sql").put("dml", sql);
break;
case "u":
JsonNode beforeNode = rootNode.path("before");
sql = generateUpdateSQL(schema + "." + table, beforeNode, afterNode);
json.putObject("sql").put("dml", sql);
break;
case "d":
JsonNode beforeNodeDelete = rootNode.path("before");
sql = generateDeleteSQL(schema + "." + table, beforeNodeDelete);
json.putObject("sql").put("dml", sql);
break;
default:
json.put("error", "UNKNOWN OPERATION: " + op);
}
// 配置ObjectMapper禁用美化打印(默认即为紧凑格式)
// ObjectMapper mapper = new ObjectMapper();
// String jsonString = mapper.writeValueAsString(json);
return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(json);
}
private String generateInsertSQL(String tableName, JsonNode afterNode) {
StringBuilder fields = new StringBuilder();
StringBuilder values = new StringBuilder();
afterNode.fieldNames().forEachRemaining(fieldName -> {
if (fields.length() > 0) {
fields.append(", ");
values.append(", ");
}
fields.append(fieldName);
JsonNode valueNode = afterNode.get(fieldName);
if (valueNode.isNull()) {
values.append("NULL");
} else if (valueNode.isTextual()) {
// 处理文本类型(含转义)
values.append("'").append(escapeSQL(valueNode.asText())).append("'");
} else if (valueNode.isLong() && isTimestampField(fieldName)) {
// 处理时间戳转换(纳秒/毫秒级)
values.append("'").append(formatTimestamp(valueNode.asLong())).append("'");
} else if (valueNode.isNumber()) {
// 处理其他数字类型
values.append(valueNode.asText());
} else {
// 默认处理(如布尔值等)
values.append(valueNode.asText());
}
});
return String.format("INSERT INTO %s (%s) VALUES (%s)",
tableName, fields.toString(), values.toString());
}
private String generateUpdateSQL(String tableName, JsonNode beforeNode, JsonNode afterNode) {
StringBuilder setClause = new StringBuilder();
StringBuilder whereClause = new StringBuilder();
// 构建SET部分
afterNode.fieldNames().forEachRemaining(fieldName -> {
if (!fieldName.equals("ID")) { // 假设ID是主键,不更新
if (setClause.length() > 0) {
setClause.append(", ");
}
JsonNode valueNode = afterNode.get(fieldName);
if (valueNode.isNull()) {
setClause.append(fieldName).append(" = NULL");
} else if (valueNode.isTextual()) {
setClause.append(fieldName)
.append(" = '")
.append(escapeSQL(valueNode.asText()))
.append("'");
} else if (valueNode.isLong() && isTimestampField(fieldName)) {
// 处理时间戳转换(纳秒/毫秒级)
setClause.append(fieldName)
.append(" = '")
.append(formatTimestamp(valueNode.asLong()))
.append("'");
} else {
setClause.append(fieldName)
.append(" = ")
.append(valueNode.asText());
}
}
});
// 构建WHERE部分(使用所有字段作为条件以确保准确性)
beforeNode.fieldNames().forEachRemaining(fieldName -> {
if (whereClause.length() > 0) {
whereClause.append(" AND ");
}
JsonNode valueNode = beforeNode.get(fieldName);
if (valueNode.isNull()) {
whereClause.append(fieldName).append(" IS NULL");
} else if (valueNode.isTextual()) {
whereClause.append(fieldName)
.append(" = '")
.append(escapeSQL(valueNode.asText()))
.append("'");
} else if (valueNode.isLong() && isTimestampField(fieldName)) {
// 处理时间戳转换(纳秒/毫秒级)
whereClause.append(fieldName)
.append(" = '")
.append(formatTimestamp(valueNode.asLong()))
.append("'");
} else {
whereClause.append(fieldName)
.append(" = ")
.append(valueNode.asText());
}
});
return String.format("UPDATE %s SET %s WHERE %s",
tableName, setClause.toString(), whereClause.toString());
}
private String generateDeleteSQL(String tableName, JsonNode beforeNode) {
StringBuilder whereClause = new StringBuilder();
beforeNode.fieldNames().forEachRemaining(fieldName -> {
if (whereClause.length() > 0) {
whereClause.append(" AND ");
}
JsonNode valueNode = beforeNode.get(fieldName);
if (valueNode.isNull()) {
whereClause.append(fieldName).append(" IS NULL");
} else if (valueNode.isTextual()) {
whereClause.append(fieldName)
.append(" = '")
.append(escapeSQL(valueNode.asText()))
.append("'");
} else if (valueNode.isLong() && isTimestampField(fieldName)) {
// 处理时间戳转换(纳秒/毫秒级)
whereClause.append(fieldName)
.append(" = '")
.append(formatTimestamp(valueNode.asLong()))
.append("'");
} else {
whereClause.append(fieldName)
.append(" = ")
.append(valueNode.asText());
}
});
return String.format("DELETE FROM %s WHERE %s", tableName, whereClause.toString());
}
// 判断字段是否为时间戳字段(根据命名约定或业务逻辑)
private boolean isTimestampField(String fieldName) {
return fieldName.toLowerCase().contains("time") ||
fieldName.toLowerCase().contains("date") ||
fieldName.toLowerCase().contains("ts");
}
// 格式化时间戳(支持纳秒/毫秒/秒级)
private String formatTimestamp(long timestamp) {
// 判断时间戳精度(假设大于1e16为纳秒,大于1e12为微秒,其余为毫秒/秒)
if (timestamp > 1e16) {
timestamp /= 1_000_000; // 纳秒转毫秒
} else if (timestamp > 1e12) {
timestamp /= 1_000; // 微秒转毫秒
} else if (timestamp < 1e10) {
timestamp *= 1_000; // 秒转毫秒
}
// 使用Java 8时间API格式化(线程安全)
return Instant.ofEpochMilli(timestamp)
.atZone(ZoneId.systemDefault())
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
}
// SQL特殊字符转义(防止注入)
private String escapeSQL(String input) {
return input.replace("'", "''");
}
});
// 创建Kafka Sink
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("192.168.20.220:9092") // Kafka broker地址
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("dm-cdc-topic") // Kafka主题名称
.setValueSerializationSchema(new SimpleStringSchema()) // 使用字符串序列化
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 精确一次交付
.setProperty("transaction.timeout.ms", "300000") // 设置为5分钟(需小于Broker的15分钟限制)
.setProperty("acks", "all") // 确保高可靠性
.setProperty("schema.history.internal.kafka.topic", "dm-history-topic")
.build();
// 构建数据处理管道
jsonStream.sinkTo(kafkaSink); // 将数据发送到Kafka
env.execute();
}
}正常情况下,你的数据会被写入到 kafka 中,如果日志中提示错误的话,可以尝试手动在 kafka 中先建好主题:dm-cdc-topic
借助 idea 中的 kafka 插件能够查看到主题中的数据。
从Kafka读取记录并通过 JDBC 写入数据库
此部分比较灵活,可以根据实际的需要来接收 kafka 的数据并进行处理。博主目前是采用 idea 中的 kafka 插件创建一个 consumer 来验证消费数据的。
后续步骤
本地调试通过后,就可以打包出 jar 文件并上传到 Flink 中进行线上测试了,或者可以通过命令行的方式运行(将打包出来的 jar 文件上传到 docker-compose.yaml 同级的 jars 目录下后执行):
docker-compose exec jobmanager flink run -d -c org.bg.dm.cdc.DmCdcApplication /opt/flink/usrlib/DmCdc-0.0.1-SNAPSHOT.jar