当前位置:首页 > 科技  > 软件

SpringBoot整合Flink CDC,实时追踪数据变动,无缝同步至Redis

来源: 责编: 时间:2024-04-09 17:23:02 133观看
导读环境:SpringBoot2.7.16 + Flink 1.19.0 + JDK211. 简介Flink CDC(Flink Change Data Capture)是基于数据库的日志CDC技术,实现了全增量一体化读取的数据集成框架。它搭配Flink计算框架,能够高效实现海量数据的实时集成。Fl

环境:SpringBoot2.7.16 + Flink 1.19.0 + JDK21FQm28资讯网——每日最新资讯28at.com

1. 简介

Flink CDC(Flink Change Data Capture)是基于数据库的日志CDC技术,实现了全增量一体化读取的数据集成框架。它搭配Flink计算框架,能够高效实现海量数据的实时集成。Flink CDC的核心功能在于实时地监视数据库或数据流中发生的数据变动,并将这些变动抽取出来,以便进一步的处理和分析。通过使用Flink CDC,用户可以轻松地构建实时数据管道,对数据变动进行实时响应和处理,为实时分析、实时报表和实时决策等场景提供强大的支持。FQm28资讯网——每日最新资讯28at.com

具体来说,Flink CDC的应用场景包括但不限于实时数据仓库更新、实时数据同步和迁移、实时数据处理等。它还可以确保数据一致性,并在数据发生变更时能够准确地捕获和处理。此外,Flink CDC支持与多种数据源进行集成,如MySQL、PostgreSQL、Oracle等,并提供了相应的连接器,方便数据的捕获和处理。FQm28资讯网——每日最新资讯28at.com

接下来将详细的介绍关于MySQL CDC的使用。MySQL CDC 连接器允许从 MySQL 数据库读取快照数据和增量数据。FQm28资讯网——每日最新资讯28at.com

支持的数据库FQm28资讯网——每日最新资讯28at.com

ConnectorFQm28资讯网——每日最新资讯28at.com

DatabaseFQm28资讯网——每日最新资讯28at.com

DriverFQm28资讯网——每日最新资讯28at.com

mysql-cdcFQm28资讯网——每日最新资讯28at.com

  • MySQL:5.6,5.7,8.0.x
  • RDS MYSQL: 5.6,5.7,8.0.x
  • PolarDB MySQL: 5.6,5.7,8.0.x
  • Aurora MySQL 5.6,5.7,8.0.x
  • MariaDB: 10.x
  • PolarDB X: 2.0.1

JDBC Driver 8.0.27FQm28资讯网——每日最新资讯28at.com

2. 实战案例

2.1 MySQL开启Binlog

在MySQL的配置文件中(如Linux的/etc/my.cnf或Windows的/my.ini),需要在[mysqld]部分设置相关参数以开启binlog功能,如下:FQm28资讯网——每日最新资讯28at.com

[mysqld]server-id=1# 格式,行级格式binlog-format=Row# binlog 日志文件的前缀log-bin=mysql-bin# 指定哪些数据库需要记录二进制日志binlog_do_db=testjpa

除了开启binlog功能外,Flink CDC还需要其他配置和权限来确保能够正常连接到MySQL并读取数据。例如,需要授予Flink CDC连接MySQL的用户必要的权限,包括SELECT、REPLICATION SLAVE、REPLICATION CLIENT、SHOW VIEW等。这些权限是Flink CDC读取数据和元数据所必需的。FQm28资讯网——每日最新资讯28at.com

查看是否开启了binlog功能

mysql> SHOW VARIABLES LIKE 'log_bin';+---------------+-------+| Variable_name | Value |+---------------+-------+| log_bin       | ON    |+---------------+-------+

以上就对mysql相关的配置完成了。FQm28资讯网——每日最新资讯28at.com

2.2 依赖管理

<properties>  <flink.version>1.19.0</flink.version></properties><dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-connector-base</artifactId>  <version>${flink.version}</version></dependency><dependency>  <groupId>com.ververica</groupId>  <artifactId>flink-sql-connector-mysql-cdc</artifactId>  <version>3.0.1</version></dependency><dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-streaming-java</artifactId>  <version>${flink.version}</version></dependency><dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-clients</artifactId>  <version>${flink.version}</version></dependency><dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-table-runtime</artifactId>  <version>${flink.version}</version></dependency>

