您的当前位置:首页正文

flink-cdc实时同步(oracle to mysql)

2024-10-17 来源:个人技术集锦

Flink CDC 于 2021 年 11 月 15 日发布了最新版本 2.1,该版本通过引入内置 Debezium 组件,增加了对 Oracle 的支持。

Flink下载地址

其他必需的jar包(cdc、jdbc、mysq和oracle等驱动包)

下载Flink后,直接解压到指定目录下即可;

tar zxvf flink-1.20.0-bin-scala_2.12.tgz

将所有必须的jar包放在lib目录下,我这边的目录为/u01/flink-1.20.0/lib;

启动flink:

[root@gcv-b-test-gmes-oracle bin]# /u01/flink-1.20.0/bin/start-cluster.shStarting cluster.Starting standalonesession daemon on host gcv-b-test-gmes-oracle.Starting taskexecutor daemon on host gcv-b-test-gmes-oracle.

如果需要web登录查看flink,需要修改配置文件(/u01/flink-1.20.0/conf/config.yaml)

address: 10.240.12.219bind-address: 0.0.0.0

登录web界面:

配置Oracle:

必须开启归档(步骤查资料);

测试用户及表create user flink identified by "123456";grant connect ,resource to flink;create table flink.user_info(id number primary key,name varchar2(100),age number);##开启数据库级别补充日志ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;##开启该表的列附加日志ALTER TABLE flink.user_info ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;创建用于cdc解析的表空间CREATE TABLESPACE logminer_tbs DATAFILE '/u01/oradata/sharedb/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;创建flinkuser复制用户CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;  GRANT CREATE SESSION TO flinkuser;  GRANT SET CONTAINER TO flinkuser;  GRANT SELECT ON V_$DATABASE to flinkuser;  GRANT FLASHBACK ANY TABLE TO flinkuser;  GRANT SELECT ANY TABLE TO flinkuser;  GRANT SELECT_CATALOG_ROLE TO flinkuser;  GRANT EXECUTE_CATALOG_ROLE TO flinkuser;  GRANT SELECT ANY TRANSACTION TO flinkuser;  GRANT LOGMINING TO flinkuser;  GRANT ANALYZE ANY TO flinkuser;  GRANT CREATE TABLE TO flinkuser;  -- need not to execute if set scan.incremental.snapshot.enabled=true(default)  GRANT LOCK ANY TABLE TO flinkuser;  GRANT ALTER ANY TABLE TO flinkuser;  GRANT CREATE SEQUENCE TO flinkuser;  GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;  GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;  GRANT SELECT ON V_$LOG TO flinkuser;  GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;  GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;  GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;  GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;  GRANT SELECT ON V_$LOGFILE TO flinkuser;  GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;  GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;

启动sql-client(SQL 客户端 的目的是提供一种简单的方式来编写、调试和提交表程序到 Flink 集群上,而无需写一行 Java 或 Scala 代码。SQL 客户端命令行界面(CLI) 能够在命令行中检索和可视化分布式应用中实时产生的结果)

我的理解就是帮你智能生成java代码,不需要自己写代码。

[root@gcv-b-test-gmes-oracle conf]# /u01/flink-1.20.0/bin/sql-client.sh                                   ▒▓██▓██▒                               ▓████▒▒█▓▒▓███▓▒                            ▓███▓░░        ▒▒▒▓██▒  ▒                          ░██▒   ▒▒▓▓█▓▓▒░      ▒████                          ██▒         ░▒▓███▒    ▒█▒█▒                            ░▓█            ███   ▓░▒██                              ▓█       ▒▒▒▒▒▓██▓░▒░▓▓█                            █░ █   ▒▒░       ███▓▓█ ▒█▒▒▒                            ████░   ▒▓█▓      ██▒▒▒ ▓███▒                         ░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░                   ▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒                  ███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒                ░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒               ███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░              ██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓           ▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒           ▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒           ▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓█           ██▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █          ▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓          █▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓          ██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓          ▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒           ██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒           ▓█   ▒█▓   ░     █░                ▒█              █▓            █▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░             █▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒█              ██   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓               ▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██                ░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓                  ░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒                      ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░    ______ _ _       _       _____  ____  _         _____ _ _            _  BETA   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.Command history file path: /root/.flink-sql-historyFlink SQL>

配置Oracle连接器:

CREATE TABLE user_info (     ID INT NOT NULL,     NAME STRING,     AGE int,     PRIMARY KEY(ID) NOT ENFORCED     ) WITH (     'connector' = 'oracle-cdc',     'hostname' = '10.240.12.219',     'port' = '1521',     'username' = 'flinkuser',     'password' = 'flinkpw',     'database-name' = 'sharedb',     'schema-name' = 'FLINK',     'table-name' = 'USER_INFO','debezium.log.mining.strategy' = 'online_catalog','debezium.log.mining.continuous.mine' = 'true');
##这里有个坑,字段必须大写啊(因为oracle默认都是大写,这里是严格区分大小写的)
##如果大小写不一致,会识别不到字段。查询的时候报错如下:
##org.apache.flink.table.api.TableException: Column 'id' is NOT NULL,
##however, a null value is being written into it.)

查看数据:

Flink SQL> select * from user_info;[INFO] Result retrieval cancelled.

mysql数据库同步的表:

create database flink;CREATE TABLE `user_info` (  `id` bigint(20) NOT NULL AUTO_INCREMENT,  `name` varchar(100) NOT NULL,  `age` bigint(20) NOT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;

sql-client配置mysql:

CREATE TABLE user_info_mysql (     id INT NOT NULL,     name STRING,     age int,     PRIMARY KEY(id) NOT ENFORCED ) WITH (     'connector' = 'jdbc',     'url' = 'jdbc:mysql://10.251.93.3:3306/flink',	'driver' = 'com.mysql.cj.jdbc.Driver',     'username' = 'xxxx',     'password' = 'xxxx',     'table-name' = 'user_info');Insert into user_info_mysql select * from user_info;

验证:

oracle:插入数据

mysql验证:

感觉上手难度不大,有些jar包容易漏,导致异常。

参考文档:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/connectors/flink-sources/tutorials/oracle-tutorial/

https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/connectors/flink-sources/oracle-cdc/

Top