精华内容
下载资源
问答
  • kafka 创建topic

    2017-09-22 23:15:02
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test09527
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test09527
    
    展开全文
  • 创建 topic javatestbin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic javatest --partitions 3 --replication-factor 1pom.xmlorg.apache.kafkakafka-clients2.3.0特别注意的需要设置:...

    创建 topic javatest

    bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic javatest --partitions 3 --replication-factor 1

    725bd8fb7ccaf62cfd019d120bb7a5b3.png

    pom.xml

    org.apache.kafka

    kafka-clients

    2.3.0

    特别注意的需要设置:advertised.listeners

    0345fe787cc579b46b104c820d63bc5e.png

    然后附上连接的 Producer 和Consumer 的代码

    import java.util.Properties;

    import java.util.Random;

    import org.apache.kafka.clients.producer.KafkaProducer;

    import org.apache.kafka.clients.producer.ProducerConfig;

    import org.apache.kafka.clients.producer.ProducerRecord;

    import org.apache.kafka.common.serialization.StringSerializer;

    public class Producer {

    public static String topic = "javatest";//定义主题

    public static void main(String[] args) throws InterruptedException {

    Properties p = new Properties();

    p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.top1novel.com:9092");//kafka地址,多个地址用逗号分割

    p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    KafkaProducer kafkaProducer = new KafkaProducer(p);

    try {

    while (true) {

    String msg = "Hello," + new Random().nextInt(100);

    ProducerRecord record = new ProducerRecord(topic, msg);

    kafkaProducer.send(record);

    System.out.println("消息发送成功:" + msg);

    Thread.sleep(500);

    }

    } finally {

    kafkaProducer.close();

    }

    }

    }

    import java.util.Collections;

    import java.util.Properties;

    import org.apache.kafka.clients.consumer.ConsumerConfig;

    import org.apache.kafka.clients.consumer.ConsumerRecord;

    import org.apache.kafka.clients.consumer.ConsumerRecords;

    import org.apache.kafka.clients.consumer.KafkaConsumer;

    import org.apache.kafka.common.serialization.StringDeserializer;

    public class Consumer {

    public static void main(String[] args) {

    Properties p = new Properties();

    p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.top1novel.com:9092");

    p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    p.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");

    KafkaConsumer kafkaConsumer = new KafkaConsumer(p);

    kafkaConsumer.subscribe(Collections.singletonList(Producer.topic));// 订阅消息

    while (true) {

    ConsumerRecords records = kafkaConsumer.poll(100);

    for (ConsumerRecord record : records) {

    System.out.println(String.format("topic:%s,offset:%d,消息:%s", //

    record.topic(), record.offset(), record.value()));

    }

    }

    }

    }

    使用kafka-net 也可以连接。

    using System;

    using System.Collections.Generic;

    using System.Linq;

    using System.Text;

    using System.Threading.Tasks;

    using KafkaNet;

    using KafkaNet.Model;

    using KafkaNet.Protocol;

    namespace ConsoleApp1

    {

    class Program

    {

    static void Main(string[] args)

    {

    Console.WriteLine("this ");

    var options = new KafkaOptions(new Uri("http://www.top1novel.com:9092"));

    var router = new BrokerRouter(options);

    var producer = new Producer(router);

    List msgArr = new List();

    msgArr.Add(new Message("你好"));

    producer.SendMessageAsync("javatest", msgArr.ToArray()).Wait(3000);

    Console.ReadKey();

    }

    }

    }

    展开全文
  • kafka创建topic报错

    2020-12-30 16:43:53
    kafka创建topic报错 kafka-topics.sh \ --zookeeper mypc01:2181,mypc02:2181,mypc03:2181/kafka-2020 \ --create \ --topic news203 \ --partitions 1 \ --replication-factor 1 ...

    kafka创建topic报错

    kafka-topics.sh \
    --zookeeper mypc01:2181,mypc02:2181,mypc03:2181/kafka-2020 \
    --create \
    --topic news \
    --partitions 1 \
    --replication-factor 1
    
    InvalidReplicationFactorException: Replication factor: 1 larger than available brokers: 0.
     (kafka.admin.TopicCommand$)
    

    排查原因,
    1 kafka正常启动
    2 发现是zookeeper路径不正确.
    就是创建topic时的zookeeper路径要和kafka配置server.properties中的一样

    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    zookeeper.connect=mypc01:2181,mypc02:2181,mypc03:2181/kafka-2020
    
    展开全文
  • kafka创建topic报错ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException 在kafka创建topic中输入以下命令 bin/kafka-topics.sh --zookeeper node01:2181 --create --replication-factor 3 -...

    kafka创建topic报错ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException

    在kafka创建topic中输入以下命令

     bin/kafka-topics.sh --zookeeper node01:2181 --create --replication-factor 3 --partitions 1 --topic first
    

    报错

    ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 0.
     (kafka.admin.TopicCommand$)
    

    出现上述的原因是原因是kafka的zookeeper.connect设置为:

    zookeeper.connect=node01:2181,node02:2181,node03:2181/kafka
    

    解决办法:zookeeper后面要加上/kafka

     bin/kafka-topics.sh --zookeeper node01:2181/kafka --create --replication-factor 3 --partitions 1 --topic first
    

    以前的没有添加这个/kafka,是因为zookeeper.connect中没有配置

    展开全文
  • 创建 topic javatestbin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic javatest --partitions 3 --replication-factor 1pom.xmlorg.apache.kafkakafka-clients2.3.0特别注意的需要设置:...
  • 创建 topic javatestbin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic javatest --partitions 3 --replication-factor 1pom.xmlorg.apache.kafkakafka-clients2.3.0特别注意的需要设置:...
  • 一个kafka创建topic失败的问题

    万次阅读 2017-11-22 14:44:55
    关于kafka创建topic时出现 ERROR kafka.common.KafkaException: Failed to parse the broker info from zookeeper 错误的处理
  • java spring-boot kafka 创建topic

    千次阅读 2018-01-17 14:26:00
    java spring-boot kafka 创建topic Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_...
  • Kafka创建Topic报错原因有很多种,我主要讲一下我所遇到的问题的解决方法,各位可以尝试一下,希望对各位有用。 第一次使用kafka,搭建环境后,使用kafka创建Topic报错,报错信息如下: Error while executing ...
  • kafka 创建topic,查看topic

    万次阅读 2018-09-28 16:46:21
    创建kafka topic bin/kafka-topics.sh --create --topic topicname --replication-factor 1 --partitions 1 --zookeeper localhost:2181  方法一: 执行linux命令: bin/kafka-topics.sh --create --topic ...
  • 使用kafka创建topic时,当选择的partition数量>1且小于集群总broker数量时有的leader为none <p><img alt="" height="504" src="https://img-ask.csdnimg.cn/upload/1616751445198.png" width="738" /></p>
  • windowsKAFKA创建TOPIC

    2019-12-24 18:36:10
    创建topic kafka-topics.bat --create -zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic "test" 创建生产者 kafka-console-producer.bat --broker-list localhost:9092 --t...
  • kafka创建topic异常

    2019-02-25 09:00:00
     kafak运行在weblogic账户下,jdk1.8,当在root账户下创建topic(当前账户下的jdk1.6)导致创建topic失败 ./bin/kafka-topics.sh --create --zookeeper x.x.x.x:2181 --replication-factor 1 --partitions 8 --...
  • 1.Kafka中的ISR、AR又代表什么? ISR:与leader保持同步的follower集合 AR:分区的所有副本2.Kafka中的HW、LEO等分别代表什么? LEO:没个副本的最后条消息的offset HW:一个分区中所有副本最小的offset3.Kafka中是...
  • 本文是 Kafka 源码解析的第三篇,主要讲述一个 topic创建过程,从 topic 是如何创建topic 真正创建成功的中间详细过程,文章主要内容可以分为以下几个部分:topic 是如何创建的?命令行创建;Producer 发送...
  • 背景书接上回,想在kafka执行命令,其实是为删除一个无用的topic,但是在不知道正确删除姿势的情况下,只能先创建再删除,验证一下,也是去年的经验了,今年用的时候又翻半天,记录一下创建topic用的命令是kafka-topics.sh,...
  • kafka创建topic失败

    千次阅读 2018-03-08 10:24:56
    本人初次接触kakfa,创建topic后生产消息失败,生产消息的命令为: sh kafka-console-producer.sh --broker-list localhost:9092 --topic test 报错日志为:[2018-03-06 21:58:24,978] ERROR Error when ...
  • kafka服务启动$KAFKA_HOME/bin/kafka-server-start.sh-daemonconfig/server.properties创建Topic$KAFKA_HOME/bin/kafka-topics.sh --create --topic test0--zookeeper 127.0.0.1:2181 --config max.message.bytes=.....
  • 1.Kafka中的ISR、AR又代表什么? ISR:与leader保持同步的follower集合 AR:分区的所有副本2.Kafka中的HW、LEO等分别代表什么? LEO:没个副本的最后条消息的offset HW:一个分区中所有副本最小的offset3.Kafka中是...
  • Linux Kafka 创建Topic 命令

    千次阅读 2020-05-08 17:25:18
    ./kafka-topics.sh --create --topic "email-topic"--replication-factor 1 --partitions 1 --zookeeper localhost:2181 这里:email-topictopic名称。 需注意zookeeper的ip和端口 要确保正确。 如果是...
  • 本文公众号来源:柳树的絮叨叨作者:靠发型吃饭...用Kafka官方的话来说就是:Kafka is used for buildingreal-time datapipelines and streaming apps. It ishorizontally scalable,fault-tolerant,wicked fast, and...
  • 使用docker搭建kafka 创建 topic 报错 Error: Exception thrown by the agent : java.rmi.server.ExportException: Port alrea 先查看卡夫卡容器有没有启动 docker ps 进入到kafka容器 docker exec -it kafka1 /...
  • Kafka 创建topic时抛出如下错误信息: Error while executing topic command : replication factor: 3 larger than available brokers: 0 [2019-05-26 13:17:23,160] ERROR org.apache.kafka.common.errors....
  • 原标题:Kafka创建Topic时如何将分区放置到不同的Broker中熟悉 Kafka 的同学肯定知道,每个主题有多个分区,每个分区会存在多个副本,本文今天要讨论的是这些副本是怎么样放置在 Kafka 集群的 Broker 中的。...
  • 最近工作中遇到需要使用kafka的场景,测试消费程序启动后,要莫名的过几十秒乃至几分钟才能成功获取到到topic的partition和offset,而后开始消费数据,于是学习了一下查看kafka broker里topic和consumer group状态的...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 2,855
精华内容 1,142
关键字:

kafka创建topic