2.3 代码实现

@Componentpublic class MonitorMySQLCDC implements InitializingBean {  // 该队列专门用来临时保存变化的数据(实际生产环境,你应该使用MQ相关的产品)  public static final LinkedBlockingQueue<Map<String, Object>> queue = new LinkedBlockingQueue<>() ;    private final StringRedisTemplate stringRedisTemplate ;  // 保存到redis中key的前缀  private final String PREFIX = "users:" ;  // 数据发生变化后的sink处理  private final CustomSink customSink ;  public MonitorMySQLCDC(CustomSink customSink, StringRedisTemplate stringRedisTemplate) {    this.customSink = customSink ;    this.stringRedisTemplate = stringRedisTemplate ;  }    @Override  public void afterPropertiesSet() throws Exception {    // 启动异步线程,实时处理队列中的数据    new Thread(() -> {      while(true) {        try {          Map<String, Object> result = queue.take();          this.doAction(result) ;        } catch (Exception e) {          e.printStackTrace();        }      }    }).start() ;    Properties jdbcProperties = new Properties() ;    jdbcProperties.setProperty("useSSL", "false") ;    MySqlSource<String> source = MySqlSource.<String>builder()        .hostname("127.0.0.1")        .port(3306)        // 可配置多个数据库        .databaseList("testjpa")        // 可配置多个表        .tableList("testjpa.users")        .username("root")        .password("123123")        .jdbcProperties(jdbcProperties)        // 包括schema的改变        .includeSchemaChanges(true)        // 反序列化设置        // .deserializer(new StringDebeziumDeserializationSchema())        .deserializer(new JsonDebeziumDeserializationSchema(true))        // 启动模式;关于启动模式下面详细介绍        .startupOptions(StartupOptions.initial())        .build() ;    // 环境配置    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;    // 设置 6s 的 checkpoint 间隔    env.enableCheckpointing(6000) ;    // 设置 source 节点的并行度为 4    env.setParallelism(4) ;    env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL")        // 添加Sink        .addSink(this.customSink) ;    env.execute() ;  }    @SuppressWarnings("unchecked")  private void doAction(Map<String, Object> result) throws Exception {    Map<String, Object> payload = (Map<String, Object>) result.get("payload") ;    String op = (String) payload.get("op") ;    switch (op) {      // 更新和插入操作      case "u", "c" -> {        Map<String, Object> after = (Map<String, Object>) payload.get("after") ;        String id = after.get("id").toString();        System.out.printf("操作:%s, ID: %s%n", op, id) ;        stringRedisTemplate.opsForValue().set(PREFIX + id, new ObjectMapper().writeValueAsString(after)) ;      }      // 删除操作      case "d" -> {        Map<String, Object> after = (Map<String, Object>) payload.get("before") ;        String id = after.get("id").toString();        stringRedisTemplate.delete(PREFIX + id) ;      }     }  }  }

启动模式:FQm28资讯网——每日最新资讯28at.com

  • initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。
  • earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取
  • latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
  • specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。
  • timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。

数据处理Sink

@Componentpublic class CustomSink extends RichSinkFunction<String> {  private ObjectMapper mapper = new ObjectMapper();  @Override  public void invoke(String value, Context context) throws Exception {    System.out.printf("数据发生变化: %s%n", value);    TypeReference<Map<String, Object>> valueType = new TypeReference<Map<String, Object>>() {    };    Map<String, Object> result = mapper.readValue(value, valueType);    Map<String, Object> payload = (Map<String, Object>) result.get("payload");    String op = (String) payload.get("op") ;    // 不对读操作处理    if (!"r".equals(op)) {      MonitorMySQLCDC.queue.put(result);    }  }}

以上就是实现通过FlinkCDC实时通过数据到Redis的所有代码。FQm28资讯网——每日最新资讯28at.com

2.4 Web监控页面

引入flink web依赖FQm28资讯网——每日最新资讯28at.com

<dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-runtime-web</artifactId>  <version>${flink.version}</version></dependency>

环境配置FQm28资讯网——每日最新资讯28at.com

Configuration config = new Configuration() ;config.set(RestOptions.PORT, 9090) ;StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config) ;

web监听9090端口。FQm28资讯网——每日最新资讯28at.com

图片图片FQm28资讯网——每日最新资讯28at.com

通过web控制台你可以管理查看到更多的信息。FQm28资讯网——每日最新资讯28at.com

本文链接://www.dmpip.com//www.dmpip.com/showinfo-26-82367-0.htmlSpringBoot整合Flink CDC,实时追踪数据变动,无缝同步至Redis

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: 图解 CSS Grid 布局,一起来看看 CSS Grid 布局是如何使用的

下一篇: 架构见解:使用Instagram示例设计高效的多层缓存

标签:
  • 热门焦点
  • MIX Fold3包装盒泄露 新机本月登场

