使用Redisson和Zookeeper实现分布式锁模拟模拟抢红包业务

使用Redisson和Zookeeper实现分布式锁模拟模拟抢红包业务

业务场景

模拟1000人在10秒内抢10000(或1000)元红包,金额在1-100不等;

使用的框架或软件:

  • Springboot(基础框架)
  • Redisson(实现分布式锁)
  • Zookeeper(实现分布式锁方案)
  • Ngnix(负载均衡)
  • Redis(红包数据存取数据库)

系统或软件:

  • Linux服务器
  • Jmeter(模拟并发请求)

Jmeter 下载和运行

官方网站:http://jmeter.apache.org/

解压后, 运行 “bin/jmeter.bat”

Jmeter 是支持中文的, 启动Jmeter 后, 点击 Options -> Choose Language 来选择语言

测试需求

以获取城市的天气数据为例

第一步: 获取城市的城市代号

发送request到http://toy1.weather.com.cn/search?cityname=上海

从这个请求的 response 中获取到上海的城市代码. 比如:

上海的地区代码是101020100

上海动物园的地区代码是: 10102010016A

第二步: 得到该城市的天气数据

发送request 到: http://www.weather.com.cn/weather2d/101020100.shtml

测试步骤

步骤一: 新建一个Thread Group

必须新建一个Thread Group, jmeter的所有任务都必须由线程处理,所有任务都必须在线程组下面创建。

img

第二步:新建一个 HTTP Request

img

比如我要发送一个Get 方法的http 请求: http://toy1.weather.com.cn/search?cityname=上海

可以按照下图这么填

img

第三步 添加HTTP Head Manager

选中上一步新建的HTTP request. 右键,新建一个Http Header manager. 添加一个header

img

img

第四步: 添加View Results Tree

View Results Tree 是用来看运行的结果的

img

第五步:运行测试,查看结果

img

img

模拟抢红包业务开发

情况1 - 单机服务——没有任何线程安全考虑

@GetMapping("/get/money")
public String getRedPackage(){

    Map map = new HashMap();
    Object o = redisTemplate.opsForValue().get(KEY_RED_PACKAGE_MONEY);
    int remainMoney = Integer.parseInt(String.valueOf(o));
    if(remainMoney <= 0 ){
        map.put("result","红包已抢完");
        return ReturnModel.success(map).appendToString();
    }
    int randomMoney = (int) (Math.random() * 100);
    if(randomMoney > remainMoney){
        randomMoney = remainMoney;
    }
    int newRemainMoney = remainMoney-randomMoney;
    redisTemplate.opsForValue().set(KEY_RED_PACKAGE_MONEY,newRemainMoney);
    String result = "原有金额:" + remainMoney + " 红包金额:" + randomMoney + " 剩余金额:" + newRemainMoney;
    System.out.println(result);
    map.put("result",result);

    return ReturnModel.success(map).appendToString();
}

输出数据有异常:

原有金额:1000 红包金额:49 剩余金额:951
原有金额:1000 红包金额:62 剩余金额:938
原有金额:1000 红包金额:61 剩余金额:939
原有金额:1000 红包金额:93 剩余金额:907
原有金额:1000 红包金额:73 剩余金额:927
原有金额:939 红包金额:65 剩余金额:874
原有金额:939 红包金额:16 剩余金额:923
原有金额:939 红包金额:30 剩余金额:909

情况2 - 单机服务, 使用Lock锁

public static Lock lock = new ReentrantLock();

@GetMapping("/get/money/lock")
public String getRedPackageLock(){
    Map map = new HashMap();
    lock.lock();
    try{
        Object o = redisTemplate.opsForValue().get(KEY_RED_PACKAGE_MONEY);
        int remainMoney = Integer.parseInt(String.valueOf(o));
        if(remainMoney <= 0 ){
            map.put("result","红包已抢完");
            return ReturnModel.success(map).appendToString();
        }
        int randomMoney = (int) (Math.random() * 100);
        if(randomMoney > remainMoney){
            randomMoney = remainMoney;
        }
        int newRemainMoney = remainMoney-randomMoney;
        redisTemplate.opsForValue().set(KEY_RED_PACKAGE_MONEY,newRemainMoney);
        String result = "原有金额:" + remainMoney + " 红包金额:" + randomMoney + " 剩余金额:" + newRemainMoney;
        System.out.println(result);
        map.put("result",result);
        return ReturnModel.success(map).appendToString();
    }finally {
        lock.unlock();
    }
}

Lock在单服务器是线程安全的, 此时输出数据正常:

原有金额:1000 红包金额:11 剩余金额:989
原有金额:989 红包金额:48 剩余金额:941
原有金额:941 红包金额:17 剩余金额:924
原有金额:924 红包金额:89 剩余金额:835
原有金额:835 红包金额:63 剩余金额:772
原有金额:772 红包金额:77 剩余金额:695
原有金额:695 红包金额:76 剩余金额:619
原有金额:619 红包金额:8 剩余金额:611
原有金额:611 红包金额:67 剩余金额:544
原有金额:544 红包金额:9 剩余金额:535
原有金额:535 红包金额:78 剩余金额:457
......

情况3.1 - 两台服务器, 使用Lock锁

数据异常(代码情况2一样);

Lock在镀钛服务器下是非线程安全的

负载均衡配置

使用Nginx配置负载均衡,部署两个服务分别是8001和8002端口,Nginx暴露8080端口,转发请求到8001和8002;

img

nginx配置

http {
    include       mime.types;
    default_type  application/octet-stream;
    sendfile        on;
    keepalive_timeout  65;
    ##定义负载均衡真实服务器IP:端口号 weight表示权重
    upstream myserver{
        server   XX.XX.XX.XX:8001 weight=1;
        server   XX.XX.XX.XX:8002 weight=1;
     }
    server {
        listen   8080;
        location  / {
            proxy_pass   http://myserver;
            proxy_connect_timeout 10;
        }
    }  
}

情况3.2 - 两台服务器 使用Redisson分布式锁


    io.netty
    netty-all
    4.1.31.Final


    org.redisson
    redisson
    3.6.5
@Configuration
public class RedissonConfig {

    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port}")
    private String port;

    @Value("${spring.redis.password}")
    private String password;

    @Bean
    public RedissonClient getRedisson(){

        Config config = new Config();
        config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password);
        return Redisson.create(config);
    }

}
@Autowired
private RedissonClient redissonClient;

//3-抢红包-redisson
@GetMapping("/get/money/redisson")
public String getRedPackageRedison(){
    RLock rLock = redissonClient.getLock("secKill");
    rLock.lock();
    Map map = new HashMap();
    try{
        Object o = redisTemplate.opsForValue().get(KEY_RED_PACKAGE_MONEY);
        int remainMoney = Integer.parseInt(String.valueOf(o));
        if(remainMoney <= 0 ){
            map.put("result","红包已抢完");
            return ReturnModel.success(map).appendToString();
        }
        int randomMoney = (int) (Math.random() * 100);
        if(randomMoney > remainMoney){
            randomMoney = remainMoney;
        }
        int newRemainMoney = remainMoney-randomMoney;
        redisTemplate.opsForValue().set(KEY_RED_PACKAGE_MONEY,newRemainMoney);
        String result = "原有金额:" + remainMoney + " 红包金额:" + randomMoney + "剩余金额:" + newRemainMoney;
        System.out.println(result);
        map.put("result",result);
        return ReturnModel.success(map).appendToString();
    }finally {
        rLock.unlock();
    }
}

情况3.3 - 两台服务器——使用Zookeeper分布式锁——数据正常

@Configuration
public class ZkConfiguration {
    /**
     * 重试次数
     */
    @Value("${curator.retryCount}")
    private int retryCount;
    /**
     * 重试间隔时间
     */
    @Value("${curator.elapsedTimeMs}")
    private int elapsedTimeMs;
    /**
     * 连接地址
     */
    @Value("${curator.connectString}")
    private String connectString;
    /**
     * Session过期时间
     */
    @Value("${curator.sessionTimeoutMs}")
    private int sessionTimeoutMs;
    /**
     * 连接超时时间
     */
    @Value("${curator.connectionTimeoutMs}")
    private int connectionTimeoutMs;

    @Bean(initMethod = "start")
    public CuratorFramework curatorFramework() {
        return CuratorFrameworkFactory.newClient(
                connectString,
                sessionTimeoutMs,
                connectionTimeoutMs,
                new RetryNTimes(retryCount,elapsedTimeMs));
    }

    /**
     * Distributed lock by zookeeper distributed lock by zookeeper.
     *
     * @return the distributed lock by zookeeper
     */
    @Bean(initMethod = "init")
    public DistributedLockByZookeeper distributedLockByZookeeper() {
        return new DistributedLockByZookeeper();
    }
}
@Slf4j
public class DistributedLockByZookeeper {
    private final static String ROOT_PATH_LOCK = "myk";

    private CountDownLatch countDownLatch = new CountDownLatch(1);

    /**
     * The Curator framework.
     */
    @Autowired
    CuratorFramework curatorFramework;

    /**
     * 获取分布式锁
     * 创建一个临时节点,
     *
     * @param path the path
     */
    public void acquireDistributedLock(String path) {
        String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
        while (true) {
            try {
                curatorFramework.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.EPHEMERAL)
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                        .forPath(keyPath);
                //log.info("success to acquire lock for path:{}", keyPath);
                break;
            } catch (Exception e) {
                //抢不到锁,进入此处!
                //log.info("failed to acquire lock for path:{}", keyPath);
                //log.info("while try again .......");
                try {
                    if (countDownLatch.getCount() <= 0) {
                        countDownLatch = new CountDownLatch(1);
                    }
                    //避免请求获取不到锁,重复的while,浪费CPU资源
                    countDownLatch.await();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }

    /**
     * 释放分布式锁
     *
     * @param path the  节点路径
     * @return the boolean
     */
    public boolean releaseDistributedLock(String path) {
        try {
            String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
            if (curatorFramework.checkExists().forPath(keyPath) != null) {
                curatorFramework.delete().forPath(keyPath);
            }
        } catch (Exception e) {
            //log.error("failed to release lock,{}", e);
            return false;
        }
        return true;
    }

    /**
     * 创建 watcher 事件
     */
    private void addWatcher(String path) {
        String keyPath;
        if (path.equals(ROOT_PATH_LOCK)) {
            keyPath = "/" + path;
        } else {
            keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
        }
        try {
            final PathChildrenCache cache = new PathChildrenCache(curatorFramework, keyPath, false);
            cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
            cache.getListenable().addListener((client, event) -> {
                if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                    String oldPath = event.getData().getPath();
                    //log.info("上一个节点 " + oldPath + " 已经被断开");
                    if (oldPath.contains(path)) {
                        //释放计数器,让当前的请求获取锁
                        countDownLatch.countDown();
                    }
                }
            });
        } catch (Exception e) {
            log.info("监听是否锁失败!{}", e);
        }
    }

    /**
     * 创建父节点,并创建永久节点
     */
    public void init() {
        curatorFramework = curatorFramework.usingNamespace("lock-namespace");
        String path = "/" + ROOT_PATH_LOCK;
        try {
            if (curatorFramework.checkExists().forPath(path) == null) {
                curatorFramework.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT)
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                        .forPath(path);
            }
            addWatcher(ROOT_PATH_LOCK);
            log.info("root path 的 watcher 事件创建成功");
        } catch (Exception e) {
            log.error("connect zookeeper fail,please check the log >> {}", e.getMessage(), e);
        }
    }

}
    @Autowired
    DistributedLockByZookeeper distributedLockByZookeeper;
    private final static String PATH = "red_package";
    //4-抢红包-zookeeper
    @GetMapping("/get/money/zookeeper")
    public String getRedPackageZookeeper(){

        Boolean flag = false;
        distributedLockByZookeeper.acquireDistributedLock(PATH);
        Map map = new HashMap();
        try {
            Object o = redisTemplate.opsForValue().get(KEY_RED_PACKAGE_MONEY);
            int remainMoney = Integer.parseInt(String.valueOf(o));
            if(remainMoney <= 0 ){
                map.put("result","红包已抢完");
                return ReturnModel.success(map).appendToString();
            }
            int randomMoney = (int) (Math.random() * 100);
            if(randomMoney > remainMoney){
                randomMoney = remainMoney;
            }
            int newRemainMoney = remainMoney-randomMoney;
            redisTemplate.opsForValue().set(KEY_RED_PACKAGE_MONEY,newRemainMoney);
            String result = "原有金额:" + remainMoney + " 红包金额:" + randomMoney + "剩余金额:" + newRemainMoney;
            System.out.println(result);
            map.put("result",result);
            return ReturnModel.success(map).appendToString();
        } catch(Exception e){
            e.printStackTrace();
            flag = distributedLockByZookeeper.releaseDistributedLock(PATH);
            //System.out.println("releaseDistributedLock: " + flag);
            map.put("result","getRedPackageZookeeper catch exceeption");
            return ReturnModel.success(map).appendToString();
        }finally {
            flag = distributedLockByZookeeper.releaseDistributedLock(PATH);
            //System.out.println("releaseDistributedLock: " + flag);
        }
    }

