跳到主要内容
Flink CDC 一键同步
实时未来技术团队杨林伟
阅读需 12 分钟

Flink CDC 一键同步

在实时数据时代,数据同步早已不是什么新鲜事,但能做到高效、低门槛、可观测性,才是关键。本文将从零开始用 Awestream 平台,手把手带你完成一套 MySQL → Doris 的 Flink CDC 实时数据同步,全流程只需 1 分钟!

🔥 核心亮点

  • 1️⃣ 极简开发:用 Awestream 平台像 “写表格、改配置” 一样轻松开发CDC作业,告别复杂代码!
  • 2️⃣ 5分钟全流程:从零搭建 MySQL+Doris环境 → 配置同步 → 上线运维,新人也能快速上手。
  • 3️⃣ 企业级能力:可视化运维、智能扩缩容、秒级响应,满足实时数据湖仓一体化需求。

Flink CDC 是一个数据集成框架,它基于数据库日志的 CDC(变更数据捕获)技术实现了统一的增量和全量数据读取。结合 Flink 出色的管道能力和丰富的上下游生态系统,Flink CDC 可以高效地实现海量数据的实时集成。 Flink CDC 可以应用在多种场景中。比如数据同步,可以将上游数据库中的数据同步至下游数据仓库、数据湖等。用户还可以借助 Flink CDC source 实现实时物化视图,结合下游 Flink 作业处理逻辑实现更丰富的业务场景。此外用户还可以使用 Flink CDC 捕获的变更数据基于业务逻辑进行数据分发。 作为一款数据集成框架,Flink CDC 对接了非常丰富的上下游数据库、数据湖仓和消息队列等外部系统,如 MySQL、PostgreSQL、Kafka、Paimon 等。

Flink CDC

二、Awestream 是什么

Awestream 是由实时未来科技(由 Apache StreamPark 创始团队成立) 推出的,基于 Apache StreamPark 和 Apache Flink 构建的新一代实时计算产品,专为企业打造,集实时计算、数据湖、数据仓库于一体,彻底解决传统流式计算复杂、门槛高、运维难的问题。 在 Awestream 中开发实时作业就像写 SQL 一样简单,全程极致流程的开发体验,作业运维可视化、自动化、秒级响应,资源调度自动扩缩容,降本增效... 不论是业务监控、实时运营分析、数据同步,还是数据湖仓一体化建设,Awestream 都能提供开箱即用、企业级标准的流计算方案,让复杂的大数据开发、实时数据同步,变得像写表格、配公式一样简单高效, 助力企业轻松应对 AI 时代的实时数据挑战。

Awestream 组件架构图

Awestream

接下来通过 Awestream 平台来演示一整套实时数据同步流程,包含:

  • 数据库环境部署( MySQL + Doris )
  • CDC 作业开发
  • 作业发布与运维

环境准备

首先需要准备一台安装了 DockerDocker ComposeLinuxMacOS 计算机。

MySQL 安装

在终端执行如下命令,使用 docker 安装 MySQL

# 拉取 MySQL 镜像
docker pull --platform linux/amd64 mysql:8.0.25

# 创建并运行 MySQL 容器
docker run -d -p 30025:3306 --name mysql8.0.25 -e MYSQL_ROOT_PASSWORD=root mysql:8.0.25

# 使用如下命令校验 MySQL 是否安装成功
docker ps -a |grep mysql
温馨提示

安装完整后,MySQL 的登录账号密码为:root/root

Doris 安装

首先需要设置 max_map_count,该参数用于控制进程可以映射的最大内存区域数量。

Doris 在运行时会使用大量的内存映射(mmap)操作来管理数据,如果 max_map_count 的值太低,可能会导致 Doris 无法正常启动或运行。

在主机上执行以下命令:

sysctl -w vm.max_map_count=2000000

MacOS 上,由于 Docker 容器的运行方式与 Linux 不同,直接在宿主机上修改 max_map_count 可能不会生效。这是因为 Docker 容器在 MacOS 上运行时,实际上是通过一个虚拟的 Linux 内核来管理容器,而不是直接使用 MacOS 的内核,所以需要 “创建特权容器”,命令如下:

# 1. 创建特权容器
docker run -it --privileged --pid=host --name=privileged_container debian nsenter -t 1 -m -u -n -i sh

# 2. 设置max_map_count参数
sysctl -w vm.max_map_count=2000000

# 3. 退出
exit

新建 docker-compose.yaml 编排文件,写入如下内容:

温馨提示

镜像地址需要根据系统来选择(本文使用的是 Mac M 系列的系统,所以这里使用的是 arm 架构)

services:
docker-fe:
image: "apache/doris:1.2.2-fe-arm"
container_name: "doris-fe"
hostname: "fe"
environment:
- FE_SERVERS=fe1:172.28.0.2:9001
- FE_ID=1
ports:
- 8030:8030
- 9030:9030
volumes:
- ./data/fe/doris-meta:/opt/apache-doris/fe/doris-meta
- ./data/fe/conf:/opt/apache-doris/fe/conf
- ./data/fe/log:/opt/apache-doris/fe/log
networks:
doris_net:
ipv4_address: 172.28.0.2
docker-be:
image: "apache/doris:1.2.2-be-arm"
container_name: "doris-be"
hostname: "be"
depends_on:
- docker-fe
environment:
- FE_SERVERS=fe1:172.28.0.2:9001
- BE_ADDR=172.28.0.3:9050
- vm.max_map_count=2000000
ports:
- 8040:8040
volumes:
- ./data/be/storage:/opt/apache-doris/be/storage
- ./data/be/conf:/opt/apache-doris/be/conf
- ./data/be/script:/docker-entrypoint-initdb.d
- ./data/be/log:/opt/apache-doris/be/log
networks:
doris_net:
ipv4_address: 172.28.0.3

