精华内容
下载资源
问答
  • 一、 Kafka常用操作命令 查看当前服务器中的所有topic[root@hadoop3 kafka]# bin/kafka-topics.sh --list --zookeeper hadoop11:2181 [root@hadoop3 kafka]#信息写入到  创建topic[root@hadoop3 kafka]# bin/...

    一、 Kafka常用操作命令

     查看当前服务器中的所有topic

    [root@hadoop3 kafka]# bin/kafka-topics.sh --list --zookeeper hadoop11:2181
    [root@hadoop3 kafka]#

    信息写入到
     创建topic

    [root@hadoop3 kafka]# bin/kafka-topics.sh --create --zookeeper hadoop11:2181 --replication-factor 1 -partitions 1 --topic test
    Created topic "test".
    [root@hadoop3 kafka]# bin/kafka-topics.sh --list --zookeeper hadoop11:2181
    test

    通过上面,可以看到已经创建了一个test的topic

     删除topic

    [root@hadoop2 kafka]# bin/kafka-topics.sh --create --zookeeper hadoop11:2181 --replication-factor 1 -partitions 1 --topic test2
    Created topic "test2".
    [root@hadoop2 kafka]# bin/kafka-topics.sh --list --zookeeper hadoop11:2181
    itheima
    test
    test2
    [root@hadoop2 kafka]# bin/kafka-topics.sh --delete --zookeeper hadoop11:2181 --topic test2
    Topic test2 is marked for deletion.
    Note: This will have no impact if delete.topic.enable is not set to true.
    [root@hadoop2 kafka]# bin/kafka-topics.sh --list --zookeeper hadoop11:2181
    itheima
    test
    [root@hadoop2 kafka]#

    需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。
     通过shell命令发送消息
    要注意的是要指定topic,表示要在哪个topic中生产消息,这里的topic需要时上面创建的topic

    [root@hadoop3 kafka]# bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic test
    asdfasdfasd
    asdfasdf
    asdfasdf
    toto test
    tuto test2

    注意命令中指定的–block-list hadoop1:9092,当改成hadoop2:9092时,也可以。

     通过shell消费消息
    要指明的是要使用哪个topic中的数据,这里的topic需要时上面创建的topic:

    [root@hadoop3 kafka]# sh bin/kafka-console-consumer.sh --zookeeper hadoop11:2181 --from-beginning --topic test
    asdfasdfasd
    asdfasdf
    asdfasdf
    toto test
    tuto test2

    注意:这里要指定消费那个topic,这里使用的是test.

     查看消费位置

    [root@hadoop3 kafka]# sh bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper hadoop11:2181 --group testGroup

     查看某个Topic的详情

    [root@hadoop3 kafka]# sh bin/kafka-topics.sh --topic test --describe --zookeeper hadoop11:2181
    Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:
        Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    [root@hadoop3 kafka]#

     对分区数进行修改

    [root@hadoop3 kafka]# bin/kafka-topics.sh --zookeeper hadoop11:2181 -alter --partitions 15 --topic test
    WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
    Adding partitions succeeded!
    [root@hadoop3 kafka]#
    展开全文
  • 1.应用需要在停止的状态下 ...3.执行kafka-topics.sh --zookeeper 127.0.0.1:2181 --delete --topic tpName (kafka服务会自动关闭) 4.删除zk节点上被标注的节点 rmr /brokers/topics/tpName rmr /admin...

    1.应用需要在停止的状态下
    2.确保delete.topic.enable=true,官方文档中标注该属性默认为true
    3.执行

    kafka-topics.sh --zookeeper 127.0.0.1:2181 --delete --topic tpName (kafka服务会自动关闭)
    

    4.删除zk节点上被标注的节点

    	rmr /brokers/topics/tpName  
    	rmr /admin/delete_topics/tpName
    

    5.删除kafka-logs下tpName的分区目录
    6.重启kafka

    展开全文
  • JMS 服务器ActiveMQ Queue和Topic区别

    万次阅读 2015-05-20 20:56:30
    JMS 服务器ActiveMQ Queue和Topic区别    简介:在消息中间件中有两种模式,点对点和订阅,下面程序将区分这两者的关系   1、点对点queue 一个生产者发送的消息只能有一个消费者接受 /** * @author Administrator ...

    JMS 服务器ActiveMQ Queue和Topic区别

     

      简介:在消息中间件中有两种模式,点对点和订阅,下面程序将区分这两者的关系

     

    1、点对点queue 一个生产者发送的消息只能有一个消费者接受

    /**
     * @author Administrator
     * @description Queue实现的是点到点模型,在下面的例子中,启动2个消费者共同监听一个Queue,然后循环给这个Queue中发送多个消息,我们依然采用ActiveMQ
     * 结果表明:可以看出每个消息直被消费了一次,但是如果有多个消费者同时监听一个Queue的话,无法确定一个消息最终会被哪一个消费者消费
     */
    package com.wl.jms;
    
    import javax.jms.Connection;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.MessageProducer;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.command.ActiveMQQueue;
    
    public class QueueTest {
    
    	/**
    	 * @param args
    	 * @throws JMSException 
    	 */
    	public static void main(String[] args) throws JMSException {
    		// TODO Auto-generated method stub
    		ActiveMQConnectionFactory factory=new ActiveMQConnectionFactory("vm://localhost");
    		Connection connection=factory.createConnection();
    		connection.start();
    		//创建一个Queue
    		Queue queue=new ActiveMQQueue("testQueue");
    		//创建一个Session
    		Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		//注册消费者1
    		MessageConsumer consumer1=session.createConsumer(queue);
    		consumer1.setMessageListener(new MessageListener(){
    			public void onMessage(Message m) {
    				try {
    					System.out.println("Consumer1 get: "+((TextMessage)m).getText());
    				} catch (JMSException e) {
    					e.printStackTrace();
    				}
    			}
    			
    		});
    		//注册消费者2
    		MessageConsumer consumer2=session.createConsumer(queue);
    		consumer2.setMessageListener(new MessageListener(){
    			public void onMessage(Message m) {
    				try {
    					System.out.println("Consumer2 get: "+((TextMessage)m).getText());
    				} catch (JMSException e) {
    					e.printStackTrace();
    				}
    			}
    			
    		});
    		//创建一个生产者,然后发送多个消息。
    		MessageProducer producer=session.createProducer(queue);
    		for(int i=0;i<10;i++){
    			producer.send(session.createTextMessage("Message:"+i));
    		}
    	}
    
    }
    

     

    2、订阅 topic 一个生产者发送的消息有多个消费者接受

    /**
     * @author Administrator
     * @description 与Queue不同的是,Topic实现的是发布/订阅模型,在下面的例子中,启动2个消费者共同监听一个Topic,然后循环给这个Topic中发送多个消息
     * 结果表明:说明每一个消息都会被所有的消费者消费
     */
    package com.wl.jms;
    
    import javax.jms.Connection;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.command.ActiveMQTopic;
    
    public class TopicTest {
    
    	/**
    	 * @param args
    	 * @throws JMSException 
    	 */
    	public static void main(String[] args) throws JMSException {
    		// TODO Auto-generated method stub
    		ActiveMQConnectionFactory factory=new ActiveMQConnectionFactory("vm://localhost");
    		Connection connection=factory.createConnection();
    		connection.start();
    		//创建一个Topic
    		Topic topic=new ActiveMQTopic("testTopic");
    		Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		//注册消费者1
    		MessageConsumer consumer1=session.createConsumer(topic);
    		consumer1.setMessageListener(new MessageListener(){
    			public void onMessage(Message m) {
    				try {
    					System.out.println("Consumer1 get: "+((TextMessage)m).getText());
    				} catch (JMSException e) {
    					e.printStackTrace();
    				}
    			}
    			
    		});
    		//注册消费者2
    		MessageConsumer consumer2=session.createConsumer(topic);
    		consumer2.setMessageListener(new MessageListener(){
    			public void onMessage(Message m) {
    				try {
    					System.out.println("Consumer2 get: "+((TextMessage)m).getText());
    				} catch (JMSException e) {
    					e.printStackTrace();
    				}
    			}
    			
    		});
    		//创建一个生产者,然后发送多个消息。
    		MessageProducer producer=session.createProducer(topic);
    		for(int i=0;i<10;i++){
    			producer.send(session.createTextMessage("Message:"+i));
    		}
    	}
    
    }
    


     

     

    展开全文
  • 大概要解决的问题就是,同一个服务同时监听多个topic,且在每个topic中的group都不相同,具体看问题描述吧。 一、问题背景  前几天部署了一套系统,每个服务都搭建了多个节点,而且是没有主从关系的节点。每个服务...

     标题比较长,实在想不出什么好的描述。大概要解决的问题就是,同一个服务同时监听多个topic,且在每个topic中的group都不相同,具体看问题描述吧。

    一、问题背景

     前几天部署了一套系统,每个服务都搭建了多个节点,而且是没有主从关系的节点。每个服务中有很多东西是放到缓存中的,配置多节点之后,相同服务的不同节点出现了缓存不一致的问题。

    二、问题描述

     刚开始想出一种解决方案,监听同一个topic1,每个节点分到一个group中,这样每次生产者生产消息后,kafka会将消息分发到所有group中,消息中带一个消息类型字段(mq_type)。
    各个节点由于处于不同group中都会消费此消息,然后根据mq_type判断是否该处理此消息。
     然而,pass。原因:由于此系统(系统B)中的服务1还与系统A有消费与生产消息的关系,都放到一个topic下数据不规范。而且如果多个服务1同时消费消息,会进行读表改表操作,还得做处理。

     emmm,又想出了一种解决方案,系统B中每个节点还是分到不同的group中,当某个服务1消费到系统A发送的消息,需要刷新缓存时,该节点对所有节点通过系统B内部的消息队列topic2进行广播,各个服务接收到消费消息后根据消息类型进行缓存的更新。
    具体系统图如下:
    系统图
    图片备用链接
    ps:以上区分两个topic是为了规范来自不同的渠道的数据走不同的topic,如果没有这种要求完全没有必要做如下这种操作,可以直接通过group和消息内容去做区分
     如上图,系统A通过topic1向系统B中的服务1发送消息,系统B中服务1和服务2以及他们的其他节点在系统B中通过topic2发送消息。
     可以看出,系统B中的服务1扮演了三个角色:系统A发送消息的消费者,系统B内部消息的生产者和消费者。可以得出如下问题:

    对于服务1,需要将其配置为监听两个topic,分别监听topic1和topic2
    系统A向系统B发送消息时,服务1以及他的其他节点处于topic1的同一个group下,即只有一个服务1节点会去消费系统A发来的消息
    系统B内部之间发送消息时,每个服务和节点都处于topic2的不同group下
    

     说到这里,其实就清楚很多了。其实就是想让服务1-1和他的其他节点在topic1中都处于group-A2B中,服务1-1在topic2中处于group-service1-1中,服务1-2在topic2中处于group-service1-2中。

    三、需求实现

    3.1 代码基础

     kafka的基础代码请参照我的以下两篇博客,本次修改都是基于这些代码的基础上改造的
    Kafka及Spring&Kafka整合
    kafka动态配置topic

    3.2 生产者

     kafka生产者发送消息时,会向该topic下的所有group发送消息,而每个group只会有一个消费者进行消费。所以生产者不用进行更改。

    3.3 消费者

    3.3.1 消费者的配置

     以下消费者以服务1-1为例,其他节点服务同理。
     由于同一个服务要扮演两个消费者,所以我们需要不同的配置文件用来生成不同的消费者

    //首先是获取公共配置方法
        public Map<String, Object> getCommonPropertis(){
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConfig.BOOTSTRAP_SERVERS);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put("auto.offset.reset", "latest");// 一般配置earliest 或者latest 值
            return props;
        }
    
    //然后不同的用来生成不同消费者的工厂
        //topic1的消费者
        public ConsumerFactory<String, String> consumerFactoryA2B() {
            Map<String, Object> properties = getCommonPropertis();
            //所在group
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-A2B");
            return new DefaultKafkaConsumerFactory<String, String>(properties);
        }
        
        
        //系统B内topic2的每个服务的group我这里用服务名+ip+端口命名
        String GROUP_NAME = "service1-1-"+serviceInfoUtil.getIpAddress()+"-"+serviceInfoUtil.getLocalPort();
        
        //topic2的消费者
        public ConsumerFactory<String, String> consumerFactoryB2B(){
                Map<String, Object> properties = getCommonPropertis();
                //所在group
                properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_NAME);
                return new DefaultKafkaConsumerFactory<String, String>(properties);
        }
        
    //再通过不同的配置工厂生成实例bean
        //topic1的消费者bean
        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactoryA2B() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactoryA2B());//通过不同工厂获取实例
            factory.setConcurrency(3);
            factory.getContainerProperties().setPollTimeout(3000);
            return factory;
        }  
        
        //topic2的消费者bean
        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactoryB2B() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactoryB2B());//通过不同工厂获取实例
            factory.setConcurrency(3);
            factory.getContainerProperties().setPollTimeout(3000);
            return factory;
        }  
        
    

    3.3.2 消费者的使用

     以上消费者的配置就算完成了,接下来就可以直接使用了。

         /**
         * 监听B2B所有消息
         * @param record
         */
        @KafkaListener(topics = "#{'${kafka.B2B.listener_topics}'.split(',')}",containerFactory = "kafkaListenerContainerFactoryB2B")
        public void B2Bconsume(ConsumerRecord<?, ?> record){
            recordDeal(record);
        }
    
        /**
         * 监听A2B的所有消息
         * @param record
         */
        @KafkaListener(topics = "#{'${kafka.A2B.listener_topics}'.split(',')}",containerFactory = "kafkaListenerContainerFactoryA2B")
        public void A2Bconsume(ConsumerRecord<?, ?> record) {
            recordDeal(record);
        }
        
    //containerFactory = "kafkaListenerContainerFactoryA2B"  主要就是这个containerFactory参数,用它控制是哪个实例
    

    3.3.3 获取服务启动的ip和端口类

    @Configuration
    public class ServiceInfoUtil {
        public static String getIpAddress() throws UnknownHostException {
            InetAddress address = InetAddress.getLocalHost();
            return address.getHostAddress();
        }
        public static String getLocalPort() throws MalformedObjectNameException {
            MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();
            Set<ObjectName> objectNames = beanServer.queryNames(new ObjectName("*:type=Connector,*"),
                    Query.match(Query.attr("protocol"), Query.value("HTTP/1.1")));
            String port = objectNames.iterator().next().getKeyProperty("port");
            return port;
        }
    }    
    

    3.3.4 最后

     这样修改后启动时,通过配置文件中的kafka.A2B.listener_topics去判断这个消费者该监听哪个topic,通过containerFactory = "kafkaListenerContainerFactoryA2B"判断这个消费者在这个topic中属于哪个group。
    然后发送消息测试,成了。

    四、感谢大佬

    这几个大佬的对于kafka的group的讲解比较好:
    KAFKA 多个消费者同一个GROUPID,只有一个能收到消息的原因
    Kafka消费组(consumer group)
    springboot 集成kafka 实现多个customer不同group

    展开全文
  • Topic 发布一个消息后,就直接去执行后面的程序;而Service 调用一个服务,会一直等待结果。
  • Topic和Partition

    千次阅读 2018-09-14 08:59:11
    从物理上来说,一个Topic是由分散在各个服务节点上的Partition组成的,每个Topic可以有多个Producer向他发送消息,也可以有多个Consumer消费其中的消息。 如图所示,一个Topic是由分散在多台broker上的Pratition...
  • 有没有哪位大神遇到过这种情况,监听ActiveMQ的topic消息时,服务刚启动时是好的,可以收到消息,但几个小时以后就突然收不到了,也不报错,必须重启服务才能再收到消息。我使用的是spring boot集成activemq,但我...
  • AMQ 虚拟topic

    千次阅读 2016-04-07 20:43:22
    为了做到高可用性,topic的consumer服务通常是多台服务。如果用普通的Topic,则多个consumer的服务就会出现重复消费的情况。 解决方案: AMQ引入了虚拟Topic,如果Topic的名字是以"VirtualTopic."开头,则AMQ自动...
  • kafka彻底删除topic

    千次阅读 2017-03-20 15:08:56
    今天发现一个线上kafka(版本为0.8.2.2)多天前已不再消费的topic标记为删除(marked for deletion),而我们每天有定时删除topic的shell脚本,会把无用的topic干掉,但发现这些topic只是被标记为删除,而并没有真正删除
  • kafka删除topic

    千次阅读 2018-09-10 10:53:10
    最近工作中经常遇到要手动或者服务器内存不足 自动关掉spark-streaming读取kafka数据的情况 ,本身只有一个节点,宕掉后会导致kafka当前topic异常,读取与写入都获取不到head,所以要删除topic来解决该问题,实际...
  • kafka中topic基本操作

    千次阅读 2018-07-13 09:36:05
    Topic基本操作 在linux服务器上查找kafka的安装位置 locate kafka-topics.sh kafka配置文件在kafka的安装文件夹下的 ./config/service.properties 中 创建kafka topic bin/kafka-topics.sh –create –...
  • 查看kafka的topic清单以及topic的内容

    千次阅读 2020-11-12 16:54:09
    对于云端服务器上运行着的kafka集群,由于没有开放相关的端口,之前查看都是使用端口转发,ssh练上去之后,使用端口转发,转发到本地查看。 使用kafka时发现,zk连接上...查看kafka中topic清单:kafka-topics.sh --l
  • MQClientException: No route info of this topic, topicTest01 问题原因 maven版本和服务器不一样。按照官方文档执行的案例出错,主要是官方文档推荐的下载包是4.4.0,然后案例引入的maven是4.3.0。所以导致出现...
  • 并且配置添加到服务里面方便管理,并且配置添加到服务里面方便管理 详细配置请访问:https://blog.csdn.net/Joe192/article/details/81215188
  • Kafka彻底删除topic详解

    千次阅读 2018-12-11 17:24:02
    一、前言 ...①停止kafka服务 停止kafka的所有的消费者和生产者,关闭kafka,防止在删除topic之后,程序发现topic不存在而自动创建topic ②修改配置文件 在kafka安装目录的conf目录下,修改server.pr...
  • Kafka配置详解-Topic配置

    千次阅读 2018-12-19 09:54:08
    topic相关的配置,服务器的默认值,也可可选择的覆盖指定的topic。如果没有给出指定topic的配置,则将使用服务器默认值。 可以通过-config选项在topic创建时设置。此示例使用自定义最大消息的大小和刷新率,创建一...
  • 2018-07-23 16:43:22,022 ERROR RocketmqClient - Send message Exception, Message [topic=heaven-mq-test, systemProperties={__KEY=ORDERID_0, __TAG=TEST-TAG}, userProperties=null, body=22] ...
  • Rocketmq 深入研究Topic

    千次阅读 2019-02-01 14:40:02
    1 Topic  首先需要提到的概念是TopicTopic是RocketMQ中的一个重要概念,RocketMQ的各组件都是围绕着Topic建立起对应关系的。  在RocketMQ官方文档和本文中, Topic在不同的语境下被赋予了两种不同的语义:  ...
  • PERSISTENT)是指的mq服务),queue的消费者不在也会给他保留,topic只有持久化订阅者会保留   (1)使用queue,即队列时,每个消息只有一个消费者,所以,持久化很简单,只要保存到数据库即可 。然后,随便一...
  • 如何永久删除Kafka的Topic

    千次阅读 2019-05-27 14:25:06
    使用kafka-topics --delete命令删除topic时并没有真正的删除,而是把topic标记为:“marked for deletion”,导致重新创建相同名称的Topic时报错“already exists”。 2.问题复现 1.登录Kafka集群所在的服务器,...
  • kafka topic 删除异常

    千次阅读 2020-05-20 17:16:01
    公司kafka集群是允许删除topic,但是前段时间组里小伙伴有个topic总是删不掉,也不知道他是怎么删的;后来我尝试手动删除zookeeper中的元数据,看看能否删掉这个topic,操作如下 rmr /brokers/topics/nginx_clean_...
  • RocketMQ Topic相关命令

    千次阅读 2019-05-13 13:54:26
    1、分配MQ bin/mqadmin allocateMQ -n localhost:9876 -t tst-topic -i ipList ipList 以逗号分隔 2、删除topic bin/mqadmin deleteTopic -n localhost:9876 -t zto-example ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 129,117
精华内容 51,646
关键字:

topic服务