附录

1- 其他配置和类

application.properties文件

server.port=80

#配置redis
spring.redis.host=XX.XX.XX.XX
spring.redis.port=6379
spring.redis.password=xuegaotest1234
spring.redis.database=0
#重试次数
curator.retryCount=5
#重试间隔时间
curator.elapsedTimeMs=5000
# zookeeper 地址
curator.connectString=XX.XX.XX.XX:2181
# session超时时间
curator.sessionTimeoutMs=60000
# 连接超时时间
curator.connectionTimeoutMs=5000

ReturnModel 类

public class ReturnModel implements Serializable{

    private int code;
    private String msg;
    private Object data;

    public static ReturnModel success(Object obj){
        return new ReturnModel(200,"success",obj);
    }

    public String appendToString(){
        return  JSON.toJSONString(this);
    }

    public ReturnModel() {
    }

    public ReturnModel(int code, String msg, Object data) {
        this.code = code;
        this.msg = msg;
        this.data = data;
    }

    public int getCode() {
        return code;
    }

    public void setCode(int code) {
        this.code = code;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public Object getData() {
        return data;
    }

    public void setData(Object data) {
        this.data = data;
    }
}

SeckillController类

    public final static String KEY_RED_PACKAGE_MONEY  = "key_red_package_money";

    @Autowired
    private RedisTemplate redisTemplate;

    //1-设置红包
    @GetMapping("/set/money/{amount}")
    public String setRedPackage(@PathVariable Integer amount){
        redisTemplate.opsForValue().set(KEY_RED_PACKAGE_MONEY,amount);
        Object o = redisTemplate.opsForValue().get(KEY_RED_PACKAGE_MONEY);
        Map map = new HashMap();
        map.put("moneyTotal",Integer.parseInt(String.valueOf(o)));
        return ReturnModel.success(map).appendToString();
    }

流程解析

Zookeeper分布锁

1- 在ZkConfiguration类中加载CuratorFramework时,设置参数,实例化一个CuratorFramework类; 实例化过程中,执行CuratorFrameworkImpl类中的的start(),其中CuratorFrameworkImpl类是CuratorFramework的实现类;根据具体的细节可以参考博客

@Bean(initMethod = "start")
public CuratorFramework curatorFramework() {
    return CuratorFrameworkFactory.newClient( connectString,  sessionTimeoutMs, connectionTimeoutMs,  new RetryNTimes(retryCount,elapsedTimeMs));
}

2- 在ZkConfiguration类中加载DistributedLockByZookeeper时;执行其中的init()方法;init()方法中主要是创建父节点和添加监听

/**
 * 创建父节点,并创建永久节点
 */
public void init() {
    curatorFramework = curatorFramework.usingNamespace("lock-namespace");
    String path = "/" + ROOT_PATH_LOCK;
    try {
        if (curatorFramework.checkExists().forPath(path) == null) {
            curatorFramework.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                    .forPath(path);
        }
        addWatcher(ROOT_PATH_LOCK);
        log.info("root path 的 watcher 事件创建成功");
    } catch (Exception e) {
        log.error("connect zookeeper fail,please check the log >> {}", e.getMessage(), e);
    }
}

3- 在具体业务中调用distributedLockByZookeeper.acquireDistributedLock(PATH);获取分布式锁

/**
 * 获取分布式锁
 * 创建一个临时节点,
 *
 * @param path the path
 */
public void acquireDistributedLock(String path) {
    String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
    while (true) {
        try {
            curatorFramework.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL)
                    .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                    .forPath(keyPath);
            break;
        } catch (Exception e) {
            //抢不到锁,进入此处!
            try {
                if (countDownLatch.getCount() <= 0) {
                    countDownLatch = new CountDownLatch(1);
                }
                //避免请求获取不到锁,重复的while,浪费CPU资源
                countDownLatch.await();
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }
    }
}

4- 业务结束时调用distributedLockByZookeeper.releaseDistributedLock(PATH);释放锁

/**
 * 释放分布式锁
 *
 * @param path the  节点路径
 * @return the boolean
 */
public boolean releaseDistributedLock(String path) {
    try {
        String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
        if (curatorFramework.checkExists().forPath(keyPath) != null) {
            curatorFramework.delete().forPath(keyPath);
        }
    } catch (Exception e) {
        return false;
    }
    return true;
}

原理图如下

img

期间碰到的问题

问题: 项目启动时:java.lang.ClassNotFoundException: com.google.common.base.Function

原因:缺少google-collections jar包;如下


    com.google.collections
    google-collections
    1.0

问题:项目启动时:org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss

原因:简单说,就是连接失败(可能原因的有很多);依次排查了zookeeper服务器防火墙、application.properties配置文件;最后发现IP的写错了,更正后就好了

问题:Jemter启用多线程并发测试时:java.net.BindException: Address already in use: connect.

Views: 512

图解 Redis 数据结构

Redis 为什么那么快?

除了它是内存数据库,使得所有的操作都在内存上进行之外,还有一个重要因素,它实现的数据结构,使得我们对数据进行增删查改操作时,Redis 能高效的处理。

因此,这次我们就来好好聊一下 Redis 数据结构,这个在面试中太常问了。

注意,Redis 数据结构并不是指 string(字符串)、List(列表)、Hash(哈希)、Set(集合)和 Zset(有序集合),因为这些是 Redis 键值对存储中值的数据类型,下面要将的是这些数据类型对应的底层实现的方式。

Redis 底层的数据结构一共有 6 种,如下图右边部分,它和数据类型对应关系也如下图:

图片

可以看到,有些数据类型可以由两种 数据结构实现,比如:

