#大数据工具对接示例-Kafka
YashanDB支持通过Kafka-Connect-Jdbc连接器将数据导入到Kafka中。
本文提供该功能的验证过程介绍,所使用测试环境为Centos7.9.2009。
# Kafka安装
本文主要验证Kafka是否可以连通YashanDB,所以选择单节点启动Kafka。
1.获取Kafka安装包,以kafka_2.13-3.2.0.tgz为例。
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.13-3.2.0.tgz
2.解压安装包,并更改安装包名称。
tar -zxvf kafka_2.13-3.2.0.tgz
mv kafka_2.13-3.2.0 kafka
3.进入Kafka目录,启动Zookeeper。
kafka/bin/zookeeper-server-start.sh kafka/config/zookeeper.properties
4.启动Kafka。
kafka/bin/kafka-server-start.sh kafka/config/server.properties
# Kafka-Connect-Jdbc配置
1.下载Kafka-Connect-Jdbc安装包。
wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.5.1/confluentinc-kafka-connect-jdbc-10.5.1.zip
ls
confluentinc-kafka-connect-jdbc-10.5.1.zip kafka kafka_2.13-3.2.0.tgz
2.创建plugins目录,并将confluentinc-kafka-connect-jdbc-10.5.1.zip解压到plugins目录下。
mkdir plugins
unzip confluentinc-kafka-connect-jdbc-10.5.1.zip -d plugins/
ls plugins/
confluentinc-kafka-connect-jdbc-10.5.1
3.将confluentinc-kafka-connect-jdbc-10.5.1/etc目录中的source-quickstart-sqlite.properties复制一份到kafka/config并改名为source-quickstart-yasdb.properties。
cp plugins/confluentinc-kafka-connect-jdbc-10.5.1/etc/source-quickstart-sqlite.properties kafka/config/
cd kafka/config
mv source-quickstart-sqlite.properties source-quickstart-yasdb.properties
4.编辑source-quickstart-yasdb.properties,提供JDBC连接信息。请将下例中的host_ip、dbname、username和password修改为实际值。
vim source-quickstart-yasdb.properties
name=test-source-yasdb-jdbc-connect
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
# The remaining configs are specific to the JDBC source connector. In this example, we connect to a
# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to
# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.
# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.
connection.url=jdbc:yasdb://host_ip:1688/dbname
connection.user=sys
connection.password=sys
query=select * from user1 limit 100
mode=bulk
incrementing.column.name=id
topic.prefix=test-yasdb-jdbc-
5.将plugins目录配置给connect-standalone.properties。
vim connect-standalone.properties
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/home/kkadmin/kafka/tmp/plugins
6.回到kafka所在目录下,启动连接。
bin/connect-standalone.sh config/connect-standalone.properties config/source-quickstart-yasdb.properties
7.查看Kafka的客户端工具,验证Topic数据能否正确获取。