high-concurrent-cache
  • Introduction
  • Redis
    • redis常用命令
      • 生产环境谨慎使用keys命令
      • Redis 命令参考
    • redis客户端
      • Jedis使用指南
      • redisson
      • redis连接池
      • jedisCluster详解
      • jedisCluster+SpringMVC整合
    • redis主从模式
      • 配置方式
      • 一主多从读写分离架构
    • redis持计划机制的讲解
      • 持久化
      • RDB详解
      • AOF详解
      • RDB与AOF异同比较
    • 哨兵机制详解(sentinel)
      • 哨兵使用架构(sentinel)
      • 哨兵工作原理/哨兵如何监控redis
      • 哨兵如何高可用
      • 哨兵内部通讯原理/自动发现 Sentinel 和从服务器
      • 基于哨兵的redis高可用架构
    • redisCluster集群
      • 架构分析
      • 搭建高可用集群
      • 分区存储详解
      • java客户端使用redisCluster
    • 实战场景
      • 高性能分布式缓存
      • 实现定时消息通知
      • 简单高速队列的实现与应用
      • 实现去重幂等性
      • 数据计数/订单号生成
      • 基于redis实现分布式锁
      • 排行榜
      • 使用Redis bitmaps进行快速、简单、实时统计
      • 利用Redis集合(Set)统计新增用户和次日留存率
    • Redis回收进程如何工作的? Redis回收使用的是什么算法?
    • Redis持久化数据和缓存怎么做扩容?
    • Redis 分区的优势、不足以及分区类型
    • Redis 大量数据插入
    • redis实现简单延时队列
    • 使用 redis 如何设计分布式锁?说一下实现思路?使用 zk 可以吗?如何实现?这两种有什么区别?
    • Redis的bitmap讲解
    • 分布式锁的漫画教程
  • Memcached
    • Memcached概念
    • 安装配置
      • Linux安装
      • Windows安装
    • 集群搭建
    • 常用场景
    • 客户端命令
      • 客户端连接命令
      • Memcached 存储命令
        • Memcached set 命令
        • Memcached add 命令
        • Memcached replace 命令
        • Memcached append 命令
        • Memcached prepend 命令
        • Memcached CAS 命令
      • Memcached 查找命令
        • Memcached get 命令
        • Memcached gets 命令
        • Memcached delete 命令
        • Memcached incr 与 decr 命令
      • Memcached 统计命令
        • Memcached stats 命令
        • Memcached stats items 命令
        • Memcached stats slabs 命令
        • Memcached stats sizes 命令
        • Memcached flush_all 命令
    • Java客户端
    • memcached特点与redis区别
    • 一致性哈希算法原理
    • Memcache 高可用集群之magent实现主从
  • 互联网缓存架构设计
    • 常见的缓存架构方案
    • 缓存雪崩的解决方案
    • 缓存穿透的解决方案
    • 缓存击穿的解决方案
Powered by GitBook
On this page
  • 实现定时消息通知
  • 一、需求分析:
  • 二、序言:
  • 三、Redis介绍
  • 四、Key过期事件的Redis配置
  • 五、Jedis实现Publish/Subscribe功能
  • 定义入口代码
  • 输出
  • 参考资料

Was this helpful?

  1. Redis
  2. 实战场景

实现定时消息通知

实现定时消息通知

一、需求分析:

  1. 设置了生存时间的Key,在过期时能不能有所提示?

  2. 如果能对过期Key有个监听,如何对过期Key进行一个回调处理?

  3. 如何使用 Redis 来实现定时任务?

二、序言:

本文所说的定时任务或者说计划任务并不是很多人想象中的那样,比如说每天凌晨三点自动运行起来跑一个脚本。这种都已经烂大街了,随便一个 Crontab 就能搞定了。

这里所说的定时任务可以说是计时器任务,比如说用户触发了某个动作,那么从这个点开始过二十四小时我们要对这个动作做点什么。那么如果有 1000 个用户触发了这个动作,就会有 1000 个定时任务。于是这就不是 Cron 范畴里面的内容了。

举个最简单的例子,一个用户推荐了另一个用户,我们定一个二十四小时之后的任务,看看被推荐的用户有没有来注册,如果没注册就给他搞一条短信过去。

三、Redis介绍

在 Redis 的 2.8.0 版本之后,其推出了一个新的特性——键空间消息(Redis Keyspace Notifications),它配合 2.0.0 版本之后的 SUBSCRIBE 就能完成这个定时任务的操作了,不过定时的单位是秒。

(1)Publish / Subscribe

Redis 在 2.0.0 之后推出了 Pub / Sub 的指令,大致就是说一边给 Redis 的特定频道发送消息,另一边从 Redis 的特定频道取值——形成了一个简易的消息队列。

(2)Redis Keyspace Notifications

在 Redis 里面有一些事件,比如键到期、键被删除等。然后我们可以通过配置一些东西来让 Redis 一旦触发这些事件的时候就往特定的 Channel 推一条消息。

大致的流程就是我们给 Redis 的某一个 db 设置过期事件,使其键一旦过期就会往特定频道推消息,我在自己的客户端这边就一直消费这个频道就好了。

以后一来一条定时任务,我们就把这个任务状态压缩成一个键,并且过期时间为距这个任务执行的时间差。那么当键一旦到期,就到了任务该执行的时间,Redis 自然会把过期消息推去,我们的客户端就能接收到了。这样一来就起到了定时任务的作用。

四、Key过期事件的Redis配置

这里需要配置 notify-keyspace-events 的参数为 “Ex”。x 代表了过期事件。notify-keyspace-events "Ex" 保存配置后,重启Redis服务,使配置生效。 重启Reids服务器:

root@iZ23s8agtagZ:/etc/redis# service redis-server restart redis.conf
Stopping redis-server: redis-server.
Starting redis-server: redis-server.

添加过期事件订阅 开启一个终端,redis-cli 进入 redis 。开始订阅所有操作,等待接收消息。

tinywan@iZ23a7607jaZ:~$ redis-cli -h 127.0.01.4 -p 63789
127.0.0.1:63789> psubscribe __keyevent@0__:expired
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "__keyevent@0__:expired"
3) (integer) 1

再开启一个终端,redis-cli 进入 redis,新增一个 20秒过期的键:

1270.01.1.1:63789> SETEX coolName 123 20
OK
121.41.188.109:63789> get coolName
"20"
121.41.188.109:63789> ttl coolName
(integer) 104

另外一边执行了阻塞订阅操作后的终端,20秒过期后有如下信息输出:

121.141.188.209:63789> psubscribe __keyevent@0__:expired
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "__keyevent@0__:expired"
3) (integer) 1
1) "pmessage"
2) "__keyevent@0__:expired"
3) "__keyevent@0__:expired"
4) "coolName"

说明:说明对过期Key信息的订阅是成功的。

五、Jedis实现Publish/Subscribe功能

Redis为我们提供了publish/subscribe(发布/订阅)功能。我们可以对某个channel(频道)进行subscribe(订阅),当有人在这个channel上publish(发布)消息时,redis就会通知我们,这样我们可以收到别人发布的消息。 作为Java的redis客户端,Jedis提供了publish/subscribe的接口。本文讲述如何使用Jedis来实现redis的publish/subscribe。

  • 定义Subscriber类

Jedis定义了抽象类JedisPubSub,在这个类中,定义publish/subsribe的回调方法。通过继承JedisPubSub类并重新实现这些回调方法,当publish/subsribe事件发生时,我们可以定制自己的处理逻辑。

在以下例子中,我们定义了Subscriber类,这个类继承了JedisPubSub类,并重新实现了其中的回调方法。

Subscriber.java

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;


public class SubThread extends Thread {
    private final JedisPool jedisPool;
    private final Subscriber subscriber = new Subscriber();

    private final String channel = "mychannel";

    public SubThread(JedisPool jedisPool) {
        super("SubThread");
        this.jedisPool = jedisPool;
    }

    @Override
    public void run() {
        System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.subscribe(subscriber, channel);
        } catch (Exception e) {
            System.out.println(String.format("subsrcibe channel error, %s", e));
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
    }
}
  • 定义SubThread线程类

由于Jedis的subscribe操作是阻塞的,因此,我们另起一个线程来进行subscribe操作。

SubThread.java

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class Publisher {
    private final JedisPool jedisPool;

    public Publisher(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }

    public void start() {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        Jedis jedis = jedisPool.getResource();
        while (true) {
            String line = null;
            try {
                line = reader.readLine();
                if (!"quit".equals(line)) {
                    jedis.publish("mychannel", line);
                } else {
                    break;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
         }
    }
}

在上面的代码中,我们从JedisPool获取一个Jedis实例,并使用这个Jedis实例进行subscribe的操作。 Jedis的subscribe的声明如下:

public void subscribe(final JedisPubSub jedisPubSub, final String… channels)

第一个参数接受一个JedisPubSub对象,第二个参数指定对哪个频道进行订阅。上例中,我们把我们定义的Subscriber对象传给subscribe方法。 当publish/subscribe的事件发生时,会自动调用我们Subscriber的方法。

  • 定义Publisher类

Publisher类接受用户的输入,并将输入发布到channel。当用户输入”quit”后,输入结束。

Publisher.java

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class Publisher {
    private final JedisPool jedisPool;

    public Publisher(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }

    public void start() {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        Jedis jedis = jedisPool.getResource();
        while (true) {
            String line = null;
            try {
                line = reader.readLine();
                if (!"quit".equals(line)) {
                    jedis.publish("mychannel", line);
                } else {
                    break;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
         }
    }
}

定义入口代码

如下是我们的程序入口代码。

PubSubDemo.java

import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;


public class PubSubDemo 
{
    public static void main( String[] args )
    {
        // 替换成你的reids地址和端口
        String redisIp = "192.168.229.154";
        int reidsPort = 6379;
        JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), redisIp, reidsPort);
        System.out.println(String.format("redis pool is starting, redis ip %s, redis port %d", redisIp, reidsPort));

        SubThread subThread = new SubThread(jedisPool);
        subThread.start();

        Publisher publisher = new Publisher(jedisPool);
        publisher.start();
    }
}

输出

redis pool is starting, redis ip 192.168.229.154, redis port 6379 subscribe redis, channel mychannel, thread will be blocked subscribe redis channel success, channel mychannel, subscribedChannels 1

这时输入

hello

控制窗口中输出

receive redis published message, channel mychannel, message hello

参考资料

Previous高性能分布式缓存Next简单高速队列的实现与应用

Last updated 5 years ago

Was this helpful?

setex 用法 见官方文档:

在上面的代码中,我们首先生成了一个JedisPool的redis连接池,这是由于Jedis不是线程安全的,JedisPool是线程安全的。而我们的程序在主线程和订阅线程(SubThread)均需要使用Jedis,故在程序中我们使用JedisPool。具体也可以参考。 由于Jedis的subcribe操作是阻塞的,故我们另起了一个线程来进行subcribe操作。 通过调用Publisher::start()方法,接受用户的输入,并publish到指定的channel。

java的实现方式

http://doc.redisfans.com/string/setex.html
在多线程环境中使用Jedis
https://github.com/xetorthio/jedis/wiki/AdvancedUsage
http://basrikahveci.com/a-simple-jedis-publish-subscribe-example/
https://blog.csdn.net/canot/article/details/51938955