  • List 数据类型底层数据结构由「双向链表」或「压缩表列表」实现;
  • Hash 数据类型底层数据结构由「压缩列表」或「哈希表」实现;
  • Set 数据类型底层数据结构由「哈希表」或「整数集合」实现;
  • Zset 数据类型底层数据结构由「压缩列表」或「跳表」实现;

好了,不多 BB 了,直接发车!

图片

1. SDS

字符串在 Redis 中是很常用的,键值对中的键是字符串,值有时也是字符串。

Redis 是用 C 语言实现的,但是它没有直接使用 C 语言的 char* 字符数组来实现字符串,而是自己封装了一个名为简单动态字符串(simple dynamic string,SDS) 的数据结构来表示字符串,也就是 Redis 的 String 数据类型的底层数据结构是 SDS。

既然 Redis 设计了 SDS 结构来表示字符串,肯定是 C 语言的 char* 字符数组存在一些缺陷。

要了解这一点,得先来看看 char* 字符数组的结构。

C 语言字符串的缺陷

C 语言的字符串其实就是一个字符数组,即数组中每个元素是字符串中的一个字符。

比如,下图就是字符串“xiaolin”的 char* 字符数组的结构:

图片

没学过 C 语言的同学,可能会好奇为什么最后一个字符是“\0”?

在 C 语言里,对字符串操作时,char * 指针只是指向字符数组的起始位置,而字符数组的结尾位置就用“\0”表示,意思是指字符串的结束

因此,C 语言标准库中字符串的操作函数,就通过判断字符是不是“\0”,如果不是说明字符串还没结束,可以继续操作,如果是则说明字符串结束了,停止操作。

举个例子,C 语言获取字符串长度的函数 strlen,就是通过字符数组中的每一个字符,并进行计数,等遇到字符为“\0”后,就会停止遍历,然后返回已经统计到的字符个数,即为字符串长度。下图显示了 strlen 函数的执行流程:

图片

很明显,C 语言获取字符串长度操作的时间复杂度是 O(N)(*这是一个可以改进的地方*

C 语言的字符串用 “\0” 字符作为结尾标记有个缺陷。假设有个字符串中有个 “\0” 字符,这时在操作这个字符串时就会提早结束,比如 “xiao\0lin” 字符串,计算字符串长度的时候则会是 4,如下图:

图片

还有,除了字符串中不能 “\0” 字符外,用 char* 字符串中的字符必须符合某种编码(比如ASCII)。

这些限制使得 C 语言的字符串只能保存文本数据,不能保存像图片、音频、视频文化这样的二进制数据(这也是一个可以改进的地方)

C 语言标准库中字符串的操作函数是很不安全的,对程序员很不友好,稍微一不注意,就会导致缓冲区溢出。

举个例子,strcat 函数是可以将两个字符串拼接在一起。

c //将 src 字符串拼接到 dest 字符串后面 char *strcat(char *dest, const char* src);

C 语言的字符串是不会记录自身的缓冲区大小的,所以 strcat 函数假定程序员在执行这个函数时,已经为 dest 分配了足够多的内存,可以容纳 src 字符串中的所有内容,而一旦这个假定不成立,就会发生缓冲区溢出将可能会造成程序运行终止,(这是一个可以改进的地方)

而且,strcat 函数和 strlen 函数类似,时间复杂度也很高,也都需要先通过遍历字符串才能得到目标字符串的末尾。然后对于 strcat 函数来说,还要再遍历源字符串才能完成追加,对字符串的操作效率不高。

好了, 通过以上的分析,我们可以得知 C 语言的字符串 不足之处以及可以改进的地方:

  • 获取字符串长度的时间复杂度为 O(N);
  • 字符串的结尾是以 “\0” 字符标识,而且字符必须符合某种编码(比如ASCII),只能保存文本数据,不能保存二进制数据;
  • 字符串操作函数不高效且不安全,比如可能会发生缓冲区溢出,从而造成程序运行终止;

Redis 实现的 SDS 的结构就把上面这些问题解决了,接下来我们一起看看 Redis 是如何解决的。

SDS 结构设计

下图就是 Redis 5.0 的 SDS 的数据结构:

图片

结构中的每个成员变量分别介绍下:

  • len,SDS 所保存的字符串长度。这样获取字符串长度的时候,只需要返回这个变量值就行,时间复杂度只需要 O(1)。
  • alloc,分配给字符数组的空间长度。这样在修改字符串的时候,可以通过 alloc - len 计算 出剩余的空间大小,然后用来判断空间是否满足修改需求,如果不满足的话,就会自动将 SDS 的空间扩展至执行修改所需的大小,然后才执行实际的修改操作,所以使用 SDS 既不需要手动修改 SDS 的空间大小,也不会出现前面所说的缓冲区益处的问题。
  • flags,SDS 类型,用来表示不同类型的 SDS。一共设计了 5 种类型,分别是 sdshdr5、sdshdr8、sdshdr16、sdshdr32 和 sdshdr64,后面再说明区别之处。
  • buf[],字节数组,用来保存实际数据。不需要用 “\0” 字符来标识字符串结尾了,而是直接将其作为二进制数据处理,可以用来保存图片等二进制数据。它即可以保存文本数据,也可以保存二进制数据,所以叫字节数组会更好点。

总的来说,Redis 的 SDS 结构在原本字符数组之上,增加了三个元数据:len、alloc、flags,用来解决 C 语言字符串的缺陷。

支持O(1)复杂度获取字符串长度

C 语言的字符串长度获取 strlen 函数,需要通过遍历的方式来统计字符串长度,时间复杂度是 O(N)。

而 Redis 的 SDS 结构因为加入了 len 成员变量,那么获取字符串长度的时候,直接返回这个变量的值就行,所以复杂度只有 O(1)。

支持二进制安全读写

因为 SDS 不需要用 “\0” 字符来标识字符串结尾了,而且 SDS 的 API 都是以处理二进制的方式来处理 SDS 存放在 buf[] 里的数据,程序不会对其中的数据做任何限制,数据写入的时候时什么样的,它被读取时就是什么样的。

通过使用二进制安全的 SDS,而不是 C 字符串,使得 Redis 不仅 可以保存文本数据,也可以保存任意格式的二进制数据。

不会发生缓冲区溢出

C 语言的字符串标准库提供的字符串操作函数,大多数(比如 strcat 追加字符串函数)都是不安全的,因为这些函数把缓冲区大小是否满足操作的工作交由开发者来保证,程序内部并不会判断缓冲区大小是否足够用,当发生了缓冲区溢出就有可能造成程序异常结束。

所以,Redis 的 SDS 结构里引入了 alloc 和 leb 成员变量,这样 SDS API 通过 alloc - len 计算,可以算出剩余可用的空间大小,这样在对字符串做修改操作的时候,就可以由程序内部判断缓冲区大小是否足够用。

而且,当判断出缓冲区大小不够用时,Redis 会自动将扩大 SDS 的空间大小,以满足修改所需的大小。

在扩展 SDS 空间之前,SDS API 会优先检查未使用空间是否足够,如果不够的话,API 不仅会为 SDS 分配修改所必须要的空间,还会给 SDS 分配额外的「未使用空间」。

这样的好处是,下次在操作 SDS 时,如果 SDS 空间够的话,API 就会直接使用「未使用空间」,而无须执行内存分配,有效的减少内存分配次数。

所以,使用 SDS 即不需要手动修改 SDS 的空间大小,也不会出现缓冲区溢出的问题。

节省内存空间

SDS 结构中有个 flags 成员变量,表示的是 SDS 类型。

Redos 一共设计了 5 种类型,分别是 sdshdr5、sdshdr8、sdshdr16、sdshdr32 和 sdshdr64。

这 5 种类型的主要区别就在于,它们数据结构中的 len 和 alloc 成员变量的数据类型不同

比如 sdshdr16 和 sdshdr32 这两个类型,它们的定义分别如下:

struct __attribute__ ((__packed__)) sdshdr16 {
    uint16_t len;
    uint16_t alloc; 
    unsigned char flags; 
    char buf[];
};

struct __attribute__ ((__packed__)) sdshdr32 {
    uint32_t len;
    uint32_t alloc; 
    unsigned char flags;
    char buf[];
};

可以看到:

  • sdshdr16 类型的 len 和 alloc 的数据类型都是 uint16_t,表示字符数组长度和分配空间大小不能超过 2 的 16 次方。
  • sdshdr32 则都是 uint32_t,表示表示字符数组长度和分配空间大小不能超过 2 的 32 次方。

之所以 SDS 设计不同类型的结构体,是为了能灵活保存不同大小的字符串,从而有效节省内存空间。比如,在保存小字符串时,结构头占用空间也比较少。

除了设计不同类型的结构体,Redis 在编程上还使用了专门的编译优化来节省内存空间,即在 struct 声明了 __attribute__ ((packed)) ,它的作用是:告诉编译器取消结构在编译过程中的优化对齐,按照实际占用字节数进行对齐

比如,sdshdr16 类型的 SDS,默认情况下,编译器会按照 16 字节对其的方式给变量分配内存,这意味着,即使一个变量的大小不到 16 个字节,编译器也会给它分配 16 个字节。

举个例子,假设下面这个结构体,它有两个成员变量,类型分别是 char 和 int,如下所示:

#include 

 struct test1 {
    char a;
    int b;
 } test1;

int main() {
     printf("%lu\n", sizeof(test1));
     return 0;
}

大家猜猜这个结构体大小是多少?我先直接说答案,这个结构体大小计算出来是 8。

图片

这是因为默认情况下,编译器是使用字节对其的方式分配内存,虽然 char 类型只占一个字节,但是由于成员变量里有 int 类型,它占用了 4 个字节,所以在成员变量为 char 类型分配内存时,会分配 4 个字节,其中这多余的 3 个字节是为了字节对其而分配的,相当于有 3 个字节被浪费掉了。

如果不想编译器使用字节对其的方式进行分配内存,可以采用了 __attribute__ ((packed)) 属性定义结构体,这样一来,结构体实际占用多少内存空间,编译器就分配多少空间。

比如,我用 __attribute__ ((packed)) 属性定义下面的结构体 ,同样包含 char 和 int 两个类型的成员变量,代码如下所示:

#include 

struct __attribute__((packed)) test2  {
    char a;
    int b;
 } test2;

int main() {
     printf("%lu\n", sizeof(test2));
     return 0;
}

这时打印的结果是 5(1 个字节 char + 4 字节 int)。

图片

可以看得出,这是按照实际占用字节数进行分配内存的,这样可以节省内存空间。


2. 链表

除了数组之外,相信大家最熟悉的数据结构就是链表了。

Redis 的 list 数据类型的底层实现之一就是链表。C 语言本身也是没有链表这个数据结构的,所以 Redis 自己设计了一个链表数据结构。

链表节点结构设计

先来看看链表节点结构的样子:

typedef struct listNode {
    //前置节点
    struct listNode *prev;
    //后置节点
    struct listNode *next;
    //节点的值
    void *value;
} listNode;

有前置节点和后置节点,可以看的出,这个是一个双向链表。

图片

链表结构设计

不过,Redis 在 listNode 结构体基础上又封装了 list 这个数据结构,这样操作起来会更方便,链表结构如下:

typedef struct list {
    //链表头节点
    listNode *head;
    //链表尾节点
    listNode *tail;
    //节点值复制函数
    void *(*dup)(void *ptr);
    //节点值释放函数
    void (*free)(void *ptr);
    //节点值比较函数
    int (*match)(void *ptr, void *key);
    //链表节点数量
    unsigned long len;
} list;

list 结构为链表提供了链表头指针 head、链表尾节点 tail、链表节点数量 len、以及可以自定义实现的 dup、free、match 函数。

举个例子,下面是由 list 结构和 3 个 listNode 结构组成的链表。

图片

Redis 的链表实现优点如下:

  • listNode 链表节点带有 prev 和 next 指针,获取某个节点的前置节点或后置节点的时间复杂度只需O(1),而且这两个指针都可以指向 NULL,所以链表是无环链表
  • list 结构因为提供了表头指针 head 和表尾节点 tail,所以获取链表的表头节点和表尾节点的时间复杂度只需O(1)
  • list 结构因为提供了链表节点数量 len,所以获取链表中的节点数量的时间复杂度只需O(1)
  • listNode 链表节使用 void* 指针保存节点值,并且可以通过 list 结构的 dup、free、match 函数指针为节点设置该节点类型特定的函数,因此链表节点可以保存各种不同类型的值;

链表的缺陷也是有的,链表每个节点之间的内存都是不连续的,意味着无法很好利用 CPU 缓存。

能很好利用 CPU 缓存的数据结构就是数组,因为数组的内存是连续的,这样就可以充分利用 CPU 缓存来加速访问。

因此,Redis 的 list 数据类型在数据量比较少的情况下,会采用「压缩列表」作为底层数据结构的实现,压缩列表就是由数组实现的,下面我们会细说压缩列表。


3. 压缩列表

压缩列表是 Redis 数据类型为 list 和 hash 的底层实现之一。

  • 当一个列表键(list)只包含少量的列表项,并且每个列表项都是小整数值,或者长度比较短的字符串,那么 Redis 就会使用压缩列表作为列表键(list)的底层实现。
  • 当一个哈希键(hash)只包含少量键值对,并且每个键值对的键和值都是小整数值,或者长度比较短的字符串,那么 Redis 就会使用压缩列表作为哈希键(hash)的底层实现。

压缩列表结构设计

压缩列表是 Redis 为了节约内存而开发的,它是由连续内存块组成的顺序型数据结构,有点类似于数组。

图片

压缩列表在表头有三个字段:

  • zlbytes,记录整个压缩列表占用对内存字节数;
  • zltail,记录压缩列表「尾部」节点距离起始地址由多少字节,也就是列表尾的偏移量;
  • zllen,记录压缩列表包含的节点数量;
  • zlend,标记压缩列表的结束点,特殊值 OxFF(十进制255)。

在压缩列表中,如果我们要查找定位第一个元素和最后一个元素,可以通过表头三个字段的长度直接定位,复杂度是 O(1)。而查找其他元素时,就没有这么高效了,只能逐个查找,此时的复杂度就是 O(N) 了。

另外,压缩列表节点(entry)的构成如下:

图片

压缩列表节点包含三部分内容:

  • prevlen,记录了前一个节点的长度;
  • encoding,记录了当前节点实际数据的类型以及长度;
  • data,记录了当前节点的实际数据;

当我们往压缩列表中插入数据时,压缩列表 就会根据数据是字符串还是整数,以及它们的大小会在 prevlen 和 encoding 这两个元素里保存不同的信息,这种根据数据大小进行对应信息保存的设计思想,正是 Redis 为了节省内存而采用的。

连锁更新

压缩列表除了查找复杂度高的问题,压缩列表在插入元素时,如果内存空间不够了,压缩列表还需要重新分配一块连续的内存空间,而这可能会引发连锁更新的问题。

压缩列表里的每个节点中的 prevlen 属性都记录了「前一个节点的长度」,而且 prevlen 属性的空间大小跟前一个节点长度值有关,比如:

  • 如果前一个节点的长度小于 254 字节,那么 prevlen 属性需要用 1 字节的空间来保存这个长度值;
  • 如果前一个节点的长度大于等于 254 字节,那么 prevlen 属性需要用 5 字节的空间来保存这个长度值;

现在假设一个压缩列表中有多个连续的、长度在 250~253 之间的节点,如下图:

图片

因为这些节点长度值小于 254 字节,所以 prevlen 属性需要用 1 字节的空间来保存这个长度值。

这时,如果将一个长度大于等于 254 字节的新节点加入到压缩列表的表头节点,即新节点将成为 e1 的前置节点,如下图:

图片

因为 e1 节点的 prevlen 属性只有 1 个字节大小,无法保存新节点的长度,此时就需要对压缩列表的空间重分配操作,并将 e1 节点的 prevlen 属性从原来的 1 字节大小扩展为 5 字节大小。

多米诺牌的效应就此开始。

图片

e1 原本的长度在 250~253 之间,因为刚才的扩展空间,此时 e1 的长度就大于等于 254 了,因此原本 e2 保存 e1 的 prevlen 属性也必须从 1 字节扩展至 5 字节大小。

正如扩展 e1 引发了对 e2 扩展一样,扩展 e2 也会引发对 e3 的扩展,而扩展 e3 又会引发对 e4 的扩展…. 一直持续到结尾。

这种在特殊情况下产生的连续多次空间扩展操作就叫做「连锁更新」,就像多米诺牌的效应一样,第一张牌倒下了,推动了第二张牌倒下;第二张牌倒下,又推动了第三张牌倒下….

连锁更新一旦发生,就会导致压缩列表 占用的内存空间要多次重新分配,这就会直接影响到压缩列表的访问性能。

所以说,虽然压缩列表紧凑型的内存布局能节省内存开销,但是如果保存的元素数量增加了,或是元素变大了,压缩列表就会面临「连锁更新」的风险。

因此,压缩列表只会用于保存的节点数量不多的场景,只要节点数量足够小,即使发生连锁更新,也是能接受的。

4. 哈希表

哈希表是一种保存键值对(key-value)的数据结构。

哈希表中的每一个 key 都是独一无二的,程序可以根据 key 查找到与之关联的 value,或者通过 key 来更新 value,又或者根据 key 来删除整个 key-value等等。

在讲压缩列表的时候,提到过 Redis 的 hash 数据类型的底层实现之一是压缩列表。hash 数据类型的另外一个底层实现就是哈希表。

那 hash 数据类型什么时候会选用哈希表作为底层实现呢?

当一个哈希键包含的 key-value 比较多,或者 key-value 中元素都是比较长多字符串时,Redis 就会使用哈希表作为哈希键的底层实现。

Hash 表优点在于,它能以 O(1) 的复杂度快速查询数据。主要是通过 Hash 函数的计算,就能定位数据在表中的位置,紧接着可以对数据进行操作,这就使得数据操作非常快。

但是存在的风险也是有,在哈希表大小固定的情况下,随着数据不断增多,那么哈希冲突的可能性也会越高。

解决哈希冲突的方式,有很多种。Redis 采用了链式哈希,在不扩容哈希表的前提下,将具有相同哈希值的数据链接起来,以便这些数据在表中仍然可以被查询到。

接下来,详细说说哈希冲突以及链式哈希。

哈希冲突

哈希表实际上是一个数组,数组里多每一个元素就是一个哈希桶。

当一个键值对的键经过 Hash 函数计算后得到哈希值,再将(哈希值 % 哈希表大小)取模计算,得到的结果值就是该 key-value 对应的数组元素位置,也就是第几个哈希桶。

举个例子,有一个可以存放 8 个哈希桶的哈希表。key1 经过哈希函数计算后,再将「哈希值 % 8 」进行取模计算,结果值为 1,那么就对应哈希桶 1,类似的,key9 和 key10 分别对应哈希桶 1 和桶 6。

图片

此时,key1 和 key9 对应到了相同的哈希桶中,这就发生了哈希冲突。

因此,当有两个以上数量的 kay 被分配到了哈希表数组的同一个哈希桶上时,此时称这些 key 发生了冲突。

链式哈希

Redis 采用了「链式哈希」的方法来解决哈希冲突。

实现的方式就是每个哈希表节点都有一个 next 指针,多个哈希表节点可以用 next 指针构成一个单项链表,被分配到同一个哈希桶上的多个节点可以用这个单项链表连接起来,这样就解决了哈希冲突。

还是用前面的哈希冲突例子,key1 和 key9 经过哈希计算后,都落在同一个哈希桶,链式哈希的话,key1 就会通过 next 指针指向 key9,形成一个单向链表。

图片

不过,链式哈希局限性也很明显,随着链表长度的增加,在查询这一位置上的数据的耗时就会增加,毕竟链表的查询的时间复杂度是 O(n)。

要想解决这一问题,就需要进行 rehash,就是对哈希表的大小进行扩展。

接下来,看看 Redis 是如何实现的 rehash 的。

rehash

Redis 会使用了两个全局哈希表进行 rehash。

在正常服务请求阶段,插入的数据,都会写入到「哈希表 1」,此时的「哈希表 2 」 并没有被分配空间。

随着数据逐步增多,触发了 rehash 操作,这个过程分为三步:

  • 给「哈希表 2」 分配空间,一般会比「哈希表 1」 大 2 倍;
  • 将「哈希表 1 」的数据迁移到「哈希表 2」 中;
  • 迁移完成后,「哈希表 1 」的空间会被释放,并把「哈希表 2」 设置为「哈希表 1」,然后在「哈希表 2」 新创建一个空白的哈希表,为下次 rehash 做准备。

为了方便你理解,我把 rehash 这三个过程画在了下面这张图:

图片

这个过程看起来简单,但是其实第二步很有问题,如果「哈希表 1 」的数据量非常大,那么在迁移至「哈希表 2 」的时候,因为会涉及大量的数据拷贝,此时可能会对 Redis 造成阻塞,无法服务其他请求。

渐进式 rehash

为了避免 rehash 在数据迁移过程中,因拷贝数据的耗时,影响 Redis 性能的情况,所以 Redis 采用了渐进式 rehash,也就是将数据的迁移的工作不再是一次性迁移完成,而是分多次迁移。

渐进式 rehash 步骤如下:

  • 给「哈希表 2」 分配空间;
  • 在 rehash 进行期间,每次哈希表元素进行新增、删除、查找或者更新操作时,Redis 除了会执行对应的操作之外,还会顺序将「哈希表 1 」中索引位置上的所有 key-value 迁移到「哈希表 2」 上
  • 随着处理客户端发起的哈希表操作请求数量越多,最终会把「哈希表 1 」的所有 key-value 迁移到「哈希表 2」,从而完成 rehash 操作。

这样就巧妙地把一次性大量数据迁移工作的开销,分摊到了多次处理请求的过程中,避免了一次性 rehash 的耗时操作。

在进行渐进式 rehash 的过程中,会有两个哈希表,所以在渐进式 rehash 进行期间,哈希表元素的删除、查找、更新等操作都会在这两个哈希表进行。

比如,查找一个 key 的值的话,先会在哈希表 1 里面进行查找,如果没找到,就会继续到哈希表 2 里面进行找到。

另外,在渐进式 rehash 进行期间,新增一个 key-value 时,会被保存到「哈希表 2 」里面,而「哈希表 1」 则不再进行任何添加操作,这样保证了「哈希表 1 」的 key-value 数量只会减少,随着 rehash 操作的完成,最终「哈希表 1 」就会变成空表。

rehash 触发条件

介绍了 rehash 那么多,还没说什么时情况下会触发 rehash 操作呢?

rehash 的触发条件跟负载因子(load factor)有关系。

负载因子可以通过下面这个公式计算:

图片

触发 rehash 操作的条件,主要有两个:

  • 当负载因子大于等于 1 ,并且 Redis 没有在执行 bgsave 命令或者 bgrewiteaof 命令,也就是没有执行 RDB 快照或没有进行 AOF 重写的时候,就会进行 rehash 操作。
  • 当负载因子大于等于 5 时,此时说明哈希冲突非常严重了,不管有没有有在执行 RDB 快照或 AOF 重写,都会强制进行 rehash 操作。

参考资料:《redis设计与实现》、《Redis 源码剖析与实战》。

以上文章来源于作者小林coding
小林coding.图解得了技术,谈吐得了烟火。

Views: 170

Storm 集成 Redis

官方文档: Storm/Trident 集成 Redis

Storm-redis 使用 Jedis 作为 Redis client.

maven 依赖配置如下:


    org.apache.storm
    storm-redis
    ${storm.version}
    jar

一般情况

storm-redis提供的三种基本的bolt实现:

  1. RedisLookupBolt 通过 key 从 redis 获取对应的 value
  2. RedisStoreBolt 存储 key / value 到 Redis
  3. RedisFilterBolt 用于过滤key或者field在 Redis 中不存在的 tuple.

Tuple和Redis中的键值对的映射关系可以通过TupleMapper来进行定义.

通过RedisDataTypeDescription这个类可以选择数据类型.参考 RedisDataTypeDescription.RedisDataType可以查看有哪些支持的数据类型.

接口 RedisLookupMapperRedisStoreMapperRedisFilterMapper 分别用来适配 RedisLookupBoltRedisStoreBolt、和RedisFilterBolt. (当实现 RedisFilterMapper时要记得在declareOutputFields() 方法中定于输入流的字段, 因为 FilterBolt 会把 Redis 中的数据作为输入的 tuples 发送出去)

RedisLookupBolt 用法

class WordCountRedisLookupMapper implements RedisLookupMapper {
    private RedisDataTypeDescription description;
    private final String hashKey = "wordCount";

    public WordCountRedisLookupMapper() {
        description = new RedisDataTypeDescription(
                RedisDataTypeDescription.RedisDataType.HASH, hashKey);
    }

    @Override
    public List toTuple(ITuple input, Object value) {
        String member = getKeyFromTuple(input);
        List values = Lists.newArrayList();
        values.add(new Values(member, value));
        return values;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("wordName", "count"));
    }

    @Override
    public RedisDataTypeDescription getDataTypeDescription() {
        return description;
    }

    @Override
    public String getKeyFromTuple(ITuple tuple) {
        return tuple.getStringByField("word");
    }

    @Override
    public String getValueFromTuple(ITuple tuple) {
        return null;
    }
}
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
        .setHost(host).setPort(port).build();
RedisLookupMapper lookupMapper = new WordCountRedisLookupMapper();
RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper);

