精华内容
下载资源
问答
  • SpringBoot 商品下单放进RobbitMQ 和数据用使用redis缓存,提高吞吐和响应速度。这是一个简单的集成rabbitMQ和redis的例子项目结构合理的创建标题,有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段...

    SpringBoot 商品下单放进RobbitMQ 和数据用使用redis缓存,提高吞吐量和响应速度。

    这是一个简单的集成rabbitMQ和redis的例子

    项目结构

    项目结构

    pom依赖配置

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>2.3.0.RELEASE</version>
    		<relativePath/> <!-- lookup parent from repository -->
    	</parent>
    	<groupId>com.tomato</groupId>
    	<artifactId>super-shopping</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<name>super-shopping</name>
    	<description>super-shopping</description>
    
    	<properties>
    		<java.version>1.8</java.version>
    	</properties>
    
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-amqp</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-data-jpa</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>mysql</groupId>
    			<artifactId>mysql-connector-java</artifactId>
    			<scope>runtime</scope>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-data-redis</artifactId>
    		</dependency>
    		<!--<dependency>-->
    			<!--<groupId>org.springframework.boot</groupId>-->
    			<!--<artifactId>spring-boot-starter-security</artifactId>-->
    		<!--</dependency>-->
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-thymeleaf</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
    
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-devtools</artifactId>
    			<scope>runtime</scope>
    			<optional>true</optional>
    		</dependency>
    		<dependency>
    			<groupId>org.projectlombok</groupId>
    			<artifactId>lombok</artifactId>
    			<optional>true</optional>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    			<exclusions>
    				<exclusion>
    					<groupId>org.junit.vintage</groupId>
    					<artifactId>junit-vintage-engine</artifactId>
    				</exclusion>
    			</exclusions>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.amqp</groupId>
    			<artifactId>spring-rabbit-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.security</groupId>
    			<artifactId>spring-security-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    	</dependencies>
    
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
    
    </project>
    
    

    代码

    启动类 SuperShoppingApplication .java

    @SpringBootApplication
    @EnableCaching
    public class SuperShoppingApplication {
    
    	private static final Logger LOGGER = LoggerFactory.getLogger(SuperShoppingApplication.class);
    
    	public static void main(String[] args) throws InterruptedException {
    		SpringApplication.run(SuperShoppingApplication.class, args);
    	}
    
    }
    

    Rabbit配置类
    RabbitMqConfig

    @Configuration
    public class RabbitMqConfig {
        public static final String topicExchangeName = "shopping-order-exchange";
        public static final String queueName = "shopping-order";
    
        @Bean
        Queue queue(){
            return new Queue(queueName,false);
        }
    
        @Bean
        TopicExchange topicExchange(){
            return new TopicExchange(topicExchangeName);
        }
    
        @Bean
        Binding binding(Queue queue,TopicExchange topicExchange){
            return BindingBuilder.bind(queue).to(topicExchange).with("shopping.#");
        }
    
        @Bean
        MessageListenerAdapter rabbitListenerAdapter(RabbitReceiver rabbitReceiver) {
            return new MessageListenerAdapter(rabbitReceiver, "receiverMessage");
        }
    
        @Bean
        SimpleMessageListenerContainer rabbitContainer(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter){
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            container.setMessageListener(listenerAdapter);
            container.setQueueNames(queueName);
            return container;
        }
    
        @Bean
        RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            return new RabbitTemplate(connectionFactory);
        }
    
    
    }
    

    Rabbit MQ 消息接受处理类

    @Component
    public class RabbitReceiver {
        //private CountDownLatch countDownLatch = new CountDownLatch(1);
        private static final Logger LOGGER = LoggerFactory.getLogger(RabbitReceiver.class);
    
        @Autowired
        OrderService orderService;
    
        public void receiverMessage(ShoppingOrder order){
            LOGGER.info(".......MQ receiverMessage.......");
            orderService.save(order);
        }
    
    //    public CountDownLatch getCountDownLatch() {
    //        return countDownLatch;
    //    }
    }
    

    Redis配置
    RedisConfig

    @Configuration
    public class RedisConfig {
        @Bean
        CacheManager cacheManager(RedisConnectionFactory connectionFactory){
            return RedisCacheManager.RedisCacheManagerBuilder.fromConnectionFactory(connectionFactory).build();
        }
    
        @Bean
        RedisTemplate template(RedisConnectionFactory connectionFactory) {
            RedisTemplate redisTemplate = new RedisTemplate();
            redisTemplate.setConnectionFactory(connectionFactory);
            return redisTemplate;
        }
    }
    

    数据库dao层 JPA实现
    OrderRepository

    @Cacheable(cacheNames = "shopping-order")
    public interface OrderRepository extends CrudRepository<ShoppingOrder,String> {
        @Cacheable(key="#p0",condition = "#p0.indexOf('mygood')!=-1")
        List<ShoppingOrder> findAllByGoods(String goods);
    
        @Cacheable(key="#p0")
        ShoppingOrder findOrderById(String id);
    
        @CachePut(value = "shopping-order",key="#p0.id")//每次都会执行方法,并将结果存入指定的缓存中
        @Override
        <S extends ShoppingOrder> S save(S s);
    
        //cacheNames 放在类上对所有方法都适用,也可以放在方法上只对某个方法作用
        //key 设置主键 #p0是指第一个参数,也可以 #参数名 比如 #id
        //condition 过滤条件 过滤掉不需要缓存的调用
        // @CacheEvict是用来标注在需要清除缓存元素的方法或类上的。当标记在一个类上时表示其中所有的方法的执行都会触发缓存的清除操作。
    }
    

    service层 接口和实现类

    public interface OrderService {
        List<ShoppingOrder> queryAllOrder(String goodsName);
        ShoppingOrder findOrderById(String id);
        ShoppingOrder save(ShoppingOrder order);
    }
    
    @Service
    public class OrderServiceImpl implements OrderService {
        @Autowired
        OrderRepository repository;
    
        @Override
        public List<ShoppingOrder> queryAllOrder(String goodsName) {
            return repository.findAllByGoods(goodsName);
        }
    
        @Override
        public ShoppingOrder findOrderById(String id) {
            return repository.findOrderById(id);
        }
    
        @Override
        public ShoppingOrder save(ShoppingOrder order) {
            return repository.save(order);
        }
    }
    

    控制层controller实现

    @RequestMapping("/order")
    @Controller
    public class OrderController {
        private static final Logger LOGGER = LoggerFactory.getLogger(OrderController.class);
    
        @Autowired
        OrderService orderService;
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @GetMapping("/")
        public String OrderPage(){
            return "order";
        }
    
        @PostMapping
        public @ResponseBody String addOrder(@RequestBody ShoppingOrder shoppingOrder){
            LOGGER.info("order is ",shoppingOrder);
            String uuid = UUID.randomUUID().toString();
            LOGGER.info("request coming.. uuid is "+uuid);
            shoppingOrder.setId(uuid);
            shoppingOrder.setCreateDate(new Date());
            //mq 缓存订单
            rabbitTemplate.convertAndSend(RabbitMqConfig.topicExchangeName, "shopping.order", shoppingOrder);
            //orderService.save(shoppingOrder);
            return "success";
        }
    }
    

    ShoppingOrder 订单实体类
    实现Serializable接口给redis缓存使用

    
    /**
     * 订单实体类
     * create by tomato
     * date 2020年5月30日
     */
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @Entity
    public class ShoppingOrder implements Serializable {
        @Id
        private String id;
        private Date createDate;
        private double price;
        private String goods;
        private int amount;
    }
    

    压力测试

    1.安装Jmeter压力测试工具,具体使用方法百度
    2.创建http请求
    在这里插入图片描述
    这里我只是配置了测试10000次,可以测试多点
    在这里插入图片描述
    3.测试结果
    使用MQ明显能提高吞吐量和响应速度。因为他加入队列是非常快的,后续在做集体业务处理。使用MQ我觉得需要前端提供一个响应接口来反馈业务处理的结果,这里我就没有写了。

    使用MQ测试结果
    在这里插入图片描述
    关闭MQ,测试结果
    在这里插入图片描述
    关闭MQ和redis缓存测试结果
    在这里插入图片描述
    事实上,redis对于下订单接口没有提高吞吐量和响应速度的帮助,使用redis的作用是这些操作的数据都缓存在了redis当再次用到的时候就不需要读取数据库,对于高并发读取数据库是很有帮助的,但需要考虑下缓存和数据库的数据一直性的问题。

    总结

    我只是简单的测试了下,还有很多细节和原理也还没弄明白。继续努力中…

    展开全文
  • redis:数据小于10k的时候,性能特别强悍,一旦大于10k入队速度慢到无法忍受 memcacheQ:并发性能强,兼容memcache完美 MSMQ:最大消息4Mb(这是微软产品唯一被认为有价值的东西),只能发送和接收 zeroMQ:轻级的...

    1. 消息队列理论

    1)消息队列产品介绍:

    redis:数据小于10k的时候,性能特别强悍,一旦大于10k入队速度慢到无法忍受
    memcacheQ:并发性能强,兼容memcache完美
    MSMQ:最大消息4Mb(这是微软产品唯一被认为有价值的东西),只能发送和接收
    zeroMQ:轻量级的,号称最快的消息列队,专门用于高吞吐量/低延迟场景(金融机构)
    kafka:高吞吐量,高达10万/秒,支持快速持久化,高吞吐,普通pc可达到10w/s吞吐率,支持大数据Hadoop
    ActiveMQ:apache开源出来的
    java世界中的中间力量
    跨平台使用
    RabbitMQ:Erlang语言
    最老牌
    最稳定
    支持的协议最多

    2)关于rabbitmq

    AMQP:高级消息列队协议,语言使用的是erlang
    支持python/ruby/.net/java/php/c/xmpp/stomp。
    扩展性好,可用性强
    解决问题:

    1. web高并发
    2. 高并发链接问题
      mysql:大量请求包括insert,update。—行锁、表锁
      提示:error :too many connextions的时候
    3. 程序间的消息发送
      这里要求主机必须开RPC(rpc.statd可以从系统核心中获取系统性能统计的相关信息,将结果返回给调用程序)

    3)原理部分

    在这里插入图片描述
    在这里插入图片描述

    a. 相关概念

    • Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
    • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
    • Queue:消息的载体,每个消息都会被投到一个或多个队列。
    • Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
    • Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
    • vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
    • Producer:消息生产者,就是投递消息的程序.
    • Consumer:消息消费者,就是接受消息的程序.
    • Channel:消息通道,在客户端的每个连接里,可建立多个channel.

    b. 流程思路

    左边的Client向右边的Client发送消息,流程:
    1) 获取Conection (producer与server建立连接)
    2)获取Channel (建立连接后,建立消息通道)
    3)producer定义Exchange,Queue ()
    4)producer在传递消息的时候还会传递一个routing key,它使用RoutingKey将Queue Binding到一个Exchange上
    5)接收方在接收时也是获取connection,接着获取channel,然后指定一个Queue直接到它关心的Queue上取消息,它对Exchange,RoutingKey及如何binding都不关心,到对应的Queue上去取消息就OK了

    c.通信过程

    假设P1和C1注册了相同的Broker,Exchange和Queue。P1发送的消息最终会被C1消费。基本的通信流程大概如下所示:
    1)P1生产消息,发送给服务器端的Exchange
    2)Exchange收到消息,根据ROUTINKEY,将消息转发给匹配的Queue1
    3)Queue1收到消息,将消息发送给订阅者C1
    4)C1收到消息,发送ACK给队列确认收到消息
    5)Queue1收到ACK,删除队列中缓存的此条消息

    d. 注意要点

    1)consumer消费消息后要发送ack给broker
    2)如果consumer接收了消息,并发送了ack,那么队列中就会删除这条消息
    3)RabbitMQ它没有用到超时机制.RabbitMQ仅仅通过Consumer的连接中断来确认该Message并没有正确处理,也就是说RabbitMQ给了Consumer足够长的时间做数据处理。
    4)如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer,这样就保证在Consumer异常退出情况下数据也不会丢失;如果忘记ack,那么当Consumer退出时,Mesage会重新分发,然后RabbitMQ会占用越来越多的内存
    5)rabbitmq2.0.0和之后的版本支持consumer reject某条(类)消息,如果拒绝,那么rabbitmq将会把消息发送给下一个注册的consumer

    6)一个client与一个server之间有一个连接;一个连接上可以建立多个channel,可以理解为逻辑上的连接。一般应用的情况下,有一个channel就够用了,不需要创建更多的channel
    7)exchange的类型:

    • direct exchange :完全根据key进行投递,routing key=exchange key
    • fanout exchange: 不需要key的叫做fanout交换机,采用广播模式
    • topic exchange: 采用正则表达式的方式进行投递
    • headers: 按照头部消息进行投递(量太大,区分不明显)

    8)dead-letter选项说明
    dead-letter=requeue-false 直接抛弃异常数据,而不是重新进入队列。重新进入队列可能会造成大量消息积压在队列中,压力较大。
    9)关于vhost

    一个RabbitMQ的实体上可以有多个vhosts,用户与权限设置就是依附于vhosts。
    在rabbitmq server上可以创建多个虚拟的message broker,又叫做virtual hosts (vhosts)。每一个vhost本质上是一个mini-rabbitmq server,分别管理各自的exchange,和bindings。vhost相当于物理的server,可以为不同app提供边界隔离,使得应用安全的运行在不同的vhost实例上,相互之间不会干扰。producer和consumer连接rabbit server需要指定一个vhost。

    2. 操作部分

    2.1 rabbitmq单节点、打模块

    rabbitMQ的集群:

    1. 普通模式(默认):消息只有一份(节省内存空间)
    2. 镜像模式:消息存在多个节点上,消息会主动在节点之间同步
      缺点:如果消息过多,则会损耗通讯信道;会降低系统性能
    Pattern: queue的匹配模式(正则表达式)
    Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
    	ha-mode:指明镜像队列的模式,有效值为 all/exactly/nodes
    		all:表示在集群中所有的节点上进行镜像
    		exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
    		nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
    	ha-params:ha-mode模式需要用到的参数
    	ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual
    priority:可选参数,policy的优先级
    

    集群节点类型:

    1. 内存节点:只保存状态到内存
    2. 磁盘节点:保存状态到内存和磁盘上
      生产中,内存节点不会写入硬盘,但是它执行比磁盘节点好,集群中,一般有一个磁盘节点来持久化就足够了,其他的使用内存节点。如果集群中只有内存节点,那么不能停止它们,否则所有状态、消息都会丢失。

    集群可以共享 user vhost queue exchange policy

    安装rabbitmq
      681  cd
      682  cp /mnt/hgfs/soft/ra/* .
      683  ls
      684  rpm -qa | grep erlang
      685  yum -y localinstall erlang-solutions-1.0-1.noarch.rpm 
      686  yum -y localinstall erlang-18.1-1.el6.x86_64.rpm 
      687  yum -y localinstall socat-1.7.2.3-1.el6.x86_64.rpm 
      688  yum -y localinstall rabbitmq-server-3.6.6-1.el6.noarch.rpm 
      启动rabbitmq,并加入开机自启
      689  /etc/init.d/rabbitmq-server start
      690  chkconfig rabbitmq-server on
      691  chkconfig rabbitmq-server --list
      
      为rabbitmq添加一个admin用户,密码是redhat,设置用户角色为administrator,Tag可以为 administrator,monitoring, management
     
      692  rabbitmqctl add_user admin redhat
      693  rabbitmqctl set_user_tags admin administrator
    
      开启 rabbitmq web管理插件
      694  rabbitmq-plugins enable rabbitmq_management
    
    
    
    rabbitMQ打模块(yum装mysql的LNMP)
    (1)安装LNMP
     639  mount /dev/cdrom /mount
      640  nginx
      641  killall nginx
      642  yum -y install mysql mysql-server mysql-devel
      734  service mysqld restart
      737  mysqladmin -u root password 123456
      738  mysql -uroot -p
      739  ln -s /var/lib/mysql/mysql.sock /tmp/mysql.sock
      740  ln -s /usr/lib64/mysql/libmysqlclient.so.16.0.0 /usr/lib/libmysqlclient.so
      643  tar -zxvf /mnt/hgfs/soft/php-5.3.28.tar.gz -C /usr/src
      644  cd /usr/src/php-5.3.28/
      645  yum -y install gd libxml2-devel libjpeg-devel libpng-devel
      655  ./configure --prefix=/usr/local/php --with-config-file-path=/usr/local/php --with-zlib --with-gd --enable-mbstring --with-mysql=/usr/share/mysql/ --enable-fpm --with-jpeg-dir=/usr/lib
      658  make
      659  make install
      668  cp sapi/fpm/init.d.php-fpm /etc/init.d/php-fpm
      668  chmod +x /etc/init.d/php-fpm
      660  cp php.ini-development /usr/local/php/php.ini
      661  cd /usr/local/php/etc/
      661  cp php-fpm.conf.default php-fpm.conf
      661  vim php-fpm.conf 
           pid = run/php-fpm.pid 取消注释
    	   user = php
           group = php
    	   pm.max_children = 50
           pm.start_servers = 20
    	   pm.min_spare_servers = 5
    	   pm.max_spare_servers = 35
      662  cd ..
      663  vim php.ini 
           short_open_tag = On
    	   default_charset = "utf-8"	   
      664  useradd -M -u 40 -s /sbin/nologin php
      667  ln -s /usr/local/php/bin/* /usr/local/bin/
      668  ln -s /usr/local/php/sbin/* /usr/local/sbin/
    
      669  vim /usr/local/nginx/conf/nginx.conf
           index index.php index.html index.htm;  location里增加index.php
    	   将下边的PHP的location取消注释,并把fastcgi改为fastcgi.conf
      669  cd /usr/local/nginx/html/
      669  vim index.php
           <?php
           phpinfo();
           ?>
      669  vim index1.php
           <?php
           $link=mysql_connect('localhost','root','123456');
    	   if ($link) echo "connection is ok";
    	   mysql_close();
    	   ?>
      670  /etc/init.d/php-fpm start
      671  nginx
      672  firefox 127.0.0.1&
      672  firefox 127.0.0.1/index1.php&
    
     打模块 amqp:advanced message queuing protocol
      695  tar -zxvf rabbitmq-c-0.7.1.tar.gz 
      696  cd rabbitmq-c-0.7.1
      697  ./configure --prefix=/usr/local/rabbitmq-c
      698  make
      699  make install
      700  cd
      701  gunzip amqp-1.6.1.tgz 
      702  tar -xvf amqp-1.6.1.tar 
      703  cd amqp-1.6.1
      704  yum -y install autoconf
      704  phpize
      705  ./configure --with-php-config=/usr/local/php/bin/php-config --with-librabbitmq-dir=/usr/local/rabbitmq-c/
      706  make
      707  make install
      708  ls /usr/local/php/lib/php/extensions/no-debug-non-zts-20090626/
      709  vim /usr/local/php/php.ini 
      710  /etc/init.d/php-fpm restart
      711  firefox 127.0.0.1&
    

    2.2 rabbit集群

    主机1 2 3如下操作
    [root@localhost ~]# yum -y localinstall erlang-solutions-1.0-1.noarch.rpm
    [root@localhost ~]# yum -y localinstall erlang-18.1-1.el6.x86_64.rpm
    [root@localhost ~]# yum -y localinstall socat-1.7.2.3-1.el6.x86_64.rpm
    [root@localhost ~]# yum -y locallinstall rabbitmq-server-3.6.6-1.el6.noarch.rpm  
    [root@localhost ~]# yum -y localinstall rabbitmq-server-3.6.6-1.el6.noarch.rpm
    [root@localhost ~]# chkconfig rabbitmq-server on
    [root@localhost ~]# service rabbitmq-server start
    [root@localhost ~]# vim /etc/hosts
    添加内容:
    192.168.1.17 rabbitmq1
    192.168.1.19 rabbitmq2
    192.168.1.22 rabbitmq3
    ping其他两台主机的主机名 确保可以ping通
     
     
    主机1
    [root@localhost ~]# cat /var/lib/rabbitmq/.erlang.cookie
    FIIUZZNUWRTXICRSMVDG[root@localhost ~]#
    
    主机2 3
    [root@localhost ~]#echo "FIIUZZNUWRTXICRSMVDG" > /var/lib/rabbitmq/.erlang.cookie
    按按钮,重启 三台主机都重启  重启之后hostname变成rabbitmq*  再往下操作
    三台主机启动rabbitmq-server
     
     
    主机1
    [root@rabbitmq1 ~]# rabbitmqctl stop_app
    Stopping node rabbit@rabbitmq1 ...
    [root@rabbitmq1 ~]# rabbitmqctl reset
    Resetting node rabbit@rabbitmq1 ...
    [root@rabbitmq1 ~]# rabbitmqctl start_app
    Starting node rabbit@rabbitmq1 ...
     
     
    主机2 3 操作相同
    [root@rabbitmq2 ~]# rabbitmqctl stop_app
    Stopping node rabbit@rabbitmq2 ...
    [root@rabbitmq2 ~]# rabbitmqctl reset
    Resetting node rabbit@rabbitmq2 ...
    [root@rabbitmq2 ~]# rabbitmqctl join_cluster --ram rabbit@rabbitmq1
    Clustering node rabbit@rabbitmq2 with rabbit@rabbitmq1 ...
    [root@rabbitmq2 ~]# rabbitmqctl start_app
    Starting node rabbit@rabbitmq2 ...
     
    主机1
    [root@rabbitmq1 ~]# rabbitmqctl cluster_status
    Cluster status of node rabbit@rabbitmq1 ...
    [{nodes,[{disc,[rabbit@rabbitmq1]},{ram,[rabbit@rabbitmq3,rabbit@rabbitmq2]}]},
     {running_nodes,[rabbit@rabbitmq3,rabbit@rabbitmq2,rabbit@rabbitmq1]},
     {cluster_name,<<"rabbit@rabbitmq1">>},
     {partitions,[]},
     {alarms,[{rabbit@rabbitmq3,[nodedown]},
              {rabbit@rabbitmq2,[]},
              {rabbit@rabbitmq1,[]}]}]
    [root@rabbitmq1 ~]# rabbitmqctl add_user admin redhat
    Creating user "admin" ...
    [root@rabbitmq1 ~]# rabbitmqctl set_user_tags admin administrator
    Setting tags for user "admin" to [administrator] ...
     
    主机 1 2 3
    [root@rabbitmq1 ~]# rabbitmq-plugins enable rabbitmq_management
     
    主机1
     为rabbitmq添加一个admin用户,密码是redhat,设置用户角色为administrator,Tag可以为 administrator,monitoring, management
      692  rabbitmqctl add_user admin redhat
      693  rabbitmqctl set_user_tags admin administrator
    [root@rabbitmq1 ~]# firefox 127.0.0.1:15672&   rabbitmq管理端口
    

    先建立一个vhost
    在这里插入图片描述

    设置一下vhost的权限
    在这里插入图片描述
    在这里插入图片描述
    建立policy:
    在这里插入图片描述
    在这里插入图片描述

    创建列队:
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    创建消息:
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    查看列队中有几个消息:
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    展开全文
  • 数据缓存

    2018-04-22 17:36:54
    redis是内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件,消息都存放在内存中,写入读取速度快,但是受内存容量的限制,容易造成内存充爆。 Kakfa kafka是一个分布式的流式处理平台,它的设计...

    redis&rabbitMq&kafka

    1、消息订阅模式比较
    • Redis
      • redis是内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件,消息都存放在内存中,写入读取速度快,但是受内存容量的限制,容易造成内存充爆。
    • Kakfa
      • kafka是一个分布式的流式处理平台,它的设计初衷就是一个日志系统,消息内容是可以持久化一段时间的,消息的订阅消费位置的确定是通过消费者offset来定位的,也可以定位到之前消息再次消费。
    • Rabbitmq
      • rabbitmq的一个特殊之处是,生产者并不是直接将消息发送到队列中,而是通过exchange中继,exchange转发到队列。所以rabbitmq在消息的路由方面比之前两者都好很多。
    2、队列机制比较
    • Redis
      • redis的消息数据就存放在内存中,因为redis数据的原子性,数据只能被消费一次,不能重复。
    • Kafka
      • kafka通过consumer group的方式实现了work queue。·同一个组的消费者能够协调完成任务。另外它是一个分布式的文件系统,他的队列可以理解为topic,我们都知道topic是可以划分为多个partition的,多个partiton是可以分布在不同的node节点上的。这样当一个consumer来消费的时候,就可以到不同的节点上去获取消息,也就达到了负载均衡的作用,避免单点压力。于此同时,就会产生另外的一个问题,当有消费者在进行消费的时候,如何保证这个队列同时不背其他的消费者消费,这就需要zookeeper的分布式锁来解决了。
    • Rabbitmq
      • rabbitmq也是有自己的一个机制来实现队列的负载均衡,通过轮询机制给worker发布消息,以此来实现。当有大量的消费者去消费同一个队列的时候,这个队列的性能就会成为一个瓶颈 
    各自特点
    • Rabbitmq
      • 因为它的一个核心exchanger。它可以将消息路由到不同的队列去,比如我既要打印输出,还要持久化,那我就可以直接路由到两个queue中去。
      • 持久化
    • redis
      • redis的多种类型的数据结构,方便我们对队列的插队,删除,置顶的操作 
      • 是一个高性能的key-value数据库,它的出现很大程度补偿了memcached这类key-value存储的不足。虽然它是一个数据库系统,但本身支持MQ功能,完全可以当做一个轻量级的队列服务器使用。
    • Kafka
      • 以时间复杂度为O(1)的方式提供消息持久化能力
      • 高吞吐率
      • 支持Kafka Server间的消息分区,及分布式消费,同时保证消息顺序传输
      • 支持离线数据处理和实时数据处理
      • 支持在线水平扩展
    可用性,
    • rabbitMQ支持miror的queue,主queue失效,miror queue接管。
    • kafka的broker支持主备模式。 
    集群负载均衡方面,
    • Kafka
      • Kafka:天然的‘Leader-Slave’无状态集群,每台服务器既是Master也是Slave
      • kafka的集群依赖于zookeeper,zookeeper支持热扩展
    • rabbitmq
    • rabbitmq:支持简单集群,'复制'模式,对高级集群模式支持不好。
    展开全文
  • 最近项目上线,一直在压测,由于项目中使用了rabbitmq中间件,所以在压测的时候需要额外关注tps的变化,... 2 rabbitmq的请求处理线程池的缓存大小设置 3 消费端采用的消费模式确认,防止大量数据堆积mq队列中 ...

    最近项目上线,一直在压测,关于mq的并发优化问题

    1. 首先是链接模式的区分,这里的链接模式我们默认选用的是channel,但是在压测时候出现了tps波动很大的情况,所以必须要对此进行优化操作

      channel模式:
      使用springboot链接rabbitmq的默认链接模式,此模式的特点是在链接时一个应用与rabbitmq链接只开辟一个connection,在connection中使用connectionFactory链接池开启多个channel链接,这样多个请求可以服用channel链接,节约了传输资源并且提高了效率,但是,在有限的connectionFactory中的链接池中的channel不够用的使用,生产者的主动请求是会被rabbit直接拒绝的,这块需要注意,在高并发的情况下我们需要考虑是否需要这种模式
      connection模式:故名思意,这种模式的特点在于当应用与rabbitmq进行链接时,是可以开启多个connection通道的,这样每个connection通道中又可以缓存多个channel链路复用,可以大大提高我们整个mq的并发量,但是不好的地方在于我们在使用的时候开启多个connnection需要大量的资源消耗,以及当程序中的并发量达不到时,多出来的channel开启也会导致资源的浪费,所以具体使用的规划,是需要我们依据业务的规划来决定的

      我们可以使用一个单例的Connection对象创建多个Channel来实现数据传输,但是在channel信息比较大的情况下,Connection带宽会限制消息的传输。那么需要设计Connection池,将流量分摊到不同的connection上。

    2. 下面来介绍一下我们使用的connectionFactory是个什么呢
      顾名思义,作为Connection的链接工厂,负责应用程序与MQ之间创建Connection链接,表示一个TCP链接,但是实际数据的传输是建立在Connection中的channel通道,Connection中的多个channel共用一个TCP的连接
      频繁建立TCP连接和channel连接是消耗性能的,于是我们希望可以共享connection或者channel。达到连接的复用。

    3. rabbitmq的请求处理线程池的缓存大小设置

    缓存模式为Channel时:
    
      connectionFactory.setChannelCacheSize(10);
    

    表示设置的单个Connection中的channel的大小为10,要先获取到一个Channel,获取Channel时会先从缓存中找闲置的Channel,如果没有则创建新的Channel,当Channel数量大于缓存数量时,多出来没法放进缓存的会被关闭。

       connectionFactory.setChannelCheckoutTimeout(600);
    

    单位毫秒,当这个值大于0时,ChannelCacheSize代表的是缓存的数量上限,当缓存获取不到可用的channel时,不会创建新的channel会等待指定的时间,若到时间后还获取不到可用的channel,直接抛出AmqpTimeoutException。

    缓存模式为Connectionl时:
    
       connectionFactory.setConnectionCacheSize(10);
    

    仅在CONNECTION模式下使用,指定connection缓存数量

       connectionFactory.setConnectionLimit(600);
    

    仅在CONNECTION模式下使用,指定connection数量上限。。

    1. 此外,除了MQ自身的参数设置之外,外界的物理条件同样也影响这MQ的并发

      如服务器网络带宽,内存,磁盘等等

    展开全文
  • 如何建立rabbitmq链接

    2020-04-02 10:39:02
    PooledConnectionFactory实现ConnectionFactory接口。...这是一个非常耗性能的地方,特别是大数据量的情况下。因此出现了PooledConnectionFactory。这个类只会缓存connection,session和productor,不会缓存...
  • 消息中间件和RabbitMQ

    2020-08-27 22:33:11
    缓冲能力:MQ应该能缓存大量数据,缓解后端系统压力 良好的伸缩性:MQ可以根据业务需求,很方便的增减MQ集群的节点数量 良好的扩展性:如果需要加入新系统或者系统需要增加新功能,不需要修改原来的系统或者修改很小...
  • Redis 消息队列多应用于即时数据分析、秒杀计数器、缓存等轻级,高并发,延迟敏感场景。 1.2 RabbitMQ  RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,用于在分布式系统中存储转发消息,在...
  • (4)Redis缓存中间件的实战与缓存雪崩跟缓存穿透等问题的解决实战 (5)RabbitMQ消息中间件在业务模块异步解耦、通信、消息确认机制以及并发配置等的实战 二、RabbitMQ实战教程课程 (1)RabbitMQ的官网权威技术...
  • SpringBoot+Redis+RabbitMQ 限时秒杀系统 原理 秒杀与其他业务最大的...把静态页面直接缓存到用户的浏览器中,当用户需要获取数据时,就从服务端接口动态获取。这样会大大节省网络的流量,如果再加上CDN优化,一般...
  • 使用RabbitMQ半年有余,每天跑的数据量数亿这个量级吧,期间做了些工作,也处理了些问题,稍加总结。 我们使用的场景主要是用于模块件的数据中转和分发,客户端以C++为主,php和python为辅。 工作: 基于...
  • 消息中间件基于队列模型实现异步/同步传输数据 作用:可以实现支撑高并发、异步解耦、流量削峰、降低耦合度。 传统的http请求存在那些缺点 1.Http请求基于请求与响应的模型,在高并发的情况下,客户端发送大量的...
  • 读写分离,分库分表,对于秒杀这样的很多热点数据可以放到redis 缓存里面。对于搜索的内容还需要搜索引擎(分布式搜索引擎) 订单延时的回顾 比如一个秒杀活动,很多人同时抢,有些人抢到了,但是一直没有支付,...
  • 1.分析 秒杀时大量用户会在同一时间同时进行抢购,网站瞬时访问流量激增。 秒杀一般是访问请求数量远远大于库存数量...使用 redis 缓存秒杀的商品信息,秒杀成功后使用消息队列发送订单信息,然后将更新后数据重...
  • 前几节提到的队列的实现都是基于并发不是特别大的场景,而且需要消息消费的数据数据达到最终一致性做了折中的处理,而本节针对超高并发的场景下使用lMax,这节就重点提及disruptor的使用,以及从各大厂对...
  • 把静态页面直接缓存到用户的浏览器中,当用户需要获取数据时,就从服务端接口动态获取。这样会大大节省网络的流量,如果再加上CDN优化,一般都不会有大问题。 对于系统并发量变大问题,这里的核心在于如何在大...
  • 我需要设计这样一个项目。 我要从两个地方读取数据然后传输给第三方。...c类数据量比较大。一秒钟几百次的传输。 要求每10分钟左右上传一次,或者某些数据达到一定量。 我没设计过系统,现在一脸萌逼。
  • Streaming处理和Streaming处理Flink在JVM内部实现了自己的内存管理 支持迭代计算支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存2. API支持对Streaming数据类应用,提供...
  • 概念小悟

    2019-04-25 20:45:26
    主题 统一日志平台及大规模去中心化的分布式流式并行计算的实际应用 ...使用三级缓存,其中进程缓存最快,64G的内存最多也只能用到64G的缓存,二级缓存是Redis数据库,可达4.8T数据量,三级缓存是Mong...
  • 电商项目订单取消(Redis 延迟队列)--1

    千次阅读 2019-05-29 11:17:28
    现功能时的选择很重要,如果你的系统所处理的数据量不是很大,我觉得队列和缓存很适合你,这样你可以对消息的传递更加了解,但你使用MQ,kafka的中间件时,你会发现使用起来更加轻松,但对于数据量大的系统来说,...
  • 3.1 大量的使用缓存,如何减少对 Redis 的访问(Redis预减库存和本地标记)3.2 Redis 的数据结构和底层实现3.3 大量的使用了缓存,那么就存在缓存的过期时间控制以及缓存击穿以及缓存雪崩等问题?3.4 缓存更新策略...

空空如也

空空如也

1 2 3 4 5 6
收藏数 111
精华内容 44
关键字:

rabbitmq缓存数据量