#大数据工具对接示例(Kafka)

YashanDB支持通过Kafka-Connect-Jdbc连接器将数据导入到Kafka中。

本文提供该功能的验证过程介绍,所使用测试环境为Centos7.9.2009。

# Kafka安装

本文主要验证Kafka是否可以连通YashanDB,所以选择单节点启动Kafka。

  1. 获取Kafka安装包,以kafka_2.13-3.2.0.tgz为例。

    wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.13-3.2.0.tgz
    
  2. 解压安装包,并更改安装包名称。

    tar -zxvf kafka_2.13-3.2.0.tgz 
    mv kafka_2.13-3.2.0  kafka
    
  3. 进入Kafka目录,启动Zookeeper。

    kafka/bin/zookeeper-server-start.sh kafka/config/zookeeper.properties
    
  4. 启动Kafka。

    kafka/bin/kafka-server-start.sh kafka/config/server.properties 
    

# Kafka-Connect-Jdbc配置

  1. 下载Kafka-Connect-Jdbc安装包。

    wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.5.1/confluentinc-kafka-connect-jdbc-10.5.1.zip
    ls                                                                
    confluentinc-kafka-connect-jdbc-10.5.1.zip  kafka  kafka_2.13-3.2.0.tgz
    
  2. 创建plugins目录,并将confluentinc-kafka-connect-jdbc-10.5.1.zip解压到plugins目录下。

    mkdir plugins
    unzip confluentinc-kafka-connect-jdbc-10.5.1.zip  -d  plugins/
    ls plugins/
    confluentinc-kafka-connect-jdbc-10.5.1
    
  3. 将confluentinc-kafka-connect-jdbc-10.5.1/etc目录中的source-quickstart-sqlite.properties复制一份到kafka/config并改名为source-quickstart-yasdb.properties。

    cp plugins/confluentinc-kafka-connect-jdbc-10.5.1/etc/source-quickstart-sqlite.properties kafka/config/
    cd kafka/config
    mv source-quickstart-sqlite.properties source-quickstart-yasdb.properties
    
  4. 编辑source-quickstart-yasdb.properties,提供JDBC连接信息。

    请将host_ip、dbname、username和password修改为实际值。

    vi source-quickstart-yasdb.properties
    
    name=test-source-yasdb-jdbc-connect
    connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
    tasks.max=1
    # The remaining configs are specific to the JDBC source connector. In this example, we connect to a
    # SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to
    # detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.
    # a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.
    connection.url=jdbc:yasdb://host_ip:1688/dbname
    connection.user=sys
    connection.password=sys
    query=select * from user1 limit 100
    mode=bulk
    incrementing.column.name=id
    topic.prefix=test-yasdb-jdbc-
    
  5. 将plugins目录配置给connect-standalone.properties。

    vi connect-standalone.properties
    
    # Examples: 
    # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
    plugin.path=/home/kkadmin/kafka/tmp/plugins
    
  6. 回到kafka所在目录下,启动连接。

    bin/connect-standalone.sh config/connect-standalone.properties config/source-quickstart-yasdb.properties
    
  7. 查看Kafka的客户端工具,验证Topic数据能否正确获取。

pdf-btn 下载文档
copy-btn 复制链接