RedisFilterBolt 用法

RedisFilterBolt 用于当Redis不存在给定数据类型的给定 key/field 时进行过滤. 类似白名单, 即只保留Redis中存在的数据, 如果 Redis 上存在 key/field,则此 bolt 会将作为输入的元组根据指定输出格式转发到默认流。

支持的数据类型:STRING、HASH、SET、SORTED_SET、HYPER_LOG_LOG、GEO。

注1:对于 STRING,它会检查KEY SPACE中是否存在此KEY。 对于 HASH 和 SORTED_SET 和 GEO,它会检查该数据结构上是否存在此类FIELD。 对于 SET 和 HYPER_LOG_LOG,需要指定一个AdditionalKey作为KEY, 可以事先在SET 或 HYPER_LOG_LOG 中添加一些值作为白名单。这样过滤器会使用白名单和上游发来的 tuple 中的某个字段(通过RedisFilterMapper#getKeyFromTuple()方法来指定)的值进行比对,包含则放行.

注2:如果只想查询key是否在Redis中存在而不管实际数据类型为何,可将RedisFilterMapper的数据类型指定STRING。

class WhitelistWordFilterMapper implements RedisFilterMapper {

    private RedisDataTypeDescription description;

    private final String setKey = "whitelist";

    public BlacklistWordFilterMapper() {
        // 设置Redis的数据类型为SET(集合)
        // setKey 作为 AdditionalKey(集合的KEY)
        description = new RedisDataTypeDescription(
                RedisDataTypeDescription.RedisDataType.SET, setKey);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }

    @Override
    public RedisDataTypeDescription getDataTypeDescription() {
        return description;
    }

    // 从Redis白名单集合中比对是否包含tuple中的"word"字段的值
    // 如果存在就放行
    @Override
    public String getKeyFromTuple(ITuple tuple) {
        return tuple.getStringByField("word");
    }

    @Override
    public String getValueFromTuple(ITuple tuple) {
        return null;
    }
}

使用

JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
        .setHost(host).setPort(port).build();
RedisFilterMapper filterMapper = new WhitelistWordFilterMapper();
RedisFilterBolt filterBolt = new RedisFilterBolt(poolConfig, filterMapper);

RedisStoreBolt 用法

class WordCountStoreMapper implements RedisStoreMapper {
    private RedisDataTypeDescription description;
    private final String hashKey = "wordCount";

    public WordCountStoreMapper() {
        description = new RedisDataTypeDescription(
            RedisDataTypeDescription.RedisDataType.HASH, hashKey);
    }

    @Override
    public RedisDataTypeDescription getDataTypeDescription() {
        return description;
    }

    @Override
    public String getKeyFromTuple(ITuple tuple) {
        return tuple.getStringByField("word");
    }

    @Override
    public String getValueFromTuple(ITuple tuple) {
        return tuple.getStringByField("count");
    }
}
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                .setHost(host).setPort(port).build();
RedisStoreMapper storeMapper = new WordCountStoreMapper();
RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);

