项目背景
发送一条消息,需要保证消息的可靠性,其中一个很重要的点就是保证不丢失,所以一般我们发送消息的时候都会有一个ack的确认机制,并在客户端为每条消息配置定时器,如果超过一定的时间,还没有收到ack的话,就需要重发。
需求分析
对每条消息设置超时时间,然后超时的话就触发超时事件,第一反应是利用Redis的超时机制,并同时通过Redis的key过期事件监听机制,达成消息超时重发的效果。
Redis配置文件
第一步:配置redis的过期失效监听,需要修改redis.conf配置文件,找到 EVENT NOTIFICATION (事件通知)这个配置
将 notify-keyspace-events ""
修改为 notify-keyspace-events "Ex"
这个代码差不多在1026行左右。
第二步:重启Redis
service redis restart
SpringBoot整合Redis
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
配置类
这里是监听所有db的过期事件keyevent@*:expired,根据自己的业务需求自行配置,@几号库,@*是所有
@Configuration
public class RedisListenerConfig {
@Autowired
RedisListen redisListen;
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory factory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
container.addMessageListener(redisListen, new PatternTopic("__keyevent@*__:expired"));
return container;
}
}
监听类
@Slf4j
@Component
public class RedisListener implements MessageListener {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Resource
private RabbitTemplate rabbitTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
String expiredKey = message.toString();
log.info("失效的redis是:" + expiredKey);
RedisSerializer<?> serializer = redisTemplate.getValueSerializer();
String channel = String.valueOf(serializer.deserialize(message.getChannel()));
String body = String.valueOf(serializer.deserialize(message.getBody()));
//key过期监听,在处理业务之前校验下自己业务的key和监听的key以及库号,这里是2号库
if ("__keyevent@2__:expired".equals(channel) && body.contains(RedisKey.ACK_MESSAGE_KEY)) {
//TODO:消息重新推送
}
}
}