Redis中的发布订阅机制(Pub/Sub)是基于channel这一概念的,这有些类似于Kafka中的基于topic的消息机制,只是不支持持久化。如果publish的消息,没有任何client处于"subscribe"状态,消息将会被丢弃。如果client在subcribe时,链接断开后重连,那么此期间的消息也将丢失。Redis server将会"尽力"将消息发送给处于subscribe状态的client,但是仍不会保证每条消息都能被正确接收。
为了解耦发布者(publisher)和订阅者(subscriber)之间的关系,Redis 使用了 channel (频道)作为两者的中介:发布者将信息直接发布给 channel ,而 channel 负责将信息发送给适当的订阅者,发布者和订阅者之间没有相互关系,也不知道对方的存在。
Spring Data Redis组件对Pub/Sub机制进行了抽象,提供了类似JMS的编程模式。Spring Data Redis使用了一个Container(RedisMessageListenerContainer)来解决发布订阅机制。这个Container使用一个Redis链接解决了多个topic订阅的问题。它把其他的订阅、发布者隔离成基本的POJO对象,而不用与Redis对象打交道。这简化了整个编程模型。
1. 使用方法
1.1 Container
首先定义一个Container。
1 @Bean
2 RedisMessageListenerContainer redisContainer() {
3 RedisMessageListenerContainer container = new RedisMessageListenerContainer();
4 container.setConnectionFactory(redisConnectionFactory); // 注入RedisConnectionFactory
5 return container;
6 }
其中的redisConnectionFactory
是通过AutoConfiguration自动创建的对象,按照实际的情况配置spring.redis.*
相关配置参数即可创建。如有需要,可以按照自己的需要进行修改。
1.2 发布
发布使用redisTemplate的convertAndSend方法即可。
1 protected void publish(String channel, String message) {
2 try {
3 redisTemplate.convertAndSend(channel, message);
4 logger.info("publish success! channel={},message={}", channel, message);
5 } catch (Exception e) {
6 logger.error("publish error! channel={},message={}, exception: {}", channel, message, e);
7 }
8 }
channel
1.3 订阅
订阅需要指定channel和对应的Handler即可。Handler需要实现MessageListener
接口。下面是订阅函数的示例:
1 protected void subscribe(String channel, MessageListener handle) {
2 try {
3 container.addMessageListener(handle, new ChannelTopic(channel));
4 logger.info("subscribe success! channel={}", channel);
5 } catch (Exception e) {
6 logger.error("subscribe error! channel={}. Exception: {}", channel, e);
7 }
8 }
一个实现MessageListener
接口的例子如下:
1 public void onMessage(Message message, byte[] pattern) {
2 long begin = System.currentTimeMillis();
3 listCardBin = bankcardBinDal.selectAll();
4 long timeUsed = System.currentTimeMillis() - begin;
5 logger.info("receive message & init cardBin success! timeUsed={}ms,[{}:{}]", timeUsed, new String(pattern), message);
6 }
收到订阅消息后的处理可以在上面接口中实现。