精华内容
下载资源
问答
  • org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.0</version> </dependency> 二、新建topic // 新建一个名为kafkaDemo的topic。分区数为3...

    一、新建maven项目

    添加依赖

    <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>2.0.0</version>
    </dependency>
    

    二、新建topic

    // 新建一个名为kafkaDemo的topic。分区数为3,副本数为1
    kafka-topics.sh --create --zookeeper 192.168.233.133:2181 --topic kafkaDemo --partitions 1 --replication-factor 1
    

    三、编写代码

    生产者

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    public class MyProducer {
        public static void main(String[] args) {
            Properties prop = new Properties();
            // kafka集群的一个地址和端口
            prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.233.133:9092");
            // 设置key的序列化器为StringSerializer
            prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            // 设置value的序列化器为StringSerializer
            prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
            // 设置数据可靠性的级别 1 0 -1
            prop.put(ProducerConfig.ACKS_CONFIG,"-1");
    
            // 创建生产者对象
            KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);
    
            for (int i = 230; i<240;i++){
                // 构建消息
                ProducerRecord<String, String> producerRecord = new ProducerRecord<>("kb09two", "hi","hello world" + i);
                // 发送消息
                producer.send(producerRecord);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("消息发送完成!!!!");
        }
    }
    

    消费者

    单线程模拟一个消费者

    public class MyConsumer {
        public static void main(String[] args) {
            final Properties prop = new Properties();
            prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.233.133:9092");
            // 指定key的反序列化器
            prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            // 指定value的反序列化器
            prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
            // 判定连接超时的时间间隔
            prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");
            // 提交方式,false为手动提交
            prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
            // 自动提交offset到zookeeper的时间间隔
            prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
            prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
            prop.put(ConsumerConfig.GROUP_ID_CONFIG,"G1");
            
    		// 创建消费者对象
            KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(prop);
            consumer.subscribe(Collections.singleton("kb09two"));
            // 一个消费者组G1里只有一个消费者
            while(true){
                ConsumerRecords<String,String> poll = consumer.poll(100);
                for (ConsumerRecord<String, String> record : poll) {
                    System.out.println(record.offset()+"\t"+record.key()+"\t"+record.value());
                }
            }
        }
    }
    

    多线程模拟多个消费者

    public class MyConsumer {
        public static void main(String[] args) {
            final Properties prop = new Properties();
            prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.233.133:9092");
            // 指定key的反序列化器
            prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            // 指定value的反序列化器
            prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
            // 判定连接超时的时间间隔
            prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");
            // 提交方式,false为手动提交
            prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
            // 自动提交offset到zookeeper的时间间隔
            prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
            prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
    
    //        模拟多个消费者在同一个分组里
            prop.put(ConsumerConfig.GROUP_ID_CONFIG,"G2");
    
            // 多线程,为每个分区创建一个消费者,但需要在注意的是每条消息只能被一个消费者消费
            for (int i=0;i<4;i++){
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(prop);
                        consumer.subscribe(Collections.singleton("kb09two"));
                        while(true){
                            ConsumerRecords<String,String> poll = consumer.poll(100);
                            for (ConsumerRecord<String, String> record : poll) {
                                System.out.println(Thread.currentThread().getName()+"\t"+record.offset()+"\t"+record.key()+"\t"+record.value());
                }
            }
                    }
                }).start();
            }
        }
    }
    
    展开全文
  • //使用java编写kafka生产者打成jar包,将windows下的文件写到kafka的主题中。 //在windows下运行jar包所需的4个参数 public class MyExecute { public static void main(String[] args) throws Exception { if ...

    1.新建maven工程——mykafka

    2.配置pom.xml文件。

     <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <kafka.version>2.0.0</kafka.version>
      </properties>
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.11</version>
          <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka_2.11</artifactId>
          <version>${kafka.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>${kafka.version}</version>
        </dependency>
    
      </dependencies>
    
      <build>
        <finalName>mykafka</finalName>
        <plugins>
          <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
              <source>1.8</source>
              <target>1.8</target>
            </configuration>
          </plugin>
          <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
              <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
              </descriptorRefs>
              <archive>
                <manifest>
                  <mainClass>org.alisa.common.MyExecute</mainClass>
                </manifest>
              </archive>
            </configuration>
            <executions>
              <execution>
                <id>make-assembly</id>
                <phase>package</phase>
                <goals>
                  <goal>single</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
        </plugins>
    
      </build>

    3.创建一个接口类——Dbinfo.java

    package cn.alisa.commons;
    
    import java.util.Map;
    
    //配置类的接口
    public interface Dbinfo {
        public String getIp();
        public int getPort();
        public String dbName();
        public Map<String,String> getOther();
    }
    

    4.创建配置项类——KafkaConfiguration.java,实现上述接口。

    package org.alisa.common;
    
    import java.util.Map;
    
    public class KafkaConfiguration implements Dbinfo {
        private String ip;
        private int port;
        private String dbname;
    
        public String getIp() {
            return ip;
        }
    
        public void setIp(String ip) {
            this.ip = ip;
        }
    
        public void setPort(int port) {
            this.port = port;
        }
    
        public String getDbname() {
            return dbname;
        }
    
        public void setDbname(String dbname) {
            this.dbname = dbname;
        }
    
        @Override
        public String getIP() {
            return this.ip;
        }
    
        @Override
        public int getPort() {
            return this.port;
        }
    
        @Override
        public String dbName() {
            return this.dbname;
        }
    
        @Override
        public Map<String, String> getOther() {
            return null;
        }
    }
    

    5.创建KafkaConnector.java,用来生产者发送消息。

    package org.alisa.common;
    
    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.DescribeTopicsResult;
    import org.apache.kafka.clients.admin.KafkaAdminClient;
    import org.apache.kafka.clients.admin.TopicDescription;
    
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.KafkaFuture;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.io.FileNotFoundException;
    import java.io.FileReader;
    import java.io.IOException;
    import java.io.LineNumberReader;
    import java.util.*;
    import java.util.concurrent.ExecutionException;
    
    public class KafkaConnector {
        private Dbinfo dbinfo;
        private int totalRow=0;
        private List<Integer> rowSize=new ArrayList<>();
        Properties prop = new Properties();
    
        //构造器
        public KafkaConnector(Dbinfo info){
            this.dbinfo=info;
            prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,dbinfo.getIP()+":"+dbinfo.getPort());
            prop.put(ProducerConfig.ACKS_CONFIG, "all");
            //生产者发送失败后的重试次数,默认0
            prop.put(ProducerConfig.RETRIES_CONFIG, "0");
            prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getTypeName());
            prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getTypeName());
        }
    
        //发送
        public void sendkafka(String path) throws FileNotFoundException {
            //获取总行数和每行最后的字节数
            getFileInfo(path);
            //获取分区数量
            int partitionSize=getTopicPartitionNumber();
            //根据分区数量分割文件 找到每个线程的开始position和行数
            Map<Long,Integer> threadInfo=calcPosAndRow(partitionSize);
            int name=0;
            for (Long key:threadInfo.keySet()) {
                new UserKafkaProducer(name+"",key,threadInfo.get(key),prop,dbinfo.dbName(),path).start();
                name++;
            }
        }
    
        /**
         * 计算partitionnum个线程 每个线程的开始字节位置和行数
         * 计算每一段的位置和每一段的总行数
         * @param partitionnum 线程数
         * @return
         */
        private Map<Long,Integer> calcPosAndRow(int partitionnum){
            Map<Long,Integer> result = new HashMap<>();
            //计算平均每个线程所用的行数
            int rows=totalRow/partitionnum;
            //循环计算每个开始的节点
            for (int i = 0; i < partitionnum; i++) {
                if (i==(partitionnum-1)){
                //计算第N次的行数,最后一个
                result.put(getPos(rows*i+1),rows+totalRow%partitionnum);
            }else {
                result.put(getPos(rows*i+1),rows);
                }
            }
            return result;
        }
    
        /**
         * 根据用户的行号计算对应行的开始位置
         * 计算下一次位置
         * @param lineNumber
         * @return
         */
        private Long getPos(int lineNumber){
            return (long)rowSize.get(lineNumber-1)+(lineNumber-1);
        }
    
        /**
         * 获取文件的信息
         * 文件的行数
         * 文件每行的结尾字节数
         * 文件路径
         * @param path
         */
        private void getFileInfo(String path) throws FileNotFoundException {
            LineNumberReader reader = new LineNumberReader(new FileReader(path));
            try {
                String str=null;
                //total:一行有多长
                int total=0;
                //reader.readLine()一次读取一行
                while((str=reader.readLine())!=null){
                    total+=str.getBytes().length+1;
                    rowSize.add(total);
                }
    
                //总行数
                totalRow=reader.getLineNumber();
                rowSize.add(0,0);
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                try {
                    reader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 获取kafka消息主题的分区数量
         * @return
         */
        private int getTopicPartitionNumber(){
            //KafkaAdminClient kafka管理客户端
            AdminClient client = KafkaAdminClient.create(prop);
            //describeTopics:查询topic列表,Arrays.asList:将数组或一些元素转为集合
            DescribeTopicsResult result = client.describeTopics(Arrays.asList(dbinfo.dbName()));
            //主题的信息
            KafkaFuture<Map<String, TopicDescription>> kf = result.all();
            //分区数
            int count=0;
            try {
                count=kf.get().get(dbinfo.dbName()).partitions().size();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            return count;
        }
    }
    

    6.创建简单分区器类——SimplePartition.java,用来生产者将数据发送到指定分区内。

    package org.alisa.common;
    
    
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    import org.apache.kafka.common.PartitionInfo;
    
    import java.util.List;
    import java.util.Map;
    
    //简单的分区器
    public class SimplePartition implements Partitioner {
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //key线程名            
        return Integer.parseInt(key.toString());
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map<String, ?> map) {
    
        }
    }
    

    7.创建线程UserKafkaProducer.java

    package org.alisa.common;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.io.File;
    import java.io.IOException;
    import java.io.RandomAccessFile;
    import java.util.Properties;
    
    public class UserKafkaProducer extends Thread{
        private long beginPos;
        private int row;
        private Properties prop;
        private String topic;
        private String path;
    
        //构造器
        public UserKafkaProducer(String threadName,long begin,int row,Properties prop,String topic,String path){
            this.beginPos=begin;
            this.row=row;
            this.prop=prop;
            this.topic=topic;
            this.setName(threadName);
            this.path=path;
        }
        @Override
        public void run() {
            //向指定分区发送消息,用的是SimplePartition类
            prop.put("partitioner.class","org.alisa.common.SimplePartition");
            prop.put("batch.size",524288);
            prop.put("linger.ms",10);
            KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
            try {
                RandomAccessFile raf = new RandomAccessFile(new File(path), "r");
                //seek偏移量
                raf.seek(beginPos);
                for (int line = 0; line < row; line++) {
                    //更换编码格式,因为文件中有中文,出来会是乱码
                    String ln = new String(raf.readLine().getBytes("iso-8859-1"), "utf-8");
    //                System.out.println(Thread.currentThread().getName()+":"+ln);
                    ProducerRecord<String, String> record =
                            new ProducerRecord<>(topic,Thread.currentThread().getName(), ln);
                    //发送
                    producer.send(record);
                }
                producer.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    8.创建测试类——MyExecute.java

    package org.alisa.common;
    
    //使用java编写kafka生产者打成jar包,将windows下的文件写到kafka的主题中。
    //在windows下运行jar包所需的4个参数
    public class MyExecute {
        public static void main(String[] args) throws Exception {
            if (args.length==4) {
                KafkaConfiguration configuration = new KafkaConfiguration();
                configuration.setIp(args[0]);
                configuration.setPort(Integer.parseInt(args[1]));
                configuration.setDbname(args[2]);
                new KafkaConnector(configuration).sendkafka(args[3]);
            }else{
                throw new Exception("the arguments need 4 but "+args.length);
            }
        }
    }
    

    9.生成jar包。先双击clean,再双击package。

    在项目左边就会形成jar包。将jar包放到本地某个路径下。

    10.在windows下执行jar包命令。不过在此之前需要现在linux下创建一个主题。

    命令为:

    kafka-topics.sh --zookeeper 192.168.21.130:2181 --create --topic mydemo --replication-factor 1 --partitions 3
    

    可以打开consumer消费端命令控制台查看:

    kafka-console-consumer.sh --bootstrap-server 192.168.21.130:9092 --from-beginning --topic mydemo

    然后查看主题的偏移量,查看数据是否写到主题中:

    kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list single:9092 --topic mydemo --time -1

    现在开始执行jar包命令:

    java -jar mykafka-jar-with-dependencies.jar 192.168.21.130 9092 mydemo e:/logs/log_2020-01-01.log

    然后可以在consumer消费端查看到进度:

    查看主题偏移量:

     

    展开全文
  • Java实现Kafka生产者和消费者的示例

    万次阅读 多人点赞 2021-01-05 10:06:08
    Java实现Kafka生产者和消费者的示例

    文章持续更新,微信搜索「万猫学社」第一时间阅读。
    关注后回复「电子书」,免费获取12本Java必读技术书籍。

    Kafka简介

    Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。

    方式一:kafka-clients

    引入依赖

    在pom.xml文件中,引入kafka-clients依赖:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.3.1</version>
    </dependency>
    

    生产者

    创建一个KafkaProducer的生产者实例:

    @Configuration
    public class Config {
    
        public final static String bootstrapServers = "127.0.0.1:9092";
    
        @Bean(destroyMethod = "close")
        public KafkaProducer<String, String> kafkaProducer() {
            Properties props = new Properties();
            //设置Kafka服务器地址
            props.put("bootstrap.servers", bootstrapServers);
            //设置数据key的序列化处理类
            props.put("key.serializer", StringSerializer.class.getName());
            //设置数据value的序列化处理类
            props.put("value.serializer", StringSerializer.class.getName());
            KafkaProducer<String, String> producer = new KafkaProducer<>(props);
            return producer;
        }
    }
    

    在Controller中进行使用:

    @RestController
    @Slf4j
    public class Controller {
    
        @Autowired
        private KafkaProducer<String, String> kafkaProducer;
    
        @RequestMapping("/kafkaClientsSend")
        public String send() {
            String uuid = UUID.randomUUID().toString();
            RecordMetadata recordMetadata = null;
            try {
            	//将消息发送到Kafka服务器的名称为“one-more-topic”的Topic中
                recordMetadata = kafkaProducer.send(new ProducerRecord<>("one-more-topic", uuid)).get();
                log.info("recordMetadata: {}", recordMetadata);
                log.info("uuid: {}", uuid);
            } catch (Exception e) {
                log.error("send fail, uuid: {}", uuid, e);
            }
            return uuid;
        }
    }
    

    消费者

    创建一个KafkaConsumer的消费者实例:

    @Configuration
    public class Config {
    
        public final static String groupId = "kafka-clients-group";
        public final static String bootstrapServers = "127.0.0.1:9092";
    
        @Bean(destroyMethod = "close")
        public KafkaConsumer<String, String> kafkaConsumer() {
            Properties props = new Properties();
            //设置Kafka服务器地址
            props.put("bootstrap.servers", bootstrapServers);
            //设置消费组
            props.put("group.id", groupId);
            //设置数据key的反序列化处理类
            props.put("key.deserializer", StringDeserializer.class.getName());
            //设置数据value的反序列化处理类
            props.put("value.deserializer", StringDeserializer.class.getName());
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
            //订阅名称为“one-more-topic”的Topic的消息
            kafkaConsumer.subscribe(Arrays.asList("one-more-topic"));
            return kafkaConsumer;
        }
    }
    

    在Controller中进行使用:

    @RestController
    @Slf4j
    public class Controller {
    
        @Autowired
        private KafkaConsumer<String, String> kafkaConsumer;
    
        @RequestMapping("/receive")
        public List<String> receive() {
        	从Kafka服务器中的名称为“one-more-topic”的Topic中消费消息
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
            List<String> messages = new ArrayList<>(records.count());
            for (ConsumerRecord<String, String> record : records.records("one-more-topic")) {
                String message = record.value();
                log.info("message: {}", message);
                messages.add(message);
            }
            return messages;
        }
    }
    

    方式二:spring-kafka

    使用kafka-clients需要我们自己创建生产者或者消费者的bean,如果我们的项目基于SpringBoot构建,那么使用spring-kafka就方便多了。

    引入依赖

    在pom.xml文件中,引入spring-kafka依赖:

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.3.12.RELEASE</version>
    </dependency>
    

    生产者

    在application.yml文件中增加配置:

    spring:
      kafka:
      	#Kafka服务器地址
        bootstrap-servers: 127.0.0.1:9092
        producer:
          #设置数据value的序列化处理类
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
    

    在Controller中注入KafkaTemplate就可以直接使用了,代码如下:

    @RestController
    @Slf4j
    public class Controller {
    
        @Autowired
        private KafkaTemplate<String, String> template;
    
        @RequestMapping("/springKafkaSend")
        public String send() {
            String uuid = UUID.randomUUID().toString();
            //将消息发送到Kafka服务器的名称为“one-more-topic”的Topic中
            this.template.send("one-more-topic", uuid);
            log.info("uuid: {}", uuid);
            return uuid;
        }
    }
    

    消费者

    在application.yml文件中增加配置:

    spring:
      kafka:
        #Kafka服务器地址
        bootstrap-servers: 127.0.0.1:9092
        consumer:
          #设置数据value的反序列化处理类
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    

    创建一个可以被Spring框架扫描到的类,并且在方法上加上@KafkaListener注解,就可以消费消息了,代码如下:

    @Component
    @Slf4j
    public class Receiver {
    
        @KafkaListener(topics = "one-more-topic", groupId = "spring-kafka-group")
        public void listen(ConsumerRecord<?, ?> record) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            if (kafkaMessage.isPresent()) {
                String message = (String) kafkaMessage.get();
                log.info("message: {}", message);
            }
        }
    }
    

    文章持续更新,微信搜索「万猫学社」第一时间阅读。
    关注后回复「电子书」,免费获取12本Java必读技术书籍。

    展开全文
  • 一,定义常量public class Constant{ public static final String TOPIC = ... //kafka创建的topic public static final String CONTENT = "This is a single message"; //要发送的内容 public static final String

    一,定义常量

    public class Constant{
        public static final String TOPIC = "test"; //kafka创建的topic
        public static final String CONTENT = "This is a single message"; //要发送的内容
        public static final String BROKER_LIST = "xx.xx.xx.xx:xxxx"; //broker的地址和端口
        public static final String SERIALIZER_CLASS = "kafka.serializer.StringEncoder"; // 序列化类
    }

    二,定义kafka的Producer

    import com.constant.Constant;
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    import java.util.Properties;
    
    public class KafkaProducer {
        public void kafkaProducerSentOneJsonMessage(String content){
            Properties props = new Properties();
            props.put("serializer.class", Constant.SERIALIZER_CLASS);
            props.put("metadata.broker.list", Constant.BROKER_LIST);
    
            ProducerConfig config = new ProducerConfig(props);
            Producer<String, String> producer = new Producer<String, String>(config);
    
            //Send one message.
            KeyedMessage<String, String> message = new KeyedMessage<String, String>(Constant.TOPIC, content);
            producer.send(message);
    
    //        //Send multiple messages.
    //        List<KeyedMessage<String,String>> messages =
    //                new ArrayList<KeyedMessage<String, String>>();
    //        for (int i = 0; i < 5; i++) {
    //            messages.add(new KeyedMessage<String, String>
    //                    (TOPIC, "Multiple message at a time. " + i));
    //        }
    //        producer.send(messages);
        }
    }

    三,编写单元测试调用producer

    import com.kafka.KafkaProducer;
    import org.testng.annotations.Test;
    
    
    public class KafkaProducerTest {
    
        @Test
        public void post1() throws Exception {
            KafkaProducer kafkaProducer = new KafkaProducer();
            String content = "this is a test message for kafkaProducerTesting";
            content = content.replace("\n","").replace(" ","");
            System.out.println("content = " + content);
            kafkaProducer.kafkaProducerSentOneJsonMessage(content);
        }
    }
    展开全文
  • 卡夫卡制片人Hello World 用Java编写Kafka生产者的基本示例。 更改BROKER_ADDR和TOPIC以匹配所需的Kafka配置。 默认值旨在通过将其作为VM运行来连接到Hortonworks沙箱。
  • 转自:http://chengjianxiaoxue.iteye.com/blog/2190488 1 kafka集群搭建 1.zookeeper集群 搭建在110, 111,112 2.kafka使用3个节点110, 111,112 修改配置文件config/server.properties b...
  • 一、前期准备工作: ...二、编写代码: 1、创建Maven工程并导入相关依赖: <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId>
  • Java实现Kafka生产消费

    2021-04-14 11:27:05
    Java实现Kafka生产消费 Kafka核心API Kafka有四个核心Api producer API发布消息到1个或多个topic consumer API来订阅1个或多个topic,并处理产生的消息 streams API充当一个流处理器,从1个或多个topic消费输入流,...
  • 一、准备工作 框架:springboot 电脑查看网关是否开启工具:telnet...--添加kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</...
  • 以创建生产者代码思路为例: 1.创建和Kafka能够连接的对象 producer  如果要创建producer对象需要进行一下操作:  1.1要通过properties对象来对producer对象的参数进行初始化  1.2由于properties对象是java提供...
  • Kafka的结构与RabbitMQ类似,消息生产者Kafka服务器发送消息,Kafka接收消息后,再投递给消费者。 生产者的消费会被发送到Topic中,Topic中保存着各类数据,每一条数据都使用键、值进行保存。 每一个Topic中都包含...
  • javakafka生产者与消费者代码

    千次阅读 2017-08-15 18:37:22
    kafka、producer、consumer
  • 一、写入kafka Linking Denpency 导入poml依赖: <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --> <dependency> <groupId>org.apache.kafka</groupId> <...
  • 上一篇博客介绍了如何安装Kafka,该篇将介绍如何在Java中创建生产者,并向Kafka写入数据。 环境: Kafka 集群 + Eclipse + Kafka-2.1.1 1. 创建项目并配置依赖 注:博主目前还不会Maven 配置,因此所有依赖都是导入...
  • zk和kafka同时跑在一台机器上,因为我没有太多的服务器,而且只是简单的java demo,没必要上来就搞集群。这里碰到的一个问题Exception in thread "main" kafka.common.FailedToSendMessag...
  • kafka集群配置和java编写生产者消费者操作例子 kafka 安装 修改配置文件 java操作kafka kafka kafka的操作相对来说简单很多 安装 下载kafka http://kafka.apache.org/downloads tar -...
  • 使用 Vert.x 编写Kafka 消费者和可选的 Kafka 生产者 使用 vertx 开发项目真的很有趣! 尤其是当您意识到 vertx 的有效性和全部 - 或至少 - 某些优势时! 这个项目为你提供了一个 kafka 消费者的框架,包括通过 ...
  • 可以用来作为编写Kafka producer和consumer的模板。 1.如果是mvn项目pom依赖: org.apache.kafka kafka-clients 0.8.2.1 org.apache.kafka kafka_2.10 0.8.2.1 2.如果是sbt项目build.sbt添加 ...
  • Kafka生产者

    2019-09-24 23:56:00
    生产者就是负责向 Kafka发送消息的应用程序。在 Kafka的历史变迁中,一共有两个大版本的生产...第二个是从 Kafka 0.9.x版本开始推出的使用Java语言编写的客户端,我们可以称之为新生产者客户端(New Producer)或Java版...
  • Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了 两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。 main 线程将消息发送给 RecordAccumulator...
  • 使用idea工具,使用mven编译工具,kafka生产者和消费者的java版本的实现。
  • 概念: Storm上游数据源之Kakfa 1、 kafka是什么?...6、 Kafka生产者Java API 7、 Kafka消费者Java API 1、Kafka是什么 在流式计算中,Kafka一般用来缓存数据,Storm通过消费Kafka的数据进行计算。KAF
  • 一、添加依赖 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.1</version> </depen...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 21,294
精华内容 8,517
关键字:

java编写kafka生产者

java 订阅