#大数据工具对接示例-Kafka

YashanDB支持通过Kafka-Connect-Jdbc连接器将数据导入到Kafka中。

本文提供该功能的验证过程介绍,所使用测试环境为Centos7.9.2009。

# Kafka安装

本文主要验证Kafka是否可以连通YashanDB,所以选择单节点启动Kafka。

1.获取Kafka安装包,以kafka_2.13-3.2.0.tgz为例。

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.13-3.2.0.tgz

2.解压安装包,并更改安装包名称。

tar -zxvf kafka_2.13-3.2.0.tgz 
mv kafka_2.13-3.2.0  kafka

3.进入Kafka目录,启动Zookeeper。

kafka/bin/zookeeper-server-start.sh kafka/config/zookeeper.properties

4.启动Kafka。

kafka/bin/kafka-server-start.sh kafka/config/server.properties 

# Kafka-Connect-Jdbc配置

1.下载Kafka-Connect-Jdbc安装包。

wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.5.1/confluentinc-kafka-connect-jdbc-10.5.1.zip
ls                                                                
confluentinc-kafka-connect-jdbc-10.5.1.zip  kafka  kafka_2.13-3.2.0.tgz

2.创建plugins目录,并将confluentinc-kafka-connect-jdbc-10.5.1.zip解压到plugins目录下。

mkdir plugins
unzip confluentinc-kafka-connect-jdbc-10.5.1.zip  -d  plugins/
ls plugins/
confluentinc-kafka-connect-jdbc-10.5.1

3.将confluentinc-kafka-connect-jdbc-10.5.1/etc目录中的source-quickstart-sqlite.properties复制一份到kafka/config并改名为source-quickstart-yasdb.properties。

cp plugins/confluentinc-kafka-connect-jdbc-10.5.1/etc/source-quickstart-sqlite.properties kafka/config/
cd kafka/config
mv source-quickstart-sqlite.properties source-quickstart-yasdb.properties

4.编辑source-quickstart-yasdb.properties,提供JDBC连接信息。请将下例中的host_ip、dbname、username和password修改为实际值。

vim source-quickstart-yasdb.properties
name=test-source-yasdb-jdbc-connect
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
# The remaining configs are specific to the JDBC source connector. In this example, we connect to a
# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to
# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.
# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.
connection.url=jdbc:yasdb://host_ip:1688/dbname
connection.user=sys
connection.password=sys
query=select * from user1 limit 100
mode=bulk
incrementing.column.name=id
topic.prefix=test-yasdb-jdbc-

5.将plugins目录配置给connect-standalone.properties。

vim connect-standalone.properties
# Examples: 
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/home/kkadmin/kafka/tmp/plugins

6.回到kafka所在目录下,启动连接。

bin/connect-standalone.sh config/connect-standalone.properties config/source-quickstart-yasdb.properties

7.查看Kafka的客户端工具,验证Topic数据能否正确获取。

pdf-btn 下载文档
copy-btn 复制链接