WordCount示例(将统计结果持久化到Redis)

public class WordCountRedisTopology {

    public static class RandomSentenceSpout extends BaseRichSpout {

        SpoutOutputCollector collector;
        Random rand;

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
            rand = new Random();
        }

        @Override
        public void nextTuple() {

            Utils.sleep(1000);
            String[] sentences = new String[]{
                    "the cow jumped over the moon",
                    "an apple a day keeps the doctor away",
                    "four score and seven years ago",
                    "snow white and the seven dwarfs",
                    "i am at two with nature"
            };

            String sentence = sentences[rand.nextInt(sentences.length)];
            collector.emit(new Values(sentence));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("line"));
        }
    }

    public static class SentenceSplitBolt extends BaseRichBolt {

        private OutputCollector collector;

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }

        @Override
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            collector = outputCollector;
        }

        @Override
        public void execute(Tuple tuple) {

            String line = tuple.getStringByField("line");
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) {
                String word = tokenizer.nextToken();
                collector.emit(new Values(word));
            }
        }
    }

    public static class DataRedisStoreMapper implements RedisStoreMapper {
        private RedisDataTypeDescription description;
        private final String hashKey = "word-count";

        public DataRedisStoreMapper() {
            description = new RedisDataTypeDescription(
                    RedisDataTypeDescription.RedisDataType.HASH, hashKey);
        }

        @Override
        public RedisDataTypeDescription getDataTypeDescription() {
            return description;
        }

        @Override
        public String getKeyFromTuple(ITuple tuple) {
            return tuple.getStringByField("word");
        }

        @Override
        public String getValueFromTuple(ITuple tuple) {
            return tuple.getIntegerByField("count") + "";
        }
    }

    public static class WordCountBolt extends BaseBasicBolt {

        Map counts = new HashMap<>();

        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String word = tuple.getStringByField("word");
            Integer count = counts.get(word);
            if (count == null) {
                count = 0;
            }
            count++;
            counts.put(word, count);
            collector.emit(new Values(word, count));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }

    public static void main(String[] args) {

        Config conf = new Config();
        conf.setNumAckers(0);
        conf.setDebug(true);

        conf.setNumWorkers(2);  // 设置为1个topology创建2个worker进程
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomSentenceSpout());
        builder.setBolt("split", new SentenceSplitBolt(), 2).setNumTasks(4).shuffleGrouping("spout");
        builder.setBolt("count", new WordCountBolt(), 3).setNumTasks(6).fieldsGrouping("split", new Fields("word"));

        // storm to redis store
        JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                .setPassword("niit1234").setHost("hadoop001").setPort(6379).build();
        RedisStoreMapper storeMapper = new DataRedisStoreMapper();
        RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);

        builder.setBolt("store", storeBolt).shuffleGrouping("count");

        String topologyName = "word-count";

        if (args != null && args.length > 0) {
            topologyName = args[0];
        }

        try {
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology(topologyName, conf, builder.createTopology());
            // StormSubmitter.submitTopology(topologyName, conf, builder.createTopology());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

结果

image-20201215015520057

复杂情况

Storm-redis提供了通用的bolt抽象类,当 RedisStoreBoltRedisLookupBoltRedisFilterBolt 不符合你的使用场景时, storm-redis 也提供了抽象的 AbstractRedisBolt 类来供你继承并实现你自己的业务逻辑.

RedisLookupBoltRedisStoreBoltRedisFilterBolt 均继承自 AbstractRedisBolt 抽象类。我们可以通过继承该抽象类,实现自定义 Bolt,进行功能的拓展。

实现自定义Bolt: LookupWordTotalCountBolt

将数据流中字段为"word"的单词作为key在Redis中查找对应的value(即对应词频),并抽样打印结果.

public static class LookupWordTotalCountBolt extends AbstractRedisBolt {
    private static final Logger LOG = LoggerFactory.getLogger(LookupWordTotalCountBolt.class);
    private static final Random RANDOM = new Random();

    public LookupWordTotalCountBolt(JedisPoolConfig config) {
        super(config);
    }

    public LookupWordTotalCountBolt(JedisClusterConfig config) {
        super(config);
    }

    @Override
    public void execute(Tuple input) {
        JedisCommands jedisCommands = null;
        try {
            jedisCommands = getInstance();
            String wordName = input.getStringByField("word");
            String countStr = jedisCommands.get(wordName);
            if (countStr != null) {
                int count = Integer.parseInt(countStr);
                this.collector.emit(new Values(wordName, count));

                // print lookup result with low probability
                if(RANDOM.nextInt(1000) > 995) {
                    LOG.info("Lookup result - word : " + wordName + " / count : " + count);
                }
            } else {
                // skip
                LOG.warn("Word not found in Redis - word : " + wordName);
            }
        } finally {
            if (jedisCommands != null) {
                returnInstance(jedisCommands);
            }
            this.collector.ack(input);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // wordName, count
        declarer.declare(new Fields("wordName", "count"));
    }
}

在Trident State集成Redis

  1. RedisStateRedisMapState, 提供了单个的Jedis客户端接口.
  2. RedisClusterStateRedisClusterMapState, 提供了JedisCluster客户端接口用于在拥有Redis集群情况下使用.

RedisState

JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                                .setHost(redisHost).setPort(redisPort)
                                .build();
RedisStoreMapper storeMapper = new WordCountStoreMapper();
RedisLookupMapper lookupMapper = new WordCountLookupMapper();

RedisState.Factory factory = new RedisState.Factory(poolConfig);

TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);

stream.partitionPersist(factory,
                        fields,
                        new RedisStateUpdater(storeMapper).withExpire(86400000),
                        new Fields());

TridentState state = topology.newStaticState(factory);
stream = stream.stateQuery(state, new Fields("word"),
                        new RedisStateQuerier(lookupMapper),
                        new Fields("columnName","columnValue"));

RedisClusterState

Set nodes = new HashSet();
for (String hostPort : redisHostPort.split(",")) {
    String[] host_port = hostPort.split(":");
    nodes.add(new InetSocketAddress(host_port[0], Integer.valueOf(host_port[1])));
}
JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
                                .build();
RedisStoreMapper storeMapper = new WordCountStoreMapper();
RedisLookupMapper lookupMapper = new WordCountLookupMapper();
RedisClusterState.Factory factory = new RedisClusterState.Factory(clusterConfig);

TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);

stream.partitionPersist(factory,
                        fields,
                        new RedisClusterStateUpdater(storeMapper).withExpire(86400000),
                        new Fields());

TridentState state = topology.newStaticState(factory);
stream = stream.stateQuery(state, new Fields("word"),
                        new RedisClusterStateQuerier(lookupMapper),
                        new Fields("columnName","columnValue"));

Views: 195

Redis的5种数据类型

Redis 是一种基于键值对的NoSQL数据库,它的值主要由string(字符串),hash(哈希),list(列表),set(集合),zset(有序集合)五种基本数据结构构成,除此之外还支持一些其他的数据结构和算法。其中key都是由字符串构成的.

这些数据类型的使用方式可以查看官方文档, 下面重点说一说数据类型的选择.

Redis五大数据类型的使用场景

1. 字符串(string)

字符串类型是Redis最基础的数据类型,字符串类型可以是JSON、XML甚至是二进制的图片等数据,但是最大值不能超过512MB。

底层数据结构

Redis会根据当前值的类型和长度决定使用哪种底层数据结构来实现。

字符串类型的底层数据结构有3种:

  • int:8个字节的长整型。

  • embstr:小于等于39个字节的字符串。

  • raw:大于39个字节的字符串。

字符串类型的添加和查询

> set mykey somevalue
OK
> get mykey
"somevalue"
> set mykey newval nx   # when not exists
#> SETNX mykey newval   # 和上面命令等效
(nil)
> set mykey newval xx   # only when exists
OK

> set counter 100       # string type includes number also
OK
> incr counter          # atomic increcement by 1
(integer) 101
> incr counter
(integer) 102
> incrby counter 50      # atomic increcement by 50
(integer) 152

# Multple Set/Get 批量查询和获取
> mset a 10 b 20 c 30
OK
> mget a b c
10
20
30

字符串类型的Key空间操作

这里只列出key空间(key space)比较常用的三个方法:

  • del 删除key空间

  • type 返回key空间中所保存数据的数据类型

  • exists 查询key空间是否存在, 返回1为存在, 返回0则不存在

> set mykey hello
OK
> exists mykey
1
> type mykey
string
> del mykey
1
> exists mykey
0
> type mykey
none

设置key的过期时间-方法 1

> set key some-value
OK
> expire key 5
(integer) 1
> get key (immediately)
"some-value"
> get key (after some time)
(nil)

设置key的过期时间-方法 2

> set key 100 ex 10 # set key to 100 with expire time of 10 seconds
OK
> ttl key  # check the remaining time to live for the key
(integer) 9

设置key的过期时间-方法 3

# SETEX key seconds value 
# 给key设置value, 同时设置过期(EXpireTime)时间, 单位秒
# 如果过期则查询结果为null
> SETEX exkey 120 value
> get exkey
valuie
# 120秒后
> get exkey
null

1.2 使用场景

1.2.1 缓存

在web服务中,使用MySQL作为数据库,Redis 作为缓存。由于Redis具有支撑高并发的特性,通常能起到加速读写和降低后端压力的作用。web端的大多数请求都是从Redis中获取的数据,如果Redis中没有需要的数据,则会从MySQL中去获取,并将获取到的数据写入redis。

1.2.2 计数

Redis中有一个字符串相关的命令incr key,incr命令对值做自增操作,返回结果分为以下三种情况:

  • 值不是整数,返回错误

  • 值是整数,返回自增后的结果

  • key不存在,返回1

比如文章的阅读量,视频的播放量等等都会使用redis来计数,每播放一次,对应的播放量就会加1,同时将这些数据异步存储到数据库中达到持久化的目的。

1.2.3 共享Session

在分布式系统中,用户的每次请求会访问到不同的服务器,这就会导致session不同步的问题,假如一个用来获取用户信息的请求落在A服务器上,获取到用户信息后存入session。下一个请求落在B服务器上,想要从session中获取用户信息就不能正常获取了,因为用户信息的session在服务器A上,为了解决这个问题,使用redis集中管理这些session,将session存入redis,使用的时候直接从redis中获取就可以了。

1.2.4 限速

为了安全考虑,有些网站会对IP进行限制,限制同一IP在给定的一段时间内访问次数不能超过n次。

在redis中保存一个count值,key为user:$ip,value为该ip访问的次数,第一次设置key的时候,设置过期时间为给定时间。

count加1之前,判断是否key是否存在,不存在的话,有两种情况:1、该ip未访问过;2、该ip访问过,但是key已经过期了。那么此时需要再次设置一次expires。

如果用户访问的时候,判断count的值是否大于上限,如果低于上限,就处理请求,否则就拒绝处理请求。

2. 哈希

Redis中哈希类型是指一个键值对的存储结构,其中键不能重复,值对应一个map。

