精华内容
下载资源
问答
  • Java实现Kafka生产者和消费者的示例

    万次阅读 多人点赞 2021-01-05 10:06:08
    Java实现Kafka生产者和消费者的示例

    文章持续更新,微信搜索「万猫学社」第一时间阅读。
    关注后回复「电子书」,免费获取12本Java必读技术书籍。

    Kafka简介

    Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。

    方式一:kafka-clients

    引入依赖

    在pom.xml文件中,引入kafka-clients依赖:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.3.1</version>
    </dependency>
    

    生产者

    创建一个KafkaProducer的生产者实例:

    @Configuration
    public class Config {
    
        public final static String bootstrapServers = "127.0.0.1:9092";
    
        @Bean(destroyMethod = "close")
        public KafkaProducer<String, String> kafkaProducer() {
            Properties props = new Properties();
            //设置Kafka服务器地址
            props.put("bootstrap.servers", bootstrapServers);
            //设置数据key的序列化处理类
            props.put("key.serializer", StringSerializer.class.getName());
            //设置数据value的序列化处理类
            props.put("value.serializer", StringSerializer.class.getName());
            KafkaProducer<String, String> producer = new KafkaProducer<>(props);
            return producer;
        }
    }
    

    在Controller中进行使用:

    @RestController
    @Slf4j
    public class Controller {
    
        @Autowired
        private KafkaProducer<String, String> kafkaProducer;
    
        @RequestMapping("/kafkaClientsSend")
        public String send() {
            String uuid = UUID.randomUUID().toString();
            RecordMetadata recordMetadata = null;
            try {
            	//将消息发送到Kafka服务器的名称为“one-more-topic”的Topic中
                recordMetadata = kafkaProducer.send(new ProducerRecord<>("one-more-topic", uuid)).get();
                log.info("recordMetadata: {}", recordMetadata);
                log.info("uuid: {}", uuid);
            } catch (Exception e) {
                log.error("send fail, uuid: {}", uuid, e);
            }
            return uuid;
        }
    }
    

    消费者

    创建一个KafkaConsumer的消费者实例:

    @Configuration
    public class Config {
    
        public final static String groupId = "kafka-clients-group";
        public final static String bootstrapServers = "127.0.0.1:9092";
    
        @Bean(destroyMethod = "close")
        public KafkaConsumer<String, String> kafkaConsumer() {
            Properties props = new Properties();
            //设置Kafka服务器地址
            props.put("bootstrap.servers", bootstrapServers);
            //设置消费组
            props.put("group.id", groupId);
            //设置数据key的反序列化处理类
            props.put("key.deserializer", StringDeserializer.class.getName());
            //设置数据value的反序列化处理类
            props.put("value.deserializer", StringDeserializer.class.getName());
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
            //订阅名称为“one-more-topic”的Topic的消息
            kafkaConsumer.subscribe(Arrays.asList("one-more-topic"));
            return kafkaConsumer;
        }
    }
    

    在Controller中进行使用:

    @RestController
    @Slf4j
    public class Controller {
    
        @Autowired
        private KafkaConsumer<String, String> kafkaConsumer;
    
        @RequestMapping("/receive")
        public List<String> receive() {
        	从Kafka服务器中的名称为“one-more-topic”的Topic中消费消息
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
            List<String> messages = new ArrayList<>(records.count());
            for (ConsumerRecord<String, String> record : records.records("one-more-topic")) {
                String message = record.value();
                log.info("message: {}", message);
                messages.add(message);
            }
            return messages;
        }
    }
    

    方式二:spring-kafka

    使用kafka-clients需要我们自己创建生产者或者消费者的bean,如果我们的项目基于SpringBoot构建,那么使用spring-kafka就方便多了。

    引入依赖

    在pom.xml文件中,引入spring-kafka依赖:

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.3.12.RELEASE</version>
    </dependency>
    

    生产者

    在application.yml文件中增加配置:

    spring:
      kafka:
      	#Kafka服务器地址
        bootstrap-servers: 127.0.0.1:9092
        producer:
          #设置数据value的序列化处理类
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
    

    在Controller中注入KafkaTemplate就可以直接使用了,代码如下:

    @RestController
    @Slf4j
    public class Controller {
    
        @Autowired
        private KafkaTemplate<String, String> template;
    
        @RequestMapping("/springKafkaSend")
        public String send() {
            String uuid = UUID.randomUUID().toString();
            //将消息发送到Kafka服务器的名称为“one-more-topic”的Topic中
            this.template.send("one-more-topic", uuid);
            log.info("uuid: {}", uuid);
            return uuid;
        }
    }
    

    消费者

    在application.yml文件中增加配置:

    spring:
      kafka:
        #Kafka服务器地址
        bootstrap-servers: 127.0.0.1:9092
        consumer:
          #设置数据value的反序列化处理类
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    

    创建一个可以被Spring框架扫描到的类,并且在方法上加上@KafkaListener注解,就可以消费消息了,代码如下:

    @Component
    @Slf4j
    public class Receiver {
    
        @KafkaListener(topics = "one-more-topic", groupId = "spring-kafka-group")
        public void listen(ConsumerRecord<?, ?> record) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            if (kafkaMessage.isPresent()) {
                String message = (String) kafkaMessage.get();
                log.info("message: {}", message);
            }
        }
    }
    

    文章持续更新,微信搜索「万猫学社」第一时间阅读。
    关注后回复「电子书」,免费获取12本Java必读技术书籍。

    展开全文
  • 深入分析Kafka生产者和消费者

    千次阅读 2020-10-29 22:43:00
    深入Kafka生产者和消费者Kafka生产者消息发送的流程发送方式生产者属性配置序列化器分区器Kafka消费者消费者群组消费者属性配置消费者基础概念消费者核心概念 Kafka生产者 消息发送的流程 生产者每发送一条消息需要...

    Kafka生产者

    消息发送的流程

    在这里插入图片描述
    生产者每发送一条消息都需要先创建一个ProducerRecord对象,并且需要指定目标主题、消息内容,当然还可以指定消息键和分区。之后就会调用send()方法发送该对象,由于生产者需要与Kafka Broker建立网络传输,必然需要先通过序列化器对消息的键和值对象序列化成字节数组,才能进行传输。
    之后,分区器就会接收到数据,然后先确认ProducerRecord对象中是否指定了分区,如果指定分区那么就直接把指定的分区返回,如果未指定分区,分区器就会根据消息键进行选择分区。确认分区后,那么消息才能确认发送到哪个主题的哪个分区上。接下来,消息又会被添加到批次里,同一个批次的消息总是发送到同一个主题和分区上,最后生产者端会有一个独立线程负责将批次发送到相应的Broker上。
    Broker接收到消息后会进行响应,消息写入成功,会返回生产者端一个RecordMetaData对象,这个对象记录了消息在哪个主题的分区上,同时还记录了消息在分区中的偏移量。消息如果写入失败,则会返回一个错误,而生产者会根据配置的重试次数进行重试,当超过重试次数还是失败,就会将错误信息返回给生产者端。

    发送方式

    发送并忘记

    producer.send(record);//忽略返回值
    

    同步发送

    //接收返回值
    Future<RecordMetadata> future = producer.send(record);
    //调用get方法进行阻塞,获取结果
    RecordMetadata recordMetadata = future.get();
    

    异步发送

    //发送时,指定Callback回调
    producer.send(record, new Callback() {
       public void onCompletion(RecordMetadata metadata,Exception exception) {
             if(null!=exception){
               //异常处理
             }
             if(null!=metadata){
                  System.out.println("message offset:"+metadata.offset()+" "
                                    +"message partition:"+metadata.partition());
             }
       }
    );
    

    生产者属性配置

    创建KafkaProducer时都需要为其指定属性,属性的配置可以参考org.apache.kafka.clients.producer 包下的 ProducerConfig 类,大部分属性都配置了合理的默认值,如果对内存使用、性能和可靠性方面有要求可以相应调整一些属性,下面介绍一些常用的配置属性:

    1. acks: 确认机制,控制生产者发送消息时,必须有指定多少个分区副本接收到信息后,生产者才能认为消息写入成功,可选配置如下:
      acks=0:生产者在成功写入消息之前是不会等待任何的来自服务器的响应。如果在此期间出现了异常,造成Broker没能收到消息,而此时生产者又得不到反馈,消息也就丢失了。但是因为生产者不需要去等待服务器的响应,吞吐量相对更高;
      acks=1:只要集群中分区的首领节点接收到消息,生产者就会收到来自服务器的成功响应。如果消息无法到达首领节点,生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。缺省使用这个配置;
      acks=all:只有当集群中所有的分区副本都接收到消息后,生产者才会受到一个来自服务器的成功响应。
    2. batch.size: 对于发往同一个分区的消息,生产者发送过程中会先将消息记录在一个批次中。该参数的作用就是控制一个批次占用的内存大小。一个批次内存被填满后,会一次性将批次里的所有消息发送给Broker,但生产者并一定会等到批次被填满后才进行发送,即使是一条消息也有可能被发送。该参数的大小是按照字节数计算的,缺省为16384(16k),批次内存满了新的消息就写不进去了。
    3. linger.ms: 该参数用于配合batch.size使用,作用是控制生产者在发送批次前等待更多消息加入批次的时间。当然如果batch.size指定的批次内存已经填满,就不会进行等待而是直接发送,反之发送的消息字节数远比batch.size小,那么就能够在linger.ms指定的时间内获得更多的消息,从而减少请求次数,提升消息的吞吐量。
    4. max.request.size: 该参数用来控制生产者发送请求最大大小,缺省为1M。如果请求只有一条消息,则约束消息大小不能超过1M,如果请求是一个批次,则约束批次中所有消息的总大小不能超过1M。需要注意这个参数与Kafka的server.properties配置文件中指定的message.max.bytes参数有关,如果生产者发送的消息超过 message.max.bytes 设置的大小,就会被 Kafka Broker拒绝。
    5. buffer.memory: 控制生产者内存缓冲区的大小。
    6. retries: 控制生产者在消息发送失败后,可以进行重试的次数,默认情况下每次重试过程都会等待100ms再进行重试,重试等待时间可以通过retry.backoff.ms 参数来调整。
    7. request.timeout.ms: 控制生产者发送消息后等待请求响应的最大时间,超过这个时间没有收到响应,那么生产者端就会重试,超过重试次数将会抛出异常,缺省为30s。
    8. max.in.flight.requests.per.connection: 控制生产者在接收到服务器响应之前可以发送多少个消息,如果需要保证消息在一个分区上的严格顺序,这个值应该设为 1,不过这样会严重影响生产者的吞吐量。
    9. compression.type: 该参数控制生产者进行压缩数据的压缩类型,可选值包括:[none,gzip,snappy],缺省是none。压缩数据适用于消息批次处理,处理的批次消息越多,压缩性能就越好。snappy 占用 cpu 少,提供较好的性能和可观的压缩比,更关注性能和网络带宽建议使用这个。而如果网络带宽紧张,可以用gzip,虽然会占用较多的 cpu,但提供更高的压缩比。

    序列化器

    创建KafkaProducer对象时,必须指定键和值的序列化器,一些业务场景可能需要自定义序列化器,那么只需要实现org.apache.kafka.common.serialization.Serializer 接口,重写serialize()方法定义序列化逻辑即可。但自定义序列化器可能更多会去结合特定业务场景使用,所以容易导致程序的脆弱性,如果需求做了调整相应的序列化器实现也可能需要调整。因此使用序列化器更推荐使用自带格式描述以及语言无关的序列化框架,比如Kafka 官方推荐的 Apache Avro。
    Avro在文件的读写是依据schema而进行的,而schema是通过一个JSON文件进行描述数据的,可以把这个schema 内嵌在数据文件中。这样,不管数据格式如何变动,消费者都知道如何处理数据。但是内嵌的消息,自带格式,会导致消息的大小不必要的增大,消耗了资源。我们可以使用 schema 注册表机制,将所有写入的数据用到的 schema 保存在注册表中,然后在消息中引用 schema 的标识符,而读取的数据的消费者程序使用这个标识符从注册表中拉取 schema 来反序列化记录。

    分区器

    生产者在发送消息时需要创建ProducerRecord对象,ProducerRecord对象可以指定一个消息键。指定了消息键,那么分区器就会将拥有相同键的消息指定给同一个主题的同一个分区。如果没有指定消息键,那么会通过默认分区器,使用轮询算法将消息均衡发布到主题下的各个分区。默认分区器会对消息键进行散列,然后根据散列值将消息映射到特定的分区上,这样同一个消息键总是能够被映射到同一个分区,但是只有不改变主题分区数量的情况下,键和分区之间的映射才能保持不变,一旦增加了新的分区,就无法保证了,所以如果要使用键来映射分区,那就要在创建主题的时候把分区规划好,不要增加新分区。

    自定义分区器

    一些业务场景中数据可能会有侧重,比如按地区进行划分数据时,不同地区的消息量是不同的,那么这种情况下就可以根据消息值中的一些标识,去针对消息值进行做分区,会更适合对应的业务场景。自定义一个分区器只需要去实现org.apache.kafka.clients.producer.Partitioner该接口,重写partition()方法完成相应的分区逻辑。

    Kafka消费者

    消费者属性配置

    消费者需要创建KafkaConsumer,创建该对象时也需要指定消费者相关属性,可以参考org.apache.kafka.clients.consumer 包下 ConsumerConfig 类,大部分属性都配置了合理的默认值,如果需要关注内存使用、性能和可靠性方面可以相应调整一些属性,下面介绍一些常用的配置属性:

    1. auto.offset.reset: 控制消费者读取一个没有偏移量的分区或者偏移量无效的情况下,消费者应该如何处理。可选值包括:[latest、earliest],缺省为latest,表示从最新的记录开始读取。而earliest则表示消费者从起始位置开始读取分区的记录。
    2. enable .auto.commit: 表示消费者是否自动提交偏移量,缺省为true。这个参数很关键,通常情况都需要设置为false,自行控制何时提交偏移量,这样可以尽量避免消息重复处理和消息丢失。
    3. partition.assignment.strategy: 指定分区分配给消费者的策略。可选值包括:[Range、RoundRobin],缺省为Range,表示当分区数量无法被消费者数整除时,会把主题下的连续分区分配给消费者,第一个消费者通常会分到更多的分区。而RoundRobin则表示会把主题下的分区轮询分配给消费者。
      在这里插入图片描述
    4. max.poll.records: 控制执行poll() 方法返回的记录数量。
    5. max.partition.fetch.bytes:指定了服务器从每个分区里返回给消费者的最大字节数,缺省为1MB。需要注意,这个参数要比服务器的 message.max.bytes 更大,否则消费者可能无法读取消息。

    消费者基础概念

    消费者群组

    在一些高并发的情况下,当Kafka生产者发送消息的速度远快于消费者消费速度时,如果只配置单个消费者,容易造成消息堆积,消息不能及时处理。这种情况下通常考虑的就是对消费者进行横向伸缩,通过增加消费者个数对同一个主题多个分区的消息进行分流。而Kafka中多个消费者通常会构成一个消费者群组,往群组中增加消费者是进行横向伸缩的主要方式。
    在这里插入图片描述
    在一个消费者群组中所有消费者都是订阅同一个主题,主题下一个分区只能由一个消费者消费,而一个消费者可以消费多个分区。
    在这里插入图片描述

    订阅主题

    //消费者订阅主题(可以多个),主题值允许使用正则表达式  
    consumer.subscribe(Collections.singletonList(BusiConst.HELLO_TOPIC));
    

    消费端创建KafkaConsumer对象后,会使用subscribe()方法进行订阅主题,而一个消费者是可以订阅多个主题的,该方法可以传递一个主题列表或者正则表达式作为参数。正则表达式也能够匹配多个主题,比如,想订阅所有order相关的主题,可以使用subscribe(“order.*”) 。
    需要注意: 在通过正则表达式订阅主题时,如果新建的一个主题正好与表达式匹配,那么会立即触发一次再均衡,消费者就可以读取新添加的主题了。

    轮询拉取

    //轮询获取消息
    while(true){
          //拉取
          ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
          for(ConsumerRecord<String, String> record:records){
               System.out.println(String.format("topic:%s,分区:%d,偏移量:%d," + "key:%s,value:%s",record.topic(),record.partition(),
                                record.offset(),record.key(),record.value()));
           }
    }
    

    Kafka消费端是通过拉取的方式获取消息,消费者为了不断获取消息,只能在循环中不断调用poll()方法进行拉取。其中poll()方法需要指定超时时间,它会让消费者在指定的毫秒数内一直等待 broker 返回数据。poll()方法会返回一个ConsumerRecords列表对象,而其中每一个ConsumerRecord对象都包含了消息所属的主题信息、所在分区信息、在分区里的偏移量,以及键值对。

    提交和偏移量

    消费者可以使用 Kafka来追踪消息在分区里的位置,称之为偏移量。消费者更新自己读取到哪个消息的操作,称之为提交。消费者提交偏移量本质上就是向一个_consumer_offset 的特殊主题发送一个消息,里面会包括每个分区的偏移量。

    提交偏移量带来的问题

    在这里插入图片描述

    如果提交的偏移量小于消费者实际处理的最后一个消息的偏移量,处于两个偏移量之间的消息会被重复处理。
    在这里插入图片描述

    如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。
    在这里插入图片描述

    自动提交

    auto.commit. offset缺省情况下为true,消费者会自动提交偏移量,自动提交存在一个时间间隔由auto.commit.interval.ms进行控制,缺省为5s。自动提交是在轮询拉取过程中触发的,消费者每次轮询时都会检查是否提交偏移量,如果是,则会将poll()方法返回的最新偏移量进行提交。
    注意:自动提交由于是基于时间间隔的提交,如果在未达到提交时间时触发了分区再均衡,就容易造成在此之前一部分已经处理的消息被其它消费者重复处理了。并且自动提交总是将poll()方法返回的最新偏移量进行提交,它并不知道哪些消息处理成功了,所以再次调用之前最好确保所有当前调用poll()方法返回的消息都处理完成,否则可能造成消息丢失。

    手动提交

    将auto.commit. offset设置为false,然后调用commitsync()方法提交偏移量。这个方法会提交调用poll()方法返回的最新偏移量,只要没有发生不可恢复的错误,该方法会一直阻塞,直到提交成功后返回,如果提交失败就会抛出异常。
    注意:手动提交由于也是提交poll()方法返回的最新偏移量,所以在处理完所有的消息后要确保调用了commitsync()方法,否则可能造成消息丢失。

    异步提交

    调用commitAsync()方法进行异步提交,相比与手动提交,它不会使应用程序阻塞,无需等待Broker响应。并且它支持回调,能够在Broker响应时执行相应回调方法。

    //异步提交偏移量
    consumer.commitAsync();
    //支持回调
    consumer.commitAsync(new OffsetCommitCallback() {
         public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
                                Exception exception) {
               if(exception!=null){
                    System.out.print("Commmit failed for offsets "+ offsets);
               }
         }
    });
    

    同步和异步提交

    一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。这个时候就需要使用同步异步组合提交。

    try {
          while(true){
              ConsumerRecords<String, String> records= consumer.poll(Duration.ofMillis(500));
              for(ConsumerRecord<String, String> record:records){
                   System.out.println(String.format("topic= %s,partition= %d,offset= %d,key= %s,value= %s",
                        record.topic(),record.partition(),record.offset(),
                        record.key(),record.value()));
                       
              }
              //每次轮询进行异步提交
              consumer.commitAsync();
          }
    }  finally {
           try {
             //同步提交下
             consumer.commitSync();
           } finally {
             consumer.close();
           }
    }
    

    特定提交

    支持在批次中间进行提交偏移量,在调用 commitsync()和 commitAsync()方法时传递希望提交的分区和偏移量构成的一个Map参数。

    Map<TopicPartition, OffsetAndMetadata> currOffsets= 
                       new HashMap<TopicPartition, OffsetAndMetadata>();
    int count = 0;
    try {
         while(true){
            ConsumerRecords<String, String> records= consumer.poll(Duration.ofMillis(500));
            for(ConsumerRecord<String, String> record:records){
                System.out.println(String.format("topic=%s, partition=%d, offset=%d, key=%s, value=%s\n",
                                record.topic(),record.partition(),record.offset(),
                                record.key(),record.value()));
                currOffsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()+1,"null")); 
                if(count%10==0){
                     //特定提交,指定一个记录希望提交的分区和偏移量的map
                     consumer.commitAsync(currOffsets,null);
                }
                 count++;
            }
    }
    } finally {
        try {
             //同步提交
             consumer.commitSync();
        } finally {
             consumer.close();
        }
    }
    

    消费者核心概念

    群组协调

    在这里插入图片描述
    消费者要加入群组时,会向群组协调器发送一个 JoinGroup 请求,第一个加入群主的消费者成为群主,群主会获得群组的成员列表,并负责给每一个消费者分配分区。分配完毕后,群主把分配情况发送给群组协调器,协调器再把这些信息发送给所有的消费者,每个消费者只能看到自己的分配信息, 只有群主知道群组里所有消费者的分配信息。群组协调的工作会在消费者发生变化,主题中分区发生了变化时发生。

    分区再均衡

    在Kafka中,消费者群组中存在着消费者对分区的所有权关系,这样在一个群组中如果新增一个消费者,那么新的消费者会分配到原先由其他消费者读取的分区,而减少一个消费者,那原本由它负责的分区就会分配给其它消费者。除此之外,如果增加了分区,新增的分区也需划分由哪个消费者读取,这一系列的行为,都会导致分区所有权的变化,这种变化就称为分区再均衡。
    在消费者群组中我们介绍了它有一个群组协调器,而群组协调器它会接收群组中每个消费者发来的心跳,然后维持每个消费者和群组的从属关系以及对分区所有权关系。如果长时间未收到消费者发送的心跳,群组协调器就会认为当前消费者已经死亡,就会触发一次再均衡。
    分区再均衡在Kafka中是非常重要的,这是消费者群组带来高可用性和伸缩性的关键所在。但是发生分区再均衡的期间,消费者会无法接收到消息,会造成整个群组一段时间的不可用,因此都需要尽量减少发生分区再均衡。
    在这里插入图片描述

    再均衡监听器

    消费者调用subscribe()订阅主题时,指定一个ConsumerRebalanceListener,在再均衡开始之前和分区再均衡完成之后做一些操作。

    //指定一个ConsumerRebalancelistener
    consumer.subscribe(Collections.singletonList("test1"), new ConsumerRebalanceListener() {
                
         //分区再均衡之前
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
             //1、将偏移量提交到Kafka
             //2、偏移量写入数据库
         }
                
         //分区再均衡完成以后
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
             //1、从数据库中获取偏移量
             //2、通过seek()方法从指定偏移量位置开始读取
         }
    });
    

    从特定偏移量处开始记录

    通常情况下消费者没有通过seek()方法指定读取位置时,调用poll()方法默认都会从分区的最新偏移量处开始读取消息。当然如果想从分区的起始位置开始读取消息,或者直接跳到分区的末尾开始读取消息,可以使 seekToBeginning(Collection tp)和 seekToEnd( Collectiontp)这两个方法。而调用seek()是可以从从特定的偏移量处开始读取消息的。

    //从指定分区中的指定偏移量开始消费
    consumer.seek(topicPartition,2);
    

    优雅退出

    如果确定要退出循环,需要通过另一个线程调用 consumer. wakeup()方法。如果循环运行在主线程里,可以在 ShutdownHook 里调用该方法。要记住, consumer. wakeup()是消费者唯一一个可以从其他线程里安全调用的方法。

    反序列化器

    创建KafkaConsumer对象时需要指定反序列化器,将从Kafka接收到的字节数组转换成 java对象,发送消息指定的序列化器必须与接收消息使用的反序列化器一一对应的。一些业务场景可能需要自定义反序列化器,那么只需要实现org.apache.kafka.common.serialization.Deserializer接口,重写deserialize()方法定义反序列化逻辑即可。

    独立消费者

    一个消费者从一个主题的所有分区或者某个特定的分区读取数据,不需要消费者群组和再均衡,只需要把主题或者分区分配给消费者,然后开始读取消息并提交偏移量。

    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    //独立消费者(不需要订阅主题,只需要分配主题中分区即可)
    KafkaConsumer<String,String>  consumer= new KafkaConsumer<String, String>(properties);
    //拿到主题的分区信息
    List<PartitionInfo> partitionInfos = consumer.partitionsFor("independ-consumer");
    List<TopicPartition> topicPartitionList = new ArrayList<TopicPartition>();
    if(null!=partitionInfos){
       for(PartitionInfo partitionInfo:partitionInfos){
           topicPartitionList.add(new TopicPartition(partitionInfo.topic(),
                            partitionInfo.partition()));
           }
    }
    //独立消费者需要执行哪些分区(这里全部的分区分配给一个消费者)
    consumer.assign(topicPartitionList);
    try {
    
        while(true){
           ConsumerRecords<String, String> records
                            = consumer.poll(Duration.ofMillis(1000));
           for(ConsumerRecord<String, String> record:records){
               System.out.println(String.format(
                                "主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",
                                record.topic(),record.partition(),record.offset(),
                                record.key(),record.value()));
           }
        }
    } finally {
         consumer.close();
    }
    
    展开全文
  • kafka生产者和消费者(java示例)

    千次阅读 2018-12-02 16:43:56
    本文主要讲解kafka生产者和消费者的API使用,以及部分配置说明 目录 一 引入依赖 二 生产者 2.1 代码 2.2 生产者配置说明 2.3结果-生产者消费者 3.1 代码 3.2 消费者配置说明 3.3 结果-消费者 ...

    kafka笔记1--集群搭建:https://blog.csdn.net/u010452388/article/details/84674454

    本文主要讲解kafka生产者和消费者的API使用,以及部分配置说明

    目录

    一 引入依赖

    二 生产者

    2.1 代码

    2.2 生产者配置说明

    2.3 结果-生产者

    三 消费者

    3.1 代码

    3.2 消费者配置说明

    3.3 结果-消费者


    一 引入依赖

    这里引入客户端依赖的时候尽量保持与服务端版本一致,不然会出现奇怪的错误,看下服务端版本

    从上面的图可以看出,服务端版本为1.1.0版本,前面的2.12是Scala版本,所以客户端引入下面的依赖

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>1.1.0</version>
            </dependency>

    下图为生产者和消费者的工程结构图

    二 生产者

    2.1 代码

    这里的生产者采用的是每隔1秒钟发送一条消息,总共发送19条

    public class KafkaProducerDemo {
    
        public static Properties kafkaProperties() {
    
            Properties properties = new Properties();
    
            /*设置集群kafka的ip地址和端口号*/
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                    "192.168.2.231:9092,192.168.2.232:9092");
            /*发送的消息需要leader确认*/
            properties.put(ProducerConfig.ACKS_CONFIG, "1");
            /*用户id*/
            properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerDemo");
            /*对key进行序列化*/
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.IntegerSerializer");
            /*对value进行序列化*/
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.StringSerializer");
    
            return properties;
        }
    
        public static void main(String[] args) throws InterruptedException {
            /*创建一个kafka生产者*/
            KafkaProducer<Integer, String> kafkaProducer =
                    new KafkaProducer<Integer, String>(kafkaProperties());
            /*主题*/
            String topic = "test";
            /*循环发送数据*/
            for (int i = 0; i < 20; i++) {
                /*发送的消息*/
                String message = "我是一条信息" + i;
                /*发出消息*/
                kafkaProducer.send(new ProducerRecord<>(topic, message));
                System.out.println(message + "->已发送");
                Thread.sleep(1000);
            }
    
        }
    }

    2.2 配置-生产者

    官网生产者配置详情:http://kafka.apache.org/11/documentation.html#producerconfigs

    下面是根据官网的配置进行翻译的,如果有翻译的不对的,可以留言沟通

    属性名 描述 类型 默认值 有效值 重要程度
    key.serializer

    为key序列化的类,这个类要实现org.apache.kafka.common.serialization.Serializer

    这个接口 

    class    
    value.serializer

    为value序列化的类,这个类要实现org.apache.kafka.common.serialization.Serializer

    这个接口

    class    
    acks

    在请求完成之前,生产者要求kafka集群的leader已经接收的确认数,这个可以控制发送记录的持久化能力(我们可以认为持久能力越高,也就是数据丢失率就越低)

    • acks=0 ,生产者不会等待服务器的任何确认,记录将会立即添加到套接字缓存以及等待发送,在这种情况下,服务器不能保证会接收到记录,以及retries配置将不会生效(因为客户端不想知道任何失败的情况)。每条记录返回的偏移量始终为-1,
      结:性能高,但是会数据丢失
    • acks=1,集群中的leader会把记录写到本地日志中,然后响应返回,并不会等待所有followers确认。在这种情况下,leader确认过记录之后突然宕机了,所有的followers还没来得及复制记录,这个时候,记录将会丢失
      总结:只需要集群中的leader确认,但是也有可能会数据丢失
    • acks=all,集群中的leader将会等待所有同步副本集去确认这个记录,只要保证至少有一个同步副本还存活着,记录就不会丢失。这是最有效的保证。设置acks=-1的话,会有同样的效果
      总结:需要leader和所有副本集去确认,性能会降低,但是数据最安全
    string 1 [all,-1,0,1]
    bootstrap.servers

    主机IP/端口成对的一组列表,用来与kafka集群建立初始化连接。这个列表的的形式为:host1:port1,host2:port2,...。因为这些服务端列表只是用来初始化连接并用来发现集群的成员的(可以动态的改变),所以这些列表不是必须包含所有服务器的集合(你应该超过1个,因为,一个服务器有可能宕机)

    list ""  
    buffer.memory

    总的缓存大小(单位:字节),生产者用来缓存发送的记录,此记录会等待被发送到服务端。如果生产的记录速度超过发送到服务端的速度,生产者可能会阻塞,阻塞的时间超过max.block.ms配置的值的话,就会抛出异常。

    这个设置应该大致的符合生产者需要使用的总缓存空间,但不是一个硬性限制,因为不是所有的缓存空间都是给生产者用作缓存。一些额外的空间既要被用作压缩(如果compression是enabled的话)也要维持快速的请求。

    long 33554432 [0,...]
    compression.type

    为生产者数据提供的压缩类型。默认是none(也就是没有压缩)。有效值为none,gzip,snappy,or lz4.压缩是针对数据的批量压缩的,所以批量数据的效率将会影响压缩的效率(批量数据越多意味着越好的压缩)

    string none  
    retries

    设置为大于0的值的时候,如果客户端发送任何记录遇到临时的错误的话,客户端会重新发送。没有将max.in.flight.requests.per.connection配置成1的话重新发送可能潜在的改变记录的发送顺序,因为如果两批记录发送到单一的分区(partition),第一批失败了,正在重新发送,但是第二批成功了,那么第二批的记录有可能出现在前面

    int 0 [0,...,2147483647]
    ssl.key.password

    key存储文件的私钥密码。对于客户端来说,可选参数

    password null  

    ssl.keystore.loca

    tion

    key存储文件的位置。对于客户端来说,可选参数,以及可以被用作对客户端的双向认证

    string null  

    ssl.keystore.pass

    word

    key存储文件的存储密码。对客户端来说,可选参数,只有ssl.keystore.location配置了才需要此参数

    password null  

    ssl.truststore.loc

    ation

    信任存储文件的位置

    string null  

    ssl.truststore.pass

    word

    信任存储文件的密码,如果一个密码没有被设置可访问信任存储,这个密码也是有效的,但是完整性的检查是无效的

    password null  
    batch.size

    每当有多个记录要发送到同样的分区的时候,生产者将尝试将记录批处理到一起以至于减少请求。这有助于客户端和服务端的性能。这个配置默认单位是字节。

    记录大于这个大小的话,不会尝试去批处理。

    小批量处理不怎么常见,并有可能减少吞吐量(批处理大小为0将禁止使用批处理)。

    一个非常大的批量大小会使用很多内存,会造成浪费,在预计额外记录的情况下,因为我们总是分配指定的缓冲大小

    int 16384 [0,...]
               
               


    1.acks(默认为1)
        "0":消息发送给broker以后,不需要确认(性能高,但是会出现数据丢失)
        "1":只需要获得kafka集群中leader节点的确认即可返回(leader/follower)
        all或者"-1" 需要ISR中的所有Replica(副本)进行确认(需要集群中所有节点确认)

    2.batch.size(默认16KB) 调优的重要参数
        producer对于同一个分区来说,会按照batch.size的大小进行统一收集,然后批量发送
        就是说我们如果发送的消息,不会直接发出去,等达到batch.size之后,再发出去

    3.linger.ms(默认0毫秒)
        延迟发送,假如设置的值是1000的话,就是每隔1秒钟积累之前的信息,然后再发送
        
    4.max.request.size(默认1M)
        消息最大发送的字节数,超过默认值1M的话,就会拒绝,抛异常

     

    2.3 结果展示

    发送的结果如下图:

     

    三 消费者

    3.1 代码

    public class KafkaConsumerDemo extends Thread {
    
        private final KafkaConsumer<Integer,String> consumer;
    
        public KafkaConsumerDemo(String topic) {
            Properties props = new Properties();
            /*设置集群kafka的ip地址和端口号*/
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                    "192.168.2.231:9092,192.168.2.232:9092");
            /*设置消费者的group.id*/
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerDemo1");
            /*消费信息以后自动提交*/
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
            /*控制消费者提交的频率*/
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
            /*key反序列化*/
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.IntegerDeserializer");
            /*value反序列化*/
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.StringDeserializer");
            /*创建kafka消费者对象*/
            consumer = new KafkaConsumer<Integer, String>(props);
            /*订阅主题*/
            consumer.subscribe(Collections.singleton(topic));
    
        }
    
        @Override
        public void run() {
            while (true) {
                /*每隔一段时间到服务器拉取数据*/
                ConsumerRecords<Integer, String> consumerRecords = consumer.poll(1000);
                for (ConsumerRecord record : consumerRecords) {
                    System.out.println(record.value());
                }
            }
        }
        public static void main(String[] args) {
            new KafkaConsumerDemo("test").start();
        }
    }

    3.2 配置说明

    1.group.id
        同一个gropu.id中的消费者,只能有一个消费者可以消费到信息
        但是不同的group.id都会去消费消息(消息是持久化的)
        
    2.enable.auto.commit 
        如果位true的话,消费者消费消息以后自动提交,只有当消息提交以后,该消息才不会被再次接收到

    3.auto.commit.interval.ms
        控制消费者提交的频率,默认单位是毫秒,一般配合enable.auto.commit

    4.auto.offset.reset
        这个参数是针对新的 groupid 中的消费者而言的,当有新 groupid 的消费者来
        消费指定的 topic 时,对于该参数的配置,会有不同的语义

        auto.offset.reset=latest 情况下,新的消费者将会从其他消费者最后消费的offset 处开始消费 Topic 下的消息
        auto.offset.reset= earliest 情况下,新的消费者会从该 topic 最早的消息开始消费
        auto.offset.reset=none 情况下,新的消费者加入以后,由于之前不存在offset,则会直接抛出异常。
        
    5.max.poll.records
        设置限制每次消费者poll返回的消息数,通过调整此值,可以减少poll的间隔

    官网消费者配置详情:http://kafka.apache.org/11/documentation.html#consumerconfigs

    3.3 结果展示

    消费的结果如下图:

     

     

     

     

    展开全文
  • kafka 商业环境实战及参数调优进阶系列2-kafka生产者和消费者吞吐量测试 本套系列博客从真实商业环境抽取案例进行总结和分享,并给出kafka商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。...

    版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。

    kafka线上真实环境实战及调优进阶系列

    1. kafka生产者吞吐量测试指标

    kafka-producer-perf-test :是kafka提供的测试Producer性能脚本,通过脚本,可以计算出Producer在一段时间内的平均延时和吞吐量。

    1.1 kafka-producer-perf-test

    在kafka安装目录下面执行如下命令,生产环境中尽量让脚本运行较长的时间,才会有意义:

    bin/kafka-producer-perf-test.sh --topic test --num-records 500000 --record-size 200 --througthput -1 --producer-props bootstrap.servers=bd-master:9092,bd-slave1=9092,bd-slave3=9092 acks=1

    1.2 测试结果分析如下:

    500000 records sent ,41963 records/sec (8.00 MB/sec),2362.85 ms/avg latency ,3513.00 ms max latency ,2792ms 50h ,3144ms 95th ,3364 ms 99h,3503ms 99.9th

    看到上面的结果肯定蒙了,看我细细讲来:
    kafka 的平均吞吐量是8.00 MB/sec ,即占用64Mb/s左右的带宽,平均每一秒发送41963条消息。平均延时为2362.85 ms,最大延时为3513.00 ms,95%的消息发送需要3144ms,99%的消息发送需要3364ms,99.9%的消息发送需要3503ms。

    2. kafka消费者吞吐量指标说明:

    2.1 kafka-consumer-perfs

    我们总共测试500万条数据量
    bin/kafka-consumer-perfs-test.sh --broker-list bd-master:9092,bd-slave1=9092,bd-slave3=9092 --message-size 200 --messages 500000 --topic test

    2.2 得到如下结果:

    2018-10-28 9:39:02 95.4188 92.2313 500271 484289
    看到上面的结果肯定蒙了,看我细细讲来:
    该环境下,1s内总共消费了95.4188MB消息,吞吐量为92.2313MB/s,也即736Mb/s。

    3 结语

    秦凯新 于深圳 2018-10-28

    展开全文
  • kafka生产者和消费者端的数据不一致

    千次阅读 2017-06-09 16:41:04
    今天测试遇到了问题,kafka生产者和消费者端的数据不一致,而且数据相差还比较大,测试生产10000条数据...kafka生产者是异步生产数据,我写了个测试方法在main函数里面 用for循环模拟发送10000条数据,就是因为这个main
  • Kafka生产者和消费者的工作原理

    万次阅读 2020-08-06 19:52:15
    探究Kafka生产者的工作原理 主题日志 对于每个主题,Kafka群集都会维护一个分区日志,如下所示: 每个分区(Partition)都是有序的(所以每一个Partition内部都是有序的),不变的记录序列,这些记录连续地附加到...
  • Kafka生产者和消费者--

    千次阅读 2019-03-14 11:22:00
    文章目录Kafka生产者:向broker写数据生产者概览创建生产者构造ProducerRecord发送消息到broker序列化器分区顺序性可靠性Kafka消费者基本概念消费过程创建消费者订阅主题轮询提交反序列化器 Kafka生产者:向broker...
  • Java测试Kafka生产者和消费者

    千次阅读 2018-06-10 17:21:34
    一、环境准备请看上篇...在Gradle中引入kafka-client依赖 compile group: 'org.apache.kafka', name: 'kafka-clients', version: '1.1.0'二、生产者创建使用topic为testimport java.util.Properties; import org.a...
  • Kafka 生产者和消费者的笔记

    万次阅读 多人点赞 2016-12-23 11:27:37
    Maven依赖: org.apache.kafka kafka-clients ...一、生产者 首先先看官方API示例: Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all
  • java kafka 生产者和消费者的代码实例

    千次阅读 2020-07-26 17:35:28
    生产者 import com.google.common.collect.Lists; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.Test; import java....
  • springboot配置kafka生产者和消费者详解

    万次阅读 热门讨论 2018-10-19 11:02:16
    在原有pom.xml依赖下新添加一下kafka依赖ar包 <!--kafka--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> ...
  • Kafka生产者和消费者相关命令行操作

    千次阅读 2019-07-15 09:40:06
    启动kafka: bin/kafka-server-start.sh config/server.properties & 1,开启zookeeper集群 startzk.sh 2,开启kafka集群 start-kafka.sh 2,开启kafka可视化界面 kafka-manager ...3,生产者操作: kafka-co...
  • kafka 生产者和消费者实例

    万次阅读 2016-09-27 20:14:00
    1、去官网下载kafka 我下载的版本是 kafka_2.11-0.10.0.1.tgz,下面的实例也是基于该版本。 2、解压安装 tar -xzf kafka_2.11-0.10.0.1.tgz mv kafka_2.11-0.10.0.1 /root 3、修改配置文件 cd /root/kafka_2.11-...
  • 在学习kafka集群之前,先来学习下单节点kafka的一些基本操作,包括安装及一些基本命令,以便后续集群环境的学习。 #1.单节点安装 kafka必须依赖于zookeeper,假定当前zookeeper集群已搭建完成(如不熟悉zookeeper...
  • 话不多说,直接上代码,在运行之前请...(一)生产者代码: Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); pr...
  • 一、准备的环境之前配置好的单节点的zookeeper单节点kafka,安装是在腾讯云上的。前两篇博客有zookeeper的安装和kafka的安装。zk和kafka同时跑在一台机器上,因为我没有太多的服务器,而且只是简单的java demo,没...
  • kafka生产者和消费者的javaAPI demo

    万次阅读 2017-02-21 10:33:39
    写了个kafka的java demo 顺便记录下,仅供参考 1.创建maven项目 目录如下: pom文件: xsi:schemaLocation="http://maven.apac
  • pykafka是Samsa的升级版本,使用samsa连接zookeeper,生产者直接连接kafka服务器列表,消费者才用zookeeper。 使用kafka Cluster。 二、pykafka (1) pykafka安装 根据机器环境从以下三种方式中选择进行一种...
  • Kafka 生产者和消费者 demo (java&scala)

    千次阅读 2017-06-30 22:39:10
    前几天完成了kafka ubuntu单机的搭建,后来就尝试写写kafka的简单代码,网上也有很多例子,但是如果自己编写运行还是有一些坑在里面,我也简单记录以下自己遇到的问题。
  • 创建两个maven工程,produceconsume: pom: <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 51,461
精华内容 20,584
关键字:

kafka生产者和消费者