精华内容
下载资源
问答
  • Producer指定分区

    千次阅读 2016-08-21 18:27:57
    我们用Kafka生产者时系统会默认进行分区,但是我们也可以通过控制key值得方式让消息存放到指定的partitions中。 首先我们创建一个SimplePartitioner类 packagecom.teamsun.kafka.m001; importkafka.producer....

    我们用Kafka生产者时系统会默认进行分区,但是我们也可以通过控制key值得方式让消息存放到指定的partitions中。

    首先我们创建一个SimplePartitioner类

    package com.teamsun.kafka.m001;

     

    import kafka.producer.Partitioner;

    import kafka.utils.VerifiableProperties;

     

    public class SimplePartitioner implements Partitioner {

    public SimplePartitioner(VerifiableProperties props) {

    }

     

    @Override

    public int partition(Object key, int numPartitions) {

    int partition = 0;

    String k = (String) key;

    partition = Math.abs(k.hashCode()) % numPartitions;

    System.out.println(partition);

    return partition;

    }

    }

    在这个类中我们可以通过key值来控制指定分区,这里是通过hash值来控制的。

    创建Kafka配置文件

    package com.teamsun.kafka.m001;

    public interface KafkaProperties {

     

    final static String zkConnect = "hadoop0:42182,hadoop1:42182,hadoop2:42182,hadoop3:42182";

    final static String groupId1= "group1";

    final static String topic = "test3";

    final static String kafkaServerURL = "hadoop0,hadoop1,hadoop2,hadoop3";

    final static int kafkaServerPort = 9092;

    final static int kafkaProducerBufferSize = 64 * 1024;

    final static int connectionTimeOut = 20000;

    final static int reconnectInterval = 10000;

    final static String clientId = "SimpleConsumerDemoClient";

    }

    创建生产者

    package com.teamsun.kafka.m001;

     

    import java.util.Properties;

    import kafka.javaapi.producer.Producer;

    import kafka.producer.KeyedMessage;

    import kafka.producer.ProducerConfig;

     

    public class PartitionerProducer {

     

    public static void main(String[] args) {

    Properties props = new Properties();

    props.put("serializer.class", "kafka.serializer.StringEncoder");

    props.put("metadata.broker.list",

    "hadoop0:9092,hadoop1:9092,hadoop2:9092,hadoop3:9092");

    props.put("partitioner.class", "com.teamsun.kafka.m001.SimplePartitioner");

    props.put("request.required.acks", "1");

     

     

    Producer<String, String> producer = new Producer<String, String>(

    new ProducerConfig(props));

    String topic = "test3";

    for (int i = 0; i <= 1000000; i++) {

    String k = "key" + i;

    String v = k + "--value" + i;

    producer.send(new KeyedMessage<String, String>(topic, k, v));

    System.out.println(k+v);

    }

    producer.close();

    }

    }

    这里注意红色部分引用之前创建的SimplePartitioner类

    创建消费者

    package com.teamsun.kafka.m001;

     

    import java.util.HashMap;

    import java.util.List;

    import java.util.Map;

    import java.util.Properties;

     

    import com.teamsun.kafka.m001.KafkaProperties;

     

    import kafka.consumer.ConsumerConfig;

    import kafka.consumer.ConsumerIterator;

    import kafka.consumer.KafkaStream;

    import kafka.javaapi.consumer.ConsumerConnector;

     

    public class KafkaConsumer1 extends Thread {

    private final ConsumerConnector consumer;

    private final String topic;

     

    public KafkaConsumer1(String topic) {

    consumer = kafka.consumer.Consumer

    .createJavaConsumerConnector(createConsumerConfig());

    this.topic = topic;

    }

     

    private static ConsumerConfig createConsumerConfig() {

    Properties props = new Properties();

    props.put("zookeeper.connect", KafkaProperties.zkConnect);

    props.put("group.id", KafkaProperties.groupId1);

    props.put("zookeeper.session.timeout.ms", "40000");

    props.put("zookeeper.sync.time.ms", "200");

    props.put("auto.commit.interval.ms", "1000");

    return new ConsumerConfig(props);

    }

     

    @Override

    public void run() {

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

    topicCountMap.put(topic, new Integer(1));

    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer

    .createMessageStreams(topicCountMap);

    KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);

    ConsumerIterator<byte[], byte[]> it = stream.iterator();

    while (it.hasNext()) {

    System.out.println("1receive:" + new String(it.next().message()));

    //try {

    ////sleep(300);      // 每条消息延迟300ms

    //} catch (InterruptedException e) {

    //e.printStackTrace();

    //}

    }

    }

    }

    多consumer同时消费时指定相同组即可同时消费消息。

     

     

    ---
    更多文章关注公众号

    展开全文
  • 1.Kafka选择由producer向broker push消息并由consumer从broker pull消息。 push模式的目标是尽可能以最快速度传递消息; pull模式则可以根据consumer的消费能力以适当的速率消费消息 Kafka API实现Producer $...

    回顾: Kafka“推拉”模式

        1.Kafka选择由producer向broker push消息并由consumer从broker pull消息。
          push模式的目标是尽可能以最快速度传递消息;
          pull模式则可以根据consumer的消费能力以适当的速率消费消息

     


    Kafka API实现Producer

        $>kafka-console-producer.sh --topic my-topic --broker-list localhost:9092   

            0.将Kafka的maven依赖复制到pom.xml中

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.1</version>
        </dependency>

            1.编写Producer类 (java类)

        import org.apache.kafka.clients.producer.KafkaProducer;
        import org.apache.kafka.clients.producer.Producer;
        import org.apache.kafka.clients.producer.ProducerRecord;
    
        import java.util.Properties;
    
        public class KafkaProducerDemo {
            public static void main(String[] args) {
                Properties props = new Properties();
                
                //windows host修改
                props.put("bootstrap.servers", "master:9092");
                props.put("acks", "all");
                props.put("retries", 0);
                props.put("batch.size", 16384);
                props.put("linger.ms", 1);
                props.put("buffer.memory", 33554432);
                props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
                Producer<String, String> producer = new KafkaProducer<String, String>(props);
                for (int i = 0; i < 100; i++) {
                    producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
                }
                producer.close();
            }
        }

     

        8.消费者能同时接收API的生产消息
        $>kafka-console-consumer.sh --zookeeper localhost:2181 --topic my-topic --from-beginning

           



    Kafka API实现Producer向指定分区生产数据

        1.开启zookeeper
           $>zookeeper-server-start.sh /home/hyxy/soft/kafka/config/zookeeper.properties

        2.开启Kafka Broker
           $>kafka-server-start.sh /home/hyxy/soft/kafka/config/server.properties
           $>kafka-server-start.sh /home/hyxy/soft/kafka/config/server-1.properties
        3.创建主题
           $>kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic-partition2 --partitions 2 --replication-factor 1
        
        5. 默认按hash分区将key值放到不同的分区中
           需求循环消息内容200-300,将小于220的数字放入分区1中, 其它220-300放入分区2中

    public class KafkaProducerDemo {
            public static void main(String[] args) throws Exception {
            Properties props = new Properties();
            props.put("bootstrap.servers", "master:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            Producer<String, String> producer = new KafkaProducer<String, String>(props);
            for (int i = 200; i < 300; i++) {
                Thread.sleep(1000);
                if(i<=220){
                    producer.send(new ProducerRecord<String, String>("my-topic-partition2",0, Integer.toString(i), Integer.toString(i)));
                }else{
                    producer.send(new ProducerRecord<String, String>("my-topic-partition2",1, Integer.toString(i), Integer.toString(i)));
                }
    
            }
            producer.close();
            }
        }

       运行 !!!

        6.kafka-console-consumer.sh --zookeeper localhost:2181 --topic my-topic-partition2 --from-beginning

        7.查看kafka_logs(存放数据大小)
        [hyxy@master my-topic-partition2-0]$ ll【840byte 200-220】
        total 4
        -rw-rw-r--. 1 hyxy hyxy 10485760   00000000000000000000.index
        -rw-rw-r--. 1 hyxy hyxy      840   00000000000000000000.log

        [hyxy@master my-topic-partition2-1]$ ll【3160byte 220-300】
        total 4
        -rw-rw-r--. 1 hyxy hyxy 10485760  00000000000000000000.index
        -rw-rw-r--. 1 hyxy hyxy     3160  00000000000000000000.log


     查看默认分区

    【public class KafkaProducer<K, V> implements Producer<K, V> {】
    【private final Partitioner partitioner;】 右键查看实现类
    【org.apache.kafka.clients.producer.internals.DefaultPartitioner】
    ---------------------------------
        Kafka默认分区策略
        1.If a partition is specified in the record, use it
          如果记录中指定了分区,使用当前分区;如上案例!!!
        2.If no partition is specified but a key is present choose a partition based on a hash of the key
          如果没有指定分区,但存在一个键,则根据键的哈希值选择分区
        3.If no partition or key is present choose a partition in a round-robin fashion
          如果没有分区或键存在,请以循环方式选择分区
        
       


    Kafka API实现Consumer

    1.  kafka-console-consumer.sh --zookeeper localhost:2181 --topic my-topic-partition2 --from-beginning
        编写Consumer 

        import org.apache.kafka.clients.consumer.ConsumerRecords;
        import org.apache.kafka.clients.consumer.KafkaConsumer;
        import org.apache.kafka.clients.consumer.ConsumerRecord;
        import java.util.Arrays;
        import java.util.Properties;
    
       public class KafkaConsumerDemo {
           public static void main(String[] args) {
               Properties props = new Properties();
               props.put("bootstrap.servers", "master:9092");
               props.put("group.id", "test");
               //设置enable.auto.commit意味着自动提交偏移量,频率由配置auto.commit.interval.ms控制
               props.put("enable.auto.commit", "true");
               props.put("auto.commit.interval.ms", "1000");
               props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
               props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
               KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
               consumer.subscribe(Arrays.asList("my-topic-partition2"));
               while (true) {
                   //100ms  poll   拉取时超时时间,100ms没有消费超时
                   ConsumerRecords<String, String> records = consumer.poll(100);
                   for (ConsumerRecord<String, String> record : records)
                       System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
               }
           }
        }


        2.开启zookeeper
        3.开启Broker,并确保Topic创建成功!
        4.开启Consumer java程序
          测试监控!!!


        record下更多的方法,eg:打印partitions:

    System.out.printf("partition = %s,offset = %d, key = %s, value = %s%n",record.partition(),record.offset(), record.key(),record.value());
       
    // partition = 1,offset = 200, key = 262, value = 262

     
        5.集群端发送消息,KafkaConsumerDemo获得消息
        kafka-console-producer.sh --topic my-topic-partition2 --broker-list localhost:9092 
     

    展开全文
  • 消息在经过拦截器、序列化后,就需要确定它发往哪个分区,如果在ProducerRecord中指定了partition字段,那么就不再需要partitioner分区器进行分区了,如果没有指定,那么会根据key来将数据进行分区,如果partitioner...

      消息在经过拦截器、序列化后,就需要确定它发往哪个分区,如果在ProducerRecord中指定了partition字段,那么就不再需要partitioner分区器进行分区了,如果没有指定,那么会根据key来将数据进行分区,如果partitioner和key都没有指定,那么就会采用默认的方式进行数据分区。

      有没有指定partition可以从源码中看出:

     public ProducerRecord(String topic, Integer partition, K key, V value) {}

    如果指定的partition,那就指定了数据发往哪个分区上,如果没有就会根据key来进行数据分区,如果2个都没有,那么会采用默认的分区策略来进行数据分区

    1.根据key进行分区

    public class CustomPartitioner {
        
        private static final Logger LOG = LoggerFactory.getLogger(CustomPartitioner.class);
        
        public static void main(String[] args) {
            //1.加载配置信息
            Properties prop = loadProperties();
            
            //2.创建生产者
            KafkaProducer<Integer,String> producer = new KafkaProducer<>(prop);
            
            String sendContent = "hello_kafka";
            IntStream.range(0, 10).forEach(i ->{
                try {
                    ProducerRecord<Integer,String> record = new ProducerRecord<>("test1",i,sendContent+"_"+i);  //topic key value
                    Future<RecordMetadata> future = producer.send(record);
                    RecordMetadata recordMetadata = future.get();
                    LOG.info("发送的数据是 :{},offset:是{},partition是:{}",sendContent,recordMetadata.offset(),recordMetadata.partition());
                } catch (Exception e) {
                    e.printStackTrace();
                }
                
            });
            
        }
         //配置文件的设置
        public static Properties loadProperties() {
            Properties prop = new Properties();
            prop.put("bootstrap.servers", "192.168.100.144:9092,192.168.100.145:9092,192.168.100.146:9092");
            prop.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
            prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            prop.put("acks", "all");    //发送到所有的ISR队列中
            return prop;
        }
    }

     2.自定义分区

      同样在使用自定义分区的时候,需要写实现类和在producer中配置引用

      我们在这个示例中,根据key来分区,key在序列化的时候用的是IntegerSerializer,在ProducerRecord中我们没有指定partition

      自定义分区器

    public class CustomPartition implements Partitioner{
    
        @Override
        public void configure(Map<String, ?> configs) {
            // TODO Auto-generated method stub
            
        }
    
        @SuppressWarnings({ "null", "unused" })
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            
            int partitionNum = cluster.partitionsForTopic(topic).size();
            int partition = (Integer)key%partitionNum;
            return key == null? 0:partition;
        }
    
        @Override
        public void close() {
            // TODO Auto-generated method stub
            
        }
    }

      生产者

    public class ProducerDemo {
        
        private static final Logger LOG = LoggerFactory.getLogger(ProducerDemo.class);
            
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            //1.加载配置信息
            Properties prop = loadProperties();
            
            //2.创建生产者
            KafkaProducer<Integer,String> producer = new KafkaProducer<>(prop);
            
            //3.发送内容
            String sendContent = "hello_kafka";
            IntStream.range(0, 10).forEach(i ->{
                try {
                    ProducerRecord<Integer,String> record = new ProducerRecord<>("test1",i,sendContent+"_"+i);
                    Future<RecordMetadata> future = producer.send(record);
                    RecordMetadata recordMetadata = future.get();
                    LOG.info("发送的数据是 :{},offset:是{},partition是:{}",sendContent,recordMetadata.offset(),recordMetadata.partition());
                } catch (Exception e) {
                    e.printStackTrace();
                }
                
            });                
            producer.close();    //回调拦截器中的close方法
            
        }
            
        //配置文件的设置
        public static Properties loadProperties() {
            Properties prop = new Properties();
            prop.put("bootstrap.servers", "192.168.100.144:9092,192.168.100.145:9092,192.168.100.146:9092");
            prop.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
            prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            prop.put("partitioner.class", "com.zpb.partitioner.CustomPartition");
            prop.put("acks", "all");
            return prop;
        }
    }

     

     

     

     

     

    转载于:https://www.cnblogs.com/MrRightZhao/p/11345846.html

    展开全文
  • kafka指定分区消费

    千次阅读 2020-05-22 18:48:51
    消费者组中不同的消费者需要从某个topic下的不同分区接收消息,并确保消息的有序性,或者producer端需要根据一定的业务规则对消息进行分类然后发送到不同的分区,然后不同的消费者就可以获取指定分区的消息了 ...

    前言

    在某些场景中,消费者组中不同的消费者需要从某个topic下的不同分区接收消息,并确保消息的有序性,或者producer端需要根据一定的业务规则对消息进行分类然后发送到不同的分区,然后不同的消费者就可以获取指定分区的消息了

    在上一篇中,我们讨论过如何在producer端将消息发送到指定的分区的两种实现方式,结合上面的需求,我们继续在此基础上实现消费端消费指定分区的消息

    代码流程说明

    1、创建测试topic

    提前在开启了kafka服务的虚拟机或服务器上创建一个名为 "zcy3"的topic,为3个分区,可执行下面的命令进行创建

    ./kafka-topics.sh --zookeeper localhost:2181 --create --topic zcy3 --partitions 3 --replication-factor 1
    

    2、编写producer代码

    public class ParProducer {
    
        private final KafkaProducer<String, String> producer;
    
        public final static String TOPIC = "zcy3";
    
        public ParProducer() {
            Properties props = new Properties();
            props.put("bootstrap.servers", "101.15.37.145:9092");
            //所有follower都响应了才认为消息提交成功,即"committed"
            pro
    展开全文
  • kafka producer 分区

    2021-03-15 23:46:58
    //策略二:这个地方就是指定了key, hash取模,相同的key打到同一个分区上 int partition = partition(record, serializedKey, serializedValue, cluster); -> return partition != null ? partition : //使用...
  • 所以想象将分区做个自定义,然后消费者组的消费者消费指定分区,达到这一目的。 一 自定义kafka分区 public class SimplePartitioner implements Partitioner { private final AtomicInteger atomicInteger = new ...
  • 生产者指定分区规则

    2020-08-12 11:19:09
    生产者指定分区规则1.编写分区规则类2.生产者类(指定分区) 1.编写分区规则类 import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; public ...
  • 关于kafka producer 分区策略的思考

    万次阅读 2016-04-07 16:24:53
    在上面的程序中,我在producer中没有定义分区策略,也就是说程序采用默认的kafka.producer.DefaultPartitioner,来看看源码中是怎么定义的: class DefaultPartitioner (props: VerifiableProperties = null)...
  • kafka发送消息至指定分区

    千次阅读 2020-05-12 22:51:56
    在实际使用中,我们可能需要对某个topic下不同的消息进行分类管理,比如确保消费的顺序性,在这种场景下,我们可以首先确保生产者发送消息到指定分区即可 本文的测试基于docker搭建的一个双节点的简单集群,有兴趣...
  • 前言 在前面的示例中,我们创建的ProducerRecord对象包括主题名称,键和值。 Kafka消息是键值对,...它们还用于决定将消息写入哪个主题分区。 具有相同Key的所有消息将转到同一分区。 这意味着如果进程只读取主题...
  • 2.写入指定分区 Producer根据指定的partition方法(round-robin、hash等),将消息发布到指定topic的partition里面。 每一条消息被发送到broker时,会根据paritition规则选择被存储到哪一个partition。如果partition...
  • SpringBoot整合kafka代码,两个工程 一个消费者 一个生产者,利用定时任务和restapi发送消息,自动分配分区消费和指定分区消费,再也不用写死分区了真好。代码简洁。欢迎大家下载。
  • 我们在前面提到过,kafka的topic是个逻辑概念,实际处理消息处理的是topic的partition。本篇我们将介绍kafka消息发送时是如何分区的以及如何自定义分区。...当我们在发送消息时,如果不指定key,则kafka内部默...
  • 一、 kafka发送消息的三种方式 public class MyProducer implements Job { private static KafkaProducer<String,String> producer; static { Properties properties = new Properties(); ...
  • 第1章 简介 Kafka生产这端分区分配规则, 第2章 根据qian元数据信息,确定数据发往哪个partition
  • Kafka Producer Inteceptor ...Producer在将消息序列化和分配分区之前会调用拦截器的这个方法来对消息进行相应的操作。一般来说最好不要修改消息ProducerRecord的topic、key以及partition等信息,如
  • 先建立好分区的topic kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --...建立分区为3 ,并行度为3 的producer 2.然后在flink中连接这个topic treamExecutionEnvironment...
  • kafka发送、消费指定分区消息

    千次阅读 2020-09-01 16:40:41
    在实际使用中,我们可能需要对某个topic下不同的消息进行分类管理,比如确保消费的顺序性,在这种场景下,我们可以首先确保生产者发送消息到指定分区即可 本文的测试基于docker搭建的一个双节点的简单集群,有...
  • springboot集成kafka指定分区发送

    千次阅读 2019-12-04 14:12:13
    springboot 版本为 2.0.6 1.配置文件 kafka: bootstrap-servers: kafka地址 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafk...
  • springboot 版本为 2.0.6 ... producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 3
  • 三、Kafka Producer发送消息及分区策略

    千次阅读 2020-08-18 16:07:50
    1、Producer代码实现 ps:不建议使用自定义序列化和反序列化,他们会把生产者和消费者耦合在一起,且容易出错 // 同步发送消息、// 异步发送消息 public class KafkaProducerDemo { private static Properties ...
  • 设置分区与指定分区消费 public class Patition1 implements Partitioner{ //设置 public void configure(Map, ?> configs) { } //分区逻辑 public int partition(String topic, Object key, byte[] ...
  • 【Kafka】Kafka指定分区消费

    千次阅读 2020-02-25 16:08:00
    //消费topic 消费全部分区 while ( true ) { ConsumerRecords < String , String > poll = consumer . poll ( Duration . ofSeconds ( 10 ) ) ; //消费一定时间的数据 Thread . sleep ( 3000 ) ; ...
  • 发送分区设置 默认配置 The default partitioning strategy: If a partition is specified in the record, use it If no partition is specified but a key is present choose a partition based on a hash of ...
  • kafka 在生产数据时指定 分区,生产数据时带有回调函数 1、配置文件 #此处是broker的地址 bootstrap.servers=bigdata01:9092,bigdata02:9092,bigdata03:9092 acks=all retries=0 batch.size= 16384 linger.ms...
  • deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.retries=3 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 spring.kafka....

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 17,669
精华内容 7,067
关键字:

producer指定分区