精华内容
下载资源
问答
  • kafka本地查看数据

    2019-07-17 10:25:36
    kafka本地代码展示 package com.kafkaSimple; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer...

    kafka本地代码展示

    package com.kafkaSimple;
    
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.io.FileWriter;
    import java.util.Arrays;
    import java.util.Properties;
    
    /**
     * kafka简单消费者实例
     *
     * 消费者组:当多个消费者的group.id相同的时候那么他们就是属于同一个
     * 消费者组。同一个消费者组中消费主题中位于不同分区的消息。每一个分区
     * 只能分配给消费者组中的一个消费者。
     * 如果创建主题的时候只制定了一个分区那么,将只有一个消费者消费所有消息。
     * 如果主题有多个分区,那么消息将分布到不同的分区上,那么消费者组中的
     * 所有消费者分摊所有消息。
     *
     * 当本例单个使用的时候是单个消费者。
     * 如果本实例代码作为另外一个新类运行的话,那么根本类就是同属于test组的消费者。
     *
     * 消费者组通过指定group.id来确定。
     */
    public class SimpleConsumer {
        public static void main(String[] args) throws Exception{
    
            String topicName = "ecarx_log_analysis";
            Properties props = new Properties();
            // 制定要连接的代理
    //        props.put("bootstrap.servers","pro01.cdh.ecarx.local:9092,pro02.cdh.ecarx.local:9092,pro03.cdh.ecarx.local:9092,pro04.cdh.ecarx.local:9092,pro05.cdh.ecarx.local:9092 ");
    //        props.put("bootstrap.servers","10.160.25.137:9092,10.160.25.138:9092,10.160.25.139:9092");
            props.put("bootstrap.servers","10.161.31.65:9092,10.161.31.83:9092,10.161.31.234:9092,10.160.26.85:9092");
            // 将单个消费者分配给组
            props.put("group.id","test1111");
            // 如果值为true,则为偏移启动自动落实,否则不提交
            props.put("enable.auto.commit","true");
    //        props.put("auto.offset.reset","earliest");
            props.put("auto.offset.reset","latest");
            // 更新偏移量的频率
            props.put("auto.commit.interval.ms","1000");
            // 超时时间
            props.put("session.timeout.ms","30000");
            // 键值序列化
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            // 创建kafkaconsumer实例
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    
            // 指定主题
            consumer.subscribe(Arrays.asList(topicName));
    
            System.out.println("Subscribed to topic" + topicName);
    
            while(true){
                // 拉取主题中的消息
                ConsumerRecords<String,String> records = consumer.poll(1000);
                for(ConsumerRecord<String,String> record : records){
                    System.out.printf("offset=%d,key=%s,value=%s\n",record.offset(),record.key(),record.value());
    
    
                }
    
    
    
    
    
    
            }
    
    
    
        }
    }
    
    展开全文
  • 本博文介绍如何一步步搭建起Kafka本地环境。 下载Kafka 0.9.0.0并配置软链接  下载好后,放入电脑本地安装目录,mac下我放在/usr/local下,解压Kafka。 tar -xzf kafka_2.11-0.9.0.0.tgz  然后建立一个...

    本博文介绍如何一步步搭建起Kafka本地环境。

    下载Kafka 0.9.0.0 并配置软链接

      下载好后,放入电脑本地安装目录,mac下我放在/usr/local下,解压Kafka。

    tar -xzf kafka_2.11-0.9.0.0.tgz

      然后建立一个当前版本的kafka的软链接,这样在配置的时候系统环境配置的时候不需要写版本号,也有利于以后的版本更新。

    ln -s kafka_2.11-0.9.0.0 kafka

      接着cd kafka进入主目录。

     

    启动Kafka Server

      在启动前,需要安装并配置Zookeeper,不过在kafka中带有一个单节点示例的Zookeeper实例脚本包,所以我们也可以先不安装配置Zookeeper,在启动Kafka Server前,我们先启动Zookeeper:

    bin/zookeeper-server-start.sh config/zookeeper.properties

      在Zookeeper成功启动以后,就可以启动Kafka Server了:

    bin/kafka-server-start.sh config/server.properties

     

    创建topic话题

      通过下面的脚本,我们创建了一个名为test的topic话题,该话题分区数为1,冗余份数为1。然后通过list展示当前话题,会列出test。  

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    bin/kafka-topics.sh --list --zookeeper localhost:2181

     

    启动生产者Producer和消费者Consumer

      通过以下脚本启动一个Producer,然后可以输入生产的消息:

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    
    this is a test of producer

       通过下面的命令启动消费者Consumer来消费,然后会打印出上述输入的消息:

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
    this is a test of producer

      如果上述两个命令,在不同的终端启动,这时候你在生产者终端输入消息并回车,消费者端可以看到。

     

    启动多broker的集群

      由于Kafka是分布式的消息系统,目前我们只是启动了一个Kafka Server,并看不到分布式的效果,下面在本地启动多个Kafka Server,首先我们需要将server的配置文件复制两份,因为每个Kafka服务启动都是依赖配置文件,在配置文件中设置了诸如服务启动端口等信息。

    cp config/server.properties config/server-1.properties
    cp config/server.properties config/server-2.properties

      修改上述复制出来的配置文件,由于默认服务启动在9092端口,所以后续两个server启动在9093和9094;然后默认的server的broker id为0,由于id是borker的唯一标识,所以后续的id依次为1和2.默认的几个属性如下:

    broker.id=0
    
    ############################# Socket Server Settings #############################
    
    listeners=PLAINTEXT://:9092
    
    # The port the socket server listens on
    #port=9092
    
    # A comma seperated list of directories under which to store log files
    log.dirs=/tmp/kafka-logs

      server1和server2的关于这几个属性的配置修改如下:

    broker.id=1
    listeners=PLAINTEXT://:9093
    port=9093
    log.dirs=/tmp/kafka-logs-1
    
    broker.id=2
    listeners=PLAINTEXT://:9094
    port=9094
    log.dirs=/tmp/kafka-logs-2

      然后依次启动两个broker的服务:

    bin/kafka-server-start.sh config/server-1.properties 
    bin/kafka-server-start.sh config/server-2.properties

      最好在多个终端启动,分别看看效果。

      接着我们启动一个冗余数为3的topic并describe这个topic,效果显示如下:

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replication-topic
    Created topic "my-replication-topic".
    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replication-topic
    Topic:my-replication-topic    PartitionCount:1    ReplicationFactor:3    Configs:
    Topic: my-replication-topic    Partition: 0    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1

      显示的内容一共是两行,第一行是对所有partition的总结,后续各行是每个partition的信息,由于我们这里只有一个partition,所以后续只有1行(也就是第二行)

    • Leader:表示负责该partition所有读写的节点
    • Replicas:所有拥有该partition 日志的备份数据的节点,不管这些节点是否是Leader或者活着。
    • Isr:是所有同步Replicas集合。 这是Replicas列表的子集合,在这个集合中的都是活着并且赶上Leader进度的。

    多broker的容错性测试

      上面的my-replication-topic的Leader是0号服务,那我们尝试找到0号broker并将该进程杀死,脚本如下:

    Lilis-MacBook-Pro:kafka_2.10-0.9.0.0 lili$ ps | grep server.properties
     2366 ttys001    1:28.51 /Library/Java/JavaVirtualMachines。。。
     2686 ttys006    0:00.00 grep server.properties
    Lilis-MacBook-Pro:kafka_2.10-0.9.0.0 lili$ kill -9 2366

      再describe my-replication-topic的时候,发现leader已经变了,此时变成了2号broker,而且同步Replicas集合中也少了0号broker。

    Lilis-MacBook-Pro:kafka_2.10-0.9.0.0 lili$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replication-topic
    Topic:my-replication-topic    PartitionCount:1    ReplicationFactor:3    Configs:
        Topic: my-replication-topic    Partition: 0    Leader: 2    Replicas: 0,2,1    Isr: 2,1
    Lilis-MacBook-Pro:kafka_2.10-0.9.0.0 lili$

      当然,再次启动消费者来消费消息,消息依然还是存在的。

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replication-topic

     

    转载于:https://www.cnblogs.com/gslyyq/p/5060037.html

    展开全文
  • kafka本地部署

    2020-04-18 23:07:44
    kafka安装 kafka安装地址 版本对应关系 kafka zookeeper kafka_2.12-2.4.0 apache-zookeeper-3.6.0-bin zookeeper和kafka的版本不对应好像是会出问题 启动 使用zookeeper集群 首先启动zookeeper 进入到...

    安装

    1. zookeeper安装
      zookeeper安装地址
    2. kafka安装
      kafka安装地址
    3. 版本对应关系
    kafka zookeeper
    kafka_2.12-2.4.0 apache-zookeeper-3.6.0-bin

    zookeeper和kafka的版本不对应好像是会出问题

    启动

    使用zookeeper集群
    • 首先启动zookeeper
      进入到zookeeper的bin目录下,然后执行
    ./zkServer.sh start
    

    可以使用 ./zkServer.sh status 或者 ./zkCli.sh 查看zookeeper是否启动成功

    ./zkServer.sh status // 查看zookeeper启动状态
    ./zkCli.sh			// 进入zookeeper客户端
    
    • 启动kafka
      进入到kafka的安装目录下,执行下面命令
    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
    
    
    • 关闭kafka
    bin/zookeeper-server-stop.sh config/zookeeper.properties
    
    
    • 创建Topic
    bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic firstTopic
    
    

    replication-facto 和 partitions的参数不可以超过kafka集群的数量,我这里值启动了一个实例所以数量都是1

    • 查看Topic
    bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181
    
    • 查看指定topic详情
    bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic firstTopic
    
    • 删除topic
    bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --delete --topic firstTopic
    
    • 打开消息发送控制台
    bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic firstTopic
    
    • 打开消息接受控制台
    bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic firstTopic --from-beginning
    
    展开全文
  • kafka本地调试

    2018-05-01 23:01:44
    主要本地调试命令 nohup bin/kafka-server-start.sh config/server.properties & bin/kafka-server-stop.sh bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --...

    主要本地调试命令


    nohup bin/kafka-server-start.sh config/server.properties &

    bin/kafka-server-stop.sh

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testSun

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testSun

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testSun



    参考:

    基于Kafka的生产者消费者消息处理本地调试

    心得总结:
    1. produce启动的时候參数使用的是kafka的port

    consumer启动的时候使用的是zookeeper的port;
    2.必须先创建topic才干使用。
    3.topic本质是以文件的形式储存在zookeeper上的。




    展开全文
  • kafka 本地安装

    2018-05-22 16:07:20
    windows 版mac 版kafka mac 安装教程 :http://blog.csdn.net/u010046908/article/details/62229015最好下载和公司一个版本的 kafka ,避免公司api冲突。 公司版本 kafka 安装包 :kafka_2.10-0.8.2.2.tgzkafka 监控...
  • kafka本地安装方法

    2020-10-27 13:56:23
    一、本地安装Kafka 1、 下载 kafka_2.13-2.5.1.tgz 文件 (带src是源码 如:kafka-2.5.1-src.tgz ,不能安装) 2、解压文件,进入目录,创建 logs文件夹。 进入config目录,打开 server.properties 文件: 注释 #...
  • Kafka简介Kafka是一个分布式流平台,可以如同消息队列或者企业级消息系统一样发布和订阅流记录,能够以容错持久化的方式存储流记录,并且实时处理流记录。目前来说主要用在两个方面:当作消息队列,在不同应用和系统...
  • Kafka本地单机部署

    2020-08-20 19:05:51
    Kafka必须依赖ZooKeeper,所以应该确保ZooKeeper已成功运行。Kafka内置了ZooKeeper,故如果未安装ZooKeeper可以使用其内置ZooKeeper。但我们一般使用外部zookeeper服务。 1.下载Kafka(本人是kafka_2.11-2.0.0.tgz...
  • kafka本地环境搭建与本地java代码测试 1、下载kafka https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.1/kafka_2.9.2-0.8.2.1.tgz 2、解压 tar -zxf kafka_2.9.2-0.8.2.1.tgz 3.修改配置文件 ( ######...
  • kafka本地测试环境搭建

    千次阅读 2017-03-20 11:29:03
    Kafka(二):环境搭建&测试 需求 软件 步骤 本地zk启动 ...需求由于共有云的kafka集群只对测试机(阡陌机器等)开放,本地是无法访问的,所以为了开发方便搭建一套kafka的测试环境是有必要的软件 kafka_2.11-0.
  • Kafka本地集群搭建

    千次阅读 2018-04-17 16:07:24
    工作需要,本地搭建了kafka的集群做测试,并简单使用。 本地环境: - VMware Workstation 14 Pro - CentOS 7 - jdk-8u161-linux-x64.tar.gz - zookeeper-3.4.10.tar.gz - kafka_2.11-1.0.1.tgz ...
  • 这里的配置主要就是adserver.lisennerts这个配置,plan协议啥的一定要写成公网ip地址就可以了
  • Kafka简介Kafka是一个分布式流平台,可以如同消息队列或者企业级消息系统一样发布和订阅流记录,能够以容错持久化的方式存储流记录,并且实时处理流记录。目前来说主要用在两个方面:当作消息队列,在不同应用和系统...
  • Kafka本地安装笔记

    2019-08-03 16:02:49
    本地手工玩一下kakfa,这里我用的MacOS,因此用brew安装。这里整理了下自己的安装笔记。首先java环境这里就不说了。 一、安装zookeeper brew的安装这里就不说了,直接安装zookeeper。运行以下命令即可。这里可能...
  • kafka本地单机安装部署

    千次阅读 2017-06-16 11:39:22
    kafka是一种高吞吐量的分布式发布订阅消息系统,这几天要上kafka,只在其中的一个节点使用,结合具体的项目实践在此将kafka本地安装部署流程记录下来与各位同仁分享交流。 准备工作: 上述的文件除了jdk以外...
  • kafka本地环境测试

    千次阅读 2019-04-26 09:41:19
    set COMMAND=%JAVA%%KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS%%KAFKA_LOG4J_OPTS% -cp%CLASSPATH% %KAFKA_OPTS% %*    #修改后 set COMMAND=%JAVA%%KAFKA_HEAP_OPTS% %KAFKA_JVM_...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,836
精华内容 734
关键字:

kafka本地