精华内容
下载资源
问答
  • org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.0</version> </dependency> 二、新建topic // 新建一个名为kafkaDemo的topic。分区数为3...

    一、新建maven项目

    添加依赖

    <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>2.0.0</version>
    </dependency>
    

    二、新建topic

    // 新建一个名为kafkaDemo的topic。分区数为3,副本数为1
    kafka-topics.sh --create --zookeeper 192.168.233.133:2181 --topic kafkaDemo --partitions 1 --replication-factor 1
    

    三、编写代码

    生产者

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    public class MyProducer {
        public static void main(String[] args) {
            Properties prop = new Properties();
            // kafka集群的一个地址和端口
            prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.233.133:9092");
            // 设置key的序列化器为StringSerializer
            prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            // 设置value的序列化器为StringSerializer
            prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
            // 设置数据可靠性的级别 1 0 -1
            prop.put(ProducerConfig.ACKS_CONFIG,"-1");
    
            // 创建生产者对象
            KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);
    
            for (int i = 230; i<240;i++){
                // 构建消息
                ProducerRecord<String, String> producerRecord = new ProducerRecord<>("kb09two", "hi","hello world" + i);
                // 发送消息
                producer.send(producerRecord);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("消息发送完成!!!!");
        }
    }
    

    消费者

    单线程模拟一个消费者

    public class MyConsumer {
        public static void main(String[] args) {
            final Properties prop = new Properties();
            prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.233.133:9092");
            // 指定key的反序列化器
            prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            // 指定value的反序列化器
            prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
            // 判定连接超时的时间间隔
            prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");
            // 提交方式,false为手动提交
            prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
            // 自动提交offset到zookeeper的时间间隔
            prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
            prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
            prop.put(ConsumerConfig.GROUP_ID_CONFIG,"G1");
            
    		// 创建消费者对象
            KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(prop);
            consumer.subscribe(Collections.singleton("kb09two"));
            // 一个消费者组G1里只有一个消费者
            while(true){
                ConsumerRecords<String,String> poll = consumer.poll(100);
                for (ConsumerRecord<String, String> record : poll) {
                    System.out.println(record.offset()+"\t"+record.key()+"\t"+record.value());
                }
            }
        }
    }
    

    多线程模拟多个消费者

    public class MyConsumer {
        public static void main(String[] args) {
            final Properties prop = new Properties();
            prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.233.133:9092");
            // 指定key的反序列化器
            prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            // 指定value的反序列化器
            prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
            // 判定连接超时的时间间隔
            prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");
            // 提交方式,false为手动提交
            prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
            // 自动提交offset到zookeeper的时间间隔
            prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
            prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
    
    //        模拟多个消费者在同一个分组里
            prop.put(ConsumerConfig.GROUP_ID_CONFIG,"G2");
    
            // 多线程,为每个分区创建一个消费者,但需要在注意的是每条消息只能被一个消费者消费
            for (int i=0;i<4;i++){
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(prop);
                        consumer.subscribe(Collections.singleton("kb09two"));
                        while(true){
                            ConsumerRecords<String,String> poll = consumer.poll(100);
                            for (ConsumerRecord<String, String> record : poll) {
                                System.out.println(Thread.currentThread().getName()+"\t"+record.offset()+"\t"+record.key()+"\t"+record.value());
                }
            }
                    }
                }).start();
            }
        }
    }
    
    展开全文
  • 一,定义常量public class Constant{ public static final String TOPIC = ... //kafka创建的topic public static final String CONTENT = "This is a single message"; //要发送的内容 public static final String

    一,定义常量

    public class Constant{
        public static final String TOPIC = "test"; //kafka创建的topic
        public static final String CONTENT = "This is a single message"; //要发送的内容
        public static final String BROKER_LIST = "xx.xx.xx.xx:xxxx"; //broker的地址和端口
        public static final String SERIALIZER_CLASS = "kafka.serializer.StringEncoder"; // 序列化类
    }

    二,定义kafka的Producer

    import com.constant.Constant;
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    import java.util.Properties;
    
    public class KafkaProducer {
        public void kafkaProducerSentOneJsonMessage(String content){
            Properties props = new Properties();
            props.put("serializer.class", Constant.SERIALIZER_CLASS);
            props.put("metadata.broker.list", Constant.BROKER_LIST);
    
            ProducerConfig config = new ProducerConfig(props);
            Producer<String, String> producer = new Producer<String, String>(config);
    
            //Send one message.
            KeyedMessage<String, String> message = new KeyedMessage<String, String>(Constant.TOPIC, content);
            producer.send(message);
    
    //        //Send multiple messages.
    //        List<KeyedMessage<String,String>> messages =
    //                new ArrayList<KeyedMessage<String, String>>();
    //        for (int i = 0; i < 5; i++) {
    //            messages.add(new KeyedMessage<String, String>
    //                    (TOPIC, "Multiple message at a time. " + i));
    //        }
    //        producer.send(messages);
        }
    }

    三,编写单元测试调用producer

    import com.kafka.KafkaProducer;
    import org.testng.annotations.Test;
    
    
    public class KafkaProducerTest {
    
        @Test
        public void post1() throws Exception {
            KafkaProducer kafkaProducer = new KafkaProducer();
            String content = "this is a test message for kafkaProducerTesting";
            content = content.replace("\n","").replace(" ","");
            System.out.println("content = " + content);
            kafkaProducer.kafkaProducerSentOneJsonMessage(content);
        }
    }
    展开全文
  • 以创建生产者代码思路为例: 1.创建和Kafka能够连接的对象 producer  如果要创建producer对象需要进行一下操作:  1.1要通过properties对象来对producer对象的参数进行初始化  1.2由于properties对象是java提供...

    以创建生产者代码思路为例:

    1.创建和Kafka能够连接的对象 producer

              如果要创建producer对象需要进行一下操作:

             1.1要通过properties对象来对producer对象的参数进行初始化

             1.2由于properties对象是java提供 而producer对象是由Kafka提供的,不能识别properties,所以需要producerConfig类来进行类的转换成producer能够识别的资源对象,转换成的producerConfig对象通过producer对象的有参构造来实例producer,实现producer对象的初始化完成

    2.使用Keyedmaessager对象对要发送的数据进行加载

    3.通过producer.send()对Keyedmaessager的数据进行发送。

    4.将资源关闭。

    解释:properties.put("auto.offset.reset","largest");

    数据:1 2 3 4 5 6

    当执行到1234的时候,如果此时消费者挂掉,zk中的消费偏移量已经记录到第四个位置

    当消费者再次启动之后 会询问我们是否从新读取所有的数据,largest就是从4开始,还有一个

    smallest就是从新开始。

    展开全文
  • 一、准备工作 框架:springboot 电脑查看网关是否开启工具:telnet...--添加kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</...

    一、准备工作

     <!--添加kafka -->
     <dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka</artifactId>
     </dependency>

    二、生产者

     public static void main(String[] args) {
            //创建生产者配置信息
            Properties properties = new Properties();
            //设置key
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            //设置重试次数
            properties.put(ProducerConfig.RETRIES_CONFIG,10);
            //设置value
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
            //设置服务地址
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"http://106.52.174.56:9092");
    
            //设置kafka
            KafkaProducer<String, String> producers = new KafkaProducer<String, String>(properties);
            //设置消息
            ProducerRecord<String,String> producerRecords = new ProducerRecord<>("kafkalearn","learn-info","hello word!");
            //发送消息
            producers.send(producerRecords);
            //关闭
            producers.close();
    
        }

    三、消费者

     public static void main(String[] args) {
    
            //创建消费者配置信息
            Properties properties = new Properties();
            //设置key
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            //设置消费组
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group.demo");
            //设置value
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
            //设置服务地址
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"http://106.52.174.56:9092");
    
            //设置kafka
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
            //订阅消息
            consumer.subscribe(Collections.singletonList("kafkalearn"));
            //监听消息,间隔2秒轮询一次
            while (true){
                ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(2000));
                poll.forEach(item->{
                    System.out.println("结果呢:----"+item.key()+"--"+item.value());
                });
    
            }

    五、出现的问题

    1. centeros上本地连接不上,报错,
    展开全文
  • kafka集群配置和java编写生产者消费者操作例子 kafka 安装 修改配置文件 java操作kafka kafka kafka的操作相对来说简单很多 安装 下载kafka http://kafka.apache.org/downloads tar -...
  • 一、前期准备工作: ...二、编写代码: 1、创建Maven工程并导入相关依赖: <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId>
  • 一、添加依赖 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.1</version> </depen...
  • Kafka是一个类似于RabbitMQ的消息系统,它的主要功能是消息的发布和订阅、处理和存储。...本篇博文主要介绍如何使用Java编写程序将数据写入到Kafka中,即Kafka生产者,并不涉及Kafka消费者。另外,像Spark,Storm等...
  • 基于JAVA API方式使用Kafka——编写生产者客户端 记录我的学习之旅,每份文档倾心倾力,带我成我大牛,回头观望满脸笑意,望大家多多给予意见,有问题或错误,请联系 我将及时改正;借鉴文章标明出处,谢谢
  • 编写生产者客户端 【有问题或错误,请私信我将及时改正;借鉴文章标明出处,谢谢】 1.导入依赖: <repositories> <repository> <id>nexus-aliyun</id> <name>nexus-aliyun</...
  • Kafka 的历史变迁 中, 一共有两个大版本的生产者客户端: 第-个是于 Kafka开源之初使用 Scala语言编写的客户 端,我们可以称之为旧生产者客户端(OldProducer)或 Scala版生产者客户端;第二个是从 Kafka 0.9.x 版本...
  • Kafka生产者

    2019-09-24 23:56:00
    生产者就是负责向 Kafka发送消息的应用程序。在 Kafka的历史变迁中,一共有两个大版本的生产...第二个是从 Kafka 0.9.x版本开始推出的使用Java语言编写的客户端,我们可以称之为新生产者客户端(New Producer)或Java版...
  • kafka生产者客户端

    2019-06-25 17:27:00
    生产者客户端开发​ 熟悉kafka的朋友都应该知道kafka客户端有新旧版本,老版本采用scala编写,新版本采用java编写。随着kafka版本的升级,旧版本客户端已经快被完全替代了。因此,我们以新客户端为例进行介绍。​ ...
  • 2、kafka生产者

    2019-05-07 21:53:52
    本文主要用来整理kafka生产者相关的一些知识点,主要参考自《深入理解kafka核心设计与实践原理》--朱忠华 kafka先后有两个大版本的生产者客户端:第一个是kafka开源之初使用Scala语言编写的客户端,我们称Scala...
  • Kafka 生产者和消费者 demo (java&scala)

    千次阅读 2017-06-30 22:39:10
    前几天完成了kafka ubuntu单机的搭建,后来就尝试写写kafka的简单代码,网上也有很多例子,但是如果自己编写运行还是有一些坑在里面,我也简单记录以下自己遇到的问题。
  • kafka生产者和消费者

    2018-12-26 18:46:00
    这个时候可以借助kafka-console-consumer.sh和kafka-console-producer.sh 这两个工具,它们包装了java客户端,让用户不需要编写整个应用程序就可以与kafka主题发生交互。 生产者 kafka-console-consumer.sh工具...
  •  Kafka诞生至今,产生两个版本的生产者客户端:1是早期基于scala语言编写的客户端;2是随着Java用户的广泛涌入,kafka0.9版本开始退出Java版本的客户端;  一个基本生产者producer逻辑需要具备以下基本条件: ...
  • Kafka是一个类似于RabbitMQ的消息系统,它的主要功能是消息的发布和订阅、处理和存储。...本篇博文主要介绍如何使用Java编写程序将数据写入到Kafka中,即Kafka生产者,并不涉及Kafka消费者。另外,像S...
  • 早期kafka 0.8.22,0.9.X 版本kafka 生产者消费者服务端都是用scala编写。而且放在cores包下,新版本的客户端使用java实现,放在clients包下。  新的生产者客户端是一个进程,使用KafkaProducer对象实例化。这个...
  • 1消息发送1、异步发送导入依赖org.apache.kafkakafka-clients0.11.0.0编写代码需要用到的类:KafkaProducer:需要创建一个生产者对象,用来发送数据ProducerConfig:获取所需的一系列配置参数ProducerRecord:每条...

空空如也

空空如也

1 2 3 4 5 ... 7
收藏数 123
精华内容 49
热门标签
关键字:

java编写kafka生产者

java 订阅