黑马点评笔记

基础知识

  • Redis 是一个 key-value 的非关系型数据库,key 一般是 String 类型,而 value 类型多样

String 存单值,Hash 存对象,List 做队列,Set 去重,ZSet 排序。

  1. String(字符串)
    ➤ 最基本的 key-value,value 可以是字符串、数字或二进制数据。
    ✅ 例子:SET name “Alice” → 存一个值。
  2. Hash(哈希)
    ➤ 一个 key 对应多个字段(field)和值(value),像 Java 的 Map<String, String>。
    ✅ 例子:存用户信息:HSET user:1001 name “Alice” age “25”
  3. List(列表)
    ➤ 有序、可重复的字符串列表,支持从两端插入/弹出。
    ✅ 例子:消息队列、最新评论列表。
  4. Set(集合)
    ➤ 无序、不重复的字符串集合,支持交集、并集等操作。
    ✅ 例子:用户标签、好友去重。
  5. Sorted Set(有序集合 / ZSet)
    ➤ 每个元素关联一个分数(score),自动按分数排序。
    ✅ 例子:排行榜(按积分排序)、带权重的任务队列。

Jedis 使用

  1. 导入依赖

  2. 建立连接

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    private Jedis jedis;

    @BeforeEach
    void setUp() {
    // 建立连接
    jedis = new Jedis("192.168.56.10", 6379);
    // 设置密码
    jedis.auth("123");
    // 选择数据库
    jedis.select(0);
    }
  3. 使用 jedis

    1
    2
    3
    4
    5
    6
    void testString() {
    // 插入数据,方法名称就是redis命令名称
    String result = jedis.set("name", "张三");
    System.out.println(result);
    // 获取数据
    }
  4. 关闭连接,释放资源

    1
    2
    3
    4
    5
    void tearDown() {
    if (jedis != null) {
    jedis.close();
    }
    }

但是 jedis 是线程不安全的,所以一般使用连接池,连接池可以复用连接,避免频繁创建连接,提高性能

Spring Data Redis 使用

苍穹外卖用的就是这种 Redis 的 Java 客户端

  1. 引入 spring-boot-starter-data-redis 依赖
  2. 在 application.yml 中配置 Redis 连接信息
  3. 注入 RedisTemplate
  • 有自动序列化工具,指定序列化器之后,存储时他能将数据序列化为 JSON,取出时也能反序列化为对象

  • 但是这样 redis 里面就要存全类名,占用空间,所以推荐手动序列化和反序列化
    String json = mapper.writeValueAsString(object);序列化
    object = mapper.readValue(json, Object.class);反序列化

  • RedisTemplate 可以处理任意类型键值对,但是他用的 JDK 序列化器,会向 redis 里面存全类名,占用空间

  • StringRedisTemplate 专门处理字符串类型的键值对,用的字符串序列化器而任何对象又都能转成字符串,所以这个项目用的都是这个

项目导入

这个项目用 Java 8,也就是项目结构哪里改成 1.8 就能运行了

单表查询

用了 MybatisPlus 后单表查询非常简单,
query() 是 MP 提供的方法,查询的表名用注解写在实体类上,查询的实体类由当前 Service 的泛型决定

1
2
// 一致,根据手机号查询用户 select * from tb_user where phone = ?
User user = query().eq("phone", phone).one();

拦截器

和苍穹外卖一样,每次登录前对登录状态做校验

  1. utils 包下新建一个登录拦截器类,实现 HandlerInterceptor 接口 ctrl + i列出所有未实现的方法
  2. 实现前置拦截方法和后置拦截方法,将 session 的用户信息存入 threadlocal,每次请求都会创建一个线程,请求完归还线程
  • HttpServletRequest request 是由 Servlet 容器创建的,并且开发者不能直接调用或构造它
  • 只有在实现了像拦截器(HandlerInterceptor)这样的容器的特定接口后,框架才会自动把 request 对象“注入”到实现的里方法(如 preHandle)中供你使用。
  • 这是 Facade 模式的应用,容器通过 RequestFacade 向外暴露一个安全、简化的接口,隐藏内部复杂实现

session

  • 默认使用的是 Servlet 容器的 session 管理机制(如 tomcat)存在了 tomcat 的内存里,多台 tomcat 并不共享 session 存储空间,tomcat 会自动将 sessionid 写入 cookie
  • 引入 spring-sessino-data-redis 依赖并配置之后,session 数据就存在 redis 里面,每次用户登录后缓存他的 session,之后每次发出请求都从 redis 的 session 里面获取数据

redis 缓存验证码

  1. 之前配置了 SpringRedisTemplate 和 redis 的连接信息,所以现在直接像 Mapper 那样注入就行了,@Resource是根据类型注入@Autowired是根据名称注入,之后用 srringRedisTemplate.opsForValue()来操作 redis
  2. 发送验证码的时候,以手机号作为 key,验证码作为 value,设置过期时间
  3. 验证成功保存用户到 redis 的时候,以随机的 UUID 作为 key,然后 user 对象转成 userDTO,去掉敏感信息,userDTO 转成的 Map 作为 value,用 hash 类型用多个键值对存储用户的信息,
  4. 然后加一个 token 刷新拦截器,拦截所有请求,只要用户有操作就刷新 token 有效期,原先的登录拦截器就只做登录拦截,其余逻辑(存放用户信息到 ThreadLocal)放到 token 刷新拦截器里
    Another 的 redis 图形化界面,每个 key 的 ttl 显示在最上面,hashMap 里面有一列 ttl 都显示-1,不是真的,还是要看 map 在 redis 里 key 的 ttl

redis 缓存更新策略

  1. 内存淘汰策略
    redis 自动就会使用,不用自己维护,内存不足时自动淘汰掉旧数据
    一致性差,维护成本无
  2. 超时剔除
    给缓存添加 TTL 时间,到期后自动删除
    一致性一般,维护成本低
  3. 主动更新
    自己编写逻辑,在修改数据库时更新缓存
    一致性高,维护成本高

主动更新策略

  1. 调用者更新数据库的时候更新缓存(推荐)
  2. 将缓存与数据库整合为一个服务,由服务来维护一致性,调用者调用就行,无需关心缓存一致性
  3. 调用者只操作缓存,由其他线程异步的将缓存数据持久化到数据库,保证最终一致性
  • 删除缓存还是更新缓存,如果进行上百次更新但中间没有人访问,那就会出现大量无效的写操作,但是删除就不会有这种问题,而且两个线程同时更新也会有线程安全的问题
  • 怎么保证缓存和数据库操作的原子性,单体系统就放在一个事务里,分布式有分布式方案
  • 多线程是先删缓存再操作数据库,还是先操作数据库再删缓存。都可能有问题,但是先操作数据库再删缓存出问题的概率更小,因为更新数据库再删缓存中间的时间间隔更短。此时再加上超时剔除可以兜底

【总结】

  1. 低一致性需求:用 redis 自带的内存淘汰机制
  2. 高一致性需求:主动更新,并用超时剔除兜底
    • 读操作
      • 缓存命中直接返回
      • 缓存未命中则查询数据库,并写入缓存,设定超时时间
    • 写操作
      • 先写数据库,再删除缓存
      • 要确保数据库和缓存操作之间的原子性

缓存穿透

缓存穿透是指客户端请求的数据在缓存和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库

【解决方案】

  • 缓存空对象
    数据库中没查到就缓存一个空对象到 redis 中,之后的请求就不会到达数据库了
    简单,易于维护,但是有额外内存消耗(设置 TTL 来减少),可能存在短期不一致
  • 布隆过滤器
    在客户端和 redis 中加一层布隆过滤器,不存在就拒绝,存在则放行
    但是他是有概率的,他说不存在是真的不存在,但是他说存在也有可能不存在然后可能缓存穿透
    空间占用少,但是实现复杂,存在误判可能

【预防方案】

  • 增加 id 的复杂度
  • 做好格式校验
  • 加强用户权限