可以用来表示一个表中的一列数据, 其中键为主键,值也就是map的key可以作为列名, map的value可以作为列的值.

比如要表示这样的一条结构化记录:

记录的主键: user:1000

username birthyear verified
antirez 1977 1

使用Redis中的哈希类型存储方式如下:

> hmset user:1000 username antirez birthyear 1977 verified 1
OK
> hget user:1000 username
"antirez"
> hget user:1000 birthyear
"1977"
> hgetall user:1000
1) "username"
2) "antirez"
3) "birthyear"
4) "1977"
5) "verified"
6) "1"

> hmget user:1000 username birthyear no-such-field
1) "antirez"
2) "1977"
3) (nil)     # 当key不存在时返回 nil

hincrby 可以对哈希表中的数据进行修改

> hincrby user:1000 birthyear 10
(integer) 1987
> hincrby user:1000 birthyear -10
(integer) 1977

You can find the full list of hash commands in the documentation.

2.1 底层数据结构

哈希类型的底层数据结构有两种:

ziplist(压缩列表):当哈希类型元素个数小于hash-max-ziplist-entries配置(默认512个)同时所有值都小于hash-max-ziplist-value配置(默认64字节)时使用。ziplist使用更加紧凑的结构实现多个元素的连续存储,所以比hashtable更加节省内存。hashtable(哈希表):当ziplist不能满足要求时,会使用hashtable。

2.2 使用场景

由于hash类型存储的是一个键值对,比如数据库有以下一个用户表结构:

表名为user,其中一个记录如下:

id username birthyear verified
1000 antirez 1977 1

将以上信息存入redis,用:作为key:

> hmset user:1000 username antirez birthyear 1977 verified 1

哈希可以让一个Map结构(一系列的键值对)和一个表的主键关联拿起来, 对这样的具有多个字段的记录使用哈希存储会比字符串更加方便直观.

3. 列表

列表类型用来存储多个有序的字符串,一个列表最多可以存储2^32-1个元素,列表的两端都可以插入和弹出元素。

列表的用法

可以列表看成一个数组构成的双端队列, 那么首先我们约定索引小的元素在左边, 索引大的元素在右边.rpush命令表示向数组的最右侧插入元素, lpush表示想数组的最左侧插入元素.

> rpush mylist A
(integer) 1
> rpush mylist B
(integer) 2
> lpush mylist first
(integer) 3
> lrange mylist 0 -1
1) "first"
2) "A"
3) "B"

即:

> rpush mylist A:           1) "A" -
> rpush mylist B            1) "A" -> 2) "B"
> lpush mylist first         1) "first" -> 2) "A" -> 3) "B"

lrange 可以查询一段范围内的元素, 元素索引从0开始, 负数表示倒序的第几个

> rpush mylist 1 2 3 4 5 "foo bar"
9                           # 返回总元素数量
> lrange mylist 0 -1          # 列出所有元素
first
A
B
1
2
3
4
5
foo bar
> lrange mylist 1 8            # 效果一样
first
A
B
1
2
3
4
5
foo bar

list也具有栈数据结构类似的pop命令, 不同的是可以选择从左边或者右边弹出元素并返回.

> rpush mylist a b c
3
> rpop mylist
c
> rpop mylist
b
> rpop mylist
a
> rpop mylist
null

同样, 如果要从左边弹出则可以使用LPOP命令.

> rpush mylist a b c
6
> del mylist
1
> rpush mylist a b c
3
> lpop mylist
a
> lpop mylist
b
> lpop mylist
c
> lpop mylist
null

Capped lists - 带上限的列表

ltrim的使用类似lrange,只不过它的作用是将指定区间的元素作为新的列表返回.

> rpush mylist 1 2 3 4 5
(integer) 5
> ltrim mylist 0 2
OK
> lrange mylist 0 -1
1) "1"
2) "2"
3) "3"

上面的LTRIM命令告诉Redis仅从索引0到2列出列表元素,其他所有内容都将被丢弃。将PUSH + TRIM命令搭配可以很容易维护一个有限容量的队列, 以便添加新元素并丢弃超出限制的元素.

LPUSH mylist 
LTRIM mylist 0 999

上面的组合添加了一个新元素,并且仅将1000个最新元素纳入列表。使用LRANGE可以访问最重要的1000个元素,而无需记住旧的数据。

注意:LRANGE从技术上讲是O(N)命令,但朝列表的开头或结尾访问较小范围的元素可以近似为O(1)操作。

3.1 底层数据结构

列表的底层数据结构有两种:

ziplist(压缩列表):当哈希类型元素个数小于list-max-ziplist-entries配置(默认512个)同时所有值都小于list-max-ziplist-value配置(默认64字节)时使用。ziplist使用更加紧凑的结构实现多个元素的连续存储,所以比hashtable更加节省内存。linkedlist(链表):当ziplist不能满足要求时,会使用linkedlist。

3.2 使用场景

3.2.1 消息队列

列表用来存储多个有序的字符串,既然是有序的,那么就满足消息队列的特点。使用lpush + rpop或者rpush + lpop实现消息队列。

除此之外,redis支持阻塞操作,在弹出元素的时候使用阻塞命令来实现阻塞队列。

列表具有一项特殊功能,使其适合于实现队列,并且通常用作进程间通信系统的构建块:阻止操作

想象一下,您想通过一个流程将项目推入列表,然后使用不同的流程来对这些项目进行某种处理工作。这是通常的生产者/使用者设置,可以通过以下简单方式实现:

  • 为了将项目推送到列表中,生产者调用LPUSH
  • 为了从列表中提取/处理项目,消费者调用RPOP

但是,有时列表可能为空,没有任何要处理的内容,因此RPOP仅返回NULL。在这种情况下,消费者被迫等待一段时间,然后使用RPOP重试。这称为轮询,这种情况下这不是一个好主意,因为它有几个缺点:

  1. 强制Redis和客户端处理无用的命令(列表为空时的所有请求将无法完成任何实际工作,它们只会返回NULL)。
  2. 由于消费者在收到NULL之后会等待一段时间,因此会增加项目处理的延迟。

所以,命令BRPOPBLPOP就是带有堵塞功能的RPOPLPOP,作用是能够如果列表是空的就堵塞,如果在指定的超时时间到达前列表有了新的数据则返回新数据,否则超过超时时间则直接返回.

这是BRPOP调用的示例:

> brpop tasks 5
1) "tasks"
2) "do_something"

这意味着:“等待列表中的元素tasks的最后一个元素,但如果5秒钟后没有可用元素,则返回”。

请注意,您可以将0用作超时来永远等待元素,还可以指定多个列表,而不仅仅是一个列表,以便同时等待多个列表,并在第一个列表收到一个元素时得到通知。

注意:

  1. 客户端以有序方式提供服务:第一个阻塞等待列表的客户端,在某个元素被其他客户端推送时首先提供服务,依此类推。
  2. 返回值与RPOP相比有所不同:它是一个包含两个元素的数组,因为它还包含键的名称,因为BRPOPBLPOP能够阻止等待来自多个列表的元素。
    1. 如果达到超时,则返回NULL。

关于列表和阻止操作,建议了解以下内容:

  • 使用 LMOVE 可以实现循环队列.
  • 相对应的堵塞版本命令就是 BLMOVE.
3.2.2 栈

由于列表存储的是有序字符串,满足队列的特点,也就能满足栈先进后出的特点,使用lpush + lpop或者rpush + rpop实现栈。

3.2.3 文章列表

因为列表的元素不但是有序的,而且还支持按照索引范围获取元素。因此我们可以使用lrange命令分页获取文章列表

> lrange key 0 9

4. 集合

集合类型也可以保存多个字符串元素,与列表不同的是,集合中不允许有重复元素并且集合中的元素是无序的。一个集合最多可以存储2^32-1个元素。可以对集合使用SADD命令添加新的元素。还可以对集合进行许多其他操作,例如测试给定元素是否已存在,执行多个集合之间的交集,并集或求差集等等。

> sadd myset 1 2 3              # set::add
(integer) 3
> smembers myset
1. 3                            # not ordered
2. 1
3. 2

> sismember myset 3              # set::is_member
(integer) 1                     # 3 is member
> sismember myset 30 
(integer) 0                     # 30 is not member

提取元素的命令称为SPOP,对于建模某些问题非常方便。例如,为了实现基于Web的扑克游戏,您可能需要用一组字符来代表您的套牌。扑克牌有四种花色, Clubs(梅花),Diamond(方块),Heart(红桃),Spade(黑桃), 使用首字母作为前缀表示花色,则一副扑克牌我们可以使用集合表示:

>  sadd deck C1 C2 C3 C4 C5 C6 C7 C8 C9 C10 CJ CQ CK
   D1 D2 D3 D4 D5 D6 D7 D8 D9 D10 DJ DQ DK H1 H2 H3
   H4 H5 H6 H7 H8 H9 H10 HJ HQ HK S1 S2 S3 S4 S5 S6
   S7 S8 S9 S10 SJ SQ SK
   (integer) 52

现在我们要为每个玩家提供5张卡片。该SPOP命令删除一个随机元素,将其返回到客户端,所以在这种情况下完美运行。

但是,如果我们直接操作deck集合,那么在游戏的下一场比赛中,我们将需要再次填充纸牌。因此,首先我们可以将deck集合复制到新集合game:1:deck中,用于开始第一局游戏。

这可以使用SUNIONSTORE来完成,SUNIONSTORE通常执行多个集合之间的联合,并将结果存储到第一个集合中。语法:

SUNIONSTORE destination key [key ...]

但是,由于单个集合的并集就是它本身,我可以使用以下命令复制我的卡组:

> sunionstore game:1:deck deck
(integer) 52

现在,我准备为第一位玩家提供五张牌:

> spop game:1:deck
"C6"
> spop game:1:deck
"CQ"
> spop game:1:deck
"D1"
> spop game:1:deck
"CJ"
> spop game:1:deck
"SJ"

Redis 的一个命令可以返回集合中元素的数量。在集合理论中元素数量也称为_集合_的基数(cardinality),因此Redis的这个命令叫做SCARD

> scard game:1:deck
(integer) 47

数学原理:52-5 = 47。

当您只需要获取随机元素而不将其从集合中删除时,可以使用SRANDMEMBER命令。它还具有返回重复元素和非重复元素的功能。

4.1 底层数据结构

集合类型的底层数据结构有两种:

intset(整数集合):当集合中的元素都是整数且元素个数小于set-max-intset-entries配置(默认512个)时,redis会选用intset来作为集合的内部实现,从而减少内存的使用。hashtable(哈希表):当intset不能满足要求时,会使用hashtable。

4.2 使用场景

4.2.1 用户标签

集合非常适合表示对象之间的关系。例如,我们可以轻松地使用集合来实现标签功能。

对这个问题进行建模的一种简单方法是为我们要标记的每个对象设置一个集合。该集合包含与对象关联的标签的ID。

以新闻文章来举例。如果ID为1000的新闻带有标签1、2、5和77,实现标签功能需要实现:

  1. 则使用集合可以将这些标签ID与新闻项相关联:
> sadd news:1000:tags 1 2 5 77
(integer) 4
  1. 反过来我们还需要将给定标签关联到所有新闻的列表:
> sadd tag:1:news 1000
(integer) 1
> sadd tag:2:news 1000
(integer) 1
> sadd tag:5:news 1000
(integer) 1
> sadd tag:77:news 1000
(integer) 1

以上2个步骤应放在一个事务中.

有了以上关系后, 要获取给定新闻ID的所有标签很简单:

> smembers news:1000:tags
1. 5
2. 1
3. 77
4. 2

也可以给定标签查询所有包含该标签的新闻

