环境:SpringBoot2.7.16 + Flink 1.19.0 + JDK21
Flink CDC(Flink Change Data Capture)是基于数据库的日志CDC技术,实现了全增量一体化读取的数据集成框架。它搭配Flink计算框架,能够高效实现海量数据的实时集成。Flink CDC的核心功能在于实时地监视数据库或数据流中发生的数据变动,并将这些变动抽取出来,以便进一步的处理和分析。通过使用Flink CDC,用户可以轻松地构建实时数据管道,对数据变动进行实时响应和处理,为实时分析、实时报表和实时决策等场景提供强大的支持。
具体来说,Flink CDC的应用场景包括但不限于实时数据仓库更新、实时数据同步和迁移、实时数据处理等。它还可以确保数据一致性,并在数据发生变更时能够准确地捕获和处理。此外,Flink CDC支持与多种数据源进行集成,如MySQL、PostgreSQL、Oracle等,并提供了相应的连接器,方便数据的捕获和处理。
接下来将详细的介绍关于MySQL CDC的使用。MySQL CDC 连接器允许从 MySQL 数据库读取快照数据和增量数据。
支持的数据库
Connector | Database | Driver |
mysql-cdc |
| JDBC Driver 8.0.27 |
在MySQL的配置文件中(如Linux的/etc/my.cnf或Windows的/my.ini),需要在[mysqld]部分设置相关参数以开启binlog功能,如下:
[mysqld]server-id=1# 格式,行级格式binlog-format=Row# binlog 日志文件的前缀log-bin=mysql-bin# 指定哪些数据库需要记录二进制日志binlog_do_db=testjpa
除了开启binlog功能外,Flink CDC还需要其他配置和权限来确保能够正常连接到MySQL并读取数据。例如,需要授予Flink CDC连接MySQL的用户必要的权限,包括SELECT、REPLICATION SLAVE、REPLICATION CLIENT、SHOW VIEW等。这些权限是Flink CDC读取数据和元数据所必需的。
mysql> SHOW VARIABLES LIKE 'log_bin';+---------------+-------+| Variable_name | Value |+---------------+-------+| log_bin | ON |+---------------+-------+
以上就对mysql相关的配置完成了。
<properties> <flink.version>1.19.0</flink.version></properties><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version></dependency><dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-mysql-cdc</artifactId> <version>3.0.1</version></dependency><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version></dependency><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version></dependency><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version></dependency>
@Componentpublic class MonitorMySQLCDC implements InitializingBean { // 该队列专门用来临时保存变化的数据(实际生产环境,你应该使用MQ相关的产品) public static final LinkedBlockingQueue<Map<String, Object>> queue = new LinkedBlockingQueue<>() ; private final StringRedisTemplate stringRedisTemplate ; // 保存到redis中key的前缀 private final String PREFIX = "users:" ; // 数据发生变化后的sink处理 private final CustomSink customSink ; public MonitorMySQLCDC(CustomSink customSink, StringRedisTemplate stringRedisTemplate) { this.customSink = customSink ; this.stringRedisTemplate = stringRedisTemplate ; } @Override public void afterPropertiesSet() throws Exception { // 启动异步线程,实时处理队列中的数据 new Thread(() -> { while(true) { try { Map<String, Object> result = queue.take(); this.doAction(result) ; } catch (Exception e) { e.printStackTrace(); } } }).start() ; Properties jdbcProperties = new Properties() ; jdbcProperties.setProperty("useSSL", "false") ; MySqlSource<String> source = MySqlSource.<String>builder() .hostname("127.0.0.1") .port(3306) // 可配置多个数据库 .databaseList("testjpa") // 可配置多个表 .tableList("testjpa.users") .username("root") .password("123123") .jdbcProperties(jdbcProperties) // 包括schema的改变 .includeSchemaChanges(true) // 反序列化设置 // .deserializer(new StringDebeziumDeserializationSchema()) .deserializer(new JsonDebeziumDeserializationSchema(true)) // 启动模式;关于启动模式下面详细介绍 .startupOptions(StartupOptions.initial()) .build() ; // 环境配置 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ; // 设置 6s 的 checkpoint 间隔 env.enableCheckpointing(6000) ; // 设置 source 节点的并行度为 4 env.setParallelism(4) ; env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL") // 添加Sink .addSink(this.customSink) ; env.execute() ; } @SuppressWarnings("unchecked") private void doAction(Map<String, Object> result) throws Exception { Map<String, Object> payload = (Map<String, Object>) result.get("payload") ; String op = (String) payload.get("op") ; switch (op) { // 更新和插入操作 case "u", "c" -> { Map<String, Object> after = (Map<String, Object>) payload.get("after") ; String id = after.get("id").toString(); System.out.printf("操作:%s, ID: %s%n", op, id) ; stringRedisTemplate.opsForValue().set(PREFIX + id, new ObjectMapper().writeValueAsString(after)) ; } // 删除操作 case "d" -> { Map<String, Object> after = (Map<String, Object>) payload.get("before") ; String id = after.get("id").toString(); stringRedisTemplate.delete(PREFIX + id) ; } } } }
启动模式:
@Componentpublic class CustomSink extends RichSinkFunction<String> { private ObjectMapper mapper = new ObjectMapper(); @Override public void invoke(String value, Context context) throws Exception { System.out.printf("数据发生变化: %s%n", value); TypeReference<Map<String, Object>> valueType = new TypeReference<Map<String, Object>>() { }; Map<String, Object> result = mapper.readValue(value, valueType); Map<String, Object> payload = (Map<String, Object>) result.get("payload"); String op = (String) payload.get("op") ; // 不对读操作处理 if (!"r".equals(op)) { MonitorMySQLCDC.queue.put(result); } }}
以上就是实现通过FlinkCDC实时通过数据到Redis的所有代码。
引入flink web依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web</artifactId> <version>${flink.version}</version></dependency>
环境配置
Configuration config = new Configuration() ;config.set(RestOptions.PORT, 9090) ;StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config) ;
web监听9090端口。
图片
通过web控制台你可以管理查看到更多的信息。
本文链接://www.dmpip.com//www.dmpip.com/showinfo-26-82367-0.htmlSpringBoot整合Flink CDC,实时追踪数据变动,无缝同步至Redis
声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com