实现定时消息通知

实现定时消息通知

一、需求分析:

  1. 设置了生存时间的Key,在过期时能不能有所提示?

  2. 如果能对过期Key有个监听,如何对过期Key进行一个回调处理?

  3. 如何使用 Redis 来实现定时任务?

二、序言:

本文所说的定时任务或者说计划任务并不是很多人想象中的那样,比如说每天凌晨三点自动运行起来跑一个脚本。这种都已经烂大街了,随便一个 Crontab 就能搞定了。

这里所说的定时任务可以说是计时器任务,比如说用户触发了某个动作,那么从这个点开始过二十四小时我们要对这个动作做点什么。那么如果有 1000 个用户触发了这个动作,就会有 1000 个定时任务。于是这就不是 Cron 范畴里面的内容了。

举个最简单的例子,一个用户推荐了另一个用户,我们定一个二十四小时之后的任务,看看被推荐的用户有没有来注册,如果没注册就给他搞一条短信过去。

三、Redis介绍

在 Redis 的 2.8.0 版本之后,其推出了一个新的特性——键空间消息(Redis Keyspace Notifications),它配合 2.0.0 版本之后的 SUBSCRIBE 就能完成这个定时任务的操作了,不过定时的单位是秒。

(1)Publish / Subscribe

Redis 在 2.0.0 之后推出了 Pub / Sub 的指令,大致就是说一边给 Redis 的特定频道发送消息,另一边从 Redis 的特定频道取值——形成了一个简易的消息队列。

(2)Redis Keyspace Notifications

在 Redis 里面有一些事件,比如键到期、键被删除等。然后我们可以通过配置一些东西来让 Redis 一旦触发这些事件的时候就往特定的 Channel 推一条消息。

大致的流程就是我们给 Redis 的某一个 db 设置过期事件,使其键一旦过期就会往特定频道推消息,我在自己的客户端这边就一直消费这个频道就好了。

以后一来一条定时任务,我们就把这个任务状态压缩成一个键,并且过期时间为距这个任务执行的时间差。那么当键一旦到期,就到了任务该执行的时间,Redis 自然会把过期消息推去,我们的客户端就能接收到了。这样一来就起到了定时任务的作用。

四、Key过期事件的Redis配置

这里需要配置 notify-keyspace-events 的参数为 “Ex”。x 代表了过期事件。notify-keyspace-events "Ex" 保存配置后,重启Redis服务,使配置生效。 重启Reids服务器:

添加过期事件订阅 开启一个终端,redis-cli 进入 redis 。开始订阅所有操作,等待接收消息。

再开启一个终端,redis-cli 进入 redis,新增一个 20秒过期的键:

setex 用法 见官方文档:http://doc.redisfans.com/string/setex.html

另外一边执行了阻塞订阅操作后的终端,20秒过期后有如下信息输出:

说明:说明对过期Key信息的订阅是成功的。

五、Jedis实现Publish/Subscribe功能

Redis为我们提供了publish/subscribe(发布/订阅)功能。我们可以对某个channel(频道)进行subscribe(订阅),当有人在这个channel上publish(发布)消息时,redis就会通知我们,这样我们可以收到别人发布的消息。 作为Java的redis客户端,Jedis提供了publish/subscribe的接口。本文讲述如何使用Jedis来实现redis的publish/subscribe。

  • 定义Subscriber类

Jedis定义了抽象类JedisPubSub,在这个类中,定义publish/subsribe的回调方法。通过继承JedisPubSub类并重新实现这些回调方法,当publish/subsribe事件发生时,我们可以定制自己的处理逻辑。

在以下例子中,我们定义了Subscriber类,这个类继承了JedisPubSub类,并重新实现了其中的回调方法。

Subscriber.java

  • 定义SubThread线程类

由于Jedis的subscribe操作是阻塞的,因此,我们另起一个线程来进行subscribe操作。

SubThread.java

在上面的代码中,我们从JedisPool获取一个Jedis实例,并使用这个Jedis实例进行subscribe的操作。 Jedissubscribe的声明如下:

public void subscribe(final JedisPubSub jedisPubSub, final String… channels)

第一个参数接受一个JedisPubSub对象,第二个参数指定对哪个频道进行订阅。上例中,我们把我们定义的Subscriber对象传给subscribe方法。 当publish/subscribe的事件发生时,会自动调用我们Subscriber的方法。

  • 定义Publisher类

Publisher类接受用户的输入,并将输入发布到channel。当用户输入”quit”后,输入结束。

Publisher.java

定义入口代码

如下是我们的程序入口代码。

PubSubDemo.java

在上面的代码中,我们首先生成了一个JedisPool的redis连接池,这是由于Jedis不是线程安全的,JedisPool是线程安全的。而我们的程序在主线程和订阅线程(SubThread)均需要使用Jedis,故在程序中我们使用JedisPool。具体也可以参考在多线程环境中使用Jedis。 由于Jedis的subcribe操作是阻塞的,故我们另起了一个线程来进行subcribe操作。 通过调用Publisher::start()方法,接受用户的输入,并publish到指定的channel。

输出

redis pool is starting, redis ip 192.168.229.154, redis port 6379 subscribe redis, channel mychannel, thread will be blocked subscribe redis channel success, channel mychannel, subscribedChannels 1

这时输入

hello

控制窗口中输出

receive redis published message, channel mychannel, message hello

参考资料

Last updated

Was this helpful?