networks:
doris_net:
ipam:
config:
- subnet: 172.28.0.0/16

执行启动命令:

# 启动命令
docker-compose -f docker-compose.yaml up -d

# 检查是否启动成功
docker-compose ps --all

如果出现类似如下提示,表示启动成功:

NAME       IMAGE                       COMMAND                  SERVICE     CREATED       STATUS          PORTS
doris-be apache/doris:1.2.2-be-arm "bash entry_point.sh" docker-be 3 weeks ago Up 30 seconds 0.0.0.0:8040->8040/tcp
doris-fe apache/doris:1.2.2-fe-arm "/opt/apache-doris/f…" docker-fe 3 weeks ago Up 31 seconds 0.0.0.0:8030->8030/tcp, 0.0.0.0:9030->9030/tcp

浏览器访问:http://127.0.0.1:8030,登录账号为:admin,无需密码。

doris-login

数据准备

使用客户端(例如 Navicat )连接 MySQL 数据库,执行如下 SQL 插入数据:

-- 创建测试数据库
CREATE DATABASE IF NOT EXISTS db_test;

-- 使用该数据库
USE db_test;

-- 创建 t_students 表
CREATE TABLE IF NOT EXISTS t_students (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(20)
);

-- 插入学生数据
INSERT INTO t_students (name) VALUES ('小明'), ('小红'), ('小刚');

-- 创建 t_classes 表
CREATE TABLE IF NOT EXISTS t_classes (
id INT AUTO_INCREMENT PRIMARY KEY,
class_name VARCHAR(20)
);

-- 插入班级数据
INSERT INTO t_classes (class_name) VALUES ('一班'), ('二班'), ('三班');

执行成功后,可以看到客户端已经创建如下数据:

mysql-test-db

CDC 作业开发

使用预置模板

进入"作业开发"模块,选择 demo_cdc 草稿:

demo-cdc

作业内容

# 定义源端,同步内容为 db_test 数据库的所有表结构及表数据
source:
type: mysql
name: source_mysql
hostname: localhost
password: root
username: root
tables: db_test.\.*
server-time-zone: UTC
server-id: 5400-5404

# 定义 doris 目标端,单机版注意设置 replication_num 副本数量为 1
sink:
type: doris
name: sink_doris
fenodes: 127.0.0.1:8030
username: root
password: ""
table.create.properties.replication_num: 1

pipeline:
name: demo_cdc

作业发布

发布准备

如果按照本教程安装与配置,你无需修改任何内容,点击"上线"按钮,提示需要选择 Flink环境,如下图所示:

job-dev-cdc-error-tips

配置环境

  • Flink版本:flink-1.19.1(系统预置)
  • 部署模式:Kubernetes Application

发布验证

成功发布后将显示如下提示,并提示跳转至运维中心:

job-draft-cdc-release

作业运维

启动作业

进入到作业运维,可以看到列表已经有了已上线的 demo_cdc 草稿,点击操作栏的启动按钮:

start-cdc-job

状态监控

成功启动后可见 Doris Playgroud 已经同步好 db_test 数据库的表结构和表数据了:

doris-sync-success

同时在作业运维也可以看到:

  • 实时运行指标面板
  • 资源使用情况(JobManager/TaskManager 内存、可用作业槽等)
  • 作业运行状态
start-cdc-job-success

也可以点击作业名称,进入到详情查看作业的部署信息、作业状态、作业监控、作业日志等信息。

job-operation-cdc-detail

文末总结

在本次教程中,我们通过 Awestream 平台,仅用 5分钟 就完成了从 MySQL 到 Doris 的实时数据同步全流程。整个过程无需复杂编码,只需简单配置,即可实现高效、稳定的企业级数据同步。

核心收获

  1. 极速搭建环境:借助 Docker 快速部署 MySQL 和 Doris,降低学习成本,100% 可复现。
  2. 零代码开发:Awestream 提供开箱即用的 CDC 模板,像填写表格一样完成数据同步配置。
  3. 一键上线 & 智能运维:15秒完成作业部署,可视化监控、日志回溯、自动扩缩容,运维无忧。
  4. 实时同步验证:数据变更秒级同步,确保 Doris 与 MySQL 数据强一致,满足业务需求。

Awestream 的独特优势

  1. 低门槛:即使没有大数据经验,也能轻松上手。
  2. 高效率:告别传统流式计算的复杂流程,5分钟完成企业级同步方案。
  3. 可观测性:全链路监控,实时掌握作业运行状态,问题排查更便捷。

适用场景

  1. 📌 实时数据分析(如运营大盘、风控监控)
  2. 📌 数据湖仓建设(MySQL→Doris/Hudi/Iceberg)
  3. 📌 业务系统数据同步(多数据库异构同步)

🚀 现在就用 Awestream,让你的实时数据同步变得前所未有的简单