> smembers tag:id:news
... results here ...

注意:如果是完整的工程,还需要有完整的标签表结构和完整的新闻表结构,例如Redis的哈希数据类型,可以将标签ID映射到标签名称, 将新闻ID映射到新闻名称上。

还有其他一些非常简单的操作,使用正确的Redis命令仍然很容易实现。例如,我们可能需要包含标签1、2、10和27的所有对象的列表。我们可以使用SINTER命令执行此操作,该命令执行不同集合之间的交集。我们可以用:

> sinter tag:1:news tag:2:news tag:10:news tag:27:news
... results here ...

除了交集之外,您还可以执行并集,求差,提取随机元素等等。

4.2.2 抽奖功能

集合有两个命令支持获取随机数,分别是:

随机获取count个元素,集合元素个数不变

> srandmember key [count]

随机弹出count个元素,元素从集合弹出,集合元素个数改变

> spop key [count]

用户点击抽奖按钮,参数抽奖,将用户编号放入集合,然后抽奖,分别抽一等奖、二等奖,如果已经抽中一等奖的用户不能参数抽二等奖则使用spop,反之使用srandmember

5. 有序集合

有序集合(sorted-set, zset)是集合和哈希表的结合,这是因为在有序集合也属于集合,和集合一样不能有重复元素。但是有序集合可以排序,它给每个元素设置一个score的浮点值作为排序的依据。最多可以存储2^32-1个元素。

排序规则:

  • 如果A和B是两个 score 不同的元素,如果 A.score > B.score,则有 A>B
  • 如果A和B的score完全相同,那么如果A字符串在字典序上大于B字符串,则有A>B。A和B字符串不能相等,因为有序集合不能保存重复的key。

下面使用zadd将黑客信息添加到有序集合里, 其中出生日期作为排序的score值:

> zadd hackers 1940 "Alan Kay"
(integer) 1
> zadd hackers 1957 "Sophie Wilson"
(integer) 1
> zadd hackers 1953 "Richard Stallman"
(integer) 1
> zadd hackers 1949 "Anita Borg"
(integer) 1
> zadd hackers 1965 "Yukihiro Matsumoto"
(integer) 1
> zadd hackers 1914 "Hedy Lamarr"
(integer) 1
> zadd hackers 1916 "Claude Shannon"
(integer) 1
> zadd hackers 1969 "Linus Torvalds"
(integer) 1
> zadd hackers 1912 "Alan Turing"
(integer) 1

根据生日先后排序

> zrange hackers 0 -1
1) "Alan Turing"
2) "Hedy Lamarr"
3) "Claude Shannon"
4) "Alan Kay"
5) "Anita Borg"
6) "Richard Stallman"
7) "Sophie Wilson"
8) "Yukihiro Matsumoto"
9) "Linus Torvalds"

根据生日先后倒序排序

> zrevrange hackers 0 -1
1) "Linus Torvalds"
2) "Yukihiro Matsumoto"
3) "Sophie Wilson"
4) "Richard Stallman"
5) "Anita Borg"
6) "Alan Kay"
7) "Claude Shannon"
8) "Hedy Lamarr"
9) "Alan Turing"

zrange或者zrevrange后面加上WITHSCORES可以同时返回score数值,如:

> zrange hackers 0 -1 withscores
1) "Alan Turing"
2) "1912"
3) "Hedy Lamarr"
4) "1914"
5) "Claude Shannon"
6) "1916"
7) "Alan Kay"
8) "1940"
9) "Anita Borg"
10) "1949"
11) "Richard Stallman"
12) "1953"
13) "Sophie Wilson"
14) "1957"
15) "Yukihiro Matsumoto"
16) "1965"
17) "Linus Torvalds"
18) "1969"

对排序的区间进行操作,比如找出出生日期早于等于1950年龄的记录

> zrangebyscore hackers -inf 1950 # range by score [-infinite,1950]
1) "Alan Turing"
2) "Hedy Lamarr"
3) "Claude Shannon"
4) "Alan Kay"
5) "Anita Borg"

再比如删除生日区间在1940到1060之间的记录,并返回删除的记录个数

> zremrangebyscore hackers 1940 1960 # remove range by score
(integer) 4

获取"Anita Borg"的生日排名:

> zrank hackers "Anita Borg"
(integer) 4         # 正数第5

如果要获取"Anita Borg"的生日倒序排名可以使用zrevrank命令。

5.1 底层数据结构

有序集合类型的底层数据结构有两种:

ziplist(压缩列表):当有序集合的元素个数小于list-max-ziplist-entries配置(默认128个)同时所有值都小于list-max-ziplist-value配置(默认64字节)时使用。ziplist使用更加紧凑的结构实现多个元素的连续存储,更加节省内存。skiplist(跳跃表):当不满足ziplist的要求时,会使用skiplist

5.2 使用场景

5.2.1 排行榜

用户发布了n篇文章,其他人看到文章后给喜欢的文章点赞,使用score来记录点赞数,有序集合会根据score排序。流程如下

用户发布一篇文章,初始点赞数为0,即score为0

zadd user:article 0 url://a
(integer) 0
zadd user:article 0 url://b
(integer) 0
zadd user:article 0 url://c
(integer) 0

有人给文章点赞,递增1

zincrby user:article 1 url://a
(integer) 1
zincrby user:article 1 url://a
(integer) 2
zincrby user:article 1 url://a
(integer) 3
zincrby user:article 1 url://b
(integer) 1
zincrby user:article 1 url://c
(integer) 1
zincrby user:article 1 url://c
(integer) 2

查询点赞前三篇文章(倒序排序)

> zrevrange user:article 0 -1
1) "url://a"    # score=3
2) "url://c"    # score=2
3) "url://b"    # score=1

查询点赞后三篇文章(正序排序)

> zrange user:article 0 -1
1) "url://b"      # score=1
2) "url://c"      # score=2
3) "url://a"      # score=3

5.2.2 延迟消息队列

下单系统,下单后需要在15分钟内进行支付,如果15分钟未支付则自动取消订单。将下单后的十五分钟后的时间作为score,订单作为value存入redis,消费者轮询去消费,如果消费订单的时间大于等于这笔记录的score,则将这笔记录移除队列,即取消订单。

总结

在开发中,字符串类型是用的最多的数据类型,导致我们忽视了redis的其他四种数据类型,在具体场景下选择具体的数据类型对提升redis性能有非常大的帮助。redis虽然支持消息队列的实现,但是并不支持ack。所以redis实现的消息队列不能保证消息的可靠性,除非自己实现消息确认机制,不过这非常麻烦,所以如果是重要的消息还是推荐使用专门的消息队列去做。

注:另外可以利用Redis的Java客户端Jedis实现分布式锁。加锁实际上就是在redis中,给指定客户端的Key键设置一个值,为避免死锁,并给定一个过期时间。解锁的过程就是将指定客户端的Key键删除。但是实际上要实现真正好用且健壮的分布式锁是比较复杂的,一般会使用Redisson、zookeeper或者DataBase来实现。

Views: 162

Redis的安装,配置和使用

Redis 是什么?

