精华内容
下载资源
问答
  • 主要介绍了Java实现Kafka生产者消费者代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • 完整kafka生产者消费者helloword代码例子。完整kafka生产者消费者helloword代码例子。
  • 主要介绍了kafka生产者消费者的javaAPI的示例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
  • Kafka生产者代码:往topic里面写东西 import java.util.Properties; import scala.collection.Seq; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer....

    Kafka生产者代码:往topic里面写东西

    import java.util.Properties;
    import scala.collection.Seq;
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    
    public class ProducerDemo {
       
    public static void main(String []args) throws InterruptedException    {
    	   //配置文件
    	   Properties props =new Properties();
    	   
    	   //zzookeeper服务器列表
    	   props.put("zk.connect", "192.168.146.100:2181,192.168.146.101:2181,192.168.146.102:2181");
    	   
    	   //borker 列表
    	   props.put("metadata.broker.list", "192.168.146.100:9092,192.168.146.101:9092,192.168.146.102:9092");
    	   
    	   //消息序列化机制   数据类型为String就需要用kafka.serializer.StringEncoder,数据类型为别的就要用别的,系统没有的自己写一个序列化
    	   props.put("serializer.class", "kafka.serializer.StringEncoder");
    	   
    	   //把  props重新封装成 生产者对象
    	   ProducerConfig config = new ProducerConfig(props);
    	   
    	   //数据类型为String
    		Producer<String, String> producer = new Producer<String, String>(config);
    		//发送100条信息
    		for (int i = 1; i <= 100; i++) {
    			Thread.sleep(500);
    			//发送消息 test1 为已经创建的topic
    			KeyedMessage<String, String> messages=new KeyedMessage<String, String>("test1",
    					"This is test1 Num:" + i );
    			producer.send( messages  );
    		}
       }
    }
    

    错误记录:导入priducer包得时候导入 import kafka.javaapi.producer.Producer,我导入得时候为import kafka.producer.Producer; 提示send错误,利用提示改正还是运行错误

    	SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    	SLF4J: Defaulting to no-operation (NOP) logger implementation
    	SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    	Exception in thread "main" java.lang.ClassCastException: kafka.producer.KeyedMessage 				   cannot be cast to scala.collection.Seq
    		at ProducerDemo.main(ProducerDemo.java:26)
    

    消息实时查看信息
    在这里插入图片描述

    Kafka消费者代码:

    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.message.MessageAndMetadata;
    
    
    public class ConsumerDemo {
    	public static void main(String []agrs){
    		
    	Properties props =new Properties();
    	props.put("zookeeper.connect", "192.168.146.100:2181,192.168.146.101:2181,192.168.146.102:2181");
    	
    	//给消费者分组,这里分了一组 也可以不分
    	props.put("group.id", "1");
    	
    	//读的信息,从开头开始读
    	props.put("auto.offset.reset","smallest");
    	
    	//将props封装为消费者的配置对象
    	ConsumerConfig config=new ConsumerConfig(props);
    	//拿到一个消费者客户端
    	ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config);
     	
    	Map<String,Integer>topicCountMap=new HashMap<String,Integer>();
    	// topic名称 ,第二个参数 是线程数量
    	topicCountMap.put("test1", 2);  
    	
    	//可以传入好多topic
    	//topicCountMap.put("test2", 1);
    	
    	//获得消息流
    	Map<String,List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);//需要传一个Map参数
    	
    	//get map里面的topic  -test1  ,拿出来是test1里面的消息,为什么是t<KafkaStream<byte[],byte[]>,两个元素,因为卡夫卡KafkaStream包含消息和元数据。 比如一些消息管理数据
    	List<KafkaStream<byte[],byte[]>>streams=consumerMap.get("test1");
    	
    	for(final KafkaStream<byte[],byte[]> kafkaStream : streams){
    		new Thread(new Runnable(){
    
    			@Override
    			public void run() {
    			//遍历流kafkaStream对象
    				for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){
    				//mm消息拿出来是序列化的,必须用String格式转换
    					String msg = new String(mm.message());
    					System.out.println(msg);
    				}
    			}
    		}).start();
    		
    /*  关于Runnable的另一种写法
    	Runnable runnble=new Runnable( )  {
    			@Override
    			public void run() {
    				for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){
    					String msg = new String(mm.message());
    					System.out.println(msg);
    				}
    			}} ;
    		Thread thread =new Thread(runnble);
    		thread.start();
    */
    

    信息:
    在生产者发送
    在这里插入图片描述
    信息实时显示在这里插入图片描述

    Runnable()是接口在上面的写法并不是实例化,看起来是直接new一个接口,实际上是匿名内部类

    同等与这种写法,实例化了一个Runnable接口子类的实例。
    Thread t=new Thread(new MyRunnable());
    public class MyRunnable implements Runnable{
    @Override
    public void run() {
    //具体实现
    }
    }

    展开全文
  • 1、安装kafka-python 执行命令 pip install kafka-python ...2、编写python kafka 生产者消费者代码 # test.py import sys import time import json from kafka import KafkaPr...

     

    1、安装kafka-python 

    执行命令

    pip install kafka-python

    kafka-python        1.4.6

     

    2、编写python kafka 生产者消费者代码

    # test.py
    
    import sys
    import time
    import json
    
    from kafka import KafkaProducer
    from kafka import KafkaConsumer
    from kafka.errors import KafkaError
    
    
    KAFAKA_HOST = "127.0.0.1"
    KAFAKA_PORT = 9092
    KAFAKA_TOPIC = "test123"
    
    
    class Kafka_producer():
        '''''
        生产模块:根据不同的key,区分消息
        '''
    
        def __init__(self, kafkahost,kafkaport, kafkatopic, key):
            self.kafkaHost = kafkahost
            self.kafkaPort = kafkaport
            self.kafkatopic = kafkatopic
            self.key = key
            print("producer:h,p,t,k",kafkahost,kafkaport,kafkatopic,key)
            bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
                    kafka_host=self.kafkaHost,
                    kafka_port=self.kafkaPort
                    )
            print("boot svr:",bootstrap_servers)
            self.producer = KafkaProducer(bootstrap_servers = bootstrap_servers
                    )
    
        def sendjsondata(self, params):
            try:
                parmas_message = json.dumps(params,ensure_ascii=False)
                producer = self.producer
                print(parmas_message)
                v = parmas_message.encode('utf-8')
                k = key.encode('utf-8')
                print("send msg:(k,v)",k,v)
                producer.send(self.kafkatopic, key=k, value= v)
                producer.flush()
            except KafkaError as e:
                print (e)
    
    class Kafka_consumer():
        '''''
        消费模块: 通过不同groupid消费topic里面的消息
        '''
    
        def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
            self.kafkaHost = kafkahost
            self.kafkaPort = kafkaport
            self.kafkatopic = kafkatopic
            self.groupid = groupid
            self.key = key
            self.consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid,
                    bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
                        kafka_host=self.kafkaHost,
                        kafka_port=self.kafkaPort )
                    )
    
        def consume_data(self):
            try:
                for message in self.consumer:
                    yield message
                    print("1")
                    print(message)
            except KeyboardInterrupt as e:
                print (e)
    
    
    def main(xtype, group, key):
        '''''
        测试consumer和producer
        '''
        if xtype == "p":
            # 生产模块
            producer = Kafka_producer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, key)
            print ("===========> producer:", producer)
            for _id in range(100):
              #  params = '{"msg" : "%s"}' % str(_id)
               params=[{"msg0" :_id},{"msg1" :_id}]
               producer.sendjsondata(params)
               time.sleep(1)
    
        if xtype == 'c':
            # 消费模块
            consumer = Kafka_consumer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, group)
            print ("===========> consumer:", consumer)
            message = consumer.consume_data()
            print('2')
            print(message)
            for msg in message:
                print ('msg---------------->k,v', msg.key,msg.value)
                print ('offset---------------->', msg.offset)
    
    if __name__ == '__main__':
        xtype = sys.argv[1]
        group = sys.argv[2]
        key = sys.argv[3]
        main(xtype, group, key)

     

    3、启动kafka服务

    打开终端 输入命令:

    kafka-server-start /usr/local/etc/kafka/server.properties

     

    4、新开一终端创建生产者

    切换到程序路径执行如下指令

    python test.py p g k

     

    5、新开一终端创建消费者

    切换到程序路径执行如下指令

    python test.py c g k

     

     

    至此已经完成kafka 的消息收发了。

     

     

     

    转载于:https://www.cnblogs.com/BlueSkyyj/p/11429484.html

    展开全文
  • 大数据——Kafka生产者消费者代码

    万次阅读 2021-07-29 22:43:50
    kafka.version>2.0.0</kafka.version> </properties> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <vers

    导入依赖

        <properties>
           <kafka.version>2.0.0</kafka.version>
        </properties>
    
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka_2.12</artifactId>
          <version>${kafka.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>${kafka.version}</version>
        </dependency>

    生产者(java版)+消费者(scala版)

    KafkaProducer生产者模式

    java版

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    
    public class KafkaKb12Producer {
        public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
            Properties config = new Properties();
            //连接
            config.setProperty("bootstrap.servers","192.168.131.200:9092");
            //容错
            config.setProperty("retries","2");
            config.setProperty("acks","-1");
            //批处理:满足一个都会推送消息
            config.setProperty("batch.size","128"); //达到指定字节
            config.setProperty("linger.ms","100"); //达到指定时间
            //消息键值的序列化
            config.setProperty("key.serializer","org.apache.kafka.common.serialization.LongSerializer");
            config.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    
            KafkaProducer<Long,String> producer = new KafkaProducer<Long, String>(config);
            BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
            long count = 0;
            final  String TOPIC ="kb12_01"; //发送的主题
            final int PATITION =0; //指定主题的分区
    
            while (true){
                String input = reader.readLine();
                if (input.equalsIgnoreCase("exit")){
                    break;
                }
                ProducerRecord<Long,String> record
                        = new ProducerRecord<Long, String>(TOPIC,PATITION,++count,input);
                RecordMetadata rmd = producer.send(record).get();
                System.out.println(rmd.topic()+"\t"+rmd.partition()+"\t"+rmd.offset()+"\t"+count+":"+input);
            }
            reader.close();
            producer.close();
        }
    }
    

    KafkaConsumer消费者模式

    scala版

    import java.time.Duration
    
    import java.util
    import java.util.Properties
    
    import org.apache.kafka.clients.consumer.KafkaConsumer
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.kafka.common.serialization.LongDeserializer
    
    
    object KafkaKB12Consumer {
      def main(args: Array[String]): Unit = {
        val config:Properties = new Properties()
        config.setProperty("bootstrap.servers","192.168.131.200:9092")
        //设置key的反序列数据类型  
        config.setProperty("key.deserializer","org.apache.kafka.common.serialization.LongDeserializer")
        //设置value的反序列数据类型
        config.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
        //设置消费者的groupid
        config.setProperty("group.id","test01")
        //enable.auto.commit默认值为true,表示自动提交,如果需要手动处理,需要设置为false
        config.setProperty("enable.auto.commit","true")
        //auto.commit.interval.ms默认值为5000,表示每5秒consumer会提交offset给kafka,或者每一次数据从指定的 Topic 取回时,将会提交最后一次的 Offset
        config.setProperty("auto.commit.interval.ms","5000")
        //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
        //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
        //none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
        config.setProperty("auto.offset.reset","earliest")
        val topics = util.Arrays.asList("kb12_01")
        val consumer:KafkaConsumer[Long,String] = new KafkaConsumer(config)
        consumer.subscribe(topics)
        try {
          while (true){
            consumer.poll(Duration.ofSeconds(5)).records("kb12_01").forEach(e=>{
              println(s"${e.key()}\t${e.value()}")
            })
          }
        }finally {
          consumer.close()
        }
      }
    }

    生产者(scala版)+消费者(java版)

    KafkaProducer生产者模式

    scala版

    import java.util.Properties
    import java.util.concurrent.Future
    
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
    
    import scala.util.Random
    object KafkaKB12Producer {
      def main(args: Array[String]): Unit = {
        val properties = new Properties()
        properties.setProperty("bootstrap.servers","192.168.131.200:9092")
        properties.setProperty("key.serializer","org.apache.kafka.common.serialization.LongSerializer")
        properties.setProperty("value.serializer","org.apache.kafka.common.serialization.FloatSerializer")
        properties.setProperty("acks","1")
        properties.setProperty("retries","2")
        properties.setProperty("linger.ms","500")
        properties.setProperty("batchSize","10")
    
        val TOPIC = "kb12_02"
        val PARTITION = 0
        val producer: KafkaProducer[Long, Float] = new KafkaProducer[Long, Float](properties)
        var count = 0
        val rand = new Random()
        try{
          while (true){
            count += 1
            val value = 100 + rand.nextInt(1000)
            val producerRecord: ProducerRecord[Long, Float] = new ProducerRecord[Long, Float](TOPIC,PARTITION,count,value)
            val send: Future[RecordMetadata] = producer.send(producerRecord)
            val metadata = send.get()
            println(s"${metadata.topic()}\t${metadata.partition()}\t${metadata.offset()}\t$count->$value")
          }
        }finally {
          producer.close()
        }
      }
    }

    KafkaConsumer消费者模式

    java版

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    
    public class KafkaKb12Consumer {
        public static void main(String[] args) {
            Properties config = new Properties();
            config.setProperty("bootstrap.servers","192.168.131.200:9092");
            config.setProperty("group.id","test01");
            config.setProperty("key.deserializer","org.apache.kafka.common.serialization.LongDeserializer");
            config.setProperty("value.deserializer","org.apache.kafka.common.serialization.FloatDeserializer");
            config.setProperty("enable.auto.commit","true");
            config.setProperty("auto.offset.reset","earliest");
            config.setProperty("auto.commit.interval.ms","5000");
    
            List<String> TOPIC = Arrays.asList("kb12_02");
            KafkaConsumer<Long, Float> consumer = new KafkaConsumer<Long, Float>(config);
            consumer.subscribe(TOPIC);
            try{
                while (true){
                    ConsumerRecords<Long, Float> records = consumer.poll(Duration.ofSeconds(5));
                    for (ConsumerRecord<Long, Float> record : records) {
                        System.out.println(record.key()+"->"+record.value());
                    }
                }
            }finally {
                consumer.close();
            }
        }
    }
    

    展开全文
  • Java实现Kafka生产者消费者功能

    万次阅读 2018-10-28 12:43:09
    Java实现Kafka生产者消费者功能 好久没有更新博客,最近学的东西很多,但一直忙的没有时间去写,先补充一篇kafka的,最基本的功能使用,不得不感叹大数据确实难,即使只说一个简单的功能,之前也需要铺垫很多完成的...

    Java实现Kafka生产者消费者功能

    好久没有更新博客,最近学的东西很多,但一直忙的没有时间去写,先补充一篇kafka的,最基本的功能使用,不得不感叹大数据确实难,即使只说一个简单的功能,之前也需要铺垫很多完成的功能,比如这篇博客的前提是,你已经安装了虚拟机,里面配置了Hadoop生态组件zookeeper,安装配置了kafka,学会使用Maven,springboot等些技术,而不是直接拿来代码就可以复制粘贴。

    保证你的虚拟机是可以ping通的,hmaster是我在host中配置的虚拟机IP,可以修改为自己的。

    在shell中开启两个窗口,测试终端中的producer和consumer能否在同一个topic中传递消息。

    启动kafka

    • bin/kafka-server-start.sh /config/server.properties

    使用Kafka(单节点单broker) • 创建topic: zk

    • kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic hello_topic

    查看所有topic

    • kafka-topics.sh --list --zookeeper hadoop000:2181

    发送消息: broker

    • kafka-console-producer.sh --broker-list hadoop000:9092 --topic hello_topic

    消费消息: zk

    • kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic hello_topic --from-beginning
    在这里插入图片描述

    在这里插入图片描述

    Maven依赖

    <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>0.9.0.0</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka_2.11</artifactId>
          <version>0.9.0.0</version>
        </dependency>
    

    ProducerDemo

    package rain;
    import java.util.Properties;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    public class ProducerDemo {
        public static void main(String[] args){
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "hadoop:9092");
            properties.put("acks", "all");
            properties.put("retries", 0);
            properties.put("batch.size", 16384);
            properties.put("linger.ms", 1);
            properties.put("buffer.memory", 33554432);
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            Producer<String, String> producer = null;
    
            try {
                producer = new KafkaProducer<String, String>(properties);
                for (int i = 0; i < 100; i++) {
                    String msg = "------Message " + i;
                    producer.send(new ProducerRecord<String, String>("hello_topic", msg));
                    System.out.println("Sent:" + msg);
                }
            } catch (Exception e) {
                e.printStackTrace();
    
            } finally {
                producer.close();
            }
    
    
        }
    }
    
    

    Consumer

    import java.util.Arrays;
    import java.util.Properties;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    public class ConsumerDemo {
        public static void main(String[] args){
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "hmaster:9092");
            properties.put("group.id", "group-1");
            properties.put("enable.auto.commit", "true");
            properties.put("auto.commit.interval.ms", "1000");
            properties.put("auto.offset.reset", "earliest");
            properties.put("session.timeout.ms", "30000");
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
            kafkaConsumer.subscribe(Arrays.asList("test"));
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, value = %s", record.offset(), record.value());
                    System.out.println();
                }
            }
    
        }
    }
    
    
    展开全文
  • Java实现Kafka生产者消费者的示例

    万次阅读 多人点赞 2021-01-05 10:06:08
    Java实现Kafka生产者消费者的示例
  • Kafka生产者消费者模型

    千次阅读 2018-08-23 00:02:25
     对于kafka,将生产者发送的消息,动态的添加到磁盘,一个Broker等同于一个kafka应用实例,用于存放消息队列 3、主题:分区:消息  一个分区(Patition)等同于一个消息队列,存放n条消息;一个主题(Topic)...
  • java kafka 生产者消费者代码实例

    千次阅读 2020-07-26 17:35:28
    生产者 import com.google.common.collect.Lists; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.Test; import java....
  • 最近搭建 kafka 集群环境以便于收集应用程序日志并进行个性化的处理,因此学习了 kafka 生产者消费者 python 程序的实现。这篇文章当是个 kafka 的学习笔记。 一、搭建 kafka 集群 为方便测试,我们在 MacOS 单机...
  • kafka生产者消费者java代码示例

    千次阅读 2017-07-12 11:21:19
    kafka生产者 package kafkatest; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import kafka.ser
  • kafka生产者消费者(java示例)

    千次阅读 2018-12-02 16:43:56
    本文主要讲解kafka生产者消费者的API使用,以及部分配置说明 目录 一 引入依赖 二 生产者 2.1 代码 2.2 生产者配置说明 2.3结果-生产者 三 消费者 3.1 代码 3.2 消费者配置说明 3.3 结果-消费者 ...
  • 一、生产者 1.pom文件添加 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.1...
  • 一.确认配置文件: 打开config/server.properties 文件,修改broker.id,listeners,port,log.dirs vi config/server.properties broker.id=0 listeners=PLAINTEXT://192.168.105.110...log.dirs=kafka-logs zo...
  • kafka API
  • Kafka 生产者消费者 Java API 编程

    千次阅读 2018-01-29 18:33:46
    我们先创建一个topic,然后启动生产者消费者,进行消息通信,然后在使用Kafka API编程的方式实现,笔者使用的ZK和Kafka都是单节点,你也可以使用集群方式。 启动Zookeeper zkServer.sh start 启动Kafka kafka-...
  • 4. kafka生产者&消费者

    2018-06-28 18:19:47
    无论是生产者还是消费者,引入的依赖都是kafka-clients,maven坐标如下: &lt;dependency&gt; &lt;groupId&gt;org.apache.kafka&lt;/groupId&gt; &lt;artifactId&gt;kafka-...
  • kafka大数据 生产者消费者实例,java代码实现,完整版
  • NULL 博文链接:https://chengjianxiaoxue.iteye.com/blog/2190488
  • 使用Java写kafka生产者消费者

    千次阅读 2016-07-10 09:11:06
    步骤1:下载代码 下载最近的一个稳定版本。 > tar xzf kafka-.tgz > cd kafka- > ./sbt update > ./sbt package 步骤2:启动服务器 Kafka brokers and consumers use this for co-ordination. bin/...
  • 一、准备的环境之前配置好的单节点的zookeeper和单节点kafka,安装是在腾讯云上的。前两篇博客有zookeeper的安装和kafka的安装。zk和kafka同时跑在一台机器上,因为我没有太多的服务器,而且只是简单的java demo,没...
  • 话不多说,直接上代码,在运行之前请...(一)生产者代码: Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); pr...
  • kafka模拟生产者消费者,集群模式,若是单机版,将ip端口组改为相应ip端口即可;
  • 以创建生产者代码思路为例: 1.创建和Kafka能够连接的对象 producer  如果要创建producer对象需要进行一下操作:  1.1要通过properties对象来对producer对象的参数进行初始化  1.2由于properties对象是java提供...
  • kafka examples 可以用来作为编写Kafka producer和consumer的模板。 1.如果是mvn项目pom依赖: org.apache.kafka kafka-clients 0.8.2.1 org.apache.kafka kafka_2.10 0.8.2.1 2.如果是sbt项目build.sbt添加 ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 25,733
精华内容 10,293
关键字:

kafka生产者消费者代码