精华内容
下载资源
问答
  • redis消息队列

    2018-09-05 15:13:41
    redis 消息队列源码示例 redis 消息队列源码示例 redis 消息队列源码示例
  • Redis消息队列

    2018-07-04 17:26:00
    1.消息队列有两种模式:发布者订阅者模式,生产者消费者模式。 发布者订阅者模式:发布者发送消息到队列...2.Redis消息队列使用场景 可以使用消息队列来实现短信的服务化,任何需要发送短信息的模块,都可以直接调...

    1.消息队列有两种模式:发布者订阅者模式,生产者消费者模式

    发布者订阅者模式:发布者发送消息到队列,每个订阅者都能收到一样的信息。

    生产者消费者模式:生产者将消息放入队列,多个消费者共同监听,谁先抢到资源,谁就从队列中取走消息去处理,注意,每个消息最多只能被一个消费者接收。

    2.Redis消息队列使用场景

    可以使用消息队列来实现短信的服务化,任何需要发送短信息的模块,都可以直接调用短信服务来完成短信的发送。比如用户系统登录注册短信,订单系统的下单成功的短信等。

    3.Spring MVC中实现Redis消息队列(GitHub地址:Redis_MQ)

    3.1 引入Redis相应的依赖包,jedis-2.9.0.jar    commons-pool2-2.5.0.jar    spring-data-redis-1.6.0.RELEASE.jar (安利个下jar包的网站:http://www.mvnjar.com/search.html

    3.2 新建redis.properties文件

    host=10.0.20.251
    port=6379

    3.3配置application-redis.xml文件(applicationContext-redis.xml)

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
        xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
        xmlns:aop="http://www.springframework.org/schema/aop" xmlns:cache="http://www.springframework.org/schema/cache"
        xmlns:redis="http://www.springframework.org/schema/redis"
        xsi:schemaLocation="http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
                                http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-34.0.xsd     
                                http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
                                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
                                http://www.springframework.org/schema/aop  http://www.springframework.org/schema/aop/spring-aop.xsd
                                http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache-4.0.xsd
                                http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis-1.0.xsd">
        <description>spring-data-redis配置</description>
        <context:component-scan base-package="com.fpc.Entity"></context:component-scan>
        <!-- 引入配置文件 -->  
        <bean id="propertyConfigurer"  
            class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">  
            <property name="location" value="classpath:redis.properties" />  
        </bean> 
    
        <bean id="redisConnectionFactory"
            class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
            <property name="hostName" value="${host}"></property>
            <property name="port" value="${port}"></property>
            <property name="usePool" value="true"></property>
        </bean>
    
        <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
            <property name="connectionFactory" ref="redisConnectionFactory"></property>
        </bean>
        <!-- 序列化:一般我们想Redis发送一个消息定义的Java对象,这个对象需要序列化。这里使用JdkSerializationRedisSerializer -->
        <bean id="jdkSerializer"
            class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />
    
        <!-- 监听器 -->
        <bean id="smsMessageListener"
            class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
            <property name="delegate" ref="smsMessageDelegateListener" />
            <property name="serializer" ref="jdkSerializer" />
        </bean>
    
        <!-- 发送者 -->
        <bean id="sendMessage" class="com.fpc.Entity.sendMessage">
            <property name="redisTemplate" ref="redisTemplate"/>
        </bean>
    
        <!-- redis:listener-container:定义消息监听,method:监听消息执行的方法,serializer:序列化,topic:监听主题(可以理解为队列的名称) -->
        <redis:listener-container>
            <redis:listener ref="smsMessageListener" method="handleMessage"
                serializer="jdkSerializer" topic="sms_queue_web_online" />
        </redis:listener-container>
    
        <!-- jedis -->
        <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
            <property name="maxIdle" value="300" /> <!-- 最大能够保持idel状态的对象数  -->
            <property name="maxTotal" value="60000" /> <!-- 最大分配的对象数 -->
            <property name="testOnBorrow" value="true" /> <!-- 当调用borrow Object方法时,是否进行有效性检查 -->
        </bean>
    
        <bean id="jedisPool" class="redis.clients.jedis.JedisPool">
            <constructor-arg index="0" ref="jedisPoolConfig" />
            <constructor-arg index="1" value="${host}" type="java.lang.String"/>
            <constructor-arg index="2" value="${port}" type="int" />
        </bean>
    
    </beans>

    3.4新建一个消息实体(Message)

    package com.fpc.Entity;
    
    import java.io.Serializable;
    import java.util.Date;
    
    public class Message implements Serializable {
        private Integer messageId;
        private String mobileNumber;//电话号码
        private Byte type;//消息类型,1:登录验证码,2:订单信息
        
        private Date createDate;//消息创建的时间
        
        //消息处理时间
        private Date processTime;
        
        //消息状态:1:未发送,2:发送成功,3:发送失败
        private Byte status;
        
        private String content;//消息主体
    
        public Integer getMessageId() {
            return messageId;
        }
    
        public void setMessageId(Integer messageId) {
            this.messageId = messageId;
        }
    
        public String getMobileNumber() {
            return mobileNumber;
        }
    
        public void setMobileNumber(String mobileNumber) {
            this.mobileNumber = mobileNumber;
        }
    
        public Byte getType() {
            return type;
        }
    
        public void setType(Byte type) {
            this.type = type;
        }
    
        public Date getCreateDate() {
            return createDate;
        }
    
        public void setCreateDate(Date createDate) {
            this.createDate = createDate;
        }
    
        public Date getProcessTime() {
            return processTime;
        }
    
        public void setProcessTime(Date processTime) {
            this.processTime = processTime;
        }
    
        public Byte getStatus() {
            return status;
        }
    
        public void setStatus(Byte status) {
            this.status = status;
        }
    
        public String getContent() {
            return content;
        }
    
        public void setContent(String content) {
            this.content = content;
        }
    }

    3.5定义一个发送消息的对象(sendMessage

    package com.fpc.Entity;
    
    import java.io.Serializable;
    
    import org.springframework.data.redis.core.RedisTemplate;
    
    //定义一个消息发送对象
    public class sendMessage {
        private RedisTemplate<String, Object> redisTemplate;
    
        public RedisTemplate<String, Object> getRedisTemplate() {
            return redisTemplate;
        }
    
        public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
            this.redisTemplate = redisTemplate;
        }
        
        public void sendMessages( String channel,Serializable message ){
            redisTemplate.convertAndSend(channel, message);
        }
    }

    3.6定义一个监听器(smsMessageDelegateListener

    package com.fpc.Entity;
    
    import java.io.Serializable;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    
    import org.springframework.stereotype.Component;
    
    import redis.clients.jedis.Jedis;
    
    //监听消息
    @Component("smsMessageDelegateListener")
    public class smsMessageDelegateListener {
        //监听Redis消息
        public void handleMessage( Serializable message ){
            Message mess = (Message) message;
            
            //发送短信
            //手机号: mess.getMobileNumber()
            //短信内容:mess.getContent();
            //send,发送状态sendStatus
            //如果发送不成功则直接return,离开该方法,或者继续重试
            //如果发送成功则需要,异步改写短信的状态;
            Jedis jedis = new Jedis("10.0.20.251");
            jedis.set("message", mess.getContent());
            Executor executor = Executors.newSingleThreadExecutor();
            executor.execute(new Runnable() {
                
                @Override
                public void run() {
                    // TODO 自动生成的方法存根
                    //读写短信数据表,将短信的发送状态改为已发送
                }
            });
            
        }
    }

    注:该监听器中注释已经写得很明白了,你可以在此处对数据进行持久化即写库操作,由于本demo中没有写库(建一张message表,然后写dao,service,serviceImpl等,就跟正常操作别的表是一样的,所以此处做数据持久化了)

    然后在该监听器可以开启一个异步线程,异步改写短信的状态。

    3.7.在controller中写一个get_message方法(get_message

    @RequestMapping("/get_message")
        public ModelAndView get_message( @RequestParam("mobileNumber") String moblieNumber ){
            //取得电话号码,构造消息对象,然后通过短信服务器生成验证码发送到该手机上
            Message message = new Message();
            message.setMobileNumber(moblieNumber);
            message.setType((byte)1); //验证消息
            message.setContent("23456"); //消息的内容
            message.setStatus((byte)1);//未发送状态
            message.setCreateDate(new Date());
            //1.可以把待发送的消息存库,也可以不存库,现在先不存库
            
            //2.异步发送短信到redis队列
            //2.1先构造一个消息发送对象
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext-redis.xml");
            sendMessage sendm = (sendMessage)context.getBean("sendMessage");
            sendm.sendMessages("sms_queue_web_online", message);
            System.out.println(message.getContent());
            ModelAndView modelAndView = new ModelAndView();
            modelAndView.addObject("code",message.getContent());
            modelAndView.setViewName("user");
            return modelAndView;
    }

    注:此处没有真正调用发短信的方法,而是硬编码为“23456”

    3.8编写jsp页面,user.jsp

    <%@ page language="java" contentType="text/html; charset=ISO-8859-1"
        pageEncoding="UTF-8"%>
    <%@ taglib prefix="shiro" uri="http://shiro.apache.org/tags" %>
    <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
    <html>
    <head>
    <meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1">
    <title>Insert title here</title>
    </head>
    <body>
        <shiro:user>
            welcome[<shiro:principal/>]login,<a href="Shiro/logout">logout</a>
        </shiro:user>
        <form action="Shiro/get_message" method="POST">
            phonenumber: <input type="text" name="mobileNumber"/>
            <br>
            code: <input type="text" name="password" value="${code}"/>
            <br>
            <input type="submit" value="get code">
        </form>
    </body>
    </html>

    运行效果:

    然后我们去Redis服务器端看下,数据有没有被保存在Redis中,然后借此判断监听器有没有执行:

    可以看到Redis服务器中已经有值存在了,说明消息队列监听器是确实在监听的,你可以根据实际的业务改变监听器中的代码和controller中的代码。

    转载于:https://www.cnblogs.com/fangpengchengbupter/p/9262977.html

    展开全文
  • redis 消息队列

    2020-08-21 18:10:52
    如果生产者速度大于消费者消费速度,消息队列长度会一直增大,时间久了会占用大量内存空间。 如果睡眠时间过长,这样不能处理一些时效性的消息,睡眠时间过短,也会在连接上造成比较大的开销。 blpop,brpop可以...

    redis 异步队列实现

    • 一般使用list结构作为队列,rpush生产消息,lpop消费消息。当lpop没有消息的时候,要适当sleep一会再重试。

     

    如果生产者速度大于消费者消费速度,消息队列长度会一直增大,时间久了会占用大量内存空间

    如果睡眠时间过长,这样不能处理一些时效性的消息,睡眠时间过短,也会在连接上造成比较大的开销。

    • blpop,brpop可以代替sleep,在没有消息的时候,它会阻塞住直到消息到来。

     

    redis pub/sub发布订阅模式

    生产者和消费者通过相同的一个信道(Channel)进行交互。信道其实也就是队列。

    通常会有多个消费者。多个消费者订阅同一个信道,当生产者向信道发布消息时,该信道会立即将消息逐一发布给每个消费者。可见,该信道对于消费者是发散的信道,每个消费者都可以得到相同的消息。典型的对多的关系。

    优点:

    1.典型的广播模式,一个消息可以发布到多个消费者

    2.多信道订阅,消费者可以同时订阅多个信道,从而接收多类消息

    3.消息即时发送,消息不用等待消费者读取,消费者会自动接收到信道发布的消息

    缺点:

    1.消息一旦发布,不能接收。换句话就是发布时若客户端不在线,则消息丢失,不能寻回。

    2.不能保证每个消费者接收的时间是一致的

    3.若消费者客户端出现消息积压,到一定程度,会被强制断开,导致消息意外丢失。通常发生在消息的生产远大于消费速度时。

     

    由此可见,Pub/Sub 模式不适合做消息存储,消息积压类的业务,而是擅长处理广播,即时通讯,即时反馈的业务

     

    redis如何实现延时队列

    (1) 拿时间戳作为score,消息内容作为key调用zadd来生产消息,存放在zset中;

    (2) 用一个进程定时查询zset的score分数最小的元素,可以用ZRANGEBYSCORE key -inf +inf limit 0 1 withscores命令来实现;

    (3) 如果最小的score时间戳小于等于当前时间戳,就将该任务取出来执行,否则休眠sleep一段时间后再查询

     

     

     

     

     

     

     

     

     

     

     

     

     

     

    展开全文
  • SpringBoot整合Redis消息队列

    万次阅读 2020-06-28 11:59:32
    队列模型如图所示,它具有以下几个特点,就像我们用微信和好友(群聊除外)聊天一样,微信就是这个队列,我们可以和很多个好友聊天,但是每条消息只能发给一个好友。 只有一个消费者将获得消息 生产者不需要在接收...

    消息模型

    1)队列模型

    队列模型如图所示,它具有以下几个特点,就像我们用微信和好友(群聊除外)聊天一样,微信就是这个队列,我们可以和很多个好友聊天,但是每条消息只能发给一个好友。

    • 只有一个消费者将获得消息
    • 生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。
    • 每一个成功处理的消息都由接收者签收

    队列模型

    2)发布/订阅模型

    发布/订阅模型如图所示,不用说,和订阅公众号是一样的。

    • 多个消费者可以获得消息
    • 在发布者和订阅者之间存在时间依赖性。发布者需要建立一个topic,以便客户能够购订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布

    发布订阅模型

    Redis实现

    1. 对于队列模型,我们可以使用redis的list数据结构,通过LPUSH和RPOP来实现一个队列。
    2. 发布/订阅模型就更简单了,redis官方就支持,而且还可以使用PSUBSCRIBE支持模式匹配,使用如下命令,即可订阅所有f开头的订阅,具体可查看文档。
    PSUBSCRIBE f*

         3. keyspace notifications(键空间通知)

    该功能是在redis2.8之后引入的,即客户端可以通过pub/sub机制,接收key的变更的消息。换句话说,就是redis官方提供了一些topic,帮助我们去监听redis数据库中的key,我曾经就使用其中的‘keyevent@0:expired’实现了定时任务。

    和SpringBoot整合

    首先得介绍一下spring-data-redis中的两种template的默认Serializer,当然Spring还提供其他的序列化器,具体可查看文档,也可以自己实现RedisSerializer接口,构建自己的序列化器。

    template default serializer serialization
    RedisTemplate JdkSerializationRedisSerializer 序列化String类型的key和value
    StringRedisTemplate StringRedisSerializer 使用Java序列化

    消息队列模型

    消息队列,这个就需要自己造轮子了,在Spring中使用redisTemlate操作数据库,而对于不同的数据类型则需要不同的操作方式,如下表格所示,具体还是请看官方文档

    实现队列选择list数据结构,redisTemplate.opsForList()使用起来非常简单,和redis命令基本一致。

    数据类型 操作方式
    string redisTemplate.opsForValue()
    hash redisTemplate.opsForHash()
    list redisTemplate.opsForList()
    set redisTemplate.opsForSet()

    1.先定义一个消息的POJO(MessageEntity实体类)

    2.配置spring data redis

    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    
    import com.fasterxml.jackson.annotation.JsonAutoDetect;
    import com.fasterxml.jackson.annotation.PropertyAccessor;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.johnfnash.learn.redis.queue.entity.MessageEntity;
    
    @Configuration
    public class RedisConfig {
    
        @Bean
        public RedisTemplate<String, MessageEntity> redisTemplate(RedisConnectionFactory factory) {
            RedisTemplate<String, MessageEntity> template = new RedisTemplate<String, MessageEntity>();
            template.setConnectionFactory(factory);
    
            Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(
                    Object.class);
            ObjectMapper om = new ObjectMapper();
            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            jackson2JsonRedisSerializer.setObjectMapper(om);
            template.setValueSerializer(jackson2JsonRedisSerializer);
    
            template.setKeySerializer(new StringRedisSerializer());
            template.setHashKeySerializer(new StringRedisSerializer());
            template.afterPropertiesSet();
            return template;
        }
        
    }
    

    用到的 redis 配置信息如下:

    #数据库索引(默认为0)
    spring.redis.database=0
    spring.redis.host=localhost
    spring.redis.port=6379
    # Redis服务器连接密码(默认为空)
    spring.redis.password=
    #连接超时时间
    spring.redis.timeout=10000
    #最大连接数
    spring.redis.jedis.pool.max-active=8
    #最大阻塞等待时间(负数表示没限制)
    spring.redis.jedis.pool.max-wait=-1
    #最大空闲
    spring.redis.jedis.pool.max-idle=8
    #最小空闲
    spring.redis.jedis.pool.min-idle=0
    
    
    # redis消息队列键名
    redis.queue.key=queue
    # redis消息队列读取消息超时时间,单位:秒
    redis.queue.pop.timeout=1000

    3.消息的消费者,消费者需要不断轮询队列,有消息便取出来。

    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.Resource;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 消息的消费者,消费者需要不断的去轮询队列,有消息就取出来
     */
    @Service
    public class MessageConsumerService extends Thread {
    
    	@Resource
    	private RedisTemplate<String, MessageEntity> redisTemplate;
    
    	private volatile boolean flag = true;
    
    	@Value("${redis.queue.key}")
    	private String queueKey;
    
    	@Value("${redis.queue.pop.timeout}")
    	private Long popTimeout;
    
    	@Override
    	public void run() {
    		try {
    			MessageEntity message;
    			while (flag && !Thread.currentThread().isInterrupted()) {
    				message = redisTemplate.opsForList().rightPop(queueKey, popTimeout, TimeUnit.SECONDS);
    				System.out.println("接收到了" + message);
    			}
    		} catch (Exception e) {
    			System.err.println(e.getMessage());
    		}
    	}
    
    	public boolean isFlag() {
    		return flag;
    	}
    
    	public void setFlag(boolean flag) {
    		this.flag = flag;
    	}
    
    }
    

    4.消息的生产者,这个类提供一个发送消息的方法。

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Service;
    
    /**
     * 生产者:提供一个发送消息的方法
     */
    @Service
    public class MessageProducerService {
    
    	@Autowired
    	private RedisTemplate<String, MessageEntity> redisTemplate;
    
    	@Value("${redis.queue.key}")
    	private String queueKey;
    
    	public Long sendMeassage(MessageEntity message) {
    		System.out.println("发送了" + message);
    		return redisTemplate.opsForList().leftPush(queueKey, message);
    	}
    
    }
    

    测试

    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class QueueTest {
    
    	@Autowired
    	private MessageProducerService producer;
    
    	@Autowired
    	private MessageConsumerService consumer;
    
    	/**
    	 * 消息队列模型测试
    	 */
    	@Test
    	public void testQueue() {
    		consumer.start();
    		producer.sendMeassage(new MessageEntity("1", "aaaa"));
    		producer.sendMeassage(new MessageEntity("2", "bbbb"));
    
    		try {
    			Thread.sleep(2000L);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		consumer.interrupt();
    	}
    }

    输出信息如下:

    2020-06-28 11:27:08.136  INFO 81195 --- [           main] DeferredRepositoryInitializationListener : Spring Data repositories initialized!
    2020-06-28 11:27:08.147  INFO 81195 --- [           main] c.test.springboot2_test.queue.QueueTest  : Started QueueTest in 11.012 seconds (JVM running for 12.666)
    发送了MessageEntity [id=1, content=aaaa]
    发送了MessageEntity [id=2, content=bbbb]
    接收到了MessageEntity [id=1, content=aaaa]
    接收到了MessageEntity [id=2, content=bbbb]
    Redis command interrupted; nested exception is io.lettuce.core.RedisCommandInterruptedException: Command interrupted

    至此,消息队列的方式也整合完成了。
    虽然redisTemplate是线程安全的,但是如果一个队列有多个接收者的话,可能也还需要考虑一下并发的问题。

    发布/订阅模型

    发布/订阅模型,因为Spring官方给了示例。但是呢,示例里面的消息是String类型,对于我们的业务来说,可能更需要一个POJO(MessageEntity实体类)

    1.先学习下org.springframework.data.redis.listener.adapter.MessageListenerAdapter源码如下,可以看到,如果使用StringRedisTemplate的话,默认都是使用StringRedisSerializer来反序列化,而如果想主动接收消息,则需要实现MessageListener接口。

        /**
         * Standard Redis {@link MessageListener} entry point.
         * <p>
         * Delegates the message to the target listener method, with appropriate conversion of the message argument. In case
         * of an exception, the {@link #handleListenerException(Throwable)} method will be invoked.
         * 
         * @param message the incoming Redis message
         * @see #handleListenerException
         */
        public void onMessage(Message message, byte[] pattern) {
            try {
                // Check whether the delegate is a MessageListener impl itself.
                // In that case, the adapter will simply act as a pass-through.
                if (delegate != this) {
                    if (delegate instanceof MessageListener) {
                        ((MessageListener) delegate).onMessage(message, pattern);
                        return;
                    }
                }
    
                // Regular case: find a handler method reflectively.
                Object convertedMessage = extractMessage(message);
                String convertedChannel = stringSerializer.deserialize(pattern);
                // Invoke the handler method with appropriate arguments.
                Object[] listenerArguments = new Object[] { convertedMessage, convertedChannel };
    
                invokeListenerMethod(invoker.getMethodName(), listenerArguments);
            } catch (Throwable th) {
                handleListenerException(th);
            }
        }
    
        /**
         * Extract the message body from the given Redis message.
         * 
         * @param message the Redis <code>Message</code>
         * @return the content of the message, to be passed into the listener method as argument
         */
        protected Object extractMessage(Message message) {
            if (serializer != null) {
                return serializer.deserialize(message.getBody());
            }
            return message.getBody();
        }
    
        /**
         * Initialize the default implementations for the adapter's strategies.
         * 
         * @see #setSerializer(RedisSerializer)
         * @see JdkSerializationRedisSerializer
         */
        protected void initDefaultStrategies() {
            RedisSerializer<String> serializer = new StringRedisSerializer();
            setSerializer(serializer);
            setStringSerializer(serializer);
        }
    
    

    2.spring data redis实现发布与订阅需要配置以下信息:

    • Topic
    • MessageListener
    • RedisMessageListenerContainer

    1)用到的相关依赖:

            <!-- 集成redis -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>

    2)配置 spring data redis:

    import com.fasterxml.jackson.annotation.JsonAutoDetect;
    import com.fasterxml.jackson.annotation.PropertyAccessor;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.test.springboot2_test.queue.ConsumerRedisListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.cache.CacheManager;
    import org.springframework.cache.annotation.CachingConfigurerSupport;
    import org.springframework.cache.annotation.EnableCaching;
    import org.springframework.cache.interceptor.KeyGenerator;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.cache.RedisCacheConfiguration;
    import org.springframework.data.redis.cache.RedisCacheManager;
    import org.springframework.data.redis.cache.RedisCacheWriter;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.listener.ChannelTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
    import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
    import org.springframework.data.redis.serializer.RedisSerializationContext;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    
    import java.lang.reflect.Method;
    import java.time.Duration;
    
    /**
     * Redis 缓存配置类
     */
    @Configuration
    @EnableCaching
    public class RedisConfig extends CachingConfigurerSupport {
    
    	@Bean
    	public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
    		RedisTemplate<String, Object> template = new RedisTemplate<>();
    		template.setConnectionFactory(factory);
    
    		Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(
    				Object.class);
    		ObjectMapper om = new ObjectMapper();
    		om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    		om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
    		jackson2JsonRedisSerializer.setObjectMapper(om);
    		template.setValueSerializer(jackson2JsonRedisSerializer);
    
    		template.setKeySerializer(new StringRedisSerializer());
    		template.setHashKeySerializer(new StringRedisSerializer());
    		template.afterPropertiesSet();
    		return template;
    	}
    
    	@Autowired
    	private LettuceConnectionFactory connectionFactory;
    
    	@Bean
    	public ConsumerRedisListener consumeRedis() {
    		return new ConsumerRedisListener();
    	}
    
    	@Bean
    	public ChannelTopic topic() {
    		return new ChannelTopic("topic");
    	}
    
    	@Bean
    	public RedisMessageListenerContainer redisMessageListenerContainer() {
    		RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    		container.setConnectionFactory(connectionFactory);
    		container.addMessageListener(consumeRedis(), topic());
    		return container;
    	}
    
    }

    3)实现一个Object类型的 topic MessageListener

    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.data.redis.core.RedisTemplate;
    
    import javax.annotation.Resource;
    
    public class ConsumerRedisListener implements MessageListener {
    
    	@Resource
    	private RedisTemplate<String, Object> redisTemplate;
    
    	@Override
    	public void onMessage(Message message, byte[] pattern) {
    		doBusiness(message);
    	}
    
    	/**
    	 * 打印 message body 内容
    	 * @param message
    	 */
    	public void doBusiness(Message message) {
    		Object value = redisTemplate.getValueSerializer().deserialize(message.getBody());
    		System.out.println("consumer message: " + value.toString());
    	}
    
    }
    

    4)其他(最简单的application.properties配置如下)

    #数据库索引(默认为0)
    spring.redis.database=0
    spring.redis.host=localhost
    spring.redis.port=6379
    # Redis服务器连接密码(默认为空)
    spring.redis.password=
    #连接超时时间
    spring.redis.timeout=10000
    #最大连接数
    spring.redis.jedis.pool.max-active=8
    #最大阻塞等待时间(负数表示没限制)
    spring.redis.jedis.pool.max-wait=-1
    #最大空闲
    spring.redis.jedis.pool.max-idle=8
    #最小空闲
    spring.redis.jedis.pool.min-idle=0

    通过上面四步,简单的订阅者就做好了,通过以下代码可以发布一个消息,同时可以查看到控制台会有订阅者消费信息打印出来:

    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import javax.annotation.Resource;
    import java.util.Date;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class QueueTest {
    
    	@Resource
    	private RedisTemplate<String, Object> redisTemplate;
    
    	/**
    	 * 发布订阅模型测试
    	 */
    	@Test
    	public void testSubscribe() {
    		String channel = "topic";
    
    		System.out.println("开始发送消息。。。。。。。。。。。。。。");
    		redisTemplate.convertAndSend(channel, "hello world");
    		redisTemplate.convertAndSend(channel, new Date(System.currentTimeMillis()));
    		redisTemplate.convertAndSend(channel, new MessageEntity("1", "object"));
    		System.out.println("发送消息结束。。。。。。。。。。。。。。");
    
    	}
    
    }
    

    这里用到了一个实体类用于测试。

    import java.io.Serializable;
    
    public class MessageEntity implements Serializable {
    	private static final long serialVersionUID = 8632296967087444509L;
    
    	private String id;
    
    	private String content;
    
    	public MessageEntity() {
    		super();
    	}
    
    	public MessageEntity(String id, String content) {
    		super();
    		this.id = id;
    		this.content = content;
    	}
    
    	public static long getSerialVersionUID() {
    		return serialVersionUID;
    	}
    
    	public String getId() {
    		return id;
    	}
    
    	public void setId(String id) {
    		this.id = id;
    	}
    
    	public String getContent() {
    		return content;
    	}
    
    	public void setContent(String content) {
    		this.content = content;
    	}
    
    	@Override
    	public String toString() {
    		return "MessageEntity [id=" + id + ", content=" + content + "]";
    	}
    
    }
    

    输出结果如下:

    consumer message: hello world
    consumer message: Sat Feb 23 13:04:40 CST 2019
    consumer message: MessageEntity [id=1, content=object]
    

    用 spring data redis 来实现 redis 订阅者,本质上还是Listener模式,只需要配置Topic, MessageListener 和 RedisMessageListenerContainer就可以了。同时,发布时,只需要使用 redisTemplate 的 convertAndSend方法即可topic来发布message。

    参考:

    springboot整合redis消息队列

    展开全文
  • 本文实例为大家分享了php+redis消息队列实现抢购的具体代码,供大家参考,具体内容如下 实现功能: 1. 基于redis队列,防止高并发的超卖 2. 基于mysql的事务加排它锁,防止高并发的超卖 基于redis队列工作流程:...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 7,084
精华内容 2,833
关键字:

redis消息队列

redis 订阅