精华内容
下载资源
问答
  • centos启动kafka命令

    2020-10-12 11:40:00
    在bin的上一级目录执行命令: bin/kafka-server-start.sh -daemon config/server.properties 加守护进程启动 方法二: 在bin的上一级目录执行命令: nohup bin/kafka-server-start.sh config/server....

    方法一:

    在bin的上一级目录执行命令:

    bin/kafka-server-start.sh -daemon config/server.properties

    加守护进程启动

    方法二:

    在bin的上一级目录执行命令:

    nohup bin/kafka-server-start.sh config/server.properties &

    通过后台来启动

    展开全文
  • kafka启动命令

    2021-01-04 21:32:38
    kafka生产者端启动命令 kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_streaming_topic kafka消费者端启动命令 kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ...

    kafka启动命令

    kafka-server-start.sh -daemon config/server.properties

    kafka生产者端启动命令

    kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_streaming_topic

    kafka消费者端启动命令

    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka_streaming_topic --from-beginning

    kafka查看topic命令

    kafka-topics.sh -zookeeper localhost:2181 -list

    kafka创建topic命令

    kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic  Newtopic

     

    展开全文
  • kafka命令启动

    2020-11-11 14:07:20
    默认内网访问,要在外网访问的话,需要在修改config/server.properties中的配置 将listeners和...以下命令都是摘抄与官网http://kafka.apache.org/quickstart 先启动zookeeper,默认自带的 bin/zookeeper-s

    默认内网访问,要在外网访问的话,需要在修改config/server.properties中的配置

    将listeners和advertised.listeners的值用主机名进行替换,在外用使用java进行生产者或消费者连接的时候,不填写具体的IP,填写安装kafka的主机名,然后,在hosts目录中,配置该主机名对应的真是IP地址即可;

    以下命令都是摘抄与官网http://kafka.apache.org/quickstart

    先启动zookeeper,默认自带的

    bin/zookeeper-server-start.sh config/zookeeper.properties

    然后启动kafka服务

    bin/kafka-server-start.sh config/server.properties

    列举拥有哪些topics

    bin/kafka-topics.sh --list --bootstrap-server localhost:9092

    在服务器上打开一个生产者,然后把输入的每行数据发送到kafka中的命令

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    #后面光标提示数据数据,然后回车就会发送到kafka中了

    打开一个消费者

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

    当有数据往kafka的test主题发送消息,这边就会进行消费。

    java调用作为生产者和消费者代码:

    项目需要引入的依赖pom.xml

    复制代码

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.theorydance</groupId>
        <artifactId>kafkademo</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>jar</packaging>
    
        <name>kafkademo</name>
        <url>http://maven.apache.org</url>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.1.1</version>
            </dependency>
        </dependencies>
    </project>

    复制代码

    生产者代码ProducerDemo.java

    复制代码

    package com.theorydance.kafkademo;
    
    import java.util.Properties;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    public class ProducerDemo {
        public static void main(String[] args){
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "node125:9092");
            properties.put("acks", "all");
            properties.put("retries", 0);
            properties.put("batch.size", 16384);
            properties.put("linger.ms", 1);
            properties.put("buffer.memory", 33554432);
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            Producer<String, String> producer = null;
            try {
                producer = new KafkaProducer<String, String>(properties);
                for (int i = 0; i < 100; i++) {
                    String msg = "This is Message " + i;
                    producer.send(new ProducerRecord<String, String>("HelloWorld", msg));
                    System.out.println("Sent:" + msg);
                }
            } catch (Exception e) {
                e.printStackTrace();
    
            } finally {
                producer.close();
            }
    
        }
    }

    复制代码

    消费者代码ConsumerDemo.java

    复制代码

    package com.theorydance.kafkademo;
    
    import java.util.HashSet;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.Set;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.PartitionInfo;
    
    public class ConsumerDemo {
        public static void main(String[] args) throws InterruptedException {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "node125:9092");
            properties.put("group.id", "group-1");
            properties.put("enable.auto.commit", "true");
            properties.put("auto.commit.interval.ms", "1000");
            properties    .put("auto.offset.reset", "earliest");
            properties.put("session.timeout.ms", "30000");
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
            
            while(true){
                Map<String, List<PartitionInfo>> maps = kafkaConsumer.listTopics();
                System.out.println("监听topics="+maps.keySet());
                Set<String> sets = new HashSet<>();
                for (String topic : maps.keySet()) {
                    if(topic.startsWith("Hello")){ // 制定规则,监听哪一些的topic
                        sets.add(topic);
                    }
                }
                kafkaConsumer.subscribe(sets);
                long startTime = System.currentTimeMillis();
                while (true) {
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.printf("offset = %d, value = %s, topic = %s", record.offset(), record.value(), record.topic());
                        System.out.println("=====================>");
                    }
                    long endTime = System.currentTimeMillis();
                    if(endTime - startTime > 30000){
                        System.out.println("------------------------------------------------------------------");
                        break;
                    }
                }
            }
            
    
        }
    }

    复制代码

    说明:在实际需求中,我需要收集在不同服务器上的日志(微服务相同模块和不同模块,或其他程序的日志),采用的是flume进行收集,希望能够对收集的日志进行分类(区别是哪个程序产生的),去网上找了一下,在flume进行收集的时候,能不能在日志前面加上应用的标识进行区别,我没有找到,如果有,看到该博客的同行,请不吝赐教。我这边就换了种思路,就像前面我写的消费者示例一样,不同的程序日志,我往不同的topic中进行发送消息,在消费者监听一定规则的topic,然后进行消费,这样就可以区分不同的应用程序的日志了。

    展开全文
  • kafka命令

    2021-02-22 14:03:44
    kafka命令 1. 安装 此处用kafka自带的zookeeper为例 window: # 进入kafka目录 cd kafka\kafka_2.11-0.11.0.0\bin\windows # 修改config中的zookeeper.properties和server.properties,如果不用自带的zookeeper...

    kafka命令

    1. 安装

    此处用kafka自带的zookeeper为例

    window:

    # 进入kafka目录
    cd kafka\kafka_2.11-0.11.0.0\bin\windows
    # 修改config中的zookeeper.properties和server.properties,如果不用自带的zookeeper可以不用修改前者
    # 启动kafka自带的zookeeper
    .\zookeeper-server-start.bat ..\..\config\\zookeeper.properties
    # 启动kafka
    .\kafka-server-start.bat ..\..\config\server.properties
    
    

    修改zookeeper.properties配置文件

    修改server.propertes配置文件

    linux:

    命令和windows类似,执行对应的sh脚本即可,参考命令如下

    # 解压
    tar -zvxf Kafka-xxx.tar.gz -C /usr/local
    cd /usr/local/kafka_2.11-0.11.0.0
    # 启动zookeeper
    nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &
    # 启动kafka
    nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
    # 启动日志内容分别可以从zookeeper.log和kafka.log中查看
    

    2. 命令

    创建topic

    kafka-topics.bat --create -zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic "test"

    topic消费者

    kafka-console-consumer.bat --zookeeper localhost:2181 --topic test

    topic生产者

    kafka-console-producer.bat --broker-list localhost:9092 --topic test

     

     

    展开全文
  • Kafka是一个高性能分布式消息系统,今天就简单介绍下基础入门吧。一、下载安装可以访问... cd kafka_2.12-2.2.0二、启动服务器Kafka使用z...
  • 服务端启动命令# -daemon 表示后台进行 # config/server.properties 指定配置文件 kafka-server-start.sh -daemon config/server.properties创建 topic# replication-factor 2 表示复制两份 # partitions 3 topic 的...
  • Kafka 启动命令

    千次阅读 2018-08-14 18:03:08
    1 .启动zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties bin/zookeeper-server-stop.sh config/zookeeper....2.启动停止kafka bin/kafka-server-start.sh config/server.properties bi...
  • 启动Kafka

    2019-02-14 12:01:00
    启动zookeeper ...启动kafka Brokerr 使用命令查看kafka Broker是否启动更成功 在kafka中创建topic 'test' bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 -...
  • 启动kafka

    2019-11-15 17:03:34
    启动zookeeper 在doc命令中 E:\store\kafka_2.12-2.3.1>.\bin\windows\zookeeper-server-start.bat .\config\...启动kafka 在doc命令中 E:\store\kafka_2.12-2.3.1>.\bin\windows\kafka-server-start.b...
  • Kafka命令

    2019-09-19 02:15:11
    首先进入到kafka的bin目录下: 1、kafka服务启动:./kafka-server-start.sh ../config/server.properties 2、查看有哪些主题: ./kafka-topics.sh --list --zookeeper zk服务IP:2181 ...
  • kafka 命令重新启动When you’re in the same room as a server, shutting it down or rebooting it is simple. You could hold the power button or even unplug the whole machine. Not only are these solutions ...
  • kafka 命令

    2019-02-28 22:05:00
    启动 [root@node1 kafka]# ./bin/kafka-server-start.sh -daemon config/server.properties 创建主题 ./bin/kafka-topics.sh --create --zookeeper 192.168.23.101:2181,192.168.23.102:2181,192.168.23.103:2181...
  • kafka命令总结

    2019-09-27 16:31:22
    kafka知识总结 //切换到安装路径命令cd /...//启动kafka服务,三台主机分别输入此指令:./kafka-server-start.sh $KAFKA_HOME/config/server.properties & //以后台的方式启动nohup ./kafka-server-start.s...
  • 1.启动kafka 首先保证zoopeeper已经启动,然后开始启动kafka -daemon:是否已后台方式启动 第二个参数需要指定server.properties ./bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties ...
  • kafka 命令笔记

    2017-12-06 13:11:00
    以下命令都是在kafka根目录下 ...启动kafka(启动完zookeeper后,应再新开个窗口启动kafka,不然会报超时错误,可能由于我用的linux命令有问题,ctrl+z让它后台运行,没有起作用。回头在研究一下) ...
  • Kafka依赖环境:JDK、Zookeeper,首先确保Java环境JDK已安装,其次Zookeeper必须在kafaka启动之前启动。 报错问题: PS C:\DEVELOPERS\ApacheKafka\kafka_2.11-2.1.0&gt; .\bin\windows\kafka-server-start....
  • 本文介绍利用kafka命令进行topic的创建,和生产者,消费者的示例 首先要保证已经安装了zookeeper集群和kafka集群,如果没有安装,可参考:kafka集群搭建 一.环境介绍 本次教程的环境安装了三台zookeeper和三个kafka...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,302
精华内容 520
关键字:

启动kafka命令