缓存空对象的流程:

  1. 数据库查到用户不存在时,就缓存一个空对象到 redis 中
  2. 然后新增一个判断空值是null还是""的语句,判断当前是查到的是空对象还是 redis 当前没缓存

缓存雪崩

缓存雪崩是指同一时间段缓存的大量 key 同时失效或者 redis 宕机,导致大量请求都到达服务器,带来巨大压力

【解决方案】

  • 给不同的 key 的 TTL 添加随机值
  • 利用 redis 集群提高可用性
  • 给缓存业务添加降级限流策略
  • 给业务添加多级缓存

缓存击穿

缓存击穿问题也叫热点 key 问题,就是一个被高并发访问并且缓存重建业务比较复杂的 key 突然失效了,无数的请求都会到达数据库带来巨大压力

  • 互斥锁:给重建缓存的逻辑加锁,让缓存重建的逻辑只有一个线程执行,其他线程就先等待之后重试,直到第一个线程缓存重建完,锁释放才查询成功
    保证一致性,简单,但是性能低,可能死锁
  • 逻辑过期:不设置 TTL 而是设置了一个字段来存储过期时间,之后查询缓存的时候多一个判断是否过期的逻辑,如果过期就获取互斥锁开启一个新线程来重建缓存完成后重置逻辑过期时间,当前和在此时访问的其他线程都直接返回旧数据
    性能好,但不保证一致性,有额外内存消耗,实现复杂

【简单互斥锁解决方案】
用 redis 的 setnx 命令设置一个 key,这个是只有 key 不存在是才能设置这个 key 的值,用这个 key 来标记当前互斥锁的状态

函数作为参数

  • 在封装解决缓存穿透的工具类的时候,没有缓存的时候要查数据库,而这个方法只有调用者知道,所以将其作为入参传入
  • 通过函数式接口(Functional Interface) 和 Lambda 表达式可以达到这种效果
    • 函数式接口:
    • Function<T, R>:接受一个参数,返回一个结果。
    • Consumer<T>:接受一个参数,无返回值。
    • Supplier<T>:无参数,返回一个结果。
    • Predicate<T>:接受一个参数,返回 boolean。
    • Runnable:无参数无返回值(常用于线程)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 先声明泛型列表,名字可以自定义
// 声明一个有参有返回值的Function作为参数,它的入参为ID,返回值为R
public <R, ID> R queryWithPassThrough(String keyPrefix, ID id,
Class<R> type,Function<ID, R> dbFallback,
Long time, TimeUnit unit) {
// 4.缓存不存在,根据id查询数据库
R r = dbFallback.apply(id);
}

// 调用者使用上面这个工具类
// 用 lambda 表达式作为函数
cacheClient.queryWithPassThrough(CACHE_SHOP_KEY, id, Shop.class, id2 -> getById(id2), CACHE_SHOP_TTL, TimeUnit.MINUTES)
// 这种 lambda 表达式可以简写
id2 -> getById(id2)
this::getById // 直接传入本类的方法

缓存穿透和缓存击穿

  • 这两种情况不会同时发生,因为逻辑过期解决缓存击穿的设计初衷是用于「热点、一定存在的数据」,它默认假设数据库中该 key 对应的数据是存在的。如果数据根本不存在,用逻辑过期模型不仅不合适,还可能引发缓存穿透。
  • 入口过滤非法 id 能一定程度防止缓存穿透,还可以用布隆过滤器拦截访问不存在的资源的请求
  • 热点数据 + 非热点/可能不存在数据时,就该分层处理或动态选择策略

全局唯一 ID

全局唯一 ID 生成策略:

  • Redis 自增:利用 redis 的 incr 命令生成全局唯一 ID(Redis 集群的 key 仍然是唯一的)
  • UUID:直接用自带的工具类生成,返回的是十六进制的字符串
  • 雪花算法:知名,也是 64 位数字,需要维护一个机械 ID
  • 数据库自增:单独用一个数据库表,自增 ID,相当于 redis 自增的数据库版

Redis 自增 ID 策略:

  • 每天一个 key,方便统计订单量
  • ID 构造:时间戳 + 计数器

JMeter

多线程测试工具

  1. 创建线程组
  2. 创建 HTTP 请求
  3. 请求内设置 HTTP 的请求头,authorization
  4. 结果用查看结果树和汇总报告
  5. 结果是否成功可以用 JSON 断言来指定
    指定返回的 JSON 中有 success:true 才算成功
    字段:Assert JSON Path exits: success
    预期结果:Expected value: true(第一个条件断言勾上,第二个正则匹配不勾)

乐观锁

乐观锁的关键是在查询的时候判断之前得到的数据是否被修改过
CAS 法(Compare And Swap)如果当前值和预期值一致,就和新值交换

  • 用版本号来说明,一开始查到的库存数和版本号为 1,在修改时加上where version = 1来确保修改时的数据和之前一样
  • 上面的这个 version 的功能可以和库存整合,查询时确保库存数和之前一样就不用版本号了where stock = 1
  • 但是乐观锁是适合读多写少的场景,一个线程成功此时的其他线程都会失败,成功率太低
  • 对上面乐观锁进行改进,改成只要库存大于 0 就允许修改where stock > 0
  • 但是目前对于数据库压力大

一人一单

一个用户只能买一个券
根据用户 ID 来创建锁,使每个用户多次抢券时只有一个线程能获得锁来创建订单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Long userId = UserHolder.getUser().getId();
// 根据userId生成锁,使每个用户购买多个优惠券创建多个线程时,同时只有一个线程能获得锁来创建订单
synchronized (userId.toString().intern()) {// intern()方法将字符串常量池中的字符串对象返回,这样每次toString()返回的都是同一个字符串对象
// 创建订单
// @Transaction加在方法上,此时用this调本类的方法实际上就不会走 Spring AOP 生成的代理对象,所以事务会失效
// 获取Transaction创建的代理对象,强转成接口类
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
// 这个方法要在接口里实现,因为这是基于接口的动态代理(JDK)
return proxy.createVoucherOrder(voucherId);
}

// 启动类上要加这个注解来暴露代理对象
@EnableAspectJAutoProxy(exposeProxy = true)

// 还要加这个依赖,因为底层用的是这个代理,记得更新maven
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>

【并发问题】
上面的锁是由 JVM 内部的锁监视器监管,多个 JVM 都各自用自己的锁监视器,所以上面这个在集群下有并发安全问题

IDEA 模拟集群

  1. 左下角服务(alt + 8 也行),点加号,添加 Spring boot,先把当前运行项目加入
  2. 然后复制一份(ctrl + D),修改端口号-Dserver.port=8082,新版 IDEA 要先点修改选项里面的的添加虚拟机选项,才能配置
  3. 修改 nginx.conf 的配置文件,配置负载均衡,当访问 8080 端口时就反向代理到 backend 的 后端服务器

分布式锁

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁
多个 JVM 都共用一个外部锁监视器就能解决上面一人一单的并发问题

分布式锁的实现:

  • MySQL:
    利用 mysql 本身的互斥锁机制(数据库写操作会加锁,并且自带事务)
    高可用,性能一般,安全性高(有事务保证锁的释放)
  • Redis:
    利用 setnx 这样的互斥命令(只在 key 不存在时设置)
    高可用,性能好,安全性一般(可以用 ttl 到期释放)
  • Zookeeper:
    利用内部节点的唯一性和有序性实现互斥
    高可用,性能一般,安全性高(临时节点,断开连接自动释放)

基于 Redis 的分布式锁

要实现两个基本方法:

  • 获取锁:
    • SETNX lock thread1 添加锁利用 setnx 的互斥特性
  • 释放锁:
    • DEL lock 删除 key,其他线程就能获取锁了
    • 超时释放,防止锁永远得 不到释放

