MySQL 到 MySQL:数据实时同步与迁移
在现代企业的数据架构中,数据库的实时同步和迁移是确保业务连续性和数据一致性的关键任务,本文将探讨如何利用不同工具和方法实现从MySQL到MySQL的数据同步,包括全量同步、增量同步以及实时数据同步,我们将详细介绍操作步骤、注意事项以及常见问题的解决方案,帮助您顺利完成数据迁移和同步任务。
一、MySQL 数据实时同步实操分享
1. 配置 MySQL 连接
要实现 MySQL 数据的实时同步,首先需要在 Tapdata Cloud 操作后台进行相关配置,具体步骤如下:
创建源端连接:点击左侧菜单栏的【连接管理】,然后点击右侧区域【连接列表】右上角的【创建连接】按钮,选择 MySQL 作为连接类型,并输入必要的配置信息,如数据库地址、端口、数据库名称、账号和密码等,测试连接成功后保存连接。
创建目标端连接:重复上述步骤,创建另一个 MySQL 连接作为目标端。
2. 选择同步模式
进入 Tapdata Cloud 操作后台的任务管理页面,点击添加任务按钮进入任务设置流程:
选定源端与目标端:根据刚才建好的连接,选定源端与目标端。
选择同步类型:平台提供全量同步、增量同步、全量+增量同步三种模式,设定写入模式和读取数量,如果选择的是全量+增量同步,在全量任务执行完毕后,Tapdata Agent 会自动进入增量同步状态。
3. 进行数据校验
一般同步完成后,建议进行数据校验,防止踩坑,Tapdata Cloud 提供三种校验模式,常用的是快速 count 校验,只需要选择要校验的表,简单方便。
二、开源 ETL 工具 DataX 实践
DataX 是阿里巴巴开源的一款异构数据源离线同步工具,广泛应用于数据仓库的数据迁移和同步,以下是使用 DataX 从 MySQL 到 MySQL 的全量同步和批量更新的示例:
1. 准备数据库
创建两张表datax_src
和datax_target
,分别用于存储源数据和目标数据:
CREATE TABLEdatax_src
(id
bigint NOT NULL,src_name
varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL, PRIMARY KEY (id
) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; CREATE TABLEdatax_target
(id
bigint NOT NULL,target_name
varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL, PRIMARY KEY (id
) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
2. 插入数据
创建一个函数,向datax_src
表中插入 5000 条数据:
DELIMITER // CREATE DEFINER=root
@%
FUNCTIONAUTO_INSERT
() RETURNS int BEGIN DECLARE index_num int DEFAULT 0; WHILE index_num < 5000 DO SET index_num = index_num + 1; INSERT INTO datax_src VALUES (index_num,CONCAT('name',index_num)); END WHILE; RETURN 0; END // DELIMITER ;
3. 修改 DataX Job 配置文件
根据文档中的mysqlreader
和mysqlwriter
的示例,修改配置文件以适应您的数据同步需求。
三、基于 Flink CDC 实时同步数据
Flink CDC 是一种基于流处理技术的实时数据同步解决方案,适用于复杂的数据链路场景,以下是基于 Flink CDC 实现 MySQL 到 MySQL 实时同步的步骤:
1. 环境准备
确保已安装并配置好 Flink 和相关的连接器包,下载并放置flink-sql-connector-mysql-cdc-2.3.0.jar
到{flink-1.16.1}/lib/
目录下。
2. SQL CLI 开发作业
启动 Flink SQL CLI,并创建相应的 Flink 表:
SET execution.checkpointing.interval = 3s; CREATE TABLE source_test ( user_id STRING, user_name STRING, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '192.168.3.31', 'port' = '3306', 'username' = 'root', 'password' = '******', 'database-name' = 'flink_source', 'table-name' = 'source_test' ); CREATE TABLE sink_test ( user_id STRING, user_name STRING, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.168.3.31:3306/flink_sink', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = 'root', 'password' = '******', 'table-name' = 'sink_test' );
3. 运行同步任务
通过执行 SQL 语句将source_test
表中的数据实时同步到sink_test
表中:
INSERT INTO sink_test SELECT * FROM source_test;
四、CloudCanal 在线迁移与同步
CloudCanal 是一款专注于数据库在线迁移和同步的工具,支持多种数据库类型和复杂的数据链路场景,以下是使用 CloudCanal 进行 MySQL 到 MySQL 在线迁移与同步的基本步骤:
1. 启动增量数据程序
在源端启动造增量数据程序,模拟 IUD(插入、更新、删除)操作的比例为 30:50:20。
2. 创建任务
在 CloudCanal 操作后台创建任务,选择源数据库和目标数据库,并进行库映射和表映射,设置数据过滤条件,如id < 3000
,以确保只同步特定范围内的数据。
3. 数据校验
任务运行后,可以通过创建单独的数据校验任务来验证源端和目标端的数据一致性,校验任务会跑完增量任务,获取差异数据,并抽样对比,确保数据一致。
在实际操作中,选择合适的工具和方法至关重要,对于简单的全量同步和批量更新,DataX 是一个不错的选择;而对于复杂的实时数据链路,Flink CDC 提供了强大的流处理能力;CloudCanal 则适用于需要在线迁移和复杂数据链路的场景,无论选择哪种方案,都需要注意以下几点:
数据一致性:确保源端和目标端的数据完全一致,特别是在高并发环境下。
性能优化:根据数据量和业务需求,调整同步任务的性能参数,如并行度、批次大小等。
错误处理:建立完善的错误处理机制,及时监控和处理同步过程中的异常情况。
安全性:保护数据库账号和密码,避免敏感信息泄露。
通过合理规划和实施,您可以实现高效、可靠的 MySQL 到 MySQL 数据同步和迁移,保障业务的连续性和数据的安全性,希望本文能为您提供有价值的参考和指导。