说到Redis,我们的第一印象肯定是作为缓存中间件使用了,这样想可能太片面,我们按官网的介绍来看吧(http://www.redis.cn/),全面的认识下它的用途和功能

image-20211204142501295

这里的描述很简练和清晰了,因此,打算梳理的 Redis 相关知识点如下:

  • Redis 支持丰富的数据类型,每种类型的特点及应用场景;
  • Redis 支持持久化机制,有RDB、AOF等持久化方式;
  • Redis 多机数据库的实现,有复制(replication)、哨兵(Sentinel)和集群(cluster)等功能;
  • Redis 支持事务,Redis事务的ACID特性;
  • Redis 支持的其他独立功能,如发布/订阅、慢日志查询、Lua脚本、排序及二进制位数组等;
  • Redis 不仅仅是缓存中间件,也可以当做nosql数据库、消息队列等使用。

当然了,Redis 的优势也有必要说明一下,比如:

  • 读写性能很好,Redis能读的速度是110000次/s,写的速度是81000次/s,原因有多路复用机制、Resp协议、单线程、基于内存持久化操作等;
  • 数据结构丰富,Redis不仅支持strings、lists、hashes、sets、sorted sets等5种基本数据结构,也支持bitmaps、hyperLogLogs、geospatial等复杂数据结构;
  • 所有操作原子性,所有 Redis 的操作都是原子的,能确保当多个客户同时访问 Redis 服务器时,每个用户得到的是更新后的最新值;
  • 拥有多样的特性,比如:支持多种键过期删除策略、支持多机数据库的实现(主从复制、哨兵和集群等)、可当做消息队列使用等。

同时,Redis 作为缓存使用时,缺点也很明显,会出现以下几种问题:

  • 缓存和数据库双写一致性问题;
  • 缓存雪崩问题;
  • 缓存击穿问题;
  • 缓存的并发竞争问题。

而这些问题也都有对应的解决方法

关于Redis的数据持久化

Redis的所有数据都是保存在内存中,然后不定期的通过异步方式保存到磁盘上(这称为“半持久化模式”);也可以把每一次数据变化都写入到一个append only file(aof)里面(这称为“全持久化模式”)。

由于Redis的数据都存放在内存中,如果没有配置持久化,redis重启后数据就全丢失了,于是需要开启redis的持久化功能,将数据保存到磁盘上,当redis重启后,可以从磁盘中恢复数据。redis提供两种方式进行持久化,一种是RDB持久化(原理是将Reids在内存中的数据库记录定时dump到磁盘上的RDB持久化),另外一种是AOF(append only file)持久化(原理是将Reids的操作日志以追加的方式写入文件)。

Redis版本选择及新版本特性

Redis 版本选择和下载(https://redis.io/download)时,一定要了解下官网提及的版本规则及更新信息

  • Redis 使用标准版本标记进行版本控制:major.minor.patchlevel。偶数的版本号表示稳定的版本, 例如 1.2,2.0,2.2,2.4,2.6,2.8,奇数的版本号用来表示非标准版本,例如2.9.x是非稳定版本,它的稳定版本是3.0。
  • Redis 4.0在2017年7月以GA的形式发布,新用户应该使用Redis 5,但Redis 4是目前最成熟的版本,并将在明年更新,直到Redis 6发布。
  • Redis 5.0是Redis的第一个重大更新版本,引入了新的流数据类型与消费者组,排序集阻塞pop操作,RDB中的LFU/LRU信息,Redis -cli中的集群管理器,活跃的碎片整理V2, HyperLogLogs改进和许多其他改进,2018年10月,Redis 5作为GA发布。
  • Redis 6.0引入了SSL,新的RESP3协议,acl,客户端缓存,无磁盘副本,I/O线程,更快的RDB加载,新的模块api和更多的改进。

经过一番比较后,最终选择使用Redis 5版本!,下载地址:https://download.redis.io/releases/

Windows 安装和使用 Redis

值得注意的是,从Redis3.x开始一直到目前的Redis6.x,官网一直没有提供Windows系统的 Redis安装包,只提供了Linux或Docker的。因此,没必要折腾去寻找能在Windows安装的zip包了。

另外,关于官网为什么不提供Windows的Redis安装包,原因可能是:

  • redis是基于单线程的高性能操作,而redis需要单线程轮询,Windows和Linux的轮询机制有所不同。
  • Windows使用的是selector,而Linux使用的是epoll,从性能上来说 epoll是高于selector 的,所以redis官方推荐使用linux版本。不过,window版本的redis是民间大神或者微软修改过的,可以在网上搜到相关的安装包使用,不过最新版本是不可能和官方保持一致的。

Linux 安装和使用 Redis

使用环境

  • CentOS 7.6
  • redis-5.0.14 【暂用单机版】

安装步骤

下载安装包

去Redis官网下载Linux安装包:

image-20211205141922934

也可以通过wget直接下载到虚拟机中

$ wget https://download.redis.io/releases/redis-5.0.14.tar.gz

检查gcc环境

redis是c语言编写的, 因此下载之后还需要使用gcc(9.3+)进行编译和安装.

查看gcc版本是否在9.3以上, 如果没有则需要安装

# 查看gcc版本是否在9.3以上,centos7.6默认安装4.8.5
$ gcc -v

如果发现gcc版本低于9.3, 我们需要升级到9.3及以上,如下:

$ sudo yum -y install centos-release-scl
$ sudo yum -y install devtoolset-9-gcc devtoolset-9-gcc-c++ devtoolset-9-binutils

$ scl enable devtoolset-9 bash

需要注意的是scl命令切换gcc版本只是临时的,退出shell或重启就会恢复原系统gcc版本。如果要长期使用gcc 9.3的话:

echo "source /opt/rh/devtoolset-9/enable" >>/etc/profile

编译redis

将下载的压缩包上传到/usr/local目录下,解压编译。

解压同时进入目录进行编译:

# 解压
tar zxf redis-5.0.14.tar.gz -C ~/app
# 编译依赖 - redis 5.x 需要
cd ~/app/redis-5.0.14/deps
make hiredis jemalloc linenoise lua
# 编译
cd ~/app/redis-5.0.14
make MALLOC=libc
# 测试
make test

如果编译测试过程发生错误提示

"You need tcl 8.5 or newer in order to run the Redis test"

则还需要安装tcl依赖

sudo yum install tcl -y

编译后产生一个新的src目录,将里面文件cp到/usr/local/bin/

cp -rf src/redis-server /usr/local/bin/
cp -rf src/redis-cli /usr/local/bin/

进入src目录并安装Redis服务:

cd src
sudo make install

这样可以将Redis服务器和客户端命令安装到默认位置/usr/local/bin/下面
也可以在make install的时候指定一个前缀PREFIX,指定一个自定义的安装位置

cd src
sudo make install PREFIX=~/app/redis-5.0.14/

至此,Redis安装完毕。

启动Redis

前台启动

$ redis-server

启动效果:

image-20211204143602980

PS:这种启动方式如果关闭终端,redis就直接退出了,所以我们需要下面的后台启动!

后台启动

  1. 修改 redis.conf 文件

    1. daemonize no 改为 daemonize yes

    2. 注释掉 # bind 127.0.0.1

    3. 修改 protected-mode yes 改为 protected-mode no

    4. 如果需要设置密码添加 requirepass niit1234

    5. 修改后的部分示例如下:

      # By default protected mode is enabled. You should disable it only if
      # you are sure you want clients from other hosts to connect to Redis
      # even if no authentication is configured, nor a specific set of interfaces
      # are explicitly listed using the "bind" directive.
      protected-mode no
      
      # Accept connections on the specified port, default is 6379 (IANA #815344).
      # If port 0 is specified Redis will not listen on a TCP socket.
      port 6379
      
      # By default Redis does not run as a daemon. Use 'yes' if you need it.
      daemonize yes
      
      # IMPORTANT NOTE: starting with Redis 6 "requirepass" is just a compatibility
      # layer on top of the new ACL system. The option effect will be just setting
      # the password for the default user. Clients will still authenticate using
      # AUTH  as usually, or more explicitly with AUTH default 
      # if they follow the new protocol: both will work.
      #
      # requirepass foobared
      requirepass niit1234
  2. 启动和停止后台服务

    ## 启动服务
    [redis-5.0.14]$ redis-server redis.conf
  3. 通过命令查看进程和端口(即启动时显示的PID和PORT)

    ## 查看进程
    [redis-5.0.14]$ ps -ef | grep redis
    ## 查看端口
    [redis-5.0.14]$ netstat -anpt | grep redis
  4. 停止后台服务

    ## 方法1. 如果知道Redis服务的进程ID可以使用Kill命令停止
    kill -9 <进程号>
    
    ## 方法2. 通过Redis Cli命令行客户端停止服务
    ## 如设置密码需要先认证
    [redis-5.0.14]$ bin/redis-cli
    127.0.0.1:6379> auth "niit1234"    # 认证密码
    OK
    127.0.0.1:6379> shutdown

作为系统服务

将配置文件复制到/etc/redis下

$ sudo mkdir /etc/redis
$ sudo cp redis.conf /etc/redis/6379.conf

安装服务

$ cd utils/
Welcome to the redis service installer
This script will help you easily set up a running redis server

Please select the redis port for this instance: [6379]
Selecting default: 6379
Please select the redis config file name [/etc/redis/6379.conf]
Selected default - /etc/redis/6379.conf
Please select the redis log file name [/var/log/redis_6379.log]
Selected default - /var/log/redis_6379.log
Please select the data directory for this instance [/var/lib/redis/6379]
Selected default - /var/lib/redis/6379
Please select the redis executable path [] /usr/local/bin/redis-server
Selected config:
Port           : 6379                           # 默认端口号
Config file    : /etc/redis/6379.conf           # redis服务使用的配置文件
Log file       : /var/log/redis_6379.log        # redis服务器日志
Data dir       : /var/lib/redis/6379            # redis 数据存储目录
Executable     : /usr/local/bin/redis-server    # redis服务命令所在位置
Cli Executable : /usr/local/bin/redis-cli    # redis客户端命令所在位置
Is this ok? Then press ENTER to go on or Ctrl-C to abort.
Copied /tmp/6379.conf => /etc/init.d/redis_6379
Installing service...
Successfully added to chkconfig!
Successfully added to runlevels 345!
Starting Redis server...
Installation successful!

安装完毕后使用chkconfig查看已经安装的服务, 并且此时redis服务已经在运行了.

[hadoop@hadoop000 utils]$ chkconfig

注:该输出结果只显示 SysV 服务,并不包含
原生 systemd 服务。SysV 配置数据
可能被原生 systemd 配置覆盖。

      要列出 systemd 服务,请执行 'systemctl list-unit-files'。
      查看在具体 target 启用的服务请执行
      'systemctl list-dependencies [target]'。

netconsole      0:关    1:关    2:关    3:关    4:关    5:关    6:关
network         0:关    1:关    2:开    3:开    4:开    5:开    6:关
redis_6379      0:关    1:关    2:开    3:开    4:开    5:开    6:关

可以看到, 新安装的服务名称为redis_6379, 并且已经配置了开机自动启动服务

也可以使用命令$ sudo chkconfig redis_6379 off关闭开机自动启动服务.

接下来就可以使用如下命令管理Redis系统服务了.

查看服务状态

$ service redis_6379 status
Redis is running (107335)

关闭服务

$ service redis_6379 stop
Stopping ...
(error) NOAUTH Authentication required.

这里无法关闭服务的原因是我们之前在redis.conf配置文件中设置了requirepasswd
所以解决办法

  1. 取消密码
  2. 修改服务启动文件

    $ sudo vi /etc/init.d/redis_6379
    
    # 修改如下
    36      stop)
    37          if [ ! -f $PIDFILE ]
    38          then
    39              echo "$PIDFILE does not exist, process is not running"
    40          else
    41              PID=$(cat $PIDFILE)
    42              echo "Stopping ..."
    43              # $CLIEXEC -p $REDISPORT shutdown
    44              $CLIEXEC -a "niit1234" -p $REDISPORT shutdown
    45              while [ -x /proc/${PID} ]
    46              do
    47                  echo "Waiting for Redis to shutdown ..."
    48                  sleep 1
    49              done
    50              echo "Redis stopped"
    51          fi
    52          ;;
    

    修改完成后再次关闭服务就可以成功了

    $ service redis_6379 stop
    Stopping ...
    Warning: Using a password with '-a' or '-u' option on the command line interface may not be safe.
    Redis stopped

    启动服务

    $ sudo service redis_6379 start
    Starting Redis server...

使用客户端连接服务

$ redis-cli
127.0.0.1:6379> auth niit1234
OK

卸载Redis

停服务、删文件即可!

#查看进程
ps aux |grep redis
#杀掉进程
kill -9 <进程号>
#查看相关文件
find / -name "redis*"
# rm -rf 删除redis相关文件, 例如:
sudo rm -rf /etc/redis
sudo rm -rf /var/lib/redis
sudo rm -rf /etc/rc.d/init.d/redis_6379
sudo rm -rf /var/log/redis_6379.log
sudo rm -rf /usr/local/bin/redis-*
sudo rm -rf /home/hadoop/app/redis-5.0.14

其他可能出现的问题及解决办法

执行service redis_6379 start命令时,出现提示:

/var/run/redis_6379.pid exists, process is already running or crashed

删除这个pid即可

rm -rf /var/run/redis_6379.pid

Redis性能测试

模拟N个客户端同时发出M个请求:

  • 通过redis-benchmark命令,测试结果如下:(每秒请求数)
$ ./src/redis-benchmark -n 10000 -q
PING_INLINE: 18115.94 requests per second
PING_BULK: 22988.51 requests per second
SET: 36496.35 requests per second
GET: 22883.29 requests per second
INCR: 24570.02 requests per second
LPUSH: 34722.22 requests per second
RPUSH: 20202.02 requests per second
LPOP: 24330.90 requests per second
RPOP: 30120.48 requests per second
SADD: 25252.53 requests per second
HSET: 28571.43 requests per second
SPOP: 35842.29 requests per second
LPUSH (needed to benchmark LRANGE): 15673.98 requests per second
LRANGE_100 (first 100 elements): 10405.83 requests per second
LRANGE_300 (first 300 elements): 9442.87 requests per second
LRANGE_500 (first 450 elements): 7374.63 requests per second
LRANGE_600 (first 600 elements): 5827.51 requests per second
MSET (10 keys): 31055.90 requests per second

使用Redis客户端

使用redi-cli客户端通过命令操作redis,这里先热身一下:

redis-5.0.8]$ ./src/redis-cli
127.0.0.1:6379> set msg helloworld
OK
127.0.0.1:6379> get msg
"helloworld"
127.0.0.1:6379> set cnt 100
OK
127.0.0.1:6379> incr cnt
(integer) 101
127.0.0.1:6379> incrby cnt 10
(integer) 111
127.0.0.1:6379> mset apple 6.82 orange 3.5 banana 9.9
OK
127.0.0.1:6379> mget apple orange banana
1) "6.82"
2) "3.5"
3) "9.9"
127.0.0.1:6379> rpush list0 java
(integer) 1
127.0.0.1:6379> rpush list0 python
(integer) 2
127.0.0.1:6379> rpush list0 go
(integer) 3
127.0.0.1:6379> lrange list0 0 -1
1) "java"
2) "python"
3) "go"
127.0.0.1:6379>

用可视化工具管理 Redis

名称: AnotherRedisDesktopManager

前身是一个工具叫做 RedisDesktopManager, 由于不再更新, 因此有人又制作了另外一款工具起名为AnotherRedisDesktopManager

下载:

GITHUB地址

exe可执行文件下载链接

使用

  1. 创建连接,输入HOST、端口(默认6379)密码(AUTH),连接名称(Name)点击OK,成功!

    image-20201214235231573

连接成功后可以使用AnotherRedisDesktopManager内置的Redis客户端工具与服务器进行交互:

image-20211205185535140

可视化界面可以方便查看和搜索那些已经存储的键值对.

image-20201214235712329

如果要清空所有缓存的数据, 可以点击flush DB选项

image-20211205185659993

当然还有很多其他功能, 这个软件和其他的数据库管理软件类似, 很快就可以上手了.

Views: 248

Index