【一人一单】

  • 每个用户都有自己的一个订单锁key为lock:order:userId,value 为 uuid-threadId
  • 没有uuid的话,多个实例之间可能会生成相同的线程id,造成线程1阻塞再执行误删线程2的锁
  • key上可没有uuid,key上加uuid那还能锁啥啊,纯自闭

Redis 的 Lua 脚本

  • 但是如果判断value和当前线程之后,释放锁之前的时间段产生了阻塞(full gc可能让程序在任何地方产生阻塞 STW(stop the world))那恢复时就有可能误删其他线程的锁,所以判断和释放必须是原子的
  • Lua脚本查询的是Redis,不是MySql,所以是库存还有下单的所有用户都是预先存在了Redis中的,lua是原子性的,用redis才能保证效率,而且lua本身也不支持MySql

Redis 提供了 Lua 脚本功能/在一个脚本中编写多条 Redis 命令/确保多条命令执行时的原子性

1
2
3
-- 调用脚本
EVAL "return redis.call('setnx', KEYS[1], ARGV[1])" 1 name Rose
-- EVAL "脚本内容"、脚本需要的key类型的参数个数、key类型会放入KEYS数组中、value参数会放入ARGV数组中

将下面的java函数改成 lua 脚本

1
2
3
4
5
6
7
8
9
10
11
@Override
public void unlock() {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中标识
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
if (threadId.equals(id)){
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}

将比较线程id和释放锁改成原子性的lua脚本

1
2
3
4
5
6
7
8
-- 这里的KEYS[1]就是锁的key, 这里的ARGV[1]就是当前线程标识
-- 获取所中的表示,判断是否与当前线程表示一致
if redis.call("get", KEYS[1]) == ARGV[1] then
-- 一致则删除锁
return redis.call("del", KEYS[1])
end
-- 不一致则直接返回
return 0

在java中调用lua脚本

1
2
3
4
@Override
public <T> T execute(RedisScript<T> script, List<String> keys, Object... args) {
return stringRedisTemplate.execute(script, keys, args);
}

依照上面的模版,将java代码改成lua脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 新声明一个脚本
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;

// 静态代码块,配置脚本
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));// 从Resource中加载lua脚本
UNLOCK_SCRIPT.setResultType(Long.class);// 返回结果类型
}

// 修改原来的unlock方法,给脚本传入参数
@Override
public void unlock() {
// 调用lua脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,// lua脚本
Collections.singletonList(KEY_PREFIX + name),// 给单元素转成集合
ID_PREFIX + Thread.currentThread().getId()); // 释放锁的线程id
}

Redisson

Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格 (ln-Memory Data Grid) 它不仅提供了一系列的分布式的java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
上面的只是自己实现的例子,要用Redis实现分布式式锁直接用这个框架就行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>

// 配置类
@Configuration
public class RedissonConfig {

@Bean
public RedissonClient redissonClient() {
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");// 如果有密码,后面就跟一个setPassword("密码")
// 创建RedissonClient对象
return Redisson.create(config);
}
}

// 注入并使用
@Resource
private RedissonClient redissonClient;

// 创建锁对象,之前是自己实现的,现在直接用redisson提供的
// SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
RLock lock = redissonClient.getLock("lock:order:" + userId);
// 获取锁
boolean isLock = lock.tryLock();

他还解决了我们自己实现的锁的4个缺陷:

  • 可重入:利用hash结构记录线程id和重入次数,一个线程可以多次获取锁,直到释放锁,每次获取时计数器加一,释放时计数器减一,计数器为0时释放锁
  • 可重试:利用信号量和PubSub功能实现等待、唤醒、获取锁失败的重试机制
    • 指定一个锁的过期时间(leaseTime),在截止时间内会不停的重试获取锁,直到成功或超时。
    • Redisson 使用 Pub/Sub 通知优化重试,在其他线程释放锁发出信息,当前线程接收后才开始重试。
  • 超时续约:利用watchDog,每隔一段时间(releaseTime / 3 ),重置超时时间
    • Redisson 里获取锁成功的线程并且没有指定leaseTime的线程会开启看门狗(watchdog)机制,不断刷新锁的过期时间,确保锁只会在业务执行完释放而不是超时释放
  • 主从一致性:
    • 正常Redis集群是分主从,写操作由主节点负责,读操作由从节点负责,主节点会不断把数据同步给从节点,但是如果主节点挂了,就会新选一个从节点作为主结点,但是原来一些没有同步的key就会丢失
    • 而Redisson会设置多个主节点,每次写操作会对每一个主节点发出,只有在每一个主节点都获取到锁(MultiLock联锁)时才算成功,然后每个主节点也可以设置自己的从节点来同步数据增强可用性

用阻塞队列优化秒杀

简化业务,将判断库存是否充足和用户是否已经下单都放在redis中,充足并且未下过单就可以购买,加入阻塞队列中异步的减库存和下单

  1. 添加优惠券时会将(优惠券的id:库存)都放到redis中,这样redis就有每一个优惠券的库存了,可以用来判断是否充足
  2. 已经下单的(优惠券id:用户id) 放入redis的集合,只有不在集合中的订单才会可以创建,创建后加入集合
  3. 创建一个阻塞队列,redis中成功的订单会加入队列
  4. 创建一个单线程线程池,线程任务就是不断从阻塞队列中获取订单,并在数据库中 创建订单

【问题】
当前是基于jvm的阻塞队列,如果空间满了数据就丢了,而且队列如果出问题可能有任务丢失,出现数据不一致的问题

消息队列

消息队列(Message Queue),存放消息的队列,包含三个元素

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)(Broker:代理)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列中获取消息并消费

【Redis中消息队列的实现方式】

  1. 基于List结构模拟
    用BRPOP和BLPOP替代RPOP和LPOP来实现阻塞效果
    利用redis存储,有持久化,有序性,但是会丢失,只支持单消费者

  2. 基于PubSub模拟
    发布订阅,消费者可以订阅一个或多个Channel(频道)
    SUBCRIBE channel1 channel2 channel3订阅一个或多个频道
    PUBLISH channel1 message1向一个频道发送消息
    PSUBSCRIBE pattern1 pattern2订阅与pattern格式匹配的所有频道
    支持多生产者,多消费者,但是不支持消息持久化,消息可能,消息堆积有上限

  3. 基于Stream结构模拟
    XADD key id field value [field value ...]向Stream中添加一个元素

    1
    2
    ## 创建名为stream1的队列并发送一个消息,内容是{"name": "Alice", "age": 18},并用Redis自动生成ID
    XADD stream1 * name Alice age 18

    XREAD [COUNT count] [BLOCK milliseconds] STREAMS stream [ID [ID ...]]

    1
    2
    ## 阻塞读取stream1队列中的消息,最多读取10条,超时10秒(最后的0表示从0开始读取,$表示读取这条命令后收到的最新消息)
    XREAD COUNT 10 BLOCK 10000 STREAMS stream1 0

    消息可回溯,一个消息可以被多个消费者读取,可以阻塞读取,但是有漏读风险

基于Stream的消息队列

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列,有三个特点

  • 消息分流:消息会分流给组内的不同消费者
  • 消息表示:消费者组会维护一个标识,记录最后一个被处理的消息,确保每一个消息都会被消费
  • 消息确认:消费者获取消息后,消息处于pending(待定/待处理)状态并存入pending-list,当处理完消息之后需要通过XACK来确认消息,标记为已处理,消息才会从pending-list中移除

创建消费者组
XGROUP CREATE key groupName ID [MKSTREAM]

  • key: 队列名
  • groupName: 消费者组名
  • ID: 队列的起始ID,0代表队列中第一个消息,$代表队列中最新消息
  • MKSTREAM: 如果队列不存在,则创建队列

