精华内容
下载资源
问答
  • kafka topic

    2019-03-11 14:30:34
    Kafka创建topic: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test kafka生产者 ./kafka-console-producer.sh --broker-list 192.168.69.64:...

    Kafka创建topic:

     

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test    

    kafka生产者

    ./kafka-console-producer.sh --broker-list 192.168.69.64:9092 --topic queue_command_to_device_sungrow

     

    kafka消费者

    ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic queue_command_to_device_sungrow

     

    查看topic详细信息

    ./kafka-topics.sh --zookeeper localhost:2181 --topic sungrow_queue_protocol_r2p3 --describe

     

    查看主题

    bin/kafka-topics.sh --list --zookeeper 192.168.69.64:2181

    删除主题

    [root@kafka2 bin]# ./kafka-topics.sh --delete --zookeeper localhost:2181  --topic sungrow_queue_ssmerge_r2p1
    Topic sungrow_queue_ssmerge_r2p1 is marked for deletion.
    Note: This will have no impact if delete.topic.enable is not set to true.

     

    展开全文
  • kafka topic 删除

    2019-12-18 15:45:02
    kafka topic 删除方法一:通过kafka 命令来删除方法二:使用zookeeper来删除topic 要删除的topic 名字为 test。 方法一:通过kafka 命令来删除 1.1 通过集群管理工具如ambari设置delete.topic.enable=true delete....

    要删除的topic 名字为 test。

    方法一:通过kafka 命令来删除

    1.1 通过集群管理工具如ambari设置delete.topic.enable=true

    delete.topic.enable
    

    1.2 通过kafka命令 删除topic

    ${KAFKA_HOME}/bin/kafka-topics.sh --delete --zookeeper hostname:2181 --topic  test
    
    

    执行完后,topic会被标记为删除状态,随后被删除。但是如果topic正在被使用,则topic不会被删除,这种情况下需要使用zookeeper来删除,见步骤二。

    方法二:使用zookeeper来删除topic

    2.1
    通过集群管理工具如ambari 暂时关闭kafka。
    2.2
    通过zookeeper 来删除topic信息。

    使用zkCli.sh 连接zookeeper

    ${zookeeper_home}/bin/zkCli.sh 
    

    删除topic

    # 查看topic信息
    get /brokers/topics/test
    # 删除topic信息
    rmr /brokers/topics/test
    rmr /admin/delete_topics/test
    

    删除topic数据。
    删除 kafka-logs 目录下要删除的test topic 的历史数据。

    rm -rf ./test*
    

    2.3 重启 kafka

    展开全文
  • kafka Java api 获取 kafka topic 或者 partition 占用的磁盘大小 package com.dtwave.kafka.storage; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin....

    在这里插入图片描述

    1.概述

    kafka Java api 获取 kafka topic 或者 partition 占用的磁盘大小

    package com.dtwave.kafka.storage;
    
    
    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.AdminClientConfig;
    import org.apache.kafka.clients.admin.DescribeLogDirsResult;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.requests.DescribeLogDirsResponse;
    
    import java.util.Collections;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    /**
     * Description:
     *
     * 获取topic占用的内存
     *
     * @author lcc
     * @version 1.0
     * @date 2019-07-07 17:38
     **/
    public class TopicDiskSizeSummary {
    
        private static AdminClient admin;
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            String brokers = "192.168.30.226:19092";
            initialize(brokers);
            try {
                long topic1InBroker1 = getTopicDiskSizeForSomeBroker("com.dbapp.topic.rawevent", 0);
                System.out.println(topic1InBroker1);
            } finally {
                shutdown();
            }
        }
    
        public static long getTopicDiskSizeForSomeBroker(String topic, int brokerID)
                throws ExecutionException, InterruptedException {
            long sum = 0;
            DescribeLogDirsResult ret = admin.describeLogDirs(Collections.singletonList(brokerID));
            Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> tmp = ret.all().get();
            for (Map.Entry<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> entry : tmp.entrySet()) {
                Map<String, DescribeLogDirsResponse.LogDirInfo> tmp1 = entry.getValue();
                for (Map.Entry<String, DescribeLogDirsResponse.LogDirInfo> entry1 : tmp1.entrySet()) {
                    DescribeLogDirsResponse.LogDirInfo info = entry1.getValue();
                    Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicaInfoMap = info.replicaInfos;
                    for (Map.Entry<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicas : replicaInfoMap.entrySet()) {
                        if (topic.equals(replicas.getKey().topic())) {
                            sum += replicas.getValue().size;
                        }
                    }
                }
            }
            return sum;
        }
    
        private static void initialize(String bootstrapServers) {
            Properties props = new Properties();
            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            admin = AdminClient.create(props);
        }
    
        private static void shutdown() {
            if (admin != null) {
                admin.close();
            }
        }
    }
    

    注意事项:

    1. kafka 版本必须是1.0.0 版本以及版本以上的。
    2. brokerId必须是你知道的 参考:【Kafka】BrokerNotAvailableException: Error choosing node for describeLogDirs: no node found.
    展开全文
  • kafka topic acl授权

    千次阅读 2017-10-16 11:27:58
    在前一篇,kafka启用认证(http://blog.csdn.net/zhoudetiankong/article/details/78229416)的基础上,来说明kafka topic的acl权限。1.修改server.propertiesauthorizer.class.name = kafka.security.auth....

    在前一篇,kafka启用认证(http://blog.csdn.net/zhoudetiankong/article/details/78229416)的基础上,来说明kafka topic的acl权限。

    1.修改server.properties

    authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer
    
    #设置超级用户
    super.users=User:admin
    

    allow.everyone.if.no.acl.found参数,默认是false。如果一个资源R没有任何acl设置,那么默认是除了超级用户之外,其他用户都不可见。设置为true可以改变这个情况。

    2.授权方法

    Principal P is [Allowed/Denied] Operation O From Host H On Resource R

    需要说明的是,Principal是根据之前的kafka认证中的主体,比如我上篇使用的是SASL/PLAIN,则acl授权体系中 Principal=PLAIN中的用户名。我刚开始打算单独使用kafka acl而不启用kafka authentication,结果是Principal并不是我想象中的linux用户名,而是基于kafka authentication体系中的用户名,这样才能生效。另外,可以通过重写principal.builder.class参数的类,来实现单独使用acl。

    以上一篇中的例子来说明,plain中配置了两个用户,admin和alice。
    现在授权给aclie用户topic:test的生产者和消费者的权限,没有限定ip。

    授权用户alice在任何ip上有topic:test的生产者权限

    ./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:alice   --producer  --topic test

    授权用户alice在任何ip上有topic:test的消费者权限,并且group是任意名字

    ./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:alice   --consumer  --group=*  --topic test

    查看kafka集群所有的acl授权信息

    ./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list

    具体授权使用方法可以查看官网:
    http://kafka.apache.org/0102/documentation.html#security_authz_examples

    另外需要注意的是,由于acl信息存储在zookeeper中,所以在没有启用zookeeper权限体系的情况下,任何用户理论上都可以执行/kafka-acls.sh。

    展开全文
  • Kafka topic常见命令解析本文着重介绍几个常用的topic命令行命令,包括listTopic,createTopic,deleteTopic和describeTopic等。由于alterTopic并不是很常用,本文中就不涉及了。另外本文的代码分析是基于kafka_2.10...
  • kafka topic迁移脚本

    2018-11-11 17:49:23
    kafka脚本实现自动化无损热迁移topic。不用停kafka进程,不影响写入。在迁移过程中业务无感知。
  • Scrapy Cluster中Kafka Topic详解demo.incomingdemo.crawled_firehosedemo.outbound_firehose__consumer_offsets 之前我们已经介绍过了Scrapy Cluster中有三大组件,Kafka,Redis和Scrapy spider。Kafka是一种高吞吐...
  • 恢复删除Topic的过程
  • 基于java实现通过zookeeper创建kafka topic 之前有个需求,通过接口创建kafka topic,网上也找了好多资料,最开始通过python实现了一版,思路是通过ssh连接zookeeper服务器,然后执行创建topic的命令,不是理想的...
  • kafka topic的基本操作

    2017-07-28 11:30:51
    转载自 http://www.cnblogs.com/xiaodf/p/6093261.html ... 创建kafka topic bin/kafka-topics.sh --zookeeper node01:2181 --create --topic t_cdr --partit
  • kafka topic制定规则

    2016-07-09 01:35:00
    kafka topic的制定,我们要考虑的问题有很多,比如生产环境中用几备份、partition数目多少合适、用几台机器支撑数据量,这些方面如何去考量?笔者根据实际的维护经验,写一些思考,希望大家指正。 1.replicas数目 ...
  • 在kafak-eagle的配置文件里查找 token https://github.com/smartloli/kafka-eagle/blob/master/kafka-eagle-web/src/main/resources/conf/system-config.properties
  • Kafka Topic Partition Replica Assignment实现原理及资源隔离方案 本文共分为三个部分:   Kafka Topic创建方式Kafka Topic Partitions Assignment实现原理Kafka资源隔离方案   1. Kafka Topic创建...
  • 然后将清洗过的数据按照数据中的某一个字段写入指定的topic中,Flink官方给我们提供的有flink-connetor-kafka接口,但是官方提供的FlinkKafkaProducer构造的时候都需要指定数据要写入的kafka topic,也就是说这个...
  • 增加Kafka Topic的分区复本数(0.8.2.1) 2018-06-26 11:25:26weixin_37648944阅读数 51 说明 Kafka提供了一个工具,用于调整Topic中各个分区的复本数据。工具名称叫kafka-reassign-partitions.sh。 过程 创建一...
  • kafka topic手动删除及其他相关

    万次阅读 2015-11-20 16:56:29
    1.手动删除kafka topic 运行./kafka-topic --zookeeper zk1:2181,zk2:2181 --topic topic_name --delete 如果topic没能成功被删除(比如broker挂掉等) 可以用以下方式手动清理 (当然,也可以重启zk和broker...
  • 前言利用kafka命令删除topic不成功。kafka-topics.sh --zookeeper hadoop111:2181 --delete --topic test原因&解决方式1. 配置文件未将delete.topic.enable=true,导致删除命令未及时生效;解决方式:第一种:...
  • 之前的文章讲述了Kafka的分区重分配,该方法同样可以用来增加或减少Kafka topic的副本数。该方法,可灵活地运用于提高topic的高可用性,提高数据的读写等场景。 目录 从一个副本数,增加到2个副本数。 再增加一个...
  • Kafka Topic partition leader 为-1 问题处理

    千次阅读 2018-11-07 20:57:20
    kafka topic R*** partition leader为-1 问题处理 1、进入zookeeper bin 目录 cd /usr/***/***/zookeeper/bin 2、登陆zookeeperServer ./zkCli.sh -server node6.**.cn:2181 3、查看topic R*** 的分区1状态...
  • kafka topic 删除异常

    千次阅读 2020-05-20 17:16:01
    公司kafka集群是允许删除topic,但是前段时间组里小伙伴有个topic总是删不掉,也不知道他是怎么删的;后来我尝试手动删除zookeeper中的元数据,看看能否删掉这个topic,操作如下 rmr /brokers/topics/nginx_clean_...
  • KAFKA topic生成 消费

    2019-10-14 10:00:09
    启动zookeeper 启动kafka,启动kafka时要启动zookeeper集群上所有的kafka,否则会出现找不到leader的错误 ./kafka-server-start.sh ../config/...1.创建一个topic ./kafka-topics.sh --zookeeper hdp-1:2181...
  • Kafka : 查看kafka topic的消息offset范围

    千次阅读 2020-03-24 15:52:01
    1.美图 ...(base) lcc@lcc kafka_2.11-1.1.0$ sh bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic topic_lcc --time -1 --broker-list localhost:9092 topic_lcc:0:9 (base) lcc@lcc...
  • kafka topic常用命令

    2020-04-18 23:08:52
    [root@node01 bin]# ./kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic first_topic Created topic "first_topic". [root@node01 bin]# 查看topic [roo...
  • kafka topic config 参数

    千次阅读 2019-01-30 14:24:17
    kafka版本:http://mirrors.hust.edu.cn/apache/kafka/2.1.0/kafka_2.12-2.1.0.tgz 1、cleanup.policy:过期或达到日志上限的清理策略(delete-..../kafka-topic --alter --zookeeper 192.168.153.128:2181 --to...
  • kafka topic删除失败

    2020-04-30 11:57:04
    kafka-topics --list --zookeeper node1:2181,node2:...kafka-topics --delete --zookeeper node1:2181,node2:2181,node4:2181 --topic ods_uinfo_topic 发现被标记为marked for deletion 登录cdh的cm的web,发现de...
  • Kafka的特性:高吞吐量、低延迟每个topic可以分多个partition, consumer group 对partition进行consume操作。可扩展性:kafka集群支持热扩展。持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失...
  • Kafka topic增加partitions

    千次阅读 2018-10-23 17:07:45
    执行增加命令:kafka-topics --alter --topic zhuzh009 --zookeeper cdh-002/kafka --partitions 3 注意该命令分区数partitions只能增加,不能减少   通过kafka-topics --describe --zookeeper cdh-002/kafka...
  • kafka topic 命令操作

    千次阅读 2018-03-19 09:56:09
    bin/kafka-topics.sh --list --zookeeper localhost:2181(查看topic)bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topictest --from-beginning(查看topic内数据)bin/kafka-topics.sh --...
  • kafka topicPartitions问题

    千次阅读 2019-06-11 16:33:07
    当我实现三个消费者去同时消费一个Topic(默认没有分区)消息时,三个消费者同时消费到了同样的消息 现象如下: 2019-06-11 15:30:02.516 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO ...
  • Topic主题用来区分不同类型的消息,实际也就是适用于不同的业务场景,默认消息保存一周时间;同一个Topic主题下,默认是一个partition分区,也就是只能有一个消费者来消费,如果想提升消费能力,就需要增加分区;同...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 16,649
精华内容 6,659
热门标签
关键字:

kafkatopic