Flink CDC 于 2021 年 11 月 15 日发布了最新版本 2.1,该版本通过引入内置 Debezium 组件,增加了对 Oracle 的支持。
Flink下载地址
其他必需的jar包(cdc、jdbc、mysq和oracle等驱动包)
下载Flink后,直接解压到指定目录下即可;
tar zxvf flink-1.20.0-bin-scala_2.12.tgz
将所有必须的jar包放在lib目录下,我这边的目录为/u01/flink-1.20.0/lib;
启动flink:
[root@gcv-b-test-gmes-oracle bin]# /u01/flink-1.20.0/bin/start-cluster.shStarting cluster.Starting standalonesession daemon on host gcv-b-test-gmes-oracle.Starting taskexecutor daemon on host gcv-b-test-gmes-oracle.
如果需要web登录查看flink,需要修改配置文件(/u01/flink-1.20.0/conf/config.yaml)
address: 10.240.12.219bind-address: 0.0.0.0
登录web界面:
配置Oracle:
必须开启归档(步骤查资料);
测试用户及表create user flink identified by "123456";grant connect ,resource to flink;create table flink.user_info(id number primary key,name varchar2(100),age number);##开启数据库级别补充日志ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;##开启该表的列附加日志ALTER TABLE flink.user_info ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;创建用于cdc解析的表空间CREATE TABLESPACE logminer_tbs DATAFILE '/u01/oradata/sharedb/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;创建flinkuser复制用户CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS; GRANT CREATE SESSION TO flinkuser; GRANT SET CONTAINER TO flinkuser; GRANT SELECT ON V_$DATABASE to flinkuser; GRANT FLASHBACK ANY TABLE TO flinkuser; GRANT SELECT ANY TABLE TO flinkuser; GRANT SELECT_CATALOG_ROLE TO flinkuser; GRANT EXECUTE_CATALOG_ROLE TO flinkuser; GRANT SELECT ANY TRANSACTION TO flinkuser; GRANT LOGMINING TO flinkuser; GRANT ANALYZE ANY TO flinkuser; GRANT CREATE TABLE TO flinkuser; -- need not to execute if set scan.incremental.snapshot.enabled=true(default) GRANT LOCK ANY TABLE TO flinkuser; GRANT ALTER ANY TABLE TO flinkuser; GRANT CREATE SEQUENCE TO flinkuser; GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser; GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser; GRANT SELECT ON V_$LOG TO flinkuser; GRANT SELECT ON V_$LOG_HISTORY TO flinkuser; GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser; GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser; GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser; GRANT SELECT ON V_$LOGFILE TO flinkuser; GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser; GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
启动sql-client(SQL 客户端 的目的是提供一种简单的方式来编写、调试和提交表程序到 Flink 集群上,而无需写一行 Java 或 Scala 代码。SQL 客户端命令行界面(CLI) 能够在命令行中检索和可视化分布式应用中实时产生的结果)
我的理解就是帮你智能生成java代码,不需要自己写代码。
[root@gcv-b-test-gmes-oracle conf]# /u01/flink-1.20.0/bin/sql-client.sh ▒▓██▓██▒ ▓████▒▒█▓▒▓███▓▒ ▓███▓░░ ▒▒▒▓██▒ ▒ ░██▒ ▒▒▓▓█▓▓▒░ ▒████ ██▒ ░▒▓███▒ ▒█▒█▒ ░▓█ ███ ▓░▒██ ▓█ ▒▒▒▒▒▓██▓░▒░▓▓█ █░ █ ▒▒░ ███▓▓█ ▒█▒▒▒ ████░ ▒▓█▓ ██▒▒▒ ▓███▒ ░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░ ▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒ ███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒ ░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒ ███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░ ██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓ ▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒ ▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒ ▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█ ██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █ ▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓ █▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓ ██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓ ▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒ ██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒ ▓█ ▒█▓ ░ █░ ▒█ █▓ █▓ ██ █░ ▓▓ ▒█▓▓▓▒█░ █▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█ ██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓ ▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██ ░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓ ░▓██▒ ▓░ ▒█▓█ ░░▒▒▒ ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░ ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.Command history file path: /root/.flink-sql-historyFlink SQL>
配置Oracle连接器:
CREATE TABLE user_info ( ID INT NOT NULL, NAME STRING, AGE int, PRIMARY KEY(ID) NOT ENFORCED ) WITH ( 'connector' = 'oracle-cdc', 'hostname' = '10.240.12.219', 'port' = '1521', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database-name' = 'sharedb', 'schema-name' = 'FLINK', 'table-name' = 'USER_INFO','debezium.log.mining.strategy' = 'online_catalog','debezium.log.mining.continuous.mine' = 'true');
##这里有个坑,字段必须大写啊(因为oracle默认都是大写,这里是严格区分大小写的)
##如果大小写不一致,会识别不到字段。查询的时候报错如下:
##org.apache.flink.table.api.TableException: Column 'id' is NOT NULL,
##however, a null value is being written into it.)
查看数据:
Flink SQL> select * from user_info;[INFO] Result retrieval cancelled.
mysql数据库同步的表:
create database flink;CREATE TABLE `user_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(100) NOT NULL, `age` bigint(20) NOT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;
sql-client配置mysql:
CREATE TABLE user_info_mysql ( id INT NOT NULL, name STRING, age int, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://10.251.93.3:3306/flink', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = 'xxxx', 'password' = 'xxxx', 'table-name' = 'user_info');Insert into user_info_mysql select * from user_info;
验证:
oracle:插入数据
mysql验证:
感觉上手难度不大,有些jar包容易漏,导致异常。
参考文档:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/connectors/flink-sources/tutorials/oracle-tutorial/
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/connectors/flink-sources/oracle-cdc/