从消费者组读取消息
XREADGROUP GROUP groupName consumerName [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [ID [ID ...]]

  • groupName: 消费者组名
  • consumerName: 消费者名,不存在会自动村建
  • count: 读取消息数量
  • BLOCK milliseconds: 阻塞时间
  • NOACK: 无需手动ACK,获取到信息后自动确认
  • STREAMS key: 监听的队列名
  • ID: 获取消息的起始ID
    • “>”: 从下一个未消费的消息开始
    • 其他: 其他根据id从pending-list中获取已消费但未确认的消息,比如0就是从pending-list中第一个消息开始

确认消息
XACK key groupName ID [ID ...]

【STREAM类型消息队列的XREADGROUP命令特点】

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读风险
  • 有消息确认机制,保证消息至少被消费一次

点赞

一人一赞

  1. Blog类中添加一个isLike字段,表示是否被当前用户点赞
  2. 利用Redis中的set集合判断是否被当前用户点赞,每个blog都对应一个set集合,所有点赞过的用户id都保存在这个集合中

显示最先点赞的5个人

就像qq,后点赞的为等其他人
用zset(sorted set)实现,以时间戳为score,就能实现自动排序

  • 元素是否存在是用zscore直接查分数,不存在则返回nil
  • zrange key start stop [WITHSCORES]查询排名范围

内容推送

Feed流实现方案:

  • 拉模式:up发内容每次往发件箱发,粉丝读取时从发件箱里拉取(不推荐)
  • 推模式:up发内容时直接将内容推给粉丝,放到粉丝的收件箱(粉丝少时推荐)
  • 推拉结合:up发内容时直接推送给活跃粉丝,不活跃的粉丝从发件箱里读取(粉丝多时推荐)

reids的zset作为粉丝收件箱,up发内容时将博客id推送到每个粉丝的收件箱中

滚动分页

Feed流中的数据会不断更新,所以下标会变,假设刚查完第一页之后,有一个数据插进来了,此时读第二页的时候就会重复读取一个被读取的数据

滚动分页就是记录上一个页的最后一个id,下次读的时候从这里开始

滚动分页查询参数:

  • max: 第一次查询为当前时间戳 | 以后为上次查询的最小时间戳
  • min: 固定为0
  • offset(偏移量): 第一次为0 | 以后为上一次查询结果中与最小时间戳相同的数据条数
    可以像算法一样维护一个当前最小时间戳和相同数量,如果上一个和当前相同就相同数量加一,不相同就重置为1
  • count: 固定为每页条数

GEO数据结构

GEO就是Geolocation的简写,代表地理坐标,存储经纬度信息
这是Redis支持的数据结构,可以直接用下面的命令

  • GEOADD:添加一个地理位置,包含经度(longitude)、纬度(latitude)、值(member)
  • GEODIST:计算两个点之间的距离
  • GEOHASH:将指定member的坐标转换为hash字符串形式返回,空间更小
  • GEOPOS:返回指定member的经纬度信息
  • GEORADIUS:指定一个圆心和半径,返回包含的所有member,6.2后废弃
  • GEOSEARCH:在指定范围内搜索member,可以是圆形可以是矩形,6.2新功能
  • GEORADIUSBYMEMBER:与上面的功能一致,但是会把结果存到一个指定的key,6.2新功能

底层用的zset存储,member就是member,score就是经纬度转成的数

BitMap(位图)

把每一个bit位对应一个数据,用0和1表示数据,这种思路就称为位图(BitMap)
比如某月的签到情况就可以用31位的二进制数表示出来,大大减少数据存储空间

Redis中也支持BitMap,底层用String实现
BitMap的命令有:

  • SETBIT:设置指定bit为1或0
  • GETBIT:获取指定bit的值
  • BITCOUNT:统计BitMap中值为1的个数
  • BITFIELD:操作(查询、修改、自增)BitMap中bit数组中指定位置(offset)的值
  • BITFIELD_RO:获取BitMap中bit数组,并以十进制形式返回
  • BITOP:对多个BitMap进行运算,并返回结果
  • BITPOS:返回bit数组中指定范围内第一个0或1出现的位置

HyperLogLog

  • UV:全称Unique Visitor,即独立访客,用来统计网站访问量
  • PV:全称Page View,即页面浏览量,用来统计网站页面访问量

Hyperloglog(HLL)是一种概率算法,用于确定非常大的集合的基数
Redis中的HLL是基于String结构实现的,内存占用小于16KB,误差只有不到0.81%

HLL的命令有:

  • PFADD:添加元素。添加相同元素会被忽略
  • PFCOUNT:返回集合的基数

单点Redis的问题

  • 数据丢失问题:实现Redis数据持久化
  • 并发能力问题:搭建主从集群,实现读写分离
  • 故障恢复问题:利用Redis哨兵,实现健康检测和自动恢复
  • 存储能力问题:搭建分片集群,利用插槽机制实现动态扩容

Redis数据持久化方案

RDB

RDB:Redis Database Backup file(Redis数据库备份文件)也叫Redis数据快照,就是把内存中的所有数据都记录到磁盘中,故障重启后从磁盘读取快照

  • save由主进程执行RDB,会阻塞所有命令
  • bgsave开启子进程执行RDB,不会阻塞命令

Redis内部有触发RDB的机制,在redis.conf中的save配置项中设置触发条件,其他设置还有名称,路径,是否压缩等
比如save 900 1 表示900秒内如果至少有1个操作,就会触发RDB保存

  • redis操作内存时,操作的实际是操作系统根据物理内存分配的虚拟内存,操作系统又会将这些虚拟内存维护成一个页表
  • bgsave开始时会fork主进程得到子进程,子进程共享主进程的内存数据,就是将主进程的页表拷贝给子进程。子进程就会读取内存数据并写入新的RDB文件
  • fork采用的是copy-on-write(写时复制),避免了脏读

【缺点】

  • RDB执行耗时长,两次RDB之间写入的数据有丢失的风险
  • fork子进程、压缩、写出RDB文件都耗时长

AOF

AOF全称为Append Only File(追加文件)。Redis处理的每一个写命令都会记录在AOF文件

AOF默认是关闭的,可以通过修改redis.conf的appendonly yes开启
有三种记录的频率

  • appendfsync always每执行一次写命令都记录
  • appendfsync everysec写命令执行完先放入AOF缓冲区,每隔一秒将缓冲区数据写入AOF文件,这是默认方案
  • appendfsync no放入AOF缓冲区,由操作系统决定何时写入(那不如用RDB)

因为是记录命令,所以AOF文件会比RDB文件大很多。而且一个对一个key的操作,只有最后一个操作有意义。可以通过bgrewriteaof命令重写AOF文件,用最少的命令达到同样的效果,这个也可以设置阈值,监控AOF文件大小和增长百分比

RDB和AOF对比

持久化方式 数据完整性 文件大小 宕机恢复速度 数据恢复优先级 系统资源占用 使用场景
RDB 不完整/两次备份之间会丢失 会有压缩/文件体积小 很快 低/因为数据完整性不如AOF 高/大量CPU和内存消耗 可以容忍数分钟的数据丢失/追求更快的启动速度
AOF 相对完整/取决于刷盘策略 记录每个命令/文件体积很大 高,数据完整性高 低,主要是磁盘IO资源但AOF重写时会占用大量CPU和内存资源 对数据安全性要求较高

总之,可以忍受丢失,追求速度用RDB,追求数据安全用AOF,也可以结合使用

搭建主从集群

假设有A、B两个Redis实例,如何让B最为A的slave节点?
在B节点中执行slaveof A的IP A的端口
之后所有的A中写入的数据都会同步给B,B中可以直接get,同时B会变成只读

【同步原理】
从节点的大体数据通过RDB同步,同时主节点每次执行命令都会存在repl_baklog中,然后发给从节点,从节点执行完这里的命令之后数据就和主节点一样了

从节点第一次同步为全量同步,后续如果从节点重启后同步,则为增量同步。
用从节点的replid判断它的数据是同步的哪个主节点的数据

repl_baklog大小有上限,写满后会覆盖最早的数据,如果slave断开时间过久,导致未备份的数据被覆盖,则无法基于log做增量同步,只能全量同步

Redis哨兵

Redis提供了哨兵(Sentinel)机制来实现主从集群的自动故障恢复,作用如下

  • 监控:Sentinel会不断监控master和slave是否正常工作
  • 自动故障恢复:如果master故障,Sentinel会将一个slave提升为master,故障实例恢复之后也以新的master为主
  • 通知:Sentinel充当Redis客户端的服务发现来源,master变更时会将最新消息推送给Redis的客户端

Sentinel基于心跳机制监测状态,每隔一秒向集群的每个实例发送ping命令

  • 主观下线:某个Sentinel发现某实例没响应,那它就认为这个实例主观下线了
  • 客观下线:若超过指定数量(quorum)的Sentinel发现某实例没响应,那该实例就客观下线

选新的master是根据与master断开连接的时间长短选的,越短优先级越高

选中一个slave作为新master后故障转移的步骤如下

  • sentinel给slave发送slaveof no one命令,将slave变为主节点
  • sentinel给其他slave发送slaveof 新主节点的IP 新主节点端口命令让其他slave指向新的主节点
  • 最后故障节点标记为slave,恢复之后成为指向新主节点的slave

RedisTemplate的哨兵模式

  1. application.yml中指定sentinel信息,让sentinel作为redis集群的服务发现

  2. 配置读写分离

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @Bean
    public LettuceConnectionConfiguration lettuceConnectionConfiguration() {
    // 优先从从节点读取数据,都读取不到再从主节点读取
    return new LettuceConnectionConfiguration() {
    @Override
    public void customize(LettuceClientConfigurationBuilder builder) {
    builder.readFrom(ReadFrom.REPLICA_PREFERRED);
    }
    }
    // 这种lambda表达式也可以
    // return configBuilder -> configBuilder.readFrom(ReadFrom.REPLICA_PREFERRED)
    }
    // Spring Boot 在自动配置 RedisConnectionFactory(基于 Lettuce)时,会查找容器中是否存在 LettuceConnectionConfiguration 类型的 Bean。
    // 如果存在,就会将该配置函数应用到 LettuceClientConfigurationBuilder 上,从而定制 Lettuce 客户端的行为。

分片集群

  • 集群中有多个master,每个master保存不同数据
  • 每个master都可以有多个slave
  • master之间通过ping监测彼此健康状态
  • 客户端可以访问任意节点,都会被转发到正确的节点
  • 因为有监控和转发了 ,所以不再需要sentinel

分片原理

  • redis是将16384个插槽分配到不同的redis实例
  • 根据key有效部分计算哈希值,对16384取摸,计算数据应该存在哪个插槽上
  • 因为数据都是存储在插槽中,所以添加删除redis节点都不影响数据

如何将同一类的数据保存在一个redis实例中?

  • 将这些key的有效部分设为一样,例如key为{typeId}
  • key中{}内的就是有效部分,没有{}整个key就都是有效部分

分配插槽给节点都是有命令的,用的时候查查

多级缓存

  • 客户端缓存,nginx缓存,redis缓存,tomcat进程缓存
  • nginx也可以编写业务逻辑,作为集群,然后单独准备一个nginx做反向代理

JVM进程缓存

分布式缓存:例如redis,容量大,但是有网络开销
本地进程缓存:例如HashMap,读取本地内存,没有网络开销,但是容量小,无法共享

Caffeine

Caffeine是一个高性能本地缓存库,spring用的就是这个
在配置类里面可以配置需要的缓存比如

1
2
3
4
5
6
7
@Bean
public Cache<Long, ItemStock> stockCache() {
return Caffeine.newBuilder()
.initialCapacity(100)
.maximumSize(1000)
.build();
}

然后在其他地方注入就能使用了

1
2
3
4
5
6
7
8
@Autowired
private Cache<Long, ItemStock> stockCache;

@GetMapping("/stock/{id}")
public ItemStock findStockById(@PathVariable("id") Long id){
// 先根据id查询缓存,如果没有就查询数据库并存入缓存,第二个参数是一个函数
return stockCache.get(id, key -> stockService.query().eq("item_id", key).one());
}

Lua

Lua数据类型

  • nil:表示一个无效值,条件表达式中相当于false
  • boolean:true或false
  • number:双精度浮点数
  • string:字符串,单括号或双括号
  • function:C或Lua编写的函数
  • table:关联数组,类似python里面的字典

Lua语法

整体类似python

  • local 声明局部变量,不需要声明变量类型
  • local arr = {'java', 'c', 'lua'}声明数组
  • arr[1]访问数组
  • local map = {name='jack', age=21}声明map
  • map['name'] map.name访问map
  • str1 .. str2字符串拼接
  • for key, value in pairs(map) do 语句 end遍历map
  • and or not都和python一样
1
2
3
4
5
6
7
8
9
10
11
12
13
14
-- 定义函数
local function print(arr)
for key, value in pairs(arr) do
print(key, value)
end
end

-- 条件控制
if (布尔表达式)
then
-- [ true时执行的语句 ]
else
-- [ false时执行的语句 ]
end

OpenResty

OpenResty是一个基于Lua和Nginx的Web服务器,可以写业务逻辑
Nginx集群就是拿OpenResty组成的,一个Nginx反向代理到这个Nginx集群

修改niginx.conf文件

  1. 在nginx.conf的http下面,添加OpenResty的Lua模块的加载

    1
    2
    3
    4
    # 加载Lua模块
    lua_package_path "/usr/local/openresty/lualib/?.lua;;";
    # 加载C模块
    lua_package_cpath "/usr/local/openresty/lualib/?.so;;";
  2. 在nginx.conf的server下面,添加对/api/item这个路径的监听

    1
    2
    3
    4
    5
    6
    7
    # 这个就类似Controller
    location /api/item {
    # 响应类型,这里返回json
    default_type application/json;
    # 响应数据由Lua/item.lua文件决定,这个就类似Service
    content_by_lua_file lua/item.lua;
    }

OpenResty获取请求参数

用不用的ngx.req的方法获取不同参数
local args = ngx.req.get_uri_args()

向Tomcat发送请求

nginx提供了内部api用来发送http请求

1
2
3
4
5
local resp = ngx.location.capture("/path", {
method = ngx.HTTP_POST,
args = { a=1 },
body = "name=jack&age=21",
})

然后在配置文件中拦截这个请求并反向代理到Tomcat

1
2
3
4
location /path {
# 这里是windows的ip和tomcat的端口号,要确保防火墙处于关闭状态
proxy_pass http://192.168.5.1:8081;
}

因为这个http的方法很常用,所以应该封装
在这个目录下创建common.lua文件
vi /usr/local/openresty/nginx/lualib/commmon.lua
在common.lua中封装http查询的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
local function read_http(path, params)
local resp = ngx.location.capture(path, {
method = ngx.HTTP_POST,
args = params,
})
if not resp then
-- 记录错误信息,返回404
ngx.log(ngx.ERR, "http查询失败,path:", path, "args:", args)
ngx.exit(404)
end
return resp.body
end
-- 将方法导出
local _M = {
read_http = read_http
}
return _M

保证相同路径转发到服务器相同

在nginx代理的地方添加哈希算法,保证相同的路径每次转发到服务器相同

1
2
3
4
5
upstream nginx-cluster {
hash $request_uri; # 使用请求路径作为hash值
server 192.168.5.1:8081;
server 192.168.5.1:8082;
}

Redis缓存预热

  1. 建立一个RedisHandler组件类,实现InitializingBean接口
  2. 然后实现afterPropertiesSet方法,这个方法会在spring初始化完成后自动调用
  3. 在这个方法里面初始化缓存,就是查数据库,将数据保存到Redis中

查询Redis缓存

引入Redis模块之后,初始化Redis对象,之后就能用Redis对象执行Redis命令了
实现先查Redis缓存,未命中再去发http请求,tomcat再查数据库将结果缓存到Redis中

nginx本地缓存

  1. OpenResty为Nginx提供了shared dict功能,可以在多个worker之间共享数据,实现缓存功能
  2. 在nginx.conf中的在http下,就是和导入模块在同一级,那里用lua_shared_dict item_cache 150m设置shared dict名称和大小
  3. 然后还是在item.lua中,导入shared dict,然后就能使用他了

缓存同步策略

  • 设置有效期:给缓存设置有效期,过期就删除
  • 同步双写:在修改数据库的同时直接修改缓存,时效性强
  • 异步通知:修改数据库时发送事件通知,相关服务监听到后通知修改数据

Canal

基于Canal的异步通知,监控数据库的变更
Canal是基于MySQL的主从同步,每当数据库有修改,就可以同步修改缓存

  • Idea引入Canal依赖,然后yml文件中配置Canal
  • @CanalTable("tb_item")指定监听的表
  • implements EntryHandler<item>指定表关联的实体类
  • 实现三个方法:insert(item)update(item)delete(item)监听数据库的增删改
  • 每当数据库有变更,就会调用对应的方法,按里面的逻辑修改缓存,可以同时修改jvm和Redis缓存

key结构设计

  • 业务名称:数据名:id
  • 长度不超过44字节,跟string底层编码方式有关
  • 不包含特殊字符

Bigkey

大小过大的key

  • key的value小于10kb
  • 集合类型的key,建议元素数量小于1000

【扫描bigkey】

  • rredis-cli –bigkeys
  • scan扫描
  • 第三方工具
  • 网络监控

【删除bigkey】

  • 如果Bigkey占用过大,删除这样的key也会消耗很长时间,阻塞主线程
  • 可以异步的删除,redis有这样的命令例如unlink

选用恰当的数据结构

对象的存储方式

  • Json存储:简单粗暴,但是没法单独调整某个属性
  • 每个属性作为一个字段:方便调整,但是占用空间大
  • hash存储:适合存储对象,但是entry数量不要超过1000,这样才会使用ziplist优化空间
    超过了可以将键值对数量取模,分成很多小hash结构,也能节省很多空间

批处理

一次命令的响应时间 = 1次往返的网络传输耗时(多) + 1次Redis执行命令耗时(少)

  • 一次传输多条命令可以减少网络传输耗时
  • Redis提供了很多Mxxx这样的命令,可以实现批量插入,例如mset``hmset
  • 但一次也不要传输太多命令,否则单词命令占用带宽过多,导致网络阻塞

Pipeline

mset虽然可以批处理,但是却只能操作部分数据类型,pipeline可以传任意命令

1
2
3
4
5
6
7
8
9
10
// 创建管道
Pipeline pipeline = jedis.pipelined();
for (int i = 1; i <= 100000; i++) {
// 将命令放入管道
pipeline.set("test:key_" + i, "value_" + i);
if (i % 1000 == 0) {
// 每放入1000条命令,发出一次
pipeline.sync();
}
}

Pipeline多个命令之间并没有 原子性

集群下的批处理

如果Redis是一个集群,那批处理命令的多个key必须落在一个插槽中,否则会执行失败

解决方案

  • 串行slot:计算每个key的slot,每组都用Pipeline批处理,串行执行各组命令
  • 并行slot:和上面一样,就是并行执行
  • hash_tag:将所有key设置相同的有效部分(hash_tag)

Spring提供了实现集群批处理的工具类

1
2
List<String> strings = stringRedisTemplate.opsForValue().multiGet(Arrays.asList("name", "age", "sex"));
strings.forEach(System.out::println);

持久化配置

  • 用来做缓存的Redis实例尽量不要开启持久化功能

  • 建议关闭RDB持久化,使用AOF持久化

  • 利用脚本定期在slave节点做RDB,实现数据备份

  • 设置合理的rewrite阈值(压缩命令),避免频繁的bgrewrite

  • 配置no-appendfsync-on-rewrite=yes,禁止在rewrite期间做aof

  • 单个Redis实例内存上限不要太大,例如4G、8G。可以加快fork的速度、减少主从同步,数据迁移压力

  • 不要与CPU密集型应用部署在一起

  • 不要与高硬盘负载应用一起部署。例如:数据库、消息队列

慢查询

在Redis执行时的耗时超过某个阈值的命令,称为慢查询

  • 配置中可以指定阈值:slowlog-log-slower-than单位微妙,默认为10000,建议1000
  • 慢查询日志:slowlog-max-len本质是一个队列,长度默认128,建议1000。可以用命令查看,也可以用图形化客户端查看

Redis配置

命令及安全配置

  • Redis一定要设置密码
  • 禁止线上使用flushall,config等危险命令,可以将这些命令rename成别的
  • bind:限制网卡,禁止外网网卡访问
  • 开启防火墙
  • 不使用默认端口

内存配置

当Redis内存不足时,可能导致key频繁被删除

数据内存:是Redis最主要的部分,存储Redis的键值信息。主要是Bigkey、内存碎片问题
进程内存:Redis主进程运行肯定需要占用内存,可以忽略

集群完整性问题

在Redis默认配置中,如果发现任意一个插槽不可用,则整个集群都会停止对外服务
为了保证高可用性,要将cluster-require-full-coverage设置为false

Redis底层数据结构

Redis的数据结构底层采用的结构就是下面的这些
比如key和string就是用的SDS结构,其他value用的其他结构

Redis的字符串结构

Redis自己构建了一种新的字符串结构,称为简单动态字符串(Simple Dynamic String)简称SDS
Redis所有的key都是string,都用的SDS结构

Redis是C语言实现的,其中SDS是一个结构体

1
2
3
4
5
6
struct __attribute__ ((__packed__)) sdshdr8 {
unsigned char len; // buf(字符串师叔祖)已保存的长度
unsigned char alloc; // buf申请内存长度
unsigned char flags; // 不用SDS头类型,用来控制SDS的头大小,就是不同buf的长度上限
char buf[]; // 存储字符串
}

例如,一个包含字符串name的sds结构如下
len:4 alloc:4 flags:1 {'n', 'a', 'm', 'e', '\0'}

之所以叫动态字符串,因为它具备动态扩容的能力
假如追加一段字符串,会主动申请内存空间:

  • 如果新字符串小于1M,则新空间为扩展后的字符串长度的两倍+1
  • 如果新字符串大于1M,则新空间为扩展后的字符串长度+1M+1。
  • 这称为内存预分配,可以减少内存分配的次数

IntSet

set集合的一种实现方式,基于整数数组来实现,具有长度可变,有序的特性

寻址公式:startPtr + (sizeof(int16) * index)
因为数组每个元素占用字节长度相同,所以只要知道起始地址和元素索引,就可以计算出来该元素地址,这也是为什么索引是从0开始的

如果新插入的字符超出了int16_t的范围,inset会自动升级编码方式,每个整数占用字节长度变长
因为是有序的,所以插入的位置查找采用二分查找

Dict

哈希表结构体:一个entry数组,哈希表大小(size),哈希表大小掩码(sizemask = size - 1),entry个数
哈希节点(单个键值对)结构体:键,值,指向下一个键结构体的指针(计算出相同位置的entry会以链表形式连接)
字典:存储两个哈希表(一个存当前数据,一个为空rehash扩容时使用),dict类型,私有数据,rehash的状态

当我们向Dict中添加键值对时,Redis首先根据key计算出hash值(h),然后利用 h & sizemask (对掩码进行和运算相当于效率更高的取余,& 运算是只有两个位都为1,结果才为1,和4进行和运算相当于舍弃了4以上位数的值,只保留余数)计算出元素应该存储到数组中的哪个索引位置

ZipList

ZipList是一种特殊的双端链表,内存连续,可以在任意一端进行压入、弹出操作,并且时间复杂度为O(1)

ZipList不像普通链表每个节点那样记录前后指针,而是每个元素记录前一个节点长度、当前内容编码属性、当前内容,节省内存
这样算前后节点地址只需要通过地址和长度来计算即可,他查找的速度和链表是差不多的

列表数据过多,导致链表过长,可能影响查询性能

QuickList

QuickList也是一个双端链表,但是每个节点都是一个ZipList
它解决了ZipList过大的和难以申请内存的问题
中间的节点可以压缩进一步节省内存

SkipList

SkipList(跳表)是一种链表,元素按升序排列,节点可能包含多个指针,指针跨度都不同
传统指针跨度只有1,所以遍历很慢,用跨度更大的指针就可以减少遍历的次数,提高效率
查找的时候就先按跨度最大的指针进行查找,如果score大于要查的元素的score,就缩小跨度,继续查找,直到找到

【特点】

  • 跳表是一个双向链表,每个节点都包含score和ele值
  • 节点按照score值排序,score值一样则按照ele字典排序
  • 每个节点都可以包含多层指针,层数是1到32的数
  • 不同层指针到下一个节点的跨度不同,层级越高跨度越大
  • 增删改查效率与红黑树基本一致,实现却更简单

ReidsObject

Redis中的任意数据类型的键和值都会被封装为一个RedisObject,也叫做Redis对象(robj)
包含:对象类型、底层编码方式(下面基本类型都会提到底层编码都是什么)、对象最后一次被访问的时间、对象引用计数器(为0说明对象无人引用可以被回收)、指向实际地址的指针

Redis会根据存储的数据类型不同,选择不同的编码方式,共包含11种不同类型编码方式

5种基本数据结构

String

  • Sting是Redis中最常见的数据存储类型
  • 基本编码方式是RAW,分别申请两个内存空间存储头部和SDS数据
  • 存储的SDS如果少于44字节,SDS就会和头部存储在连续空间中,只用申请一次内存(推荐少于44字节)
  • 如果存储的字符串是整型并且大小在LONG_MAX范围内,那就会直接用int编码,存储在原来指针的位置,不需要SDS了

List

  • Redis的List类型可以从首、尾操作列表中的元素
  • 3.2版本之后,Redis统一使用QucikList结构来存储List数据

Set

Set是Redis中的单列集合

  • 不保证有序性
  • 保证元素唯一(可以判断元素是否存在)
  • 求交集、并集、差集

【底层数据结构】
为了查询效率和唯一性,set采用HT编码(Dict)。Dict的key用来存储元素,value统一为null
当存储的所有数据都是整数,并且元素数量不超过set-max-intset-entries,Set会采用IntSet编码

【跳表对比B+树的优势】

  • 实现简单:无需复杂的树平衡操作
  • 动态性:插入和删除操作都无需复杂操作,树
  • 内存效率高
  • 范围查询优势

ZSet

ZSset也就是SortedSet,每个元素有一个score值和member值

  • 可以根据值排序
  • member必须唯一
  • 可以根据member查询分数

【底层数据结构】

  • 同时用下面这两种来分别实现排序和查找key的功能,也就是说一份数据会存储两次

  • SkipList:可以排序并且可以同时存储score和ele值(member)(找key慢)

  • HT(Dict):可以键值存储,并且可以根据key找value(不能排序)

  • 元素数量不多时,HT和SkipList优势都不明显,为了节省内存会采用ZipList结构,元素数量(128)和每个元素大小(64字节)都要小于一个值

  • ziplist本身没有排序功能,而且没有键值对的概念,因此需要自己通过编码实现

Hash

Hash结构与Redis中的ZSet类似

  • 都是键值存储
  • 都需要根据建获取值
  • 键必须唯一

区别如下:

  • zset的键是memberr,值是score;hash的键和值都是任意值
  • zset要根据score排序;hash则无需排序

因此,Hash底层采用的编码与ZSet基本一致,只需要去掉排序用的SkipList

  • 结构默认用ZipList编码,用来节省内存,相邻的两个元素分别存储field和value
  • 当数据量较大时,Hash结构会转为HT编码(Dict)

IO方案

用户态读取是从内核读取的,内核缓冲区得到数据然后拷贝到用户缓冲区,就是下面两个流程

  1. 用户缓冲区等待数据就绪
  2. 用户线程将内核缓冲区中的数据读取(拷贝)到用户缓冲区

阻塞IO

两个阶段用户线程都在等待,拷贝完成返回OK

非阻塞IO

非阻塞IO调用recvfrom没得到结果就立刻返回,然后一直重试,直到数据就绪用户线程开始读取
但用户线程重试期间也没干别的事,其实单独看来性能不如阻塞IO

IO多路复用

服务器处理客户端Socket请求时,在单线程下只能按顺序处理每一个socket,如果正在处理的socket恰好未就绪(数据不可读不可写),那其他的socket就只能等待,性能很差,这个有两种解决方案

  • 多线程,每个线程处理一个socket
  • 单线程但先处理数据就绪的socket

文件描述符(File Descriptor):简称FD,是一个从0开始递增的无符号整数,用来关联Linux中的一个文件。在Linux中一切皆文件,例如常规文件,视频、硬件设备等、也包括网络套接字(Socket)

IO多路复用:是利用单个线程来同事监听多个IO通道(例如多个socket FD),并在某个FD可读、可写时得到通知,从而避免无效的等待,这个线程就可以一直工作,充分利用CPU资源

监听FD的方式、通知的方式有多重实现:select、poll、epoll
select和poll只会通知用户进程有FD就绪,但不确定是哪个FD,用户进程要自己遍历
epoll则会在通知的FD就绪的同时,把已就绪的FD写入用户空间

  • select:有一个fd_set(BitMap)存储监听的fd,内核会根据这个fd_set去查看是否就绪写回fd_set,然后拷贝回用户线程(会覆盖原来监听的fd_set,所以每次监听都要重新设置)
  • poll:用一个pollfd数组(可自定义大小)存储监听的fd信息,poll函数会返回已就绪fd数量,如果大于0则遍历pollfd数组,找到就绪的fd
  • epoll:对select和poll的巨大改进,内核空间存是一个eventpoll,内部有一个红黑树和一个链表,红黑树用来存储监听的fd,链表用来存储已就绪的fd,然后链表可以直接拷贝回用户空间

epoll有三个函数:

  • epoll_create:创建epoll实例
  • epoll_ctl:添加、修改、删除监听的fd
  • epoll_wait:等待已就绪的fd

【总结】
select模式的三个问题:

  • 能监听的FD数量最大是1024
  • 每次select都要把要监听的FD拷贝到内核空间
  • 每次都要遍历所有的FD来判断就绪状态

poll模式的问题:

  • 利用链表解决了监听上限的问题,但依旧要遍历所有FD,如果监听较多,性能会下降

epoll模式问题:

  • 基于epoll实例中的红黑树保存要监听的FD,理论上无上限,而且增删改查效率都非常高,性能不会随监听的FD数量增多而下降
  • 每个FD都只需要执行一次epoll_ctl添加到红黑树,以后每次epol_wait无需传递任何参数,无需重复拷贝FD到内核空间
  • 内核会将准备就绪的FD直接拷贝到用户空间的指定位置,无需再遍历所有FD来直到就绪的FD是谁

IO多路复用-事件通知机制

当FD有数据可读时,我们调用epoll_wait就可以得到通知,通知有两种模式

  • LevelTriggered:简称LT。当FD有数据可读时,会重复通知多次,直到数据处理完成,这是默认模式
  • EdgeTriggered:简称ET。当FD有数据可读时,只会通知一次,不管数据是否处理完成

这个LT重复通知实际上就是读取完之后如果仍就绪,就将FD添加回内核的就绪链表
ET只通知一次,就是不添加回内核就绪链表,所以如果没读完数据,用epoll_ctl添加这个FD,之后再检查,内核就又会将所有就绪FD添加到就绪链表

LT可能会出现惊群问题:假设多个线程处理一个FD,前一两个进程已经处理完FD,后面的就白唤醒了
ET可以避免这个问题,它配合EPOLLEXCLUSIVE只给一个线程通知一次之后,不会通知后续的线

IO多路复用-web服务流程

  1. 服务端用epoll_create创建一个epoll实例
  2. 然后创建一个serverSockert,得到FD,记作ssfd
  3. 当有客户端连接上serverSocket时,他的ssfd就变成就绪
  4. 从就绪的ssfd中用accetp()接收客户端的socket得到这个连接上的socket的FD,加入到监听队列
  5. 当客户端从这个连接FD发参数时,也会触发FD就绪,服务端就可以读数据了

所以就绪队列里面有两种socket的FD,一种serverSocket,用来与客户端建立连接:一种clientSocket,是用户发过来的请求

【socket连接流程】
服务端会创建一个监听socket,这个ssfd永远处于监听状态
然后客户端尝试连接时会生成一个socket,当客户端的连接到达服务端时,服务端就会监听到
调用accept()返回一个全新的已连接socket,此时客户端的socket也会进入已连接状态
然后双方就能用这个已连接的socket进行通信

信号驱动IO

信号驱动IO是与内核建立SIGIO的信号关联并设置回调,当内核有FD就绪时,会发出SIGIO信号通知用户,期间用户应用可以执行其他业务,无需阻塞等待

缺点:
当有大量IO操作时,信号较多,SIGIO处理函数不能及时处理的话可能导致信号队列溢出
而且内核空间与用户空间频繁信号交互,性能也低

异步IO

异步IO的整个过程都是非阻塞的,用户进程调用完异步API后就可以去做其他事
用户线程不使用recvfrom,内核数据就绪时内核自动进行拷贝,拷贝完成后递交信号用户线程

缺点:
因为IO操作是慢的,如果并发请求过多,内核积累了大量的IO请求,占用内存资源过多,可能会崩溃
所以要加限流,代码实现就困难了,所以性能虽然也好,但更推荐IO多路复用

只有异步IO的从内核数据拷贝到用户空间是非阻塞的,所以只有异步IO是异步的,其他4个都是同步的

Redis网络模型

Redis是单行程还是多线程?

  • 如果仅仅聊Redis的核心业务部分(命令处理),那是单线程
  • 如果是聊整个Redis,那就是多线程

Redis版本迭代时引入了多线程支持

  • Redis 4.0: 引入多线程异步处理一些耗时任务,如删除Bigkey时用的异步删除命令unlink
  • Redis 6.0:在核心网络模型中引入了多线程,进一步提高对于多核CPU的利用率

为什么Redis要使用单线程?

  • 抛开持久化不谈,Redis是纯内存操作,执行速度非常快,它的性能瓶颈是网络延迟而不是执行速度,所以多线程并不会带来巨大的性能提升
  • 多线程会导致过多的上下文切换,带来不必要的开销
  • 引入多线程会面临线程安全问题,必然要加锁,复杂度高,性能还低

集群模式下,分片保证局部单线程,操作同一个key保持在一个分片内,而这个分片Redis实例仍是单线程的

对客户端发来的请求进行读写的IO操作很耗时,这是redis的性能瓶颈,所以redis可以在这两个阶段开启多线程,但核心的读取命令后的执行命令仍然是单线程

RESP协议

客户端发送命令的格式、服务端响应结果的格式必须有一个规范,这个规范就是通信协议
Redis中采用的就是RESP(Redis Serialization Protocol)协议

RESP协议-数据类型

  • 单行字符串:以+开头,以\r\n结尾

  • 错误(Errors):以-开头,其他与单行字符串格式一样,只是字符串是异常信息,以\r\n结尾

  • 数值:以:开头,后面跟上数字格式字符串,以\r\n结尾

  • 多行字符串:以$开头,表示二进制安全的字符串,最大支持512MB,按字节大小读,就不怕中间有特殊字符:例如 $5\r\nHello\r\n,0为空串$0\r\n\r\n,-1为不存在$-1\r\n

  • 数组:以*开头,表示数组,数组中可以包含任意数据类型,就先标记数组元素个数,后面这个个数的数据就都是这个数组的元素,举例子

    1
    2
    3
    4
    5
    set name 虎哥 的命令就会变成这样的数组
    *3\r\n
    $3\r\nset\r\n
    $4\r\nname\r\n
    $6\r\n虎哥\r\n

发送的请求一般就都是数组,响应结果这五种都有可能

Redis过期策略

Redis的DB结构

Redis的16个DB实际上都是结构体,内部有,dict(存储所有键值对),expires(存储所有key的TTL)

过期策略-惰性删除

Redis采用惰性删除:一个key不是在TTL到期后立刻删除,而是在他被访问的时候检查他的存活时间,如果已经过期才执行删除

但是如果一个key一直没有被访问,那就不会被删除,一直占着空间

过期策略-周期删除

周期删除:设置一个定时任务,周期性的抽样部分过期的key,然后执行删除,周期有两种:

  • Redis设置一个定时任务,按照serrver.hz频率来执行过期key的删除,模式为SLOW
  • Redis的每个事件循环前都会调用beforeSleep()函数,执行过期key的清理,模式为FAST

SLOW模式规则:

  • 执行频率受serrver.hz参数影响,默认每秒10次,每个执行周期100ms
  • 清理耗时耗时不超过一次执行周期的1/4
  • 逐个遍历db,逐个遍历db中的bucket,抽取20个key判断是否过期,删除其中过期的key
  • 如果没达到时间上限(25ms)并且过期key的比例大于10%,就再抽一次样,否则结束

FAST模式规则:

  • 执行频率受beforeSleep()调用频率影响,但两次FAST模式间隔不低于2ms
  • 执行清理耗时不超过1ms
  • 逐个遍历db,逐个遍历db中的bucket,抽取20个key判断是否过期,删除其中过期的key
  • 如果没达到时间上限(1ms)并且过期key的比例大于10%,就再抽一次样,否则结束

Redis内存淘汰策略

只删除过期的key仍不满足空间需求,这时候就该删除其他key了

内存淘汰:当Redis内存使用达到设置的阈值时,Redis主动挑选部分key删除以释放更多的内存
Redis会在处理客户端命令的方法processCommand()中尝试内存淘汰,这个函数的执行条件为达到阈值,且不在lua脚本中

Redis淘汰策略

Redis支持8种不同的内存淘汰策略:

  • noeviction:不淘汰
  • volatile-ttl: 淘汰设置了TTL的key,TTL越小越线被淘汰
  • allkeys-random:对全体key,随机进行淘汰,从db->dict(db结构体的成员)中随机挑选
  • volatile-random:对设置了TTL的key,随机进行淘汰,从db->expires中随机挑选
  • allkeys-lru:对全体key,按照LRU进行淘汰
  • volatile-lru:对设置了TTL的key,按照LRU进行淘汰
  • allkeys-lfu:对全体key,按照LFU进行淘汰
  • volatile-lfu:对设置了TTL的key,按照LFU进行淘汰

LRU(Least Recently Used):最少最近使用。用当前时间减去最后一次访问时间,这个值越大淘汰优先级越高
LFU(Least Frequently Used):最少使用。统计每个key到底访问频率,值越小淘汰优先级越高

Redis的数据都会被封装为RedisObject,爱中就有LRU或LFU的时间,和访问次数
LFU的访问次数是逻辑访问次数:通过概率运算来决定计数器是否加一

淘汰策略的流程

  1. 先判断内存是否充足,否就开始执行淘汰策略
  2. 判断是否是ALLKEYS策略,是就从dict中淘汰,否就从expires中淘汰
  3. 判断内存策略random策略就随机删除,LRU|LFU|TTL要继续往下走
  4. 准备一个淘汰池(升序),逐个DB随机挑选key,用对应策略来判断key是否存入淘汰池
  5. 每个DB都遍历完后就倒序从淘汰池中删除key,直到内存充足

黑马点评笔记
http://www.981928.xyz/2026/01/13/黑马点评笔记/
作者
981928
发布于
2026年1月13日
许可协议