#YashanDB Flink Connector
YashanDB连接器允许从YashanDB数据库读取数据和写入数据。
本文档将介绍如何设置YashanDB连接器以对YashanDB数据库运行SQL查询。
# 版本配套说明
Connector Version | Flink Version | YashanDB Version | YashanDB Jdbc Version | Java Version |
---|---|---|---|---|
1.1.0 | 1.15 | 23.2及以上 | 1.6.0以及以上 | jdk11以及以上 |
1.2.0 | 1.16、1.17、1.18、1.19 | 23.2及以上 | 1.6.0以及以上 | jdk11以及以上 |
# 前提条件
已安装Flink,具体操作请查阅Flink官方文档 (opens new window)。
已获取下列jar包并存放至<FLINK_HOME>/lib/目录下:
可自行使用构建自动化工具(例如Maven)将项目构建打包,也可联系我们的技术支持获取。
flink-connector-yashandb.jar
yashandb-jdbc.jar
# 使用限制
YashanDB flink connector不对时间类型的时区进行处理,如遇时区问题导致时间差异,可参考Flink官网文档相关介绍 (opens new window)。
# 配置YashanDB
为了让连接器能够正常运行,您需要为连接器的连接用户授予相关的权限。
YashanDB CDC connector的连接用户需要相关权限,请授予如下权限保障任务能够正常运行:
GRANT CREATE SESSION TO username;
GRANT SELECT ANY TABLE TO username;
GRANT INSERT ANY TABLE TO username;
# 创建YashanDB表
YashanDB表可以定义如下:
-- register an YashanDB table 'products' in Flink SQL
Flink SQL> CREATE TABLE products (
ID INT NOT NULL,
NAME STRING,
DESCRIPTION STRING,
WEIGHT DECIMAL(10, 3),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'yashandb',
'url' = 'jdbc:yasdb://192.168.3.216:1665/test',
'port' = '1688',
'username' = 'flinkuser',
'password' = 'flinkpw',
'schema-name' = 'inventory',
'table-name' = 'products');
-- 查看YashanDB表中的数据
Flink SQL> SELECT * FROM products;
# 连接器参数
参数名 | 是否必选 | 默认值 | 数据类型 | 参数描述 |
---|---|---|---|---|
connector | 必选 | (none) | String | 指定需要使用的连接器,固定为yashandb |
username | 必选 | (none) | String | YashanDB数据库的连接用户名 |
password | 必选 | (none) | String | YashanDB数据库的连接用户的登录密码 |
schema-name | 必选 | (none) | String | YashanDB数据库的schema名称 |
table-name | 必选 | (none) | String | YashanDB数据库的table名称 |
url | 可选 | (none) | String | YashanDB数据库的JDBC URL |
sink.buffer-flush.max-rows | 可选 | 100 | Integer | flush前缓存记录的最大值,设置为'0' 表示禁用 |
sink.buffer-flush.interval | 可选 | 1s | Duration | flush间隔时间,超过该时间后异步线程将flush数据,设置为 '0' 表示禁用为了完全异步地处理缓存的flush事件,可以将 'sink.buffer-flush.max-rows' 设置为'0' 并配置适当的flush时间间隔 |
sink.max-retries | 可选 | 3 | Integer | 写入记录到数据库失败后的最大重试次数 |
sink.parallelism | 可选 | (none) | Integer | 用于定义JDBC sink算子的并行度,默认情况下并行度由框架决定(即使用与上游链式算子相同的并行度) |
bulk-load-enable | 可选 | false | boolean | 是否开启bulkload插入模式,该模式仅对YashanDB的LSC表生效 若设置为true,原普通插入将变成bulkInsert、原upsert插入将变成bulkupsert,使用BULKLOAD模式的详细介绍请查阅hint |
invert-bit-type-data | 可选 | true | boolean | 是否开启反转BIT类型的BYTE数组,默认开启(兼容MySQL2YashanDB因BIT数据类型的大小端的差异),设置为false则不反转 如果其他数据库作为源,且BIT数据类型的大小端跟YashanDB相同可使用该参数 |
scan.partition.column | 可选 | (none) | String | 用于将输入进行分区的列名,分区扫描相关介绍请查阅Flink官方文档 (opens new window) |
scan.partition.num | 可选 | (none) | Integer | 分区数 |
scan.partition.lower-bound | 可选 | (none) | Integer | 第一个分区的最小值 |
scan.partition.upper-bound | 可选 | (none) | Integer | 最后一个分区的最大值 |
# 功能特性
# 键处理
当写入数据到外部数据库时,Flink会使用DDL中定义的主键。如果定义了主键,则连接器将以upsert模式工作,否则连接器将以append模式工作。
在upsert模式下,connector将根据主键判断插入新行或更新已存在的行,该模式可确保幂等性。建议为表定义主键并确保该主键为底层数据库中表的唯一键或主键。
在append模式下,connector会把所有记录解释为INSERT消息,如果违反了底层数据库中主键或者唯一约束,INSERT插入可能会失败。
PRIMARY KEY语法的更多详情,请查阅Flink官方文档 (opens new window)。
# 分区扫描
分区扫描相关介绍,请查阅Flink官方文档 (opens new window)。
# 幂等写入
如果在DDL中定义了主键,JDBC sink将使用upsert语义(而非普通的INSERT语句),如果底层数据库中存在违反唯一性约束,则原子地添加新行或更新现有行,从而确保幂等性。
如果出现故障,Flink作业会从上次成功的checkpoint恢复并重新处理,基于该机制可能会在恢复过程中重复处理消息。如果使用upsert模式,在需要重复处理记录的场景中可有效避免违反数据库主键约束和产生重复数据。
除了故障恢复场景外,数据源(kafka topic)也可能随着时间推移自然地包含多个具有相同主键的记录,因此upsert模式将更符合预期。
# BulkLoad模式
为提高YashanDB的LSC表的写入性能,本连接器提供bulkload模式,可通过bulk-load-enable
设置,若开启bulkload模式(即设置为true),原普通插入将变成bulkInsert、原upsert插入将变成bulkupsert,使用BULKLOAD模式的详细介绍请查阅hint。
# 数据类型映射
YashanDB type | Flink SQL type |
---|---|
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
NUMBER | DECIMAL |
BIT | bytes |
BOOLEAN | BOOLEAN |
DATE | TIMESTAMP |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
INTERVAL YEAR TO MONTH | BIGINT |
INTERVAL DAY TO SECOND | BIGINT |
CHAR | STRING |
VARCHAR | STRING |
NCHAR | STRING |
NVARCHAR | STRING |
BLOB | bytes |
CLOB | STRING |
NCLOB | STRING |
RAW | bytes |
ROWID | STRING |
UROWID | bytes |