#大数据工具对接示例(Kafka)
YashanDB支持通过Kafka-Connect-Jdbc连接器将数据导入到Kafka中。
本文提供该功能的验证过程介绍,所使用测试环境为Centos7.9.2009。
# Kafka安装
本文主要验证Kafka是否可以连通YashanDB,所以选择单节点启动Kafka。
获取Kafka安装包,以kafka_2.13-3.2.0.tgz为例。
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.13-3.2.0.tgz
解压安装包,并更改安装包名称。
tar -zxvf kafka_2.13-3.2.0.tgz mv kafka_2.13-3.2.0 kafka
进入Kafka目录,启动Zookeeper。
kafka/bin/zookeeper-server-start.sh kafka/config/zookeeper.properties
启动Kafka。
kafka/bin/kafka-server-start.sh kafka/config/server.properties
# Kafka-Connect-Jdbc配置
下载Kafka-Connect-Jdbc安装包。
wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.5.1/confluentinc-kafka-connect-jdbc-10.5.1.zip ls confluentinc-kafka-connect-jdbc-10.5.1.zip kafka kafka_2.13-3.2.0.tgz
创建plugins目录,并将confluentinc-kafka-connect-jdbc-10.5.1.zip解压到plugins目录下。
mkdir plugins unzip confluentinc-kafka-connect-jdbc-10.5.1.zip -d plugins/ ls plugins/ confluentinc-kafka-connect-jdbc-10.5.1
将confluentinc-kafka-connect-jdbc-10.5.1/etc目录中的source-quickstart-sqlite.properties复制一份到kafka/config并改名为source-quickstart-yasdb.properties。
cp plugins/confluentinc-kafka-connect-jdbc-10.5.1/etc/source-quickstart-sqlite.properties kafka/config/ cd kafka/config mv source-quickstart-sqlite.properties source-quickstart-yasdb.properties
编辑source-quickstart-yasdb.properties,提供JDBC连接信息。
请将host_ip、dbname、username和password修改为实际值。
vi source-quickstart-yasdb.properties
name=test-source-yasdb-jdbc-connect connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 # The remaining configs are specific to the JDBC source connector. In this example, we connect to a # SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to # detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g. # a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'. connection.url=jdbc:yasdb://host_ip:1688/dbname connection.user=sys connection.password=sys query=select * from user1 limit 100 mode=bulk incrementing.column.name=id topic.prefix=test-yasdb-jdbc-
将plugins目录配置给connect-standalone.properties。
vi connect-standalone.properties
# Examples: # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, plugin.path=/home/kkadmin/kafka/tmp/plugins
回到kafka所在目录下,启动连接。
bin/connect-standalone.sh config/connect-standalone.properties config/source-quickstart-yasdb.properties
查看Kafka的客户端工具,验证Topic数据能否正确获取。