精华内容
下载资源
问答
  • Confluent的适用于Apache Kafka TM的Python客户端 confluent-kafka-python提供了与所有兼容的高级Producer,Consumer和AdminClient 经纪人> = v0.8, 和。 客户是: 可靠-它是 (通过二进制车轮自动提供)的包装,...
  • 使用avro和Confluent的Schema Registry的Kafka De / Serializer 从1.1.1-4-> 1.1.1-5 +迁移 1.1.1-5添加了对逻辑类型的支持。 此支持是自动的,将在序列化和反序列化中使用。 如果需要反序列化为基础基本类型,则...
  • 该实现基于Confluent Kafka Go库和Confluent的示例实现的略微扩展,可以在找到。 用法 要构建Kafka生产者,只需执行以下命令: $ make 确保之前已安装。 可以使用以下命令运行示例应用程序: confluent-kafka-go...
  • Confluent Schema Registry为您的元数据提供了一个服务层。 它提供了一个RESTful接口,用于存储和检索Avro:registered:,JSON模式和Protobuf模式。 它根据指定的主题名称策略存储所有架构的版本历史记录,提供多种...
  • 这是一组精选的演示,展示了Confluent平台上的ApacheKafka:registered:事件流处理,该事件流处理平台使您能够处理,组织和管理跨云,本地和无服务器部署的大量流数据。 。 从哪儿开始 最好的演示从开始,该示例在...
  • 适用于Apache Kafka TM的Confluent的.NET客户端 confluent-kafka-dotnet是用于和Confluent的.NET客户端。 特征: 高性能 -kafka-dotnet是围绕 (一种经过微调的C客户端)的轻量级包装。 可靠性-编写Apache Kafka...
  • 一个简单的本地主机,使用confluent-kafka运行Apache Kafka实现。 此实现基于,但我需要更轻便的版本进行开发/研究,并且可以从主机进行访问(我不需要额外的容器)而不会弄乱我的机器。 有助于了解Zookeeper(配置...
  • Ansible提供了一种简单的方法来部署,管理和配置Confluent Platform服务。 该存储库提供了剧本和模板,可轻松启动Confluent Platform安装。 具体来说,此存储库: 安装Confluent Platform软件包。 使用systemd脚本...
  • 该存储库不再由Confluent维护。 它已被专有的取代,后者提供了显着更多的功能,包括针对Confluent平台的ACL,RBAC和审核日志管理。 融合平台CLI 通过命令行启动和管理Confluent平台的CLI。 安装 下载并安装 通过...
  • confluent-kafka-dotnet, 融合 Kafka. NET 客户端的Apache Apache KafkaTM融合客户端的. NET confluent-kafka-dotnet 是 Apache Kafka的汇合客户机的.NET,以及汇合平台。功能:高性能 - confluent-kafka-do
  • Confluent平台中的所有组件都启用了端到端安全性。 使用运行示例。 目录 总览 用例是一个Kafka事件流应用程序,用于实时编辑真实的Wikipedia页面。 Wikimedia的EventStreams会发布连续的实时编辑流,这些实时编辑...
  • kafka_confluent-源码

    2021-02-13 18:56:00
    命令:通过运行启动Confluent平台 $ docker-compose up -d 创建一个主题: $ docker-compose exec broker kafka-topics --create --topic example-topic --bootstrap-server broker:9092 --replication-factor 1 ...
  • 所以我做了 ConfluentConfluent 仅适用于 URL 中带有 /confluence/display/ 或 /confluence/pages/ 的 HTTPS 页面。 它重新设计了一些元素,使文章更易于阅读。 我在哪里可以得到这个? Dope Dope 我很乐意...
  • Confluent.Kafka源代码 测试单元 Confluent.kafka是一款C#客户端的kafka API
  • Confluent用于Apache KafkaTM的Golang客户端confluent-kafka-go是Confluent用于Apache Kafka和Confluent平台的Golang客户端。 特点:高性能-confluent-kafka-go是librd的轻量级包装,用于Apache KafkaTM的Confluent...
  • Confluent用于Apache KafkaTM的Python客户端confluent-kafka-python提供了与所有大于等于v0.8的Apache KafkaTM代理,Confluent Cloud和Confluent平台兼容的高级Producer,Consumer和AdminClient。 用于Apache ...
  • sql server2008 cdc 数据实时同步到kafka,Debezium是捕获数据实时动态变化的开源的分布式同步平台。能实时捕获到数据源(Mysql、Mongo、PostgreSql)的:新增(inserts)、更新(updates)、删除(deletes)操作,实时...
  • Confluent-kafka的Docker映像 图像基于python:3.5和python:2.7.12 该映像编译librdkafka ,这是confluent-kafka librdkafka的先决条件
  • ccloud-tools:从Confluent平台运行工具以及您的Confluent Cloud:trade_mark:集群
  • 其中第一种SASL/GSSAPI的认证就是kerberos认证,对于java来说有原生的支持,但是对于python来说配置稍微麻烦一些,下面说一下具体的配置过程,confluent kafka模块底层依赖于librdkafka,这是使用c编写的高性能的...
  • confluent-2.0.0-2.11.7(2)(3-2)
  • 该存储库包含一组docker映像,以演示Kafka和Confluent平台的安全配置。 该存储库的目的不是提供生产就绪图像。 它被设计用作示例,以帮助人们配置Apache Kafka的安全模块。 所有图像都是从头开始创建的,没有重复...
  • 卡夫卡融合角色 这个Ansible角色旨在使用安装和配置Apache Kafka和Apache Zookeeper。 所有配置都可以通过var传递,您可以在查看必要var的列表,并根据需要自定义它们。 入门 先决条件 Ansible 2.2 +,Python和Pip...
  • 一个在Confluent和Spring Boot上组合Kafka Streams的游乐场。 跑步 使用以下命令启动本地的,简化的,融合的堆栈 docker-compose up 然后运行该应用程序。 与实体发布端点和查询端点进行交互,例如 POST ...
  • ##1,获取生效的配置(从confluent local services start 启动后获取) $> find /tmp/confluent.33312/ -name '*.properties' -exec cp {} . \; # 2,创建数据目录 $> grep /var/lib/confluent*.* * -o -h |...

    1,获取配置文件

    • 需求:confluent local 启动服务,数据存放在/tmp 目录,不安全;需要自定义数据目录
    • 避免报错:Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1747974 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration (配置kafka.properties, connect.properties)
    ##1,获取生效的配置(从confluent local services start 启动后获取)
    $> find /tmp/confluent.*/ -name '*.properties'  -exec cp {} . \;
       #只保留有效注释
       ls  |awk '{print "grep -Ev \"^$|^#\" "$1 " > "$1".2"}'   |bash
       rename .2 '' *  
       #修改数据存放目录为:/var/lib/confluent
       sed -i 's@/tmp/confluent.[a-zA-Z0-9]\+/\(.*\)@/var/lib/confluent/\1@'  *
       #修改ip为真实服务器ip
       sed -i 's@localhost:@192.168.56.162:@' ksql-server.properties connect.properties  control-center.properties
    
    # 2,创建数据目录
    $> grep /var/lib/confluent.* *  -o -h |xargs -n 1 mkdir  -p
    
    ##3,保留生效的配置(删除注释行和空行)
    $> cat zookeeper.properties
    dataDir=/var/lib/confluent2/zookeeper/data
    clientPort=2181
    maxClientCnxns=0
    admin.enableServer=false
    autopurge.snapRetainCount=3
    autopurge.purgeInterval=24
    
    $> cat kafka.properties
    broker.id=0
    num.network.threads=3
    num.io.threads=8
    #100KB
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    #50MB, 40MB
    #socket.request.max.bytes=104857600
    socket.request.max.bytes=524288000
    message.max.bytes=419430400
    log.dirs=/var/lib/confluent2/kafka/data
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=localhost:2181
    zookeeper.connection.timeout.ms=18000
    group.initial.rebalance.delay.ms=0
    confluent.license.topic.replication.factor=1
    confluent.metadata.topic.replication.factor=1
    confluent.security.event.logger.exporter.kafka.topic.replicas=1
    confluent.balancer.enable=true
    confluent.balancer.topic.replication.factor=1
    metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
    confluent.metrics.reporter.bootstrap.servers=localhost:9092
    confluent.metrics.reporter.topic.replicas=1
    
    $> cat schema-registry.properties
    listeners=http://0.0.0.0:8081
    kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
    kafkastore.topic=_schemas
    debug=false
    producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
    kafkastore.connection.url=localhost:2181
    consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    
    #####################
    $> cat connect.properties
    bootstrap.servers=192.168.56.7:9092
    group.id=connect-cluster
    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://192.168.56.7:8081
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://192.168.56.7:8081
    config.storage.topic=connect-configs
    offset.storage.topic=connect-offsets
    status.storage.topic=connect-statuses
    config.storage.replication.factor=1
    offset.storage.replication.factor=1
    status.storage.replication.factor=1
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    #producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
    #consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    plugin.path=/opt/confluent/share/java,/opt/confluent/share/confluent-hub-components
    #rest.extension.classes=io.confluent.connect.replicator.monitoring.ReplicatorMonitoringExtension
    producer.max.request.size=524288000
    consumer.max.request.size=524288000
    
    $> cat control-center.properties
    bootstrap.servers=192.168.56.7:9092
    zookeeper.connect=192.168.56.7:2181
    confluent.controlcenter.id=1
    confluent.controlcenter.data.dir=/var/lib/confluent2/control-center/data
    confluent.controlcenter.connect.connect-default.cluster=http://192.168.56.7:8083
    confluent.controlcenter.ksql.ksqlDB.url=http://192.168.56.7:8088
    confluent.controlcenter.schema.registry.url=http://192.168.56.7:8081
    confluent.controlcenter.streams.cprest.url=http://192.168.56.7:8090
    confluent.controlcenter.internal.topics.replication=1
    confluent.controlcenter.internal.topics.partitions=2
    confluent.controlcenter.command.topic.replication=1
    confluent.controlcenter.ui.autoupdate.enable=false
    confluent.controlcenter.usage.data.collection.enable=true
    confluent.monitoring.interceptor.topic.partitions=2
    confluent.monitoring.interceptor.topic.replication=1
    confluent.metrics.topic.replication=1
    
    $> cat kafka-rest.properties
    bootstrap.servers=PLAINTEXT://localhost:9092
    producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
    schema.registry.url=http://192.168.56.7:8081
    zookeeper.connect=localhost:2181
    consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    
    $> cat ksql-server.properties
    listeners=http://0.0.0.0:8088
    ksql.logging.processing.topic.auto.create=true
    ksql.logging.processing.stream.auto.create=true
    bootstrap.servers=192.168.56.7:9092
    compression.type=snappy
    ksql.schema.registry.url=http://192.168.56.7:8081
    consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
    kafkastore.connection.url=192.168.56.7:2181
    state.dir=/var/lib/confluent2/ksql-server/data/kafka-streams
    

    2,CentOS 7 使用systemd 管理服务

    服务启动顺序说明:https://docs.confluent.io/platform/current/installation/installing_cp/zip-tar.html

    service文件目录自定义配置文件目录
    /opt/confluent/lib/systemd/systemservice目录/profiles
    $> systemctl cat confluent*
    # 1, /usr/lib/systemd/system/confluent-control-center.service
    [Unit]
    Description=Confluent Control Center
    After=network.target confluent-kafka.target
    [Service]
    Type=simple
    #User=cp-control-center
    #Group=confluent
    Environment="LOG_DIR=/var/log/confluent/control-center" "CONTROL_CENTER_LOG4J_OPTS=-Dlog4j.configuration=file:/etc/confluent-control-center/log4j-rolling.properties"
    #ExecStart=/usr/bin/control-center-start /etc/confluent-control-center/control-center-production.properties
    ExecStart=/opt/confluent/bin/control-center-start /opt/confluent/lib/systemd/system/profiles/control-center.properties
    TimeoutStopSec=180
    Restart=always
    LimitNOFILE=100000
    [Install]
    WantedBy=multi-user.target
    # 2,...
    

    管理服务:start,status

    • systemctl start|status|stop confluent*

    在这里插入图片描述

    3, CentOS 6 使用init.d管理服务

    在这里插入图片描述

    下载confluent 5.x版本:https://packages.confluent.io/archive/5.5/confluent-5.5.5-2.12.tar.gz?_ga=2.175838050.1197985.1626658432-1483307143.1622790441

    [root@test-c62 configs]# cat /etc/init.d/confluent-ksqldb
    #!/bin/bash
    #description: ksqldb service script
    #chkconfig: 235 93 87
    #prog: some cmd, but not stdout !
    f(){
         /opt/confluent/bin/ksql-server-start /root/configs/ksql-server.properties &> $log
    }
    
    #调用系统脚本方法
    . /etc/rc.d/init.d/functions > /dev/null
    prog=KsqlServerMain
    pid=$(jps |grep $prog |awk '{print $1}' )
    pid=${pid:-0}
    log=/opt/confluent/logs/confluent-ksql-server.log
    
    #服务管理:start,stop,status
    case $1 in
       stop)
            [ $pid -gt 0 ] && kill $pid
       ;;
       start)
           [ $pid -gt 0 ] || f &
       ;;
       status)
           echo -n "service ${0##*/}: "
           [ $pid -gt 0 ] && success || failure
           echo
    esac
    
    
    
    [root@test-c62 configs]# jps |grep -v Jps
    26913 QuorumPeerMain
    1765 ControlCenter
    32070 KsqlServerMain
    8519 ConnectDistributed
    31146 SchemaRegistryMain
    410   KafkaRestMain
    10603 Ksql
    29628 SupportedKafka
    
    #启动|停止 优先级
    [root@test-c62 init.d]# grep chkconfig: /etc/init.d/confluent-*
    /etc/init.d/confluent-control-center:#chkconfig: 235 96 84
    /etc/init.d/confluent-kafka-connect:#chkconfig: 235 95 85
    /etc/init.d/confluent-kafka-rest:#chkconfig: 235 94 86
    /etc/init.d/confluent-ksqldb:#chkconfig: 235 93 87
    /etc/init.d/confluent-schema-registry:#chkconfig: 235 92 88
    /etc/init.d/confluent-server:#chkconfig: 235 91 89
    /etc/init.d/confluent-zk:#chkconfig: 235 90 90
    
    [root@test-c62 configs]# ls /etc/init.d/confluent-* |awk '{print $1" status"}' |bash
    service confluent-control-center: [  OK  ]
    service confluent-kafka-connect: [  OK  ]
    service confluent-kafka-rest: [  OK  ]
    service confluent-ksqldb: [  OK  ]
    service confluent-schema-registry: [  OK  ]
    service confluent-server: [  OK  ]
    service confluent-zk: [  OK  ]
    
    展开全文
  • Confluent6.0平台搭建

    2020-12-09 18:55:42
    Confluent6.0平台搭建 大家好,我是一拳就能打爆A柱的男人 我搭Confluent的时候也遇上很多问题,所以我也不希望各位把坑都踩一遍,所以给大家带来这篇搭建流程。大家一定要多看官方的文档,里面文件虽然很乱,但是...

    Confluent6.0平台搭建

    大家好,我是一拳就能打爆A柱的男人

    我搭Confluent的时候也遇上很多问题,所以我也不希望各位把坑都踩一遍,所以给大家带来这篇搭建流程。大家一定要多看官方的文档,里面文件虽然很乱,但是确实有整体的搭建流程。我建议各位一边看这篇博客,一边搭配官方文档来做!

    1. 环境介绍

    版本其他
    LinuxCentOS 6.10内存:2GB
    JDK1.8.0_141

    2. Confluent6.0下载

    进入Confluent官网 点击右上角GET STARTED FREE进入下载页面,下拉至Download Confluent Platform(如图1),输入Email、选择Manual、选择File Type为tar下载。

    在这里插入图片描述

    将压缩包confluent-上传至服务器。

    2. 解压、修改配置文件

    在Linux中查看路径下文件:

    [root@spark-03 apps]# ls
    confluent-6.0.0.tar.gz
    

    解压改文件到当前目录:

    [root@spark-03 apps]# tar -zxvf confluent-6.0.0.tar.gz
    

    进入confluent-6.0.0:

    [root@spark-03 apps]# cd confluent-6.0.0/
    

    查看confluent目录结构:

    [root@spark-03 confluent-6.0.0]# ls
    bin  etc  lib  README  share  src
    

    配置kafka、zookeeper的路径

    在目录下新建data文件夹:

    [root@spark-03 confluent-6.0.0]# mkdir data
    [root@spark-03 confluent-6.0.0]# cd data
    [root@spark-03 confluent-6.0.0]# mkdir zkdata kafkadata
    [root@spark-03 data]# ls
    kafkadata  zkdata
    

    复制zkdata文件夹路径,修改…/confluent-6.0.0/etc/kafka/zookeeper.properties文件:

    [root@spark-03 confluent-6.0.0]# cd etc/kafka
    [root@spark-03 kafka]# vi zookeeper.properties
    

    进入zookeeper.properties:

    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    # the directory where the snapshot is stored.
    dataDir=/tmp/zookeeper
    # the port at which the clients will connect
    clientPort=2181
    # disable the per-ip limit on the number of connections since this is a non-production config
    maxClientCnxns=0
    # Disable the adminserver by default to avoid port conflicts.
    # Set the port to something non-conflicting if choosing to enable this
    admin.enableServer=false
    # admin.serverPort=8080
    

    修改dataDir的路径:

    dataDir=/.../confluent-6.0.0/data/zkdata
    

    修改server.properties文件:

    [root@spark-03 kafka]# vi server.properties
    

    查找log.dir(或者手动找):

    :g log.dir
    

    修改对应路径:

    log.dirs=/.../confluent-6.0.0/data/kafkadata
    

    3. 启动Confluent Platform

    进入Confluent的bin目录,启动Confluent(这里虽然会报启动失败,但是查看进程却发现进程成功启动了,所以提示进程启动失败可以查看是否真正启动成功,若成功则重复以上命令将所有进程依次启动完毕):

    • 第一次启动失败:
    [root@spark-03 bin]# ./confluent local services start
    The local commands are intended for a single-node development environment only,
    NOT for production usage. https://docs.confluent.io/current/cli/index.html
    
    Using CONFLUENT_CURRENT: /tmp/confluent.ktr9tuJJ
    Starting ZooKeeper
    Error: ZooKeeper failed to start
    
    • 查看进程:
    [root@spark-03 bin]# jps
    1445 QuorumPeerMain
    1494 Jps
    
    • 第二次启动zookeeper成功,kafka失败:
    [root@spark-03 bin]# ./confluent local services start
    The local commands are intended for a single-node development environment only,
    NOT for production usage. https://docs.confluent.io/current/cli/index.html
    
    Using CONFLUENT_CURRENT: /tmp/confluent.727822
    ZooKeeper is [UP]
    Starting Kafka
    Error: Kafka failed to start
    
    • 查看进程:
    [root@spark-03 bin]# jps
    1536 Kafka
    1699 Jps
    1445 QuorumPeerMain
    

    如此反复直至所有进程启动(整个过程可能需要十几分钟):

    [root@spark-03 bin]# ./confluent local services start
    The local commands are intended for a single-node development environment only,
    NOT for production usage. https://docs.confluent.io/current/cli/index.html
    
    Using CONFLUENT_CURRENT: /tmp/confluent.727822
    ZooKeeper is [UP]
    Kafka is [UP]
    Schema Registry is [UP]
    Kafka REST is [UP]
    Connect is [UP]
    ksqlDB Server is [UP]
    Control Center is [UP]
    

    这些进程就是整个Confluent Platform完全打开的状态,可是我电脑比较差也开不起那么多进程,所以以下几个进程是可以关闭的,如果是做单机kafka的话:

    • 查看内存(建议虚拟机开到3G以上)
    [root@spark-03 bin]# free -h
                 total       used       free     shared    buffers     cached
    Mem:          1.9G       1.9G        59M         0B       1.8M        28M
    -/+ buffers/cache:       1.9G        89M
    Swap:         1.0G       544M       479M
    
    • 查看进程号,建议砍掉ConnectDistributed、KsqlServerMain、ControlCenter:
    [root@spark-03 bin]# jps
    1536 Kafka
    1873 ConnectDistributed
    1810 KafkaRestMain
    1986 KsqlServerMain
    1445 QuorumPeerMain
    2585 Jps
    1741 SchemaRegistryMain
    2063 ControlCenter
    [root@spark-03 bin]# kill 1873 1986 2063
    

    4. 配置Confluent-connectors

    Confluent提供了上百种数据源的连接器(connectors),而要连接达梦8也是类似JDBC的连接方法,所以进入JDBC Connector (Source and Sink) for Confluent Platform 按照官方流程下载JDBC的connector。其实Confluent的组件所以也不一定要按照官方的办法通过confluent-hub来操作,也可以从confluent里下载connector然后上传到服务器。

    4.1和4.2二选一

    4.1 通过confluent-hub下载jdbc-connector

    Confluent6.0平台已经带有confluent-hub,在bin目录中输入命令:

    [root@spark-03 bin]# ./confluent-hub install confluentinc/kafka-connect-jdbc:latest
    # 注:亦可指定版本kafka-connect-jdbc:10.0.0
    
    # 选择安装目录,1是根据CONFLUENT_HOME 2是根据平台安装的路径 这里二者都是一样的
    The component can be installed in any of the following Confluent Platform installations: 
      1. /.../confluent-6.0.0 (based on $CONFLUENT_HOME) 
      2. /.../confluent-6.0.0 (where this tool is installed) 
    Choose one of these to continue the installation (1-2): 1
    Do you want to install this into /.../confluent-6.0.0/share/confluent-hub-components? (yN) y
    
     
    Component's license: 
    Confluent Community License 
    https://www.confluent.io/confluent-community-license 
    I agree to the software license agreement (yN) y
    
    Downloading component Kafka Connect JDBC 10.0.1, provided by Confluent, Inc. from Confluent Hub and installing into /.../confluent-6.0.0/share/confluent-hub-components 
    Detected Worker's configs: 
      1. Standard: /.../confluent-6.0.0/etc/kafka/connect-distributed.properties 
      2. Standard: /.../confluent-6.0.0/etc/kafka/connect-standalone.properties 
      3. Standard: /.../confluent-6.0.0/etc/schema-registry/connect-avro-distributed.properties 
      4. Standard: /.../confluent-6.0.0/etc/schema-registry/connect-avro-standalone.properties 
      5. Based on CONFLUENT_CURRENT: /tmp/confluent.727822/connect/connect.properties 
    # 选择是否更改1~5个文件的path
    Do you want to update all detected configs? (yN) y
    
    Adding installation directory to plugin path in the following files: 
      /.../confluent-6.0.0/etc/kafka/connect-distributed.properties 
      /.../confluent-6.0.0/etc/kafka/connect-standalone.properties 
      /.../confluent-6.0.0/etc/schema-registry/connect-avro-distributed.properties 
      /.../confluent-6.0.0/etc/schema-registry/connect-avro-standalone.properties 
      /tmp/confluent.727822/connect/connect.properties 
     
    Completed 
    

    以上是使用confluent-hub安装的jdbc-connector的过程。

    4.2 通过官网下载jdbc-connector

    也可以通过官网安装jdbc-connector,进入confluent-hub,在搜索框中搜索jdbc:

    在这里插入图片描述

    在这里插入图片描述

    下载完成后上传至服务器即可。

    4.3 配置jdbc-connector相关文件和路径

    进入/…/confluent-6.0.0/share/confluent-hub-components查看jdbc-connector:

    [root@spark-03 confluent-hub-components]# ls
    confluentinc-kafka-connect-jdbc
    

    进入文件夹,查看目录:

    [root@spark-03 confluentinc-kafka-connect-jdbc]# ls
    assets  doc  etc  lib  manifest.json
    
    • lib目录依赖包路径,也是之后存放DM8connector的路径
    • etc目录是相关配置文件文件夹

    接下来首先要确定依赖路径正确,进入/…/confluent-6.0.0/etc/schema-registry:

    4.3.1 配置依赖路径

    查看connect-avro-standalone.properties,确保其plugin.path包括了/…/confluent-6.0.0/share/confluent-hub-components,如下:

    plugin.path=share/java,/.../confluent-6.0.0/share/confluent-hub-components
    

    也可以更详细的指定到lib中。

    4.3.2 配置数据源相关配置

    进入/…/confluent-6.0.0/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/etc,拷贝source文件:

    [root@spark-03 etc]# cp source-quickstart-sqlite.properties source-dm8.properties
    

    修改source-dm8.properties参数:

    # name必须唯一
    name=test-source-dm-jdbc
    connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
    tasks.max=1
    
    connection.url=jdbc:dm://IP:5236/jc?user=用户名&password=密码&characterEncoding=utf-8
    # 需要查询的表
    table.whitelist=kmeans2
    # 增量查询
    mode=incrementing
    # 增量依据
    incrementing.column.name=id
    # 自动生成的topic的前缀
    topic.prefix=test-dm-jc-
    

    5. 上传达梦driver,启动connect-standalone

    将达梦的驱动放入/…/confluent-6.0.0/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib:

    在这里插入图片描述

    进入confluent的bin目录,启动connect-standalone:

    [root@spark-03 bin]# ./connect-standalone /.../confluent-6.0.0/etc/schema-registry/connect-avro-standalone.properties /.../confluent-6.0.0/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/etc/source-dm8.properties
    

    如果配置正确,则会出现下面这个提示:

    [2020-12-10 01:42:20,071] INFO Using JDBC dialect Generic (io.confluent.connect.jdbc.source.JdbcSourceTask:102)
    [2020-12-10 01:42:20,380] INFO Attempting to open connection #1 to Generic (io.confluent.connect.jdbc.util.CachedConnectionProvider:82)
    [2020-12-10 01:42:20,858] INFO Started JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:261)
    [2020-12-10 01:42:20,858] INFO WorkerSourceTask{id=test-source-dm-jdbc-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
    [2020-12-10 01:42:20,939] INFO Begin using SQL query: SELECT * FROM "JC"."kmeans2" WHERE "JC"."kmeans2"."id" > ? ORDER BY "JC"."kmeans2"."id" ASC (io.confluent.connect.jdbc.source.TableQuerier:164)
    

    出现了自动生成的SQL语句,表示配置成功。

    展开全文
  • cp-helm-charts:Confluent Platform Helm图表使您能够在Kubernetes上部署Confluent Platform服务以进行开发,测试和概念验证环境
  • Confluent 由kafka的核心成员成立,目前最新版Confluent为v5.5.1版本,对应的kafka版本为Apache Kafka_v2.5.0,在Confluent中不仅包含了kafka,还有下面几个组件,增强了kafka的功能也大大提升了kafka的易用性。...

    目录

    简介

    单机部署

    集群部署

    尾巴


    Kafka系列:

    kafka 2.4.1单机版部署及使用

    kafka监控系统kafka eagle安装使用

    滴滴开源的kafka-manager编译及部署使用

    kafka管理监控系统 CMAK(yahoo的kafka-manager)部署及使用

    Kafka系列(一)、2.6.0版本kafka集群搭建

    Kafka系列(二)、架构原理及存储机制

    Kafka系列(三)、生产者分区策略、ISR、ACK机制、一致性语义

    Kafka系列(四)、消费者策略、Rebalance机制、Offset存储机制 

    Kafka系列(五)、开启SASL安全认证以及配置ACL权限控制

    Kafka系列(六)、Kafka开发套件kafka lenses 安装及使用(带WebUI)


    简介

    Confluent 由kafka的核心成员成立,目前最新版Confluent为v5.5.1版本,对应的kafka版本为Apache Kafka_v2.5.0,在Confluent中不仅包含了kafka,还有下面几个组件,增强了kafka的功能也大大提升了kafka的易用性。

    • KSQL:通过SQL查询topic内的数据;
    • Confluent Connetors:支持配置HDFS,Cassandra,MySQL,Oracle,Postgre,MongoDB,Kudu等数据源的source和sink;
    • Confluent clients:支持Java,Python,C/C++,Go,.Net等客户端;
    • Schema Registry:统一消息格式,提前将topic内的数据注册到schema目录表内,消费端无需判断消息内的数据类型,只需要从目录表内找到该主题对应的schema即可;
    • REST Proxy:允许以RESTful api方式直接访问kafka;
    • Confluent Control Center:提供WebUI可视化监控、运维以及开发,简化配置方式;(商业版的功能,试用期30天)

    官方网址:https://www.confluent.io/

    下载地址:https://www.confluent.io/download/

    单机部署

    本次我们安装的是最新版的v5.5.1版本,单机版的安装非常简单,安装步骤如下:

    0. JDK1.8(略)
    1. 解压tar包
    tar -zxvf confluent-5.5.1-2.11.tar.gz -C /opt/app/
    
    2. 配置环境变量
    vim /etc/profile
    ----------------
    export CONFLUENT_HOME=/opt/app/confluent-5.5.1
    export PATH=$PATH:$CONFLUENT_HOME/bin
    ----------------
    
    3. 刷新环境变量
    source /etc/profile
    
    4. 启动单机confluent
    $CONFLUENT_HOME/bin/confluent local start
    
    
    # ------其他命令-------
    # 关闭单机confluent
    $CONFLUENT_HOME/bin/confluent local stop
    
    # 清空当前confluent下的数据
    $CONFLUENT_HOME/bin/confluent local destroy

    启动成功如图:

    在浏览器中输入 ip:9021 即可打开Confluent Control Center界面:

     

    Control Center 还有很多其他功能,非常方便,如果公司不差钱的话可以考虑一下。

    集群部署

    Confluent套件中除了control center以及部分connector以外的组件都是免费的,比apache kafka 更方便,因此接下来我们来部署集群版的Confluent,对于有部署zookeeper 及kafka 经验的人来说 稍微修改一下配置文件也可以将Confluent部署在已有的zk和kafka之上,这里我就使用之前部署好的zk和kafka了,如果要使用confluent中的zk和kafka也是一样的配置,只是配置文件路径在$CONFLUENT_HOME/etc/kafka 内。

    可参考的配置: 

    Kafka系列(一)、2.6.0版本kafka集群搭建

    Zookeeper系列(三)、zk集群安装部署

    0. 集群规划

    机器角色端口
    wyk01 & wyk02 & wyk03kafka

    9092

    wyk01 & wyk02 & wyk03zookeeper2181
    wyk01 & wyk02 & wyk03

    confluent control center

    9021
    wyk01 & wyk02 & wyk03schema registry8081
    wyk01 & wyk02 & wyk03kafka rest proxy8082
    wyk01 & wyk02 & wyk03confluent connector8083
    wyk01 & wyk02 & wyk03ksql8088

    1. wyk01:解压tar包 并配置环境变量

    0. JDK1.8(略)
    1. 解压tar包
    tar -zxvf confluent-5.5.1-2.11.tar.gz -C /opt/app/
    
    2. 配置环境变量
    vim /etc/profile
    ----------------
    export CONFLUENT_HOME=/opt/app/confluent-5.5.1
    export PATH=$PATH:$CONFLUENT_HOME/bin
    ----------------
    
    3. 刷新环境变量
    source /etc/profile

    2. wyk01:配置zookeeper和kafka

    cd $CONFLUENT_HOME/etc/kafka
    # 备份confluent的配置文件
    mv zookeeper.properties zookeeper.properties.bak
    mv server.properties server.properties.bak
    
    # 将已有的kafka和zookeeper集群的配置文件复制过来
    cp /opt/app/kafka/config/server.properties .
    cp /opt/app/zookeeper/conf/zoo.cfg zookeeper.properties
    
    # 修改kafka配置,新增下列confluent参数
    vim $CONFLUENT_HOME/etc/kafka/server.properties
    --------------------------------------------
    metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
    confluent.metrics.reporter.bootstrap.servers=wyk01:9092,wyk02:9092,wyk03:9092
    confluent.metrics.reporter.topic.replicas=2
    confluent.support.metrics.enable=true
    confluent.support.customer.id=anonymous
    delete.topic.enable=true
    
    # 如果使用confluent自带的zk和kafka的话,只需要参考kafka和zookeeper集群的方式修改刚才备份的那两个配置文件即可

    3. wyk01:配置schema-registry

    vim $CONFLUENT_HOME/etc/kafka-rest/kafka-rest.properties
    ---------------------------------
    #修改下面的参数,其他参数保持不变
    kafkastore.bootstrap.servers=PLAINTEXT://wyk01:9092,wyk02:9092,wyk03:9092

    4. wyk01:配置ksql

    vim $CONFLUENT_HOME/etc/ksqldb/ksql-server.properties
    ---------------------------------
    #修改下面的参数,其他参数保持不变
    bootstrap.servers=wyk01:9092,wyk02:9092,wyk03:9092
    ksql.schema.registry.url=http://wyk01:8081,http://wyk02:8081,http://wyk03:8081
    

    5. wyk01:配置kafka rest

    vim $CONFLUENT_HOME/etc/kafka-rest/kafka-rest.properties
    ---------------------------------
    #修改下面的参数,其他参数保持不变
    schema.registry.url=http://wyk01:8081,http://wyk02:8081,http://wyk03:8081
    zookeeper.connect=wyk01:2181,wyk02:2181,wyk03:2181
    bootstrap.servers=PLAINTEXT://wyk01:9092,wyk02:9092,wyk03:9092
    listeners=http://0.0.0.0:8082
    consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor

    6. wyk01:配置confluent connectors

    vim $CONFLUENT_HOME/etc/schema-registry/connect-avro-distributed.properties
    ---------------------------------
    #修改下面的参数,其他参数保持不变
    bootstrap.servers=wyk01:9092,wyk02:9092,wyk03:9092
    key.converter.schema.registry.url==http://wyk01:8081,http://wyk02:8081,http://wyk03:8081
    value.converter.schema.registry.url=http://wyk01:8081,http://wyk02:8081,http://wyk03:8081
    config.storage.replication.factor=3
    offset.storage.replication.factor=3
    status.storage.replication.factor=3
    rest.port=8083
    rest.advertised.port=8083
    plugin.path=$CONFLUENT_HOME/share/java

    7. wyk01:配置control center

    vim $CONFLUENT_HOME/etc/confluent-control-center/control-center-dev.properties
    ---------------------------------
    #修改下面的参数,其他参数保持不变
    bootstrap.servers=wyk01:9092,wyk02:9092,wyk03:9092
    zookeeper.connect=wyk01:2181,wyk02:2181,wyk03:2181
    confluent.controlcenter.rest.listeners=http://0.0.0.0:9021
    confluent.controlcenter.id=1    # 每台confluent机器的ID不能有重复
    confluent.controlcenter.ksql.ksqlDB.url=http://wyk01:8088  # 每台confluent机器配置各自的IP
    confluent.controlcenter.connect.cluster=http://wyk01:8083,http://wyk02:8083,http://wyk03:8083
    confluent.controlcenter.schema.registry.url=http://wyk01:8081,http://wyk02:8081,http://wyk03:8081
    confluent.controlcenter.internal.topics.replication=2
    confluent.controlcenter.command.topic.replication=2
    confluent.monitoring.interceptor.topic.replication=2
    confluent.metrics.topic.replication=3

    8. wyk01:将confluent 分发到wyk02,wyk03节点,并在wyk02,wyk03也配置环境变量

    1. wyk01:分发confluent 文件夹
    scp -r /opt/app/confluent-5.5.1 root@wyk02:/opt/app/
    scp -r /opt/app/confluent-5.5.1 root@wyk03:/opt/app/
    
    2. wyk02 & wyk03:配置环境变量
    vim /etc/profile
    ----------------
    export CONFLUENT_HOME=/opt/app/confluent-5.5.1
    export PATH=$PATH:$CONFLUENT_HOME/bin
    ----------------
    
    3. wyk02 & wyk03:刷新环境变量
    source /etc/profile

     9. wyk02 & wyk03:修改配置文件中的ID

    # wyk02: 修改kafka和 control center中的id
    vim $CONFLUENT_HOME/etc/kafka/server.properties
    ---------------------------------
    #修改下面的参数,其他参数保持不变
    broker.id=2
    
    vim $CONFLUENT_HOME/etc/confluent-control-center/control-center-dev.properties
    ---------------------------------
    #修改下面的参数,其他参数保持不变
    confluent.controlcenter.id=2    # 每台confluent机器的ID不能有重复
    confluent.controlcenter.ksql.ksqlDB.url=http://wyk02:8088  # 每台confluent机器配置各自的IP
    
    -------------------------------------------------------
    
    # wyk03: 修改kafka和 control center中的id
    vim $CONFLUENT_HOME/etc/kafka/server.properties
    ---------------------------------
    #修改下面的参数,其他参数保持不变
    broker.id=3
    
    vim $CONFLUENT_HOME/etc/confluent-control-center/control-center-dev.properties
    ---------------------------------
    #修改下面的参数,其他参数保持不变
    confluent.controlcenter.id=3    # 每台confluent机器的ID不能有重复
    confluent.controlcenter.ksql.ksqlDB.url=http://wyk03:8088  # 每台confluent机器配置各自的IP

    10. 在三台机器上分别启动下列服务:(xcall是我自己写的shell脚本,表示在三台机器同时执行命令)

    # 1. 启动 zookeeper
    xcall.sh '/opt/app/zookeeper/bin/zkServer.sh start'
    
    # 2. 启动 kafka
    xcall.sh '/opt/app/confluent-5.5.1/bin/kafka-server-start -daemon /opt/app/confluent-5.5.1/etc/kafka/server.properties' 
    
    # 3. 启动 schema registry
    xcall.sh '/opt/app/confluent-5.5.1/bin/schema-registry-start -daemon /opt/app/confluent-5.5.1/etc/schema-registry/schema-registry.properties'
    
    # 4. 启动 kafka rest proxy
    xcall.sh '/opt/app/confluent-5.5.1/bin/kafka-rest-start -daemon /opt/app/confluent-5.5.1/etc/kafka-rest/kafka-rest.properties'
    
    # 5. 启动 confluent connect
    xcall.sh '/opt/app/confluent-5.5.1/bin/connect-distributed -daemon /opt/app/confluent-5.5.1/etc/schema-registry/connect-avro-distributed.properties'
    
    # 6. 启动 ksql
    xcall.sh '/opt/app/confluent-5.5.1/bin/ksql-server-start -daemon /opt/app/confluent-5.5.1/etc/ksqldb/ksql-server.properties'
    
    # 7. 启动 control center
    xcall.sh '/opt/app/confluent-5.5.1/bin/control-center-start -daemon /opt/app/confluent-5.5.1/etc/confluent-control-center/control-center-dev.properties'

    11. 启动成功的话,jps进程应该如下图

    全部启动成功后,在浏览器中输入任意control center节点的ip:9021 即可进入管理界面:

    尾巴

    Confluent 推出的kafka connect, schema registry, ksql, rest proxy 让kafka变得更加好用和强大(目前apache kafka 也有kafka connect,但confluent提供了更多了connector),control center提供了web界面对集群进行管理和使用(试用期30天),极大的降低了开发运维成本。

     

    希望本文对你有帮助,请点个赞鼓励一下作者吧~ 谢谢!

     

    展开全文
  • C#版Confluent.Kafka源代码,实现了单元测试功能,方便加载到项目中.
  • Confluent介绍及其使用

    千次阅读 2020-06-29 20:41:59
    1 confluent介绍 Confluent是用来管理和组织不同数据源的流媒体平台,可以实时地把不同源和位置的数据集成到一个中心的事件流平台。并且很可靠、性能很高。 Confluent目前提供了社区版(免费)和商业版()收费两个...

    1 confluent介绍

    Confluent是用来管理和组织不同数据源的流媒体平台,可以实时地把不同源和位置的数据集成到一个中心的事件流平台。并且很可靠、性能很高。

    Confluent目前提供了社区版(免费)和商业版(收费)两个版本,社区版提供了Connectors、REST Proxy、KSQL、Schema-Registry等基础服务。商业版为企业提供了控制面板、负载均衡,跨中心数据备份、安全防护等高级特性。

    1.2 服务功能介绍

    1.2.1 Zookeeper

    Zookeeper是一个开放源码的分布式应用程序协调服务,主要功能包扩:维护配置信息、命名、提供分布式同步、组管理等集中式服务 。Kafka使用ZooKeeper对集群元数据进行持久化存储,如果ZooKeeper丢失了Kafka数据,集群的副本映射关系以及topic等配置信息都会丢失,最终导致Kafka集群不再正常工作,造成数据丢失的后果。

    1.2.2 Kafka

    Kafka是一个分布式流处理平台,基于zookeeper协调并支持分区和多副本的分布式消息系统,是一种高吞吐量的分布式发布订阅消息系统,消息队列中间件,主要功能是负责消息传输,Confluent就是依赖Kafka来进行消息传输。Kafka最大的特性就是可以实时的处理大量数据以满足各种需求场景。

    1.2.3 Control Center

    control center可以很容易地管理kafka的连接,创建,编辑,和管理与其他系统的连接。我们可以从producer到consumer监控data streams,保证我们的每一条消息都被传递,还能测量出消息的传输耗时多久。使用confluent control center能让开发人员不写一句代码,也能构建基于kafka的数据生产管道。

    1.2.4 Kafka-rest

    Kafka-rest是Kafka RESTful接口服务组件,可以通过Restful接口而不是本机Kafka协议或客户端的情况下,生成和使用消息,而且还可以查看集群状态以及执行管理操作。

    1.2.5 Schema-Registry

    Schema-Registry是为元数据管理提供的服务,同样提供了RESTful接口用来存储和获取schemas,它能够保存数据格式变化的所有版本,并可以做到向下兼容。Schema-Registry还为Kafka提供了Avro格式的序列化插件来传输消息。Confluent主要用Schema-Registry来对数据schema进行管理和序列化操作。

    1.2.6 Connect

    Kafka Connect是 Kafka的一个开源组件,是用来将Kafka与数据库、key-value存储系统、搜索系统、文件系统等外部系统连接起来的基础框架。通过使用Kafka Connect框架以及现有的连接器可以实现从源数据读入消息到Kafka,再从Kafka读出消息到目的地的功能。

    1.2.7 ksql-server

    KSQL是使用SQL语句对Apache Kafka执行流处理任务的流式SQL引擎,Confluent 使用KSQL对Kafka的数据提供查询服务.

    2 confluent下载

    使用的开源的confluent的5.2.4版本

    下载链接:http://packages.confluent.io/archive/5.2/confluent-5.2.4-2.11.tar.gz

    3 环境准备

    分布式搭建建议至少3个节点,但是由于用于测试及节点紧张这里使用2个节点

    节点zookeeperkafkacontrol-centerkafka-resetschema-registryconnectorksql-server
    10.0.165.8
    10.0.165.9
    2181909290218082808180838088

    4 安装

    4.1 解压

    将下载的文件上传至linux,然后解压至相应的目录下

    tar -zxvf /opt/package/confluent-5.2.4-2.11.tar.gz -C /home/kafka/.local/
    

    修改文件名并进入到相应的目录下

    mv /home/kafka/.local/confluent-5.2.4 /home/kafka/.local/confluent
    cd /home/kafka/.local/confluent
    

    4.2 修改配置

    修改10.0.165.8节点的相应配置

    4.2.1 zookeeper配置

    (1)vim /home/kafka/.local/confluent/etc/kafka/zookeeper.properties

    ##数据存放目录,默认为/tmp/zookeepe存在删除风险
    dataDir=/data/confluent/zookeeper
    clientPort=2181
    maxClientCnxns=0
    initLimit=5
    syncLimit=2
    
     
    ##多个zookeeper server,server的编号1、2等要与myid中的一致
    server.1=10.0.165.8:2888:3888
    server.2=10.0.165.9:2888:3888
    

    (2)生成myid

    echo 1 > /home/kafka/.local/confluent/etc/kafka/myid

    (3)修改confluent服务启动脚本,将myid发布到confluent运行目录下。

    bin/confluent start会启动confluent的各服务,且会将etc下的各配置,复制到confluent运行目录下。

    vim /home/kafka/.local/confluent/bin/confluent

    在config_zookeeper()方法块最后一行,添加

    cp ${confluent_conf}/kafka/myid $confluent_current/zookeeper/data/

    目的是将etc/kafka/myid拷贝到confluent运行目录下,否则会报myid is no found,zookeeper启动失败。

    4.2.2 Kafka配置

    vim /home/kafka/.local/confluent/etc/kafka/server.properties

    broker.id=0
    
    #listeners与advertised.listeners可以只配一个,与当前机器网卡有关系,请注意。advertised.listeners可能通用性更强,值为当前机器的ip与端口,其他机器ip无需配置
    advertised.listeners=PLAINTEXT://10.0.165.8:9092
     
    ##根据实际情况调整
    num.network.threads=8
    num.io.threads=8
    socket.send.buffer.bytes=1048576
    socket.receive.buffer.bytes=1048576
    socket.request.max.bytes=104857600
    fetch.purgatory.purge.interval.requests=100
    producer.purgatory.purge.interval.requests=100
    
    #log.dirs是最重要的配置,kafka数据所在
    log.dirs=/data/confluent/kafka-logs
    num.partitions=12
    
    num.recovery.threads.per.data.dir=1
    
    message.max.bytes=10000000
    replica.fetch.max.bytes= 10485760
    auto.create.topics.enable=true
    auto.leader.rebalance.enable = true
    
    ##备份因子数<=kafka节点数,若大于会报错
    default.replication.factor=2
    offsets.topic.replication.factor=2
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    
    log.flush.interval.messages=20000
    log.flush.interval.ms=10000
    log.flush.scheduler.interval.ms=2000
    log.retention.check.interval.ms=300000
    log.cleaner.enable=true
    
    ##log失效时间,单位小时
    log.retention.hours=48
    zookeeper.connect=10.0.165.8:2181,10.0.165.9:2181
    zookeeper.connection.timeout.ms=6000
    zookeeper.sync.time.ms=2000
    
    confluent.metrics.reporter.bootstrap.servers=10.0.165.8:9092,10.0.165.9:9092
    confluent.metrics.reporter.topic.replicas=2
    
    confluent.support.metrics.enable=true
    confluent.support.customer.id=anonymous
    
    delete.topic.enable=true
    group.initial.rebalance.delay.ms=0
    

    4.2.3 kafka-rest

    vim /home/kafka/.local/confluent/etc/kafka-rest/kafka-rest.properties

    id=kafka-rest-server-001
    schema.registry.url=http://10.0.165.8:8081
    zookeeper.connect=10.0.165.8:2181,10.0.165.9:2181
    bootstrap.servers=PLAINTEXT://10.0.165.8:9092
    port=8082
    consumer.threads=8
    
    access.control.allow.methods=GET,POST,PUT,DELETE,OPTIONS
    access.control.allow.origin=*
    

    4.2.4 ksql

    confluent-4没有这个

    vim /home/kafka/.local/confluent/etc/ksql/ksql-server.properties

    ksql.service.id=default_
    bootstrap.servers=10.0.165.8:9092,10.0.165.9:9092
    listeners=http://0.0.0.0:8088
    ksql.schema.registry.url=http://10.0.165.8:8081,http://10.0.165.9:8081
    ksql.sink.partitions=4
    
    

    4.2.5 confluent-control-center

    vim /home/kafka/.local/confluent/etc/confluent-control-center/control-center-dev.properties

    bootstrap.servers=10.0.165.8:9092,10.0.165.9:9092
    zookeeper.connect=10.0.165.8:2181,10.0.165.9:2181
    confluent.controlcenter.rest.listeners=http://0.0.0.0:9021
    
     
    
    #每个id要唯一,不然只能启动一个
    confluent.controlcenter.id=1
    confluent.controlcenter.data.dir=/data/confluent/control-center
    confluent.controlcenter.connect.cluster=http://10.0.165.8:8083,http://10.0.165.9:8083
    
    ##每台都配置各自的ip
    confluent.controlcenter.ksql.url=http://10.0.165.8:8088
    confluent.controlcenter.schema.registry.url=http:/10.0.165.8:8081,http://10.0.165.9:8081
    
    confluent.controlcenter.internal.topics.replication=2
    confluent.controlcenter.internal.topics.partitions=2
    confluent.controlcenter.command.topic.replication=2
    confluent.monitoring.interceptor.topic.partitions=2
    confluent.monitoring.interceptor.topic.replication=2
    confluent.metrics.topic.replication=2
    
    confluent.controlcenter.streams.num.stream.threads=30
    
    

    4.2.6 schema-registry

    vim /home/kafka/.local/confluent/etc/schema-registry/schema-registry.properties

    listeners=http://0.0.0.0:8081
    kafkastore.bootstrap.servers=PLAINTEXT://10.0.165.8:9092,10.0.165.9:9092
    kafkastore.topic=_schemas
    debug=false
    
    

    4.2.7 connect

    vim /home/kafka/.local/confluent/etc/schema-registry/connect-avro-distributed.properties

    bootstrap.servers=10.0.165.8:9092,10.0.165.9:9092
    group.id=connect-cluster
    
    key.converter=org.apache.kafka.connect.storage.StringConverter 
    key.converter.schema.registry.url=http://localhost:8081
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    
     
    config.storage.topic=connect-configs
    offset.storage.topic=connect-offsets
    status.storage.topic=connect-statuses
    
    config.storage.replication.factor=2
    offset.storage.replication.factor=2
    status.storage.replication.factor=2
    
     
    
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    
    rest.port=8083
    rest.advertised.port=8083
    
    plugin.path=/home/kafka/.local/confluent/share/java
    

    4.2.8 将confluent发送到其他节点

    scp -r confluent/ kafka@10.0.165.9:/home/kafka/.local/

    然后修改其他节点的配置

    vi myid

    2
    

    vi /home/kafka/.local/confluent/etc/kafka/server.properties

    broker.id=1
    advertised.listeners=PLAINTEXT://10.0.165.9:9092
    

    vi /home/kafka/.local/confluent/etc/kafka-rest/kafka-rest.properties

    id=kafka-rest-server-002
    schema.registry.url=http://10.0.165.9:8081
    bootstrap.servers=PLAINTEXT://10.0.165.9:9092
    

    vi /home/kafka/.local/confluent/etc/confluent-control-center/control-center-dev.properties

    confluent.controlcenter.id=2
    confluent.controlcenter.ksql.url=http://10.0.165.9:8088
    

    然后在两个节点的/data目录下新建confluent并修改权限

    sudo mkdir /data/confluent
    sudo chown kafka:kafka /data/confluent
    

    4.3 服务启动与停止

    4.3.1 全部服务启动

    启动:bin/confluent start

    查看状态:bin/confluent status

    停止:bin/confluent stop

    4.3.2 单独启动服务

    服务单独启动

    启动kafka-rest

    bin/kafka-rest-start   etc/kafka-rest/kafka-rest.properties
    

    上面的这种方式是前台启动,也可以以后台方式启动。

    nohup bin/kafka-rest-start   etc/kafka-rest/kafka-rest.properties &
    

    启动zookeeper

    bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties 
    

    启动kafka broker

    bin/kafka-server-start -daemon  etc/kafka/server.properties
    

    启动schema registry

    bin/schema-registry-start -daemon  etc/schema-registry/schema-registry.properties
    

    5 安装过程常见报错

    5.1 KafkaServer启动失败

    [2020-06-27 04:28:15,713] FATAL [KafkaServer id=2] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
    kafka.common.KafkaException: Socket server failed to bind to 10.0.165.8:9092: Cannot assign requested address.
    	at kafka.network.Acceptor.openServerSocket(SocketServer.scala:331)
    	at kafka.network.Acceptor.<init>(SocketServer.scala:256)
    	at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:97)
    	at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:89)
    	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    	at kafka.network.SocketServer.startup(SocketServer.scala:89)
    	at kafka.server.KafkaServer.startup(KafkaServer.scala:229)
    	at io.confluent.support.metrics.SupportedServerStartable.startup(SupportedServerStartable.java:112)
    	at io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:58)
    Caused by: java.net.BindException: Cannot assign requested address
    	at sun.nio.ch.Net.bind0(Native Method)
    	at sun.nio.ch.Net.bind(Net.java:433)
    	at sun.nio.ch.Net.bind(Net.java:425)
    	at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
    	at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
    	at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)
    	at kafka.network.Acceptor.openServerSocket(SocketServer.scala:327)
    	... 9 more
    [2020-06-27 04:28:15,715] INFO [KafkaServer id=2] shutting down (kafka.server.KafkaServer)
    [2020-06-27 04:28:15,717] INFO [SocketServer brokerId=2] Shutting down (kafka.network.SocketServer)
    [2020-06-27 04:28:15,718] INFO [SocketServer brokerId=2] Shutdown completed (kafka.network.SocketServer)
    [2020-06-27 04:28:15,721] INFO Shutting down. (kafka.log.LogManager)
    [2020-06-27 04:28:15,760] INFO Shutdown complete. (kafka.log.LogManager)
    [2020-06-27 04:28:15,761] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
    [2020-06-27 04:28:15,762] INFO Session: 0x27297ff0225a5a9 closed (org.apache.zookeeper.ZooKeeper)
    [2020-06-27 04:28:15,764] INFO EventThread shut down for session: 0x27297ff0225a5a9 (org.apache.zookeeper.ClientCnxn)
    [2020-06-27 04:28:15,765] INFO [KafkaServer id=2] shut down completed (kafka.server.KafkaServer)
    [2020-06-27 04:28:15,766] INFO [KafkaServer id=2] shutting down (kafka.server.KafkaServer)
    

    自己copy了server.properties文件到各个节点没有修改下面的配置 监听器的配置,应该指向节点本身的主机名和端口,我全部四台机器都指向了10.0.165.8,所以导致了只有节点1是正常的

    advertised.listeners=PLAINTEXT://10.0.165.9:9092

    5.2 Confluent schema-registry启动失败

    [2020-06-27 16:09:39,872] WARN The replication factor of the schema topic _schemas is less than the desired one of 3. If this is a production environment, it's crucial to add more brokers and increase the replication factor of the topic. (io.confluent.kafka.schemaregistry.storage.KafkaStore:242)
    [2020-06-27 16:09:50,095] ERROR Server died unexpectedly:  (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:51)
    java.lang.IllegalArgumentException: Unable to subscribe to the Kafka topic _schemas backing this data store. Topic may not exist.
    	at io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread.<init>(KafkaStoreReaderThread.java:125)
    	at io.confluent.kafka.schemaregistry.storage.KafkaStore.init(KafkaStore.java:130)
    	at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:199)
    	at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:64)
    	at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:42)
    	at io.confluent.rest.Application.createServer(Application.java:157)
    	at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:43)
    

    因为kafkaserver没有启动

    6 常用操作

    (1)启动

    confluent start

    (2)查看日志文件目录

    confluent current

    (3)列出连接

    confluent list connectors

    (4)查看加载的连接器

    confluent status connectors

    [
    "file-source"
    ]
    

    (5)查看具体连接器状态

    confluent status file-source

    展开全文

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 4,072
精华内容 1,628
关键字:

confluent