精华内容
下载资源
问答
  • Java Topic

    2014-11-21 17:01:24
    每个Java开发者都应该知道的5个JDK工具. http://www.csdn.net/article/2014-11-20/2822750-5-JDK-Tools-Every-Java-Developer-Should-Know?reload=1

    每个Java开发者都应该知道的5个JDK工具.

    http://www.csdn.net/article/2014-11-20/2822750-5-JDK-Tools-Every-Java-Developer-Should-Know?reload=1

    介绍了:

    javap 反汇编程序,可以查看Java编译器生成的字节码.

    Jvisualvm JVM监控和分析工具.

    Jcmd 用来把诊断命令请求发送到Java JVM.

    Jhat Java Heap analysis tool. 诊断和浏览堆文件.

    Oracle Java Mission Control .主要用来统一HotSpot、JRockit VMs

     

    Java JDK Tools.

    http://javapapers.com/java/java-jdk-tools/

    简单介绍了jdk 的工具






    展开全文
  • //创建操作客户端 //创建名称为test1的topic,有5个分区 NewTopic topic = new NewTopic("test1", 5, (short) 1); client.createTopics(Arrays.asList(topic)); client.close();//关闭 } 查看结果,在zookeeper路径/...

    环境

    JDK 1.8
    Zookeeper 3.6.1
    Kafka 2.6.0

    引入依赖

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      
    展开全文
  • Java笔记-使用RabbitMQ的Java接口实现topic(主题模式)

    千次阅读 多人点赞 2019-07-04 09:34:55
    目录 基本概念 代码与实例 ...Topic exchange:将路由和某模式匹配 ...交换机设置为topic模式,生产者生成的消息的路由键值为goods.XXXX 其中XXXX,可能为add、delete、update、modify等 队列一绑定的是goo...

    目录

     

     

    基本概念

    代码与实例


     

    基本概念

    实现的就是官方给出的这个模型:

    Topic exchange:将路由和某模式匹配

    其中

    #:匹配一个或多个

    *:匹配一个

    比如下面要举得这个例子

    交换机设置为topic模式,生产者生成的消息的路由键值为goods.XXXX

    其中XXXX,可能为add、delete、update、modify等

    队列一绑定的是goods.add

    队列二绑定的是goods.#

    这样话,如果生产者生产一个路由键值为goods.add的消息,辣么2个队列都将会收到。

    如果生产者生成一个路由键值为goods.delete的消息,辣么只有1个队列将会收到。

     

    代码与实例

    当生产者和消费者跑起来后,对应的RabbitMQ交换机如下:

    可见有2个队列,一个绑定的路由键为goods.add

    一个绑定的路由键为goods.#

    当生产者发送的键值为goods.add时:

    两个消费者都可以收到:

    当生产者发送的键值为goods.delete时:

    只有消费者二可以收到,消费者一和以前一样

    源码如下:

    Recv1.java

    package topic;
    
    import com.rabbitmq.client.*;
    import util.ConnectionUtils;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Recv1 {
    
        private static final String EXCHANGE_NAME = "test_exchange_topic";
        private static final String QUEUE_NAME = "test_queue_topic_1";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            Connection connection = ConnectionUtils.getConnection();
            final Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add");
    
            channel.basicQos(1);
    
            Consumer consumer = new DefaultConsumer(channel){
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    String msg = new String(body, "utf-8");
                    System.out.println("[1] Recv msg : " + msg);
    
    
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("[1] done");
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
    
            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
    }
    

    Recv2.java

    package topic;
    
    import com.rabbitmq.client.*;
    import util.ConnectionUtils;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Recv2 {
    
        private static final String EXCHANGE_NAME = "test_exchange_topic";
        private static final String QUEUE_NAME = "test_queue_topic_2";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            Connection connection = ConnectionUtils.getConnection();
            final Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");
    
            channel.basicQos(1);
    
            Consumer consumer = new DefaultConsumer(channel){
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    String msg = new String(body, "utf-8");
                    System.out.println("[2] Recv msg : " + msg);
    
    
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("[2] done");
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
    
            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
    }
    

    Send.java

    package topic;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import util.ConnectionUtils;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Send {
    
        private static final String EXCHANGE_NAME = "test_exchange_topic";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    
            String msgString = "goods ... ... ...";
            channel.basicPublish(EXCHANGE_NAME, "goods.delete", null, msgString.getBytes());
            System.out.println("send msg :" + msgString);
            channel.close();
            connection.close();
        }
    }
    

    ConnectionUtils.java

    package util;
    
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ConnectionUtils {
    
        public static Connection getConnection() throws IOException, TimeoutException {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setVirtualHost("/vhost_cff");
            factory.setUsername("cff");
            factory.setPassword("123");
    
            return factory.newConnection();
        }
    }
    

    源码打包下载地址:

    https://github.com/fengfanchen/Java/tree/master/TopicModel

    展开全文
  • 队列需要有一个routingKey与交换机exchange相匹配,此时交换机的type为topic 通过*(只匹配一个单词)或是#(可以匹配多个单词)匹配对应的队列 新建连接RabbitMQ的工具类utils import ...

    主题模式

    队列需要有一个routingKey与交换机exchange相匹配,此时交换机的type为topic

    通过*(只匹配一个单词)或是#(可以匹配多个单词)匹配对应的队列

    新建连接RabbitMQ的工具类utils

    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
     
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
     
    public class utils {
        public  static Connection getConnection() throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setVirtualHost("/vhost_zdy");
            factory.setUsername("username");
            factory.setPassword("password");
     
            return factory.newConnection();
        }
    }
    

    新建生产者类TopicSend 分别发送 orange.add 和 lazy.pink.rabbit

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class TopicSend {
        private final static String EXCHANGE_NAME = "Topic_exchange";
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            Connection connection = utils.getConnection();
            Channel channel = connection.createChannel();
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME,"topic");
            //声明key
            //String routingKey="orange.add";
            String routingKey="lazy.pink.rabbit";
    
            for(int i=0;i<3;i++)//发3条消息
            {
                String msg="msg"+":"+routingKey+i;
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
                Thread.sleep(20);//假设每20ms发送一次消息
                System.out.println("send:"+ msg);
            }
    
            channel.close();
            connection.close();
    
        }
    }

    消费者1接受orange.* 的所有消息

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class TopicReceive_1 {
        private final static String QUEUE_NAME = "Topic_queue_1";
        private final static String EXCHANGE_NAME = "Topic_exchange";
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = utils.getConnection();
            final Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"orange.*");
            channel.basicQos(1);//保证一次只分发一个\n" +
            System.out.println(" [1] Waiting for messages.");
    
            DefaultConsumer defaultConsumer= new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    String msg = new String(body,"UTF-8");
                    System.out.println("[1]receive:"+ msg);
                    try {
                        Thread.sleep(2000);//假设消费者1处理时间为2s
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        System.out.println("[1]done:"+ msg);
                        //手动回执
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }
                }
            };
            boolean autoack=false;
            channel.basicConsume(QUEUE_NAME,autoack,defaultConsumer);
        }
    }

    消费者2分别可以接受 *.*.rabbit 或是lazy.#的所有消息

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class TopicReceive_2 {
        private final static String QUEUE_NAME = "Topic_queue_2";
        private final static String EXCHANGE_NAME = "Topic_exchange";
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = utils.getConnection();
            final Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //绑定多个key
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"lazy.#");
            //channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.*.rabbit");
            channel.basicQos(1);//保证一次只分发一个\n" +
            System.out.println(" [2] Waiting for messages.");
    
            DefaultConsumer defaultConsumer= new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    String msg = new String(body,"UTF-8");
                    System.out.println("[2]receive:"+ msg);
                    try {
                        Thread.sleep(1000);//假设消费者2处理时间为1s
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        System.out.println("[2]done:"+ msg);
                        //手动回执
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }
                }
            };
            boolean autoack=false;
            channel.basicConsume(QUEUE_NAME,autoack,defaultConsumer);
        }
    }

    运行查看效果

    更多详情请参考官方文档

    https://www.rabbitmq.com/tutorials/tutorial-five-java.html

    展开全文
  • Java使用kafka的API来监控kafka的某些topic的数据量增量,offset,定时查总量之后,然后计算差值,然后就可以算单位间隔的每个topic的增量,kafka监控一般都是监控的吞吐量,即数据量的大小,而不在意这个count,...
  • 这种方式使用的是链接zk来创建topic。在所有使用的zk的版本中都是可以用的。 需要用的依赖: <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</...
  • Java获取指定topic每个分区的当前偏移量 首先引入pom.xml <dependencies> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</...
  • topic是在路由的基础上更方便了一点,可以使用通配符来路由,如下图 *.orange.* *表示1个单词的通配符 如果写成add.orange.delete.update就匹配不成功,因为后面有俩单词了 lazy.# #表示无论后面跟几个单词都可以 ...
  • RabbitMQ四种Exchange类型之Topic (Java)

    千次阅读 2016-12-29 10:20:24
    Topic类型的Exchange是要进行路由键匹配的。此时需要通过路由键将队列绑定要一个交换器上。规则如下: 符号“#”匹配一个或多个词,例如:“logs.#”能够匹配到“logs.error”、“logs.info.toc” 符号“*”只能...
  • 使用JAVA获取KAFKA中指定TOPIC的OFFSET

    千次阅读 2018-07-25 20:55:00
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, OffsetRequest.CurrentVersion(), consumer.clientId()); OffsetResponse response = consumer.getOffsetsBefore(request)...
  •  其他创建Topic得方式Java API: https://blog.csdn.net/meng984611383/article/details/80500761 Kafka生产数据:  Producer, String> producer = new KafkaProducer(props);  for(int i = 0; i ; i++) ...
  • JavaAPI创建kafka topic 删除及修改分区

    千次阅读 2021-11-12 09:36:44
    使用JavaAPI写一个Kafka topic创建及修改 目前发现有两种方法: 一种通过注册zookeeper来管理kafka,这是一种很老的方法,这里不做过多赘述 另一种是adminclient 说明:在Kafka0.11.0.0版本之后,多了一个...
  • kafka java客户端获取topic列表

    千次阅读 2020-07-16 17:39:34
    需要加入的依赖jar包 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>...public static void main(String[] args) throws InterruptedException, ExecutionException { ...
  • 我就废话不多说了,直接 上代码吧! import kafka.api.PartitionOffsetRequestInfo; import kafka.common.TopicAndPartition; import kafka.javaapi.OffsetResponse;...import kafka.javaapi.consumer.SimpleConsu
  • 使用Java API进行kafka topic开发

    万次阅读 2018-05-29 19:21:49
    package org.sunny.two; import kafka.admin.AdminClient; import kafka.admin.AdminUtils...import kafka.admin.TopicCommand; import kafka.server.ConfigType; import kafka.utils.ZkUtils; import org.apache....
  • Rocketmq在Java代码之中手动创建Topic

    千次阅读 2019-12-31 19:00:56
    本文仅限RocketMQ 4.5.*版本,其他版本可能有区别,仅供参考 本文仅限单 name server 的情况,nameserver集群的情况下不确定能否...org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand#execute org.ap...
  • java批量创建删除Kafka的topic 一、前言 参考资料:Kafka 0.11客户端集群管理工具AdminClient https://blog.csdn.net/u012501054/article/details/80594374 2.批量创建删除Kafka的topic 2.集群管理工具AdminClient ...
  • java.util.Properties; import org.apache.kafka.common.security.JaasUtils; import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; import kafka.utils.ZkUtils; public class ...
  • import kafka.api.OffsetRequest; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.TopicAndPartition...import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata; import...
  • kafka java 主题及分区 副本操作代码

    千次阅读 2018-09-28 16:18:51
    package com.jingshan.topic...import java.util.Properties; import org.apache.kafka.common.security.JaasUtils; import kafka.admin.AdminUtils; import kafka.admin.BrokerMetadata; import kafka.server.Con...
  • Topic Exchange交换机也叫通配符交换机,我们在发送消息到Topic Exchange的时候不能随意指定route key(应该是由一系列点号连接的字符串,一般会与binding key有关联,route key的长度一般不能超过255个字节)。...
  • 1.怎样用java实现短信发送 Copy to clipboard Posted by:creatyangPosted on:2003-04-18 12:58我现在有一siemense手机及数据线,我想编写一个java程序能控制手机能输入发送的内容及发送短信,希望那高手帮帮忙 2....
  • 1.获取所有topic package com.example.demo; import java.io.IOException; import java.util.List; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import or....
  • Kafka Java API操作topic

    千次阅读 2017-09-28 11:10:05
    Kafka官方提供了两个脚本来管理topic,包括topic的增删改查。其中kafka-topics.sh负责topic的创建与删除;...上一篇文章中提到了如何使用客户端协议(client protocol)来创建topic,本文则使用服务器端的Java API对t
  • MQTT topic匹配规则

    千次阅读 2018-08-01 16:55:49
    主题层级分隔符 “/”:...注:使用通配符时需要保证创建的policy里有相应规则的topic,比如a/+(设为只订阅),如果没有是无法在连接时通过订阅a/+匹配到该policy里其他相关的诸如a/b a/c等主题 主要参考链接 点我
  • 基于java实现通过zookeeper创建kafka topic 之前有个需求,通过接口创建kafka topic,网上也找了好多资料,最开始通过python实现了一版,思路是通过ssh连接zookeeper服务器,然后执行创建topic的命令,不是理想的...
  • 在网上找了很多的topic持久化的Demo做了很多的测试,现把熟肉呈上。
  • 可以一次性创建多个Topic,每个topic需要指定名称、Partition数量和Replicas数量。 Replicas数量不能超过broker数量。本文使用的kafka只有一个broker。 //创建topic:副本数不能超过broker数量 client . ...
  • ActiveMQ Topic 实例

    2014-12-27 21:28:24
    欢迎下载ActiveMQ Topic 实例!
  • 笔者主要是将在自己的应用当中所用到资料整合在一起,并且加上自己的理解从而写下了这篇文章,文章主要包括了Kafka的简单介绍、Kafka的部署、Kafka的java代码应用。如果写的不好的地方还请指正。

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 157,191
精华内容 62,876
关键字:

javatopic

java 订阅