flink-cdc实时同步(oracle to mysql)

Flink CDC 于 2021 年 11 月 15 日发布了最新版本 2.1,该版本通过引入内置 Debezium 组件,增加了对 Oracle 的支持。

Flink下载地址

https://flink.apache.org/downloads/

其他必需的jar包(cdc、jdbc、mysq和oracle等驱动包)

 下载Flink后,直接解压到指定目录下即可;

tar zxvf flink-1.20.0-bin-scala_2.12.tgz

 将所有必须的jar包放在lib目录下,我这边的目录为/u01/flink-1.20.0/lib;

启动flink:

[root@gcv-b-test-gmes-oracle bin]# /u01/flink-1.20.0/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host gcv-b-test-gmes-oracle.
Starting taskexecutor daemon on host gcv-b-test-gmes-oracle.

 

如果需要web登录查看flink,需要修改配置文件(/u01/flink-1.20.0/conf/config.yaml)

address: 10.240.12.219
bind-address: 0.0.0.0

 

登录web界面:

 

配置Oracle:

必须开启归档(步骤查资料);

测试用户及表
create user flink identified by "123456";
grant connect ,resource to flink;
create table flink.user_info(id number primary key,name varchar2(100),age number);

##开启数据库级别补充日志
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
##开启该表的列附加日志
ALTER TABLE flink.user_info ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

创建用于cdc解析的表空间
CREATE TABLESPACE logminer_tbs DATAFILE '/u01/oradata/sharedb/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

创建flinkuser复制用户
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
  GRANT CREATE SESSION TO flinkuser;
  GRANT SET CONTAINER TO flinkuser;
  GRANT SELECT ON V_$DATABASE to flinkuser;
  GRANT FLASHBACK ANY TABLE TO flinkuser;
  GRANT SELECT ANY TABLE TO flinkuser;
  GRANT SELECT_CATALOG_ROLE TO flinkuser;
  GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
  GRANT SELECT ANY TRANSACTION TO flinkuser;
  GRANT LOGMINING TO flinkuser;
  GRANT ANALYZE ANY TO flinkuser;

  GRANT CREATE TABLE TO flinkuser;
  -- need not to execute if set scan.incremental.snapshot.enabled=true(default)
  GRANT LOCK ANY TABLE TO flinkuser;
  GRANT ALTER ANY TABLE TO flinkuser;
  GRANT CREATE SEQUENCE TO flinkuser;

  GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
  GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;

  GRANT SELECT ON V_$LOG TO flinkuser;
  GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
  GRANT SELECT ON V_$LOGFILE TO flinkuser;
  GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
  GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;

  

 

 

启动sql-client(SQL 客户端 的目的是提供一种简单的方式来编写、调试和提交表程序到 Flink 集群上,而无需写一行 Java 或 Scala 代码。SQL 客户端命令行界面(CLI) 能够在命令行中检索和可视化分布式应用中实时产生的结果)

我的理解就是帮你智能生成java代码,不需要自己写代码。

[root@gcv-b-test-gmes-oracle conf]# /u01/flink-1.20.0/bin/sql-client.sh

                                   ▒▓██▓██▒
                               ▓████▒▒█▓▒▓███▓▒
                            ▓███▓░░        ▒▒▒▓██▒  ▒
                          ░██▒   ▒▒▓▓█▓▓▒░      ▒████
                          ██▒         ░▒▓███▒    ▒█▒█▒
                            ░▓█            ███   ▓░▒██
                              ▓█       ▒▒▒▒▒▓██▓░▒░▓▓█
                            █░ █   ▒▒░       ███▓▓█ ▒█▒▒▒
                            ████░   ▒▓█▓      ██▒▒▒ ▓███▒
                         ░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░
                   ▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒
                  ███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒
                ░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒
               ███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░
              ██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓
           ▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒
           ▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒
           ▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓█
           ██▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █
          ▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓
          █▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓
          ██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓
          ▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒
           ██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒
           ▓█   ▒█▓   ░     █░                ▒█              █▓
            █▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░
             █▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒█
              ██   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓
               ▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██
                ░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓
                  ░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒
                      ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░

    ______ _ _       _       _____  ____  _         _____ _ _            _  BETA
   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_
   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_
   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|

        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.

Command history file path: /root/.flink-sql-history

Flink SQL>

  

配置Oracle连接器:

CREATE TABLE user_info (
     ID INT NOT NULL,
     NAME STRING,
     AGE int,
     PRIMARY KEY(ID) NOT ENFORCED
     ) WITH (
     'connector' = 'oracle-cdc',
     'hostname' = '10.240.12.219',
     'port' = '1521',
     'username' = 'flinkuser',
     'password' = 'flinkpw',
     'database-name' = 'sharedb',
     'schema-name' = 'FLINK',
     'table-name' = 'USER_INFO',
'debezium.log.mining.strategy' = 'online_catalog',
'debezium.log.mining.continuous.mine' = 'true'
);
##这里有个坑,字段必须大写啊(因为oracle默认都是大写,这里是严格区分大小写的)
##如果大小写不一致,会识别不到字段。查询的时候报错如下:
##org.apache.flink.table.api.TableException: Column 'id' is NOT NULL,
##however, a null value is being written into it.)

  

 查看数据:

Flink SQL> select * from user_info;
[INFO] Result retrieval cancelled.

  

 

mysql数据库同步的表:

create database flink;

CREATE TABLE `user_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(100) NOT NULL,
  `age` bigint(20) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;

 

sql-client配置mysql:

CREATE TABLE user_info_mysql (
     id INT NOT NULL,
     name STRING,
     age int,
     PRIMARY KEY(id) NOT ENFORCED
 ) WITH (
     'connector' = 'jdbc',
     'url' = 'jdbc:mysql://10.251.93.3:3306/flink',
	'driver' = 'com.mysql.cj.jdbc.Driver',
     'username' = 'xxxx',
     'password' = 'xxxx',
     'table-name' = 'user_info');


Insert into user_info_mysql select * from user_info;

  

 

 

验证:

oracle:插入数据

 mysql验证:

 

感觉上手难度不大,有些jar包容易漏,导致异常。

 

参考文档:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/connectors/flink-sources/tutorials/oracle-tutorial/

                  https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/connectors/flink-sources/oracle-cdc/

 

热门相关:浴火重生:恶魔五小姐   Hello,校草大人!   寒门状元   魅王毒后   美食萌后:皇上,休了你