执子之手

与子偕老


  • 首页

  • 分类

  • 归档

  • 标签

  • 关于

  • 搜索
close

迁移到SpringBoot 07 - Redis发布订阅机制

时间: 2018-07-09   |   分类: 开发     |   阅读: 1019 字 ~3分钟   |   访问: 0

Redis中的发布订阅机制(Pub/Sub)是基于channel这一概念的,这有些类似于Kafka中的基于topic的消息机制,只是不支持持久化。如果publish的消息,没有任何client处于"subscribe"状态,消息将会被丢弃。如果client在subcribe时,链接断开后重连,那么此期间的消息也将丢失。Redis server将会"尽力"将消息发送给处于subscribe状态的client,但是仍不会保证每条消息都能被正确接收。

为了解耦发布者(publisher)和订阅者(subscriber)之间的关系,Redis 使用了 channel (频道)作为两者的中介:发布者将信息直接发布给 channel ,而 channel 负责将信息发送给适当的订阅者,发布者和订阅者之间没有相互关系,也不知道对方的存在。

Spring Data Redis组件对Pub/Sub机制进行了抽象,提供了类似JMS的编程模式。Spring Data Redis使用了一个Container(RedisMessageListenerContainer)来解决发布订阅机制。这个Container使用一个Redis链接解决了多个topic订阅的问题。它把其他的订阅、发布者隔离成基本的POJO对象,而不用与Redis对象打交道。这简化了整个编程模型。

1. 使用方法

1.1 Container

首先定义一个Container。

1    @Bean
2    RedisMessageListenerContainer redisContainer() {
3        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
4        container.setConnectionFactory(redisConnectionFactory);  // 注入RedisConnectionFactory
5        return container;
6    }

其中的redisConnectionFactory是通过AutoConfiguration自动创建的对象,按照实际的情况配置spring.redis.*相关配置参数即可创建。如有需要,可以按照自己的需要进行修改。

1.2 发布

发布使用redisTemplate的convertAndSend方法即可。

1    protected void publish(String channel, String message) {
2        try {
3            redisTemplate.convertAndSend(channel, message);
4            logger.info("publish success! channel={},message={}", channel, message);
5        } catch (Exception e) {
6            logger.error("publish error! channel={},message={}, exception: {}", channel, message, e);
7        }
8    }

channel

1.3 订阅

订阅需要指定channel和对应的Handler即可。Handler需要实现MessageListener接口。下面是订阅函数的示例:

1    protected void subscribe(String channel, MessageListener handle) {
2        try {
3            container.addMessageListener(handle, new ChannelTopic(channel));
4            logger.info("subscribe success! channel={}", channel);
5        } catch (Exception e) {
6            logger.error("subscribe error! channel={}. Exception: {}", channel, e);
7        }
8    }

一个实现MessageListener接口的例子如下:

1    public void onMessage(Message message, byte[] pattern) {
2        long begin = System.currentTimeMillis();
3        listCardBin = bankcardBinDal.selectAll();
4        long timeUsed = System.currentTimeMillis() - begin;
5        logger.info("receive message & init cardBin success! timeUsed={}ms,[{}:{}]", timeUsed, new String(pattern), message);
6    }

收到订阅消息后的处理可以在上面接口中实现。

附录、参考资料

  • PubSub Messaging with Spring Data Redis
  • Spring Data Redis PubSub Documentation
#Spring# #Springboot#
迁移到SpringBoot 08 - 日志
迁移到SpringBoot 06 - Swagger
  • 文章目录
  • 站点概览
Orchidflower

Orchidflower

Do one thing at a time, and do well.

77 日志
6 分类
84 标签
GitHub 知乎 OSC 豆瓣
  • 1. 使用方法
    • 1.1 Container
    • 1.2 发布
    • 1.3 订阅
  • 附录、参考资料
© 2009 - 2024 执子之手
Powered by - Hugo v0.113.0
Theme by - NexT
ICP - 鲁ICP备17006463号-1
0%