    MIX Fold3包装盒泄露 新机本月登场

    小米的全新折叠屏旗舰MIX Fold3将于本月发布,近日该机的真机包装盒在网上泄露。从图上来看,新的MIX Fold3包装盒在外观设计方面延续了之前的方案,变化不大,这也是目前小米旗舰
  • 6月安卓手机好评榜:魅族20 Pro蝉联冠军

    6月安卓手机好评榜:魅族20 Pro蝉联冠军

    性能榜和性价比榜之后,我们来看最后的安卓手机好评榜,数据来源安兔兔评测,收集时间2023年6月1日至6月30日,仅限国内市场。第一名:魅族20 Pro好评率:95%5月份的时候魅族20 Pro就是
  • 从 Pulsar Client 的原理到它的监控面板

    从 Pulsar Client 的原理到它的监控面板

    背景前段时间业务团队偶尔会碰到一些 Pulsar 使用的问题,比如消息阻塞不消费了、生产者消息发送缓慢等各种问题。虽然我们有个监控页面可以根据 topic 维度查看他的发送状态,
  • 多线程开发带来的问题与解决方法

    多线程开发带来的问题与解决方法

    使用多线程主要会带来以下几个问题:(一)线程安全问题  线程安全问题指的是在某一线程从开始访问到结束访问某一数据期间,该数据被其他的线程所修改,那么对于当前线程而言,该线程
  • 使用LLM插件从命令行访问Llama 2

    使用LLM插件从命令行访问Llama 2

    最近的一个大新闻是Meta AI推出了新的开源授权的大型语言模型Llama 2。这是一项非常重要的进展:Llama 2可免费用于研究和商业用途。(几小时前,swyy发现它已从LLaMA 2更名为Lla
  • 微软邀请 Microsoft 365 商业用户,测试视频编辑器 Clipchamp

    微软邀请 Microsoft 365 商业用户,测试视频编辑器 Clipchamp

    8 月 1 日消息,微软近日宣布即将面向 Microsoft 365 商业用户,开放 Clipchamp 应用,邀请用户通过该应用来编辑视频。微软于 2021 年收购 Clipchamp,随后开始逐步整合到 Microsof
  • 消费结构调整丨巨头低价博弈,拼多多还卷得动吗?

    消费结构调整丨巨头低价博弈,拼多多还卷得动吗?

    来源:征探财经作者:陈香羽随着流量红利的退潮,电商的存量博弈越来越明显。曾经主攻中高端与品质的淘宝天猫、京东重拾&ldquo;低价&rdquo;口号。而过去与他们错位竞争的拼多多,靠
  • 小米MIX Fold 3下月亮相:今年唯一无短板的全能折叠屏

    小米MIX Fold 3下月亮相:今年唯一无短板的全能折叠屏

    这段时间以来,包括三星、一加、荣耀等等有不少品牌旗下的最新折叠屏旗舰都有新的进展,其中荣耀、三星都已陆续发布了最新的折叠屏旗舰,尤其号荣耀Magi
  • OPPO K11评测:旗舰级IMX890加持 2000元档最强影像手机

    OPPO K11评测:旗舰级IMX890加持 2000元档最强影像手机

    【Techweb评测】中端机型用户群体巨大,占了中国目前手机市场的大头,一直以来都是各手机品牌的“必争之地”,其中OPPO K系列机型一直以来都以高品质、
Top
Baidu
map