缓存的套路

本文探讨缓存设计的通用模式,涵盖从选型决策、更新策略到故障防护的完整体系。

mindmap
  root((缓存架构))
    何时使用
      读多写少
      热点集中
      可容忍最终一致性
    缓存层次
      近端缓存
        Guava
        Caffeine
        EhCache
      远端缓存
        Redis
        Memcached
    核心挑战
      更新策略
        Cache Aside
        Read Through
        Write Through
        Write Behind
      一致性保障
      故障防护
        击穿防护
        雪崩防护
        穿透防护

模式总览

# 模式名称 一句话口诀 适用场景
1 分层降级 本地兜底,远程扩展 多级缓存架构
2 惰性填充 触发加载,按需扩容 冷启动与预热
3 旁路同步 先库后删,读写互斥 Cache Aside 模式
4 空值防御 存空防击,短TTL控险 缓存穿透防护
5 随机散列 过期分散,渐进刷新 缓存雪崩防护

一、缓存的本质与适用场景

1.1 什么是缓存

缓存是一种以空间换时间的优化技术,通过在计算代价较高的数据源(数据库、外部服务等)和消费方之间插入高速存储层,减少重复计算的执行次数。

1.2 何时使用缓存

满足以下全部条件的场景适合引入缓存:

  • 读多写少:读操作占比显著高于写操作(典型比例 > 10:1)
  • 热点集中:小部分数据承担大部分访问流量(帕累托分布)
  • 可容忍最终一致性:业务逻辑能接受短暂的数据不一致窗口
  • 计算代价高:原始数据源响应慢或资源消耗大

1.3 不适用缓存的场景

场景 原因
实时性要求极高(如交易撮合) 毫秒级延迟无法接受任何缓存失效
写多读少 同步成本超过缓存收益
数据变化极快(如股票行情) 缓存命中率趋近于零
数据量极大但无热点 缓存无法有效缩减数据集

二、缓存的层次与选型

2.1 近端缓存 vs 远端缓存

缓存按部署位置分为两类:

维度 近端缓存 (In-Memory) 远端缓存 (Remote)
典型实现 Guava, Caffeine, EhCache Redis, Memcached
访问延迟 微秒级(~100μs) 毫秒级(~1-5ms)
容量限制 受限于单节点内存 水平扩展
数据共享 进程私有 多进程共享
运维复杂度 低(内置) 高(独立集群)

2.2 关于多级缓存

多级缓存(本地 + 远端)的设计需谨慎评估:

可行场景

  • L1:本地缓存(Caffeine)——存储极热数据
  • L2:远端缓存(Redis)——存储热数据
  • 差异化 TTL:L1 TTL << L2 TTL

风险点

  • 一致性问题:更新时需要同时失效多个层级
  • 复杂性倍增:每个层级都需要独立的容量规划、监控、故障预案

建议

  • 多数应用从单一远端缓存起步即可
  • 仅在 QPS > 10万且 p99 延迟 < 1ms 为硬性 SLA 时考虑多级缓存

三、缓存更新策略

缓存与数据源的一致性维护是核心挑战,业界形成了四种经典模式:

3.1 Cache Aside(旁路缓存)

最常用的模式,由应用程序显式控制缓存与数据库的交互。

1
2
3
4
5
读取流程:
Client -> Cache Miss? -> Load from DB -> Populate Cache -> Return Data

写入流程:
Client -> Update DB -> Delete Cache (非更新缓存)

为什么写入时是 Delete 而非 Update?

Update 模式在并发场景下存在竞态条件:

1
2
3
T1: 更新数据库 A=1
T2: 更新数据库 A=2, 更新缓存 A=2
T1: 更新缓存 A=1 (覆盖了T2的正确值,导致不一致)

Delete 模式下,下次读取会从数据库加载最新值,保证最终一致性。

3.2 Read Through / Write Through

缓存组件接管数据访问,对应用透明。

1
2
Read Through: Client -> Cache -> (Miss时Cache自动从DB加载)
Write Through: Client -> Cache -> Cache同步写入DB -> 返回成功

特点

  • 简化应用逻辑,无需处理缓存细节
  • 强耦合于特定缓存框架
  • Write Through 写延迟较高(需等待双写完成)

3.3 Write Behind(异步回写)

写入操作仅更新缓存,由后台线程异步批量刷盘到数据库。

1
2
Client -> Update Cache -> 立即返回成功
└-> 异步线程批量 flush 到 DB

优势

  • 写性能最优(纯内存操作)
  • 可合并多次更新,降低 DB 压力

风险

  • 数据丢失窗口:宕机时未 flush 的数据丢失
  • 实现复杂:需处理事务边界、失败重试、幂等问题

3.4 策略对比

策略 一致性 读性能 写性能 实现复杂度
Cache Aside 最终一致
Read Through 最终一致 -
Write Through 强一致 -
Write Behind 弱一致 - 最高

四、Java 进程内缓存详解

4.1 Guava Cache

Guava Cache 是 Google 提供的轻量级缓存库,设计上是对 ConcurrentHashMap 的增强。

核心特性

特性 说明
最大容量限制 基于数量或权重
过期策略 expireAfterWrite / expireAfterAccess
刷新机制 refreshAfterWrite(异步刷新,不阻塞读)
引用类型 支持 weakKeys / weakValues / softValues
统计信息 hit/miss/eviction 计数

基本用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.CacheLoader;

LoadingCache<String, User> userCache = CacheBuilder.newBuilder()
.maximumSize(10000)
.expireAfterWrite(10, TimeUnit.MINUTES)
.refreshAfterWrite(1, TimeUnit.MINUTES) // 异步刷新
.recordStats() // 开启统计
.build(new CacheLoader<String, User>() {
@Override
public User load(String key) throws Exception {
return loadUserFromDatabase(key); // 回源方法
}
});

// 获取(命中则返回,未命中则调用 load)
User user = userCache.get(userId);

⚠️ 重要限制:Guava Cache 不支持 null value

Guava Cache 的设计哲学将 null 视为"该 key 无对应数据",因此禁止存入 null。这导致一个实际问题:如何处理缓存穿透?

解决方案是使用空对象模式(Null Object Pattern):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static final User EMPTY_USER = new User();  // 标记空用户

LoadingCache<String, Optional<User>> userCache = CacheBuilder.newBuilder()
.build(new CacheLoader<String, Optional<User>>() {
@Override
public Optional<User> load(String key) {
User user = loadUserFromDatabase(key);
return Optional.ofNullable(user != null ? user : EMPTY_USER);
}
});

Optional<User> result = userCache.get(userId);
if (result.isPresent() && result.get() != EMPTY_USER) {
return result.get();
}
return null;

4.2 Caffeine

Caffeine 是 Guava Cache 的高性能替代品,采用 Window-TinyLFU 淘汰算法,性能显著优于 Guava 的 LRU。

演进关系

1
2
3
4
5
concurrentlinkedhashmap (Ben Manes)

Guava Cache (Google)

Caffeine (Ben Manes, Java 8)

关键改进

维度 Guava Caffeine
淘汰算法 Segmented LRU W-TinyLFU
并发模型 分段锁 无锁 + RingBuffer
刷新机制 同步阻塞 全异步
统计精度 简单计数 频率素描

异步加载示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;

AsyncLoadingCache<String, User> asyncCache = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(10, TimeUnit.MINUTES)
.executor(Executors.newFixedThreadPool(10)) // 自定义执行器
.buildAsync((key, executor) ->
CompletableFuture.supplyAsync(() -> loadUser(key), executor)
);

// 异步获取,不阻塞
CompletableFuture<User> future = asyncCache.get(userId);
future.thenAccept(user -> System.out.println(user.getName()));

4.3 EhCache 3

EhCache 是唯一支持持久化的本地缓存,适合大容量、可容忍磁盘访问延迟的场景。

堆内外分层配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<config xmlns="http://www.ehcache.org/v3">
<cache alias="largeDataCache">
<key-type>java.lang.Long</key-type>
<value-type>java.io.Serializable</value-type>

<resources>
<!-- 堆内缓存 -->
<heap unit="entries">1000</heap>
<!-- 堆外内存(避免 GC 压力) -->
<offheap unit="MB">128</offheap>
<!-- 磁盘持久化 -->
<disk unit="GB">10</disk>
</resources>

<expiry>
<ttl unit="minutes">60</ttl>
</expiry>
</cache>
</config>

与 Spring Boot 集成

1
2
3
4
5
6
7
8
9
10
11
12
13
@Configuration
@EnableCaching
public class CacheConfig {

@Bean
public CacheManager cacheManager() {
CachingProvider provider = Caching.getCachingProvider();
CacheManager manager = provider.getCacheManager();

// 或通过 ehcache.xml 配置
return new JCacheCacheManager(manager);
}
}

五、分布式缓存:Redis

Redis 是目前最流行的分布式缓存,本节聚焦 Java 客户端的最佳实践。

5.1 客户端选择

客户端 连接方式 特性 推荐场景
Jedis 直连 成熟稳定,API丰富 单机或简单分片
Lettuce Netty + 异步 响应式编程,集群友好 高并发、Reactive
Redisson 高级封装 分布式锁、对象映射 需要分布式协调

5.2 Spring Data Redis 配置

1
2
3
4
5
6
7
8
9
10
11
spring:
redis:
host: localhost
port: 6379
lettuce:
pool:
max-active: 50
max-idle: 20
min-idle: 5
max-wait: 3000ms
timeout: 2000ms

5.3 缓存序列化

默认 JDK 序列化存在性能和空间问题,推荐配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);

// Key 使用 String
template.setKeySerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());

// Value 使用 JSON
Jackson2JsonRedisSerializer<Object> jsonSerializer =
new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule()); // JDK8日期支持
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
jsonSerializer.setObjectMapper(mapper);

template.setValueSerializer(jsonSerializer);
template.afterPropertiesSet();
return template;
}

六、缓存故障防护

缓存系统面临三类典型故障场景,每类都有对应的防护模式。

6.1 缓存击穿(Hot Key Expiration)

现象:热点 key 过期瞬间,大量请求同时穿透到数据库。

防护模式

方案 原理 实现
互斥重建 单线程回源,其他线程等待 分布式锁或本地 synchronized
逻辑过期 不设置物理 TTL,通过逻辑字段判断是否过期 延长实际 TTL,后台异步刷新
热点识别 预加载即将过期的热 key 访问统计 + 主动刷新

Guava 互斥重建示例

1
2
3
4
5
6
LoadingCache<String, User> cache = CacheBuilder.newBuilder()
.expireAfterWrite(5, TimeUnit.MINUTES)
.refreshAfterWrite(1, TimeUnit.MINUTES) // 提前刷新,避免过期
.build(CacheLoader.from(this::loadUser));

// CacheLoader 的 load 方法是原子性的,天然防止并发击穿

6.2 缓存雪崩(Mass Expiration)

现象:大量 key 同时过期,引发数据库流量洪峰。

防护模式:随机散列

在基础 TTL 上增加随机偏移:

1
2
3
4
int baseTtlSeconds = 3600;  // 1小时
int randomOffset = ThreadLocalRandom.current().nextInt(600); // 0-10分钟
redisTemplate.opsForValue().set(key, value,
baseTtlSeconds + randomOffset, TimeUnit.SECONDS);

对于定时任务刷新的场景,采用阶梯过期:将数据分批,每批在不同时间点刷新。

6.3 缓存穿透(Phantom Key)

现象:查询不存在的数据,每次都穿透到数据库。

防护模式:空值防御

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public User getUser(String userId) {
String cacheKey = "user:" + userId;
User cached = redisTemplate.opsForValue().get(cacheKey);

if (cached != null) {
// 空对象标记,防止重复穿透
if (cached == EMPTY_USER) {
return null;
}
return cached;
}

User user = userRepository.findById(userId);
if (user != null) {
redisTemplate.opsForValue().set(cacheKey, user, 1, TimeUnit.HOURS);
} else {
// 缓存空对象,短 TTL
redisTemplate.opsForValue().set(cacheKey, EMPTY_USER, 5, TimeUnit.MINUTES);
}
return user;
}

进阶:布隆过滤器拦截

对于查询模式固定的场景,可用布隆过滤器前置过滤:

1
2
3
Client -> Bloom Filter Check? 
└─ 可能存在 -> Query Cache -> (Miss) -> Query DB
└─ 肯定不存在 -> 直接返回 Null

七、🔑 模式提炼

模式一:分层降级

公式L1(高频小容量) → L2(中频大容量) → Origin

迁移表

场景 L1 L2 Origin
用户信息 Caffeine (10s) Redis (1h) MySQL
配置项 Guava (1min) Consul Config Server
API 限流状态 Local Counter Redis Cell -

核心洞察:每一层的职责是"挡住上一层的穿透",TTL 必须逐层递增,否则失去分层意义。

模式二:惰性填充

公式On-Demand = Trigger(Load(Key) -> Store) -> Return

迁移表

场景 触发条件 加载来源 应用
缓存预热 系统启动/定时任务 数据库全量扫描 商品类目
懒加载 首次访问 miss 单条查询 用户资料
异步回填 消息队列通知 Kafka Topic 订单状态

模式三:空值防御

公式Key → {Value | EmptyMarker} with Short-TTL

关键参数

  • 空对象标记 TTL:通常为正常数据的 1/10 ~ 1/5
  • 必须配合手动清除逻辑,防止真实数据产生后仍返回空

八、生产环境实践要点

8.1 监控指标

指标 健康阈值 报警条件
命中率 > 90% < 80%
平均加载时间 < 100ms > 500ms
驱逐率 视容量而定 突增 > 200%
错误率 0% > 0.1%

8.2 容量规划

估算公式:

1
缓存条目数 ≈ 峰值 QPS × 平均访问间隔 / (命中率目标 / (1 - 命中率目标))

示例:QPS=10000,用户平均10分钟访问一次,目标命中率95%:

1
条目数 ≈ 10000 × 600s × (0.05/0.95) ≈ 315,789

8.3 故障演练清单

  • [ ] 模拟 Redis 宕机,验证降级逻辑
  • [ ] 模拟大 key 过期,观察数据库压力
  • [ ] 模拟网络分区,测试最终一致性收敛时间
  • [ ] 压测缓存满载时的驱逐行为

九、模式速查表

听到的需求关键词 对应模式 推荐方案 口诀
本地缓存要高性能 分层降级 Caffeine + Redis 本地兜底,远程扩展
冷启动加载慢 惰性填充 @PostConstruct 预加载 触发加载,按需扩容
数据更新后不一致 旁路同步 Cache Aside + 延迟双删 先库后删,读写互斥
恶意请求导致 DB 崩溃 空值防御 空对象 + 布隆过滤器 存空防击,短TTL控险
大量 key 同时过期 随机散列 TTL + Random Offset 过期分散,渐进刷新
热点 key 频繁失效 互斥重建 分布式锁 / 逻辑过期 单线回源,排队等待

参考文献

  1. 陈皓:缓存更新的套路
  2. 美团技术团队:缓存那些事
  3. Guava Cache 官方文档
  4. Caffeine Wiki

Guava

spring 的 cache 用的是 cachemanager。
guava 的 cache 用的是 cachebuilder。

Spring + Guava 一般的短路器式的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Configuration
@ComponentScan("com.concretepage")
@EnableCaching
public class AppConfigA {
@Bean
public CacheManager cacheManager() {
GuavaCacheManager cacheManager = new GuavaCacheManager("mycache");
CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder()
.maximumSize(100)
.expireAfterWrite(10, TimeUnit.MINUTES);
cacheManager.setCacheBuilder(cacheBuilder);
return cacheManager;
}
}

@Configuration
@ComponentScan("com.concretepage")
@EnableCaching
public class AppConfigB {
@Bean
public CacheManager cacheManager() {
SimpleCacheManager cacheManager = new SimpleCacheManager();
GuavaCache guavaCache1 = new GuavaCache("book", CacheBuilder.newBuilder()
.maximumSize(50).build());
GuavaCache guavaCache2 = new GuavaCache("bookstore", CacheBuilder.newBuilder()
.maximumSize(100).expireAfterAccess(5, TimeUnit.MINUTES).build());
cacheManager.setCaches(Arrays.asList(guavaCache1, guavaCache2));
return cacheManager;
}
}

@Service
public class BookAppA {
Book book = new Book();
@Cacheable(value = "mycache")
public Book getBook() {
System.out.println("Executing getBook method...");
book.setBookName("Mahabharat");
return book;
}
}

只需要覆盖 load 方法

load 和 evict 逻辑是解耦的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Test
public void whenCacheMiss_thenValueIsComputed() {
    CacheLoader<String, String> loader;
    loader = new CacheLoader<String, String>() {
        @Override
        public String load(String key) {
            return key.toUpperCase();
        }
    };
 
    LoadingCache<String, String> cache;
    cache = CacheBuilder.newBuilder().build(loader);
 
    assertEquals(0, cache.size());
    assertEquals("HELLO", cache.getUnchecked("hello"));
    assertEquals(1, cache.size());
}

// load 的用法,下次不会再计算 createExpensiveGraph 了。
CacheLoader<Key, Graph> loader = new CacheLoader<Key, Graph>() {
public Graph load(Key key) throws AnyException {
return createExpensiveGraph(key);
}
};
LoadingCache<Key, Graph> cache = CacheBuilder.newBuilder().build(loader);}

带权重的缓存配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Test
public void whenCacheReachMaxWeight_thenEviction() {
    CacheLoader<String, String> loader;
    loader = new CacheLoader<String, String>() {
        @Override
        public String load(String key) {
            return key.toUpperCase();
        }
    };
 
    Weigher<String, String> weighByLength;
    weighByLength = new Weigher<String, String>() {
        @Override
        public int weigh(String key, String value) {
            return value.length();
        }
    };
 
    LoadingCache<String, String> cache;
    cache = CacheBuilder.newBuilder()
      .maximumWeight(16)
      .weigher(weighByLength)
      .build(loader);
 
    cache.getUnchecked("first");
    cache.getUnchecked("second");
    cache.getUnchecked("third");
    cache.getUnchecked("last");
    assertEquals(3, cache.size());
    assertNull(cache.getIfPresent("first"));
    assertEquals("LAST", cache.getIfPresent("last"));

指定过期时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

@Test
public void whenEntryIdle_thenEviction()
throws InterruptedException {
CacheLoader<String, String> loader;
loader = new CacheLoader<String, String>() {
@Override
public String load(String key) {
return key.toUpperCase();
}
};

LoadingCache<String, String> cache;
cache = CacheBuilder.newBuilder()
// remove records that have been idle for 2ms:
.expireAfterAccess(2,TimeUnit.MILLISECONDS)
.build(loader);

cache.getUnchecked("hello");
assertEquals(1, cache.size());

cache.getUnchecked("hello");
Thread.sleep(300);

cache.getUnchecked("test");
assertEquals(1, cache.size());
assertNull(cache.getIfPresent("hello"));
}

@Test
public void whenEntryLiveTimeExpire_thenEviction()
throws InterruptedException {
CacheLoader<String, String> loader;
loader = new CacheLoader<String, String>() {
@Override
public String load(String key) {
return key.toUpperCase();
}
};

LoadingCache<String, String> cache;
cache = CacheBuilder.newBuilder()
// 这个策略更保守
// evict records based on their total live time
.expireAfterWrite(2,TimeUnit.MILLISECONDS)
.build(loader);

cache.getUnchecked("hello");
assertEquals(1, cache.size());
Thread.sleep(300);
cache.getUnchecked("test");
assertEquals(1, cache.size());
assertNull(cache.getIfPresent("hello"));
}

弱引用和软引用 key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Test
public void whenWeakKeyHasNoRef_thenRemoveFromCache() {
    CacheLoader<String, String> loader;
    loader = new CacheLoader<String, String>() {
        @Override
        public String load(String key) {
            return key.toUpperCase();
        }
    };
 
    LoadingCache<String, String> cache;
    cache = CacheBuilder.newBuilder().weakKeys().build(loader);
}

@Test
public void whenSoftValue_thenRemoveFromCache() {
CacheLoader<String, String> loader;
loader = new CacheLoader<String, String>() {
@Override
public String load(String key) {
return key.toUpperCase();
}
};

LoadingCache<String, String> cache;
cache = CacheBuilder.newBuilder().softValues().build(loader);
}

定时触发 load 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Test
public void whenLiveTimeEnd_thenRefresh() {
CacheLoader<String, String> loader;
loader = new CacheLoader<String, String>() {
@Override
public String load(String key) {
return key.toUpperCase();
}
};

LoadingCache<String, String> cache;
cache = CacheBuilder.newBuilder()
.refreshAfterWrite(1,TimeUnit.MINUTES)
.build(loader);
}

主动预热

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void whenPreloadCache_thenUsePutAll() {
    CacheLoader<String, String> loader;
    loader = new CacheLoader<String, String>() {
        @Override
        public String load(String key) {
            return key.toUpperCase();
        }
    };
 
    LoadingCache<String, String> cache;
    cache = CacheBuilder.newBuilder().build(loader);
 
    Map<String, String> map = new HashMap<String, String>();
    map.put("first", "FIRST");
    map.put("second", "SECOND");
    cache.putAll(map);
 
    assertEquals(2, cache.size());
}

必须使用 optional 来应对 null 值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Test
public void whenNullValue_thenOptional() {
CacheLoader<String, Optional<String>> loader;
loader = new CacheLoader<String, Optional<String>>() {
@Override
public Optional<String> load(String key) {
return Optional.fromNullable(getSuffix(key));
}
};

LoadingCache<String, Optional<String>> cache;
cache = CacheBuilder.newBuilder().build(loader);

assertEquals("txt", cache.getUnchecked("text.txt").get());
assertFalse(cache.getUnchecked("hello").isPresent());
}
private String getSuffix(final String str) {
int lastIndex = str.lastIndexOf('.');
if (lastIndex == -1) {
return null;
}
return str.substring(lastIndex + 1);
}

订阅删除事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Test
public void whenEntryRemovedFromCache_thenNotify() {
CacheLoader<String, String> loader;
loader = new CacheLoader<String, String>() {
@Override
public String load(final String key) {
return key.toUpperCase();
}
};

RemovalListener<String, String> listener;
listener = new RemovalListener<String, String>() {
@Override
public void onRemoval(RemovalNotification<String, String> n){
if (n.wasEvicted()) {
String cause = n.getCause().name();
assertEquals(RemovalCause.SIZE.toString(),cause);
}
}
};

LoadingCache<String, String> cache;
cache = CacheBuilder.newBuilder()
.maximumSize(3)
.removalListener(listener)
.build(loader);

cache.getUnchecked("first");
cache.getUnchecked("second");
cache.getUnchecked("third");
cache.getUnchecked("last");
assertEquals(3, cache.size());
}

Cache Statistic

可以 logging cache statistic data

Cache Stats= CacheStats{hitCount=3296628, missCount=1353372,
loadSuccessCount=1353138, loadExceptionCount=0,
totalLoadTime=2268064327604, evictionCount=1325410} Cache Stats=
CacheStats{hitCount=3334167, missCount=1365834,
loadSuccessCount=1365597, loadExceptionCount=0,
totalLoadTime=2287551024797, evictionCount=1337740} Cache Stats=
CacheStats{hitCount=3371463, missCount=1378536,
loadSuccessCount=1378296, loadExceptionCount=0,
totalLoadTime=2309012047459, evictionCount=1350990} Cache Stats=
CacheStats{hitCount=3407719, missCount=1392280,
loadSuccessCount=1392039, loadExceptionCount=0,
totalLoadTime=2331355983194, evictionCount=1364535} Cache Stats=
CacheStats{hitCount=3443848, missCount=1406152,
loadSuccessCount=1405908, loadExceptionCount=0,
totalLoadTime=2354162371299, evictionCount=1378654}

参考:recordStats

ECache

Spring4 + ECache 2

整个 namespace 的说明见这里

具体的配置选项见:

  • name:缓存名称。
  • maxElementsInMemory:缓存最大个数。
  • eternal:缓存中对象是否为永久的,如果是,超时设置将被忽略,对象从不过期。
  • timeToIdleSeconds:置对象在失效前的允许闲置时间(单位:秒)。仅当eternal=false对象不是永久有效时使用,可选属性,默认值是0,也就是可闲置时间无穷大。
  • timeToLiveSeconds:缓存数据的生存时间(TTL),也就是一个元素从构建到消亡的最大时间间隔值,这只能在元素不是永久驻留时有效,如果该值是0就意味着元素可以停顿无穷长的时间。
  • maxEntriesLocalDisk:当内存中对象数量达到maxElementsInMemory时,Ehcache将会对象写到磁盘中。
  • overflowToDisk:内存不足时,是否启用磁盘缓存。
  • diskSpoolBufferSizeMB:这个参数设置DiskStore(磁盘缓存)的缓存区大小。默认是30MB。每个Cache都应该有自己的一个缓冲区。
  • maxElementsOnDisk:硬盘最大缓存个数。
  • diskPersistent:是否在VM重启时存储硬盘的缓存数据。默认值是false。
  • diskExpiryThreadIntervalSeconds:磁盘失效线程运行时间间隔,默认是120秒。
  • memoryStoreEvictionPolicy:当达到maxElementsInMemory限制时,Ehcache将会根据指定的策略去清理内存。默认策略是LRU(最近最少使用)。你可以设置为FIFO(先进先出)或是LFU(较少使用)。
  • clearOnFlush:内存数量最大时是否清除。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<ehcache xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="ehcache.xsd"
updateCheck="true"
monitoring="autodetect"
dynamicConfig="true">
<diskStore path="java.io.tmpdir"/>
<cache name="movieFindCache"
maxEntriesLocalHeap="10000"
maxEntriesLocalDisk="1000"
eternal="false"
diskSpoolBufferSizeMB="20"
timeToIdleSeconds="300" timeToLiveSeconds="600"
memoryStoreEvictionPolicy="LFU"
transactionalMode="off">
<persistence strategy="localTempSwap" />
</cache>
</ehcache>

对应的 java code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.mkyong.test;

import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.ehcache.EhCacheCacheManager;
import org.springframework.cache.ehcache.EhCacheManagerFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

@Configuration
@EnableCaching
@ComponentScan({ "com.mkyong.*" })
public class AppConfig {

@Bean
public CacheManager cacheManager() {
return new EhCacheCacheManager(ehCacheCacheManager().getObject());
}

@Bean
public EhCacheManagerFactoryBean ehCacheCacheManager() {
EhCacheManagerFactoryBean cmfb = new EhCacheManagerFactoryBean();
cmfb.setConfigLocation(new ClassPathResource("ehcache.xml"));
cmfb.setShared(true);
return cmfb;
}
}


package com.mkyong.movie;

import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Repository;

@Repository("movieDao")
public class MovieDaoImpl implements MovieDao{

//This "movieFindCache" is delcares in ehcache.xml
@Cacheable(value="movieFindCache", key="#name")
public Movie findByDirector(String name) {
slowQuery(2000L);
System.out.println("findByDirector is running...");
return new Movie(1,"Forrest Gump","Robert Zemeckis");
}

private void slowQuery(long seconds){
try {
Thread.sleep(seconds);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}

}

Spring Boot 2 + ECache3

ECache 是 Hibernate 中的默认缓存框架。

要引入 javax 的 cache api(JSR-107):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-cache</artifactId>
    <version>2.2.2.RELEASE</version></dependency>
<dependency>
    <groupId>javax.cache</groupId>
    <artifactId>cache-api</artifactId>
    <version>1.1.1</version>
</dependency>
<dependency>
    <groupId>org.ehcache</groupId>
    <artifactId>ehcache</artifactId>
    <version>3.8.1</version>
</dependency>

对应的缓存注解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Service
public class NumberService {

// 配置全都在 Cacheable 接口上
@Cacheable(
value = "squareCache",
key = "#number",
condition = "#number>10")
public BigDecimal square(Long number) {
BigDecimal square = BigDecimal.valueOf(number)
.multiply(BigDecimal.valueOf(number));
log.info("square of {} is {}", number, square);
return square;
}
}

@Configuration
// 注意,这里的配置,不需要自己再生成 cachemanager
@EnableCaching
public class CacheConfig {
}

public class CacheEventLogger
implements CacheEventListener<Object, Object> {

// ...

@Override
public void onEvent(
CacheEvent<? extends Object, ? extends Object> cacheEvent) {
log.info(/* message */,
cacheEvent.getKey(), cacheEvent.getOldValue(), cacheEvent.getNewValue());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.ehcache.org/v3"
xmlns:jsr107="http://www.ehcache.org/v3/jsr107"
xsi:schemaLocation="
http://www.ehcache.org/v3 http://www.ehcache.org/schema/ehcache-core-3.0.xsd
http://www.ehcache.org/v3/jsr107 http://www.ehcache.org/schema/ehcache-107-ext-3.0.xsd">

<cache alias="squareCache">
<key-type>java.lang.Long</key-type>
<value-type>java.math.BigDecimal</value-type>
<expiry>
<ttl unit="seconds">30</ttl>
</expiry>

<listeners>
<listener>
<class>com.baeldung.cachetest.config.CacheEventLogger</class>
<event-firing-mode>ASYNCHRONOUS</event-firing-mode>
<event-ordering-mode>UNORDERED</event-ordering-mode>
<events-to-fire-on>CREATED</events-to-fire-on>
<events-to-fire-on>EXPIRED</events-to-fire-on>
</listener>
</listeners>

<resources>
<heap unit="entries">2</heap>
<offheap unit="MB">10</offheap>
</resources>
</cache>

</config>

具体的其他 cache 操作的注解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Service
public class UserService {
// @Cacheable可以设置多个缓存,形式如:@Cacheable({"books", "isbns"})
@Cacheable({"users"})
public User findUser(User user) {
return findUserInDB(user.getId());
}

@Cacheable(value = "users", condition = "#user.getId() <= 2")
public User findUserInLimit(User user) {
return findUserInDB(user.getId());
}

@CachePut(value = "users", key = "#user.getId()")
public void updateUser(User user) {
updateUserInDB(user);
}

@CacheEvict(value = "users")
public void removeUser(User user) {
removeUserInDB(user.getId());
}

@CacheEvict(value = "users", allEntries = true)
public void clear() {
removeAllInDB();
}
}

@Caching(evict = { @CacheEvict("primary"), @CacheEvict(cacheNames="secondary", key="#p0") })
public Book importBooks(String deposit, Date date)


// 与前面的缓存注解不同,这是一个类级别的注解。
如果类的所有操作都是缓存操作,你可以使用@CacheConfig来指定类,省去一些配置。
@CacheConfig("books")
public class BookRepositoryImpl implements BookRepository {
@Cacheable
public Book findBook(ISBN isbn) {...}
}

可以考虑,定义自己的 KeyGenerator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
public class MyKeyGenerator implements KeyGenerator {
@Override
public Object generate(Object target, Method method, Object... params) {
return method.getName()+Arrays.toString(params);
}
}

@Cacheable(keyGenerator = "myKeyGenerator")
public User getUserById(Long id) {
User user = new User();
user.setId(id);
user.setUsername("lisi");
System.out.println(user);
return user;
}

另外,可以用的 key 的缓存专用 SPEL 表达式,在这里

Caffeine

它几个特别有意思的特性:time-based eviction、size-based eviction、异步加载、弱引用 key(不考虑 referenceQueue 的特性,WeakReference 是最适合我们用的)。

  • automatic loading of entries into the cache, optionally asynchronously
  • size-based eviction when a maximum is exceeded based on frequency and recency
  • time-based expiration of entries, measured since last access or last write
  • asynchronously refresh when the first stale request for an entry occurs
  • keys automatically wrapped in weak references
  • values automatically wrapped in weak or soft references
  • notification of evicted (or otherwise removed) entries
  • writes propagated to an external resource
  • accumulation of cache access statistics

不搭配 Spring

1
2
3
4
5
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.5.5</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
Cache<String, DataObject> cache = Caffeine.newBuilder()
  .expireAfterWrite(1, TimeUnit.MINUTES)
  .maximumSize(100)
  .build();
  
String key = "A";
DataObject dataObject = cache.getIfPresent(key);

assertNull(dataObject);

cache.put(key, dataObject);
dataObject = cache.getIfPresent(key);

assertNotNull(dataObject);

dataObject = cache
.get(key, k -> DataObject.get("Data for A"));

assertNotNull(dataObject);
assertEquals("Data for A", dataObject.getData());

// 同步加载
LoadingCache<String, DataObject> cache = Caffeine.newBuilder()
  .maximumSize(100)
  .expireAfterWrite(1, TimeUnit.MINUTES)
  .build(k -> DataObject.get("Data for " + k));

DataObject dataObject = cache.get(key);

assertNotNull(dataObject);
assertEquals("Data for " + key, dataObject.getData());

Map<String, DataObject> dataObjectMap
  = cache.getAll(Arrays.asList("A", "B", "C"));
 
assertEquals(3, dataObjectMap.size());
  
// 异步加载
AsyncLoadingCache<String, DataObject> cache = Caffeine.newBuilder()
.maximumSize(100)
.expireAfterWrite(1, TimeUnit.MINUTES)
.buildAsync(k -> DataObject.get("Data for " + k));

String key = "A";
cache.get(key).thenAccept(dataObject -> {
assertNotNull(dataObject);
assertEquals("Data for " + key, dataObject.getData());
});
cache.getAll(Arrays.asList("A", "B", "C"))
.thenAccept(dataObjectMap -> assertEquals(3, dataObjectMap.size()));

// 基于 size 的淘汰
LoadingCache<String, DataObject> cache = Caffeine.newBuilder()
  .maximumSize(1)
  .build(k -> DataObject.get("Data for " + k));
 
assertEquals(0, cache.estimatedSize());

// 等待异步淘汰完成才同步返回
cache.cleanUp();

// 基于权重的淘汰
LoadingCache<String, DataObject> cache = Caffeine.newBuilder()
  .maximumWeight(10)
  .weigher((k,v) -> 5)
  .build(k -> DataObject.get("Data for " + k));
 
assertEquals(0, cache.estimatedSize());
 
cache.get("A");
assertEquals(1, cache.estimatedSize());
 
cache.get("B");
assertEquals(2, cache.estimatedSize());

// 基于访问时间的淘汰
LoadingCache<String, DataObject> cache = Caffeine.newBuilder()
.expireAfterAccess(5, TimeUnit.MINUTES)
.build(k -> DataObject.get("Data for " + k));

// 基于写时间的淘汰
cache = Caffeine.newBuilder()
  .expireAfterWrite(10, TimeUnit.SECONDS)
  .weakKeys()
  .weakValues()
  .build(k -> DataObject.get("Data for " + k));

// 自定义基于时间的淘汰策略
cache = Caffeine.newBuilder().expireAfter(new Expiry<String, DataObject>() {
@Override
public long expireAfterCreate(
String key, DataObject value, long currentTime) {
return value.getData().length() * 1000;
}
@Override
public long expireAfterUpdate(
String key, DataObject value, long currentTime, long currentDuration) {
return currentDuration;
}
@Override
public long expireAfterRead(
String key, DataObject value, long currentTime, long currentDuration) {
return currentDuration;
}
}).build(k -> DataObject.get("Data for " + k));

// key 和 value 使用不同的引用。value 只能使用 softValues。
LoadingCache<String, DataObject> cache = Caffeine.newBuilder()
  .expireAfterWrite(10, TimeUnit.SECONDS)
  .weakKeys()
  .weakValues()
  .build(k -> DataObject.get("Data for " + k));
 
cache = Caffeine.newBuilder()
  .expireAfterWrite(10, TimeUnit.SECONDS)
  .softValues()
  .build(k -> DataObject.get("Data for " + k));

// 自动 populate
Caffeine.newBuilder()
  .refreshAfterWrite(1, TimeUnit.MINUTES)
  .build(k -> DataObject.get("Data for " + k));

// 获取统计数据
LoadingCache<String, DataObject> cache = Caffeine.newBuilder()
  .maximumSize(100)
  .recordStats()
  .build(k -> DataObject.get("Data for " + k));
cache.get("A");
cache.get("A");

assertEquals(1, cache.stats().hitCount());
assertEquals(1, cache.stats().missCount());

这里的 get 类操作的原子性特别重要:

method invocation is performed atomically, so the function is applied
at most once per key. Some attempted update operations on this cache
by other threads may be blocked while the computation is in progress,
so the computation should be short and simple, and must not attempt to
update any other mappings of this cache.

这样可以保证线性一致性,实现立即可见(类似强广播),但中间插入的这个原子操作必须短,类似 redis 的 set 才可以。

搭配 Spring

1
2
3
4
5
6
7
8
9
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.6.0</version>
</dependency>

相关的配置文件:

  • initialCapacity: # 初始的缓存空间大小
  • maximumSize: # 缓存的最大条数
  • maximumWeight: # 缓存的最大权重
  • expireAfterAccess: # 最后一次写入或访问后经过固定时间过期
  • expireAfterWrite: # 最后一次写入后经过固定时间过期
  • refreshAfterWrite: # 创建缓存或者最近一次更新缓存后经过固定的时间间隔,刷新缓存
  • weakKeys: # 打开 key 的弱引用
  • weakValues: # 打开 value 的弱引用
  • softValues: # 打开 value 的软引用
  • recordStats: # 开发统计功能
1
2
3
4
5
6
spring:
cache:
type: caffeine
caffeine:
spec: maximumSize=1024
cache-names: cache1,cache2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

@Configuration
public class CaffeineCacheConfig {

public CacheManager cacheManager() {
CaffeineCacheManager cacheManager = new CaffeineCacheManager("customer");
cacheManager.setCaffeine(caffeineCacheBuilder());
return cacheManager;
}

Caffeine < Object, Object > caffeineCacheBuilder() {
return Caffeine.newBuilder()
.initialCapacity(100)
.maximumSize(500)
.expireAfterAccess(10, TimeUnit.MINUTES)
.weakKeys()
.recordStats();
}
}

public interface CustomerService {
Customer getCustomer(final Long customerID);
}
// Implementation
@Service
@CacheConfig(cacheNames = {"customer"})
public class DefaultCustomerService implements CustomerService {

private static final Logger LOG = LoggerFactory.getLogger(DefaultCustomerService.class);

@Cacheable
@Override
public Customer getCustomer(Long customerID) {
LOG.info("Trying to get customer information for id {} ",customerID);
return getCustomerData(customerID);
}

private Customer getCustomerData(final Long id){
Customer customer = new Customer(id, "testemail@test.com", "Test Customer");
return customer;
}
}

如何应对缓存 miss 的问题

cache miss 在中文的语境里经常被人分为缓存击穿、缓存雪崩和缓存穿透,这三种类型并不完全互斥穷举,在概念上极其容易造成混淆。在这一段总结的时候姑且依照这三种类型分别加以论述。

如何应对缓存击穿(breakthrough)

缓存击穿,指的是某个 key 应该访问缓存,却没有访问到,导致缓存通过兜底的策略去更下游的冷存储加载内容,给下游的系统造成了读压力。

应对这个问题的基本思路有:

  1. 事前:

  2. 在可能热点数据的访问高峰到达以前,提前把数据预热。

  3. 永远不要让缓存失效-这样缓存 stale 以后,怎么保鲜是个巨大的问题,只能靠一个后端的主动更新机制来尽最大努力来更新缓存。这种方案是一致性最差的。

  4. 在缓存击穿以前,主动更新缓存,即不让缓存击穿发生,即同时才有 expireAfter(t1) + loadAfter(t2) 的策略。

  5. 极端热的数据,不允许缓存被动失效,必须使用主动更新的模式。

  6. 并发操作下更新缓存一定要注意顺序!如果有消息来更新更要注意顺序!

  7. 定时任务和广播刷新有时候可以互相补充-定时任务是超时的补充。

  8. 事中:

  9. 在缓存击穿的时候,严格限制读冷存储 + 预热缓存的流量,即有限降级,有损服务。

  10. 如果缓存更新是同步读写(Cache Aside 或者 Read/Write Through)的模式,则引入各种限流工具(限制线程数的线程池/信号量/SLA/Rhino/Redis 计数器/线程内计数器/Hystrix/Web 容器的限流器),保障可用性的同时保障吞吐量。

  11. 如果缓存更新可以异步主动更新,则考虑单线程执行或者使用消息队列进行低流量更新。能怎样在事中限制这个问题,取决于缓存和读写接入层之间本来的架构关系是如何设计的。

  12. 某类特别热的 key 可能一旦失效会导致大量的读,这种 key 的实际更新流程还要加上分布式锁-而且还要使用试锁而不能使用阻塞锁-facebook 的论文里没有提到这种策略,不知道是不是数据很均匀。

  13. 事后:

  14. 如果系统无法自愈,熔断拒绝服务以后(所以熔断、降级限流每一手准备都要准备好,可以用限流为 0 来制造熔断),手工预热缓存。

如何应对缓存雪崩

雪崩问题,指的是:大规模的缓存失效,再加上大规模的访问流量,造成对后端非高可用的冷存储(通常是 RDBMS)的大规模读写,导致 RDBMS 可用性下降,甚至整个系统级联崩溃。

从某种意义上,单一缓存的击穿并不可怕,缓存雪崩才是最可怕的。

应对缓存雪崩问题,基本思路是大规模使用应对缓存击穿的基础策略的基础上,把缓存预热的行为模式打散。

基于超时时间的思路是:不同的 key 设置不同的超时时间,让缓存失效不同时到来。但这样并不能完全解决问题,因为缓存并不是失效以后就直接可以被加载上,除非缓存自带异步自加载的机制(很多 in-memory cache 有,但 Redis 没有),否则不均匀的流量还是可能到达缓存后导致大规模击穿。对超时时间的方案的加强版是,采用一套主动更新缓存的机制。

基于预热的思路是:缓存一开始分好集群。允许某些集群的上游准备好熔断,然后集体停下流量以后,使用脚本批量预热整个集群数据。

如何应对缓存穿透(penetration)

缓存穿透不同于缓存击穿。

缓存穿透指的是试图查询不存在的缓存数据。

可以针对缓存穿透来刷冷数据,导致整个集群频繁查询冷存储而崩溃。

解决方案有:

  1. 对明显不符合要求的请求,直接返回 null。
  2. 使用一个大的 bitmap 或者布隆过滤器来拦截可能不存在的请求,直接返回 null。
  3. 缓存穿透一次,就在 cache 中存上 null - 允许使用 null 的缓存能够天然抵挡缓存穿透问题。Guava 的缺点就在这里被体现出来了

以上措施混合使用的话,必须考虑缓存里的 null。 必须有超时时间,而且应该有对应的无 null。 以后主动更新的机制,否则这个空值就被污染了。

远端缓存与近端缓存的辨析

缓存在哪端,哪端就能定制它的行为,但要供应它消耗的资源。近端缓存通常简单,但也就意味着没有什么功能。

远端缓存的好处

自带广播、同步和共识功能,能够对接写入服务。
自带独立的集群,有专业的运维人员,适合存储海量数据。

远端缓存的坏处

制造了复杂的依赖,比如接入变复杂、流程变复杂。
所有的服务都依赖于一个服务,配置和流程不易于差异化,冲突比例增多。

近端缓存的好处

接入简单。
自己可以把控自己的缓存使用逻辑。

近端缓存的坏处

相对于广播同步一致性难度大,通信成本高-易引起通信风暴。
占用内存变大,无法解决海量数据存储。

十、缓存与数据库一致性深度分析

在分布式系统中,缓存与数据库的数据一致性是永恒的难题。本节深入分析几种高级一致性方案。

延迟双删策略

问题描述:在 Cache Aside 模式下,先删除缓存再更新数据库,在更新完成之前,如果有读请求进来,会读取到旧数据并写入缓存,导致缓存与数据库不一致。

解决方案:延迟双删策略通过在数据库更新后再次删除缓存来解决这个问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class DelayedDoubleDeleteCacheService {
private JedisPool jedisPool;

/**
* 延迟双删策略
* @param key 缓存key
* @param value 新值
* @param delayMs 延迟删除时间(毫秒)
*/
public void updateWithDelayedDoubleDelete(String key, String value, long delayMs) {
// 第一次删除缓存
deleteCache(key);

// 更新数据库
updateDatabase(key, value);

// 延迟后再次删除缓存
scheduleDelayedDelete(key, delayMs);
}

private void deleteCache(String key) {
try (Jedis jedis = jedisPool.getResource()) {
jedis.del(key);
}
}

private void updateDatabase(String key, String value) {
// 数据库更新逻辑
}

private void scheduleDelayedDelete(String key, long delayMs) {
new Thread(() -> {
try {
Thread.sleep(delayMs);
deleteCache(key);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}

时序分析

sequenceDiagram
    participant Client
    participant Cache
    participant DB
    participant AsyncDelete
    
    Client->>Cache: 1. 删除缓存
    Client->>DB: 2. 更新数据库
    par 并发读请求
        Client->>Cache: 3a. 读取缓存(miss)
        Client->>DB: 3b. 读取数据库(旧值)
        Client->>Cache: 3c. 写入缓存(旧值)
    end
    Client->>AsyncDelete: 4. 延迟N秒后删除缓存
    AsyncDelete->>Cache: 5. 删除缓存

最佳实践

  • 延迟时间应大于数据库主从同步延迟 + 业务读请求平均耗时
  • 建议延迟时间设置为 500ms-2s
  • 配合消息队列实现异步删除,避免线程阻塞

Canal/Binlog 订阅方案

问题描述:业务代码中维护缓存一致性逻辑复杂,容易遗漏,且侵入性强。

解决方案:通过监听 MySQL binlog,异步更新缓存,实现最终一致性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

import java.net.InetSocketAddress;
import java.util.List;

public class CanalCacheSyncService {
private CanalConnector connector;
private JedisPool jedisPool;

public void start() {
connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example",
"",
""
);
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();

while (true) {
Message message = connector.getWithoutAck(100);
long batchId = message.getId();
int size = message.getEntries().size();

if (batchId != -1 && size > 0) {
processEntries(message.getEntries());
}

connector.ack(batchId);
}
}

private void processEntries(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
continue;
}

String tableName = entry.getHeader().getTableName();
CanalEntry.EventType eventType = rowChange.getEventType();

for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType == CanalEntry.EventType.INSERT ||
eventType == CanalEntry.EventType.UPDATE) {
// 处理插入和更新
handleInsertOrUpdate(tableName, rowData.getAfterColumnsList());
} else if (eventType == CanalEntry.EventType.DELETE) {
// 处理删除
handleDelete(tableName, rowData.getBeforeColumnsList());
}
}
}
}
}

private void handleInsertOrUpdate(String tableName, List<CanalEntry.Column> columns) {
String cacheKey = buildCacheKey(tableName, columns);
String cacheValue = buildCacheValue(columns);

try (Jedis jedis = jedisPool.getResource()) {
jedis.setex(cacheKey, 3600, cacheValue);
}
}

private void handleDelete(String tableName, List<CanalEntry.Column> columns) {
String cacheKey = buildCacheKey(tableName, columns);

try (Jedis jedis = jedisPool.getResource()) {
jedis.del(cacheKey);
}
}

private String buildCacheKey(String tableName, List<CanalEntry.Column> columns) {
// 根据表名和主键构建缓存key
return tableName + ":" + getColumnValue(columns, "id");
}

private String buildCacheValue(List<CanalEntry.Column> columns) {
StringBuilder sb = new StringBuilder("{");
for (CanalEntry.Column column : columns) {
sb.append("\"").append(column.getName()).append("\":\"")
.append(column.getValue()).append("\",");
}
sb.deleteCharAt(sb.length() - 1);
sb.append("}");
return sb.toString();
}

private String getColumnValue(List<CanalEntry.Column> columns, String columnName) {
return columns.stream()
.filter(c -> c.getName().equals(columnName))
.findFirst()
.map(CanalEntry.Column::getValue)
.orElse("");
}
}

架构图

graph LR
    A[业务应用] -->|写操作| B[(MySQL)]
    B -->|Binlog| C[Canal Server]
    C -->|订阅| D[Canal Client]
    D -->|解析| E[缓存更新服务]
    E -->|更新/删除| F[(Redis)]
    A -->|读操作| F

最佳实践

  • 使用消息队列(如 Kafka)缓冲 binlog 事件,提高吞吐量
  • 实现幂等性,避免重复更新
  • 监控 binlog 延迟,及时发现同步问题

分布式事务与缓存

问题描述:在分布式事务场景下,数据库和缓存的操作原子性难以保证。

解决方案:使用 TCC(Try-Confirm-Cancel)模式或 Saga 模式,结合缓存补偿机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Pipeline;

public class DistributedTransactionCacheService {
private JedisPool jedisPool;

/**
* TCC 模式下的缓存操作
*/
public void updateWithTCC(String key, String value) {
// Try 阶段:预留资源
try {
tryPhase(key, value);

// 确认阶段:提交事务
confirmPhase(key, value);

} catch (Exception e) {
// 取消阶段:回滚事务
cancelPhase(key);
throw e;
}
}

private void tryPhase(String key, String value) {
try (Jedis jedis = jedisPool.getResource()) {
// 设置临时标记
jedis.setex(key + ":try", 30, "processing");

// 记录原始值(用于回滚)
String oldValue = jedis.get(key);
if (oldValue != null) {
jedis.setex(key + ":old", 30, oldValue);
}
}
}

private void confirmPhase(String key, String value) {
try (Jedis jedis = jedisPool.getResource()) {
Pipeline pipeline = jedis.pipelined();

// 更新缓存
pipeline.setex(key, 3600, value);

// 清理临时标记
pipeline.del(key + ":try");
pipeline.del(key + ":old");

pipeline.sync();
}
}

private void cancelPhase(String key) {
try (Jedis jedis = jedisPool.getResource()) {
Pipeline pipeline = jedis.pipelined();

// 恢复原始值
String oldValue = jedis.get(key + ":old");
if (oldValue != null) {
pipeline.setex(key, 3600, oldValue);
} else {
pipeline.del(key);
}

// 清理临时标记
pipeline.del(key + ":try");
pipeline.del(key + ":old");

pipeline.sync();
}
}
}

版本号/时间戳方案

问题描述:并发更新时,旧数据可能覆盖新数据,导致缓存脏数据。

解决方案:使用版本号或时间戳实现乐观锁机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class VersionedCacheService {
private JedisPool jedisPool;

/**
* 使用版本号的缓存更新
*/
public boolean updateWithVersion(String key, String value, long expectedVersion) {
try (Jedis jedis = jedisPool.getResource()) {
// 使用 Lua 脚本保证原子性
String luaScript =
"local currentVersion = redis.call('HGET', KEYS[1], 'version') " +
"if currentVersion == false then " +
" return 0 " +
"end " +
"if tonumber(currentVersion) == tonumber(ARGV[1]) then " +
" redis.call('HMSET', KEYS[1], 'value', ARGV[2], 'version', ARGV[3]) " +
" return 1 " +
"else " +
" return 0 " +
"end";

Long result = (Long) jedis.eval(
luaScript,
1,
key + ":versioned",
String.valueOf(expectedVersion),
value,
String.valueOf(expectedVersion + 1)
);

return result == 1;
}
}

/**
* 获取带版本号的数据
*/
public VersionedValue getWithVersion(String key) {
try (Jedis jedis = jedisPool.getResource()) {
String value = jedis.hget(key + ":versioned", "value");
String version = jedis.hget(key + ":versioned", "version");

if (value == null) {
return null;
}

return new VersionedValue(value, Long.parseLong(version));
}
}

public static class VersionedValue {
private String value;
private long version;

public VersionedValue(String value, long version) {
this.value = value;
this.version = version;
}

public String getValue() {
return value;
}

public long getVersion() {
return version;
}
}
}

Facebook Memcache 论文要点

Lease 机制:防止多个客户端同时缓存未命中数据,导致数据库压力。

sequenceDiagram
    participant Client1
    participant Client2
    participant Cache
    participant DB
    
    Client1->>Cache: 1. Get(key) miss
    Cache->>Client1: 2. 返回 lease (1秒有效)
    Client1->>DB: 3. 从DB加载数据
    Client2->>Cache: 4. Get(key) miss
    Cache->>Client2: 5. 返回 lease (等待中)
    Client1->>Cache: 6. Set(key, value) with lease
    Cache->>Client2: 7. 通知数据已更新
    Client2->>Cache: 8. Get(key) hit
    Cache->>Client2: 9. 返回数据

十一、热点探测与热 Key 治理

热点 Key 的定义和危害

定义:单个 key 的访问频率远超平均水平,导致单个缓存节点压力过大。

危害

  • 单节点 CPU/内存资源耗尽
  • 网络带宽瓶颈
  • 影响其他 key 的访问性能
  • 可能导致整个缓存集群不可用

热点探测方案

客户端统计
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ClientSideHotKeyDetector {
private ConcurrentHashMap<String, AtomicLong> keyCounter = new ConcurrentHashMap<>();
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private long hotThreshold = 1000; // QPS 阈值

public ClientSideHotKeyDetector() {
// 每秒统计一次
scheduler.scheduleAtFixedRate(this::detectHotKeys, 1, 1, TimeUnit.SECONDS);
}

public void recordAccess(String key) {
keyCounter.computeIfAbsent(key, k -> new AtomicLong(0)).incrementAndGet();
}

private void detectHotKeys() {
long currentTime = System.currentTimeMillis();
keyCounter.forEach((key, count) -> {
long qps = count.getAndSet(0);
if (qps > hotThreshold) {
System.out.println("检测到热点 Key: " + key + ", QPS: " + qps);
handleHotKey(key);
}
});
}

private void handleHotKey(String key) {
// 通知治理服务
// 可以通过消息队列发送热点事件
}
}
服务端统计(Redis hotkeys)
1
2
3
4
5
6
7
8
9
# Redis 4.0+ 提供 hotkeys 命令
redis-cli --hotkeys

# 输出示例:
# # Hotkeys
#
# 1) "user:profile:1001"
# 2) (integer) 1234567
# 3) (integer) 1234567890.1234567890
京东 HotKey 框架

架构图

graph TB
    A[客户端应用] -->|上报访问| B[Worker节点]
    A -->|查询热点| C[本地缓存]
    B -->|聚合统计| D[Etcd集群]
    D -->|推送热点| B
    B -->|推送热点| A
    D -->|持久化| E[(数据库)]

热 Key 治理方案

本地缓存兜底(L1 Cache)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

import java.util.concurrent.TimeUnit;

public class TwoLevelCacheService {
private JedisPool redisPool;
private Cache<String, String> localCache;

public TwoLevelCacheService() {
localCache = Caffeine.newBuilder()
.maximumSize(1000)
.expireAfterWrite(10, TimeUnit.SECONDS)
.build();
}

public String get(String key) {
// L1: 本地缓存
String value = localCache.getIfPresent(key);
if (value != null) {
return value;
}

// L2: Redis 缓存
try (Jedis jedis = redisPool.getResource()) {
value = jedis.get(key);

if (value != null) {
// 写入本地缓存
localCache.put(key, value);
}
}

return value;
}

public void set(String key, String value) {
try (Jedis jedis = redisPool.getResource()) {
jedis.setex(key, 3600, value);
}

// 同时更新本地缓存
localCache.put(key, value);
}
}
Key 分散/打散
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.util.Random;

public class KeyDistributeService {
private JedisPool jedisPool;
private static final int SHARD_COUNT = 10;
private Random random = new Random();

/**
* 写入:分片写入
*/
public void writeDistributed(String baseKey, String value) {
int shardId = random.nextInt(SHARD_COUNT);
String distributedKey = baseKey + ":" + shardId;

try (Jedis jedis = jedisPool.getResource()) {
jedis.setex(distributedKey, 3600, value);
}
}

/**
* 读取:随机读取一个分片
*/
public String readDistributed(String baseKey) {
int shardId = random.nextInt(SHARD_COUNT);
String distributedKey = baseKey + ":" + shardId;

try (Jedis jedis = jedisPool.getResource()) {
return jedis.get(distributedKey);
}
}

/**
* 读取:尝试读取所有分片(最新数据)
*/
public String readAllShards(String baseKey) {
for (int i = 0; i < SHARD_COUNT; i++) {
String distributedKey = baseKey + ":" + i;
try (Jedis jedis = jedisPool.getResource()) {
String value = jedis.get(distributedKey);
if (value != null) {
return value;
}
}
}
return null;
}
}

对比表

方案 优点 缺点 适用场景
本地缓存 响应快,减轻Redis压力 数据不一致,内存占用大 读多写少,一致性要求不高
Key分散 负载均衡,无单点 读取复杂,可能读到旧数据 写多读少,可接受最终一致
读写分离 读写分离,性能好 架构复杂,主从延迟 读多写少,可接受延迟

十二、大 Key 问题与治理

大 Key 的定义

类型 大 Key 定义
String 值大小 > 10KB
Hash 字段数 > 5000
List 元素数 > 5000
Set 元素数 > 5000
Sorted Set 元素数 > 5000

大 Key 的危害

  • 网络带宽:大 Key 传输占用大量带宽,影响其他请求
  • 内存碎片:大 Key 申请大块内存,难以重用,导致碎片
  • 慢查询:大 Key 操作耗时,阻塞其他请求
  • 主从同步延迟:大 Key 同步到从节点慢,导致主从延迟
  • 阻塞风险:DEL、HDEL 等命令可能阻塞 Redis

发现手段

redis-cli --bigkeys
1
2
3
4
5
6
7
8
9
10
11
12
13
14
redis-cli --bigkeys

# 输出示例:
# -------- summary -------
# Sampled 1024 keys in the keyspace!
# Total key length in bytes is 123456 (avg len 120.56)
#
# Biggest string found 'big_key' has 102400 bytes
#
# 1024 strings with 123456 bytes (100.00% of keys, avg size 120.56)
# 0 lists with 0 bytes (00.00% of keys, avg size 0.00)
# 0 sets with 0 bytes (00.00% of keys, avg size 0.00)
# 0 hashs with 0 bytes (00.00% of keys, avg size 0.00)
# 0 zsets with 0 bytes (00.00% of keys, avg size 0.00)
memory usage
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class BigKeyAnalyzer {
private JedisPool jedisPool;

public void analyzeKeySize(String key) {
try (Jedis jedis = jedisPool.getResource()) {
String type = jedis.type(key);
long memory = jedis.memoryUsage(key);

System.out.println("Key: " + key);
System.out.println("Type: " + type);
System.out.println("Memory: " + memory + " bytes (" + memory / 1024 + " KB)");

switch (type) {
case "string":
long len = jedis.strlen(key);
System.out.println("Length: " + len);
break;
case "hash":
long hlen = jedis.hlen(key);
System.out.println("Hash fields: " + hlen);
break;
case "list":
long llen = jedis.llen(key);
System.out.println("List elements: " + llen);
break;
case "set":
long scard = jedis.scard(key);
System.out.println("Set members: " + scard);
break;
case "zset":
long zcard = jedis.zcard(key);
System.out.println("Sorted set members: " + zcard);
break;
}
}
}
}

治理方案

拆分大 Key
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.util.HashMap;
import java.util.Map;

public class BigKeySplitService {
private JedisPool jedisPool;
private static final int MAX_HASH_SIZE = 1000;

/**
* 拆分大 Hash
*/
public void splitBigHash(String bigKey) {
try (Jedis jedis = jedisPool.getResource()) {
Map<String, String> bigHash = jedis.hgetAll(bigKey);

if (bigHash.size() <= MAX_HASH_SIZE) {
return; // 不需要拆分
}

// 拆分成多个小 Hash
int shardIndex = 0;
Map<String, String> shard = new HashMap<>();

for (Map.Entry<String, String> entry : bigHash.entrySet()) {
shard.put(entry.getKey(), entry.getValue());

if (shard.size() >= MAX_HASH_SIZE) {
String shardKey = bigKey + ":" + shardIndex;
jedis.hmset(shardKey, shard);
shard.clear();
shardIndex++;
}
}

// 处理剩余数据
if (!shard.isEmpty()) {
String shardKey = bigKey + ":" + shardIndex;
jedis.hmset(shardKey, shard);
}

// 删除原始大 Key
jedis.del(bigKey);
}
}

/**
* 从拆分后的 Hash 中读取
*/
public Map<String, String> getFromSplitHash(String baseKey, String field) {
try (Jedis jedis = jedisPool.getResource()) {
// 计算字段所在的分片
int shardIndex = Math.abs(field.hashCode()) % 10;
String shardKey = baseKey + ":" + shardIndex;

String value = jedis.hget(shardKey, field);

Map<String, String> result = new HashMap<>();
result.put(field, value);
return result;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class AsyncDeleteService {
private JedisPool jedisPool;

/**
* 使用 UNLINK 异步删除大 Key
*/
public void asyncDelete(String key) {
try (Jedis jedis = jedisPool.getResource()) {
// UNLINK 是 DEL 的异步版本,不会阻塞 Redis
jedis.unlink(key);
}
}

/**
* 批量异步删除
*/
public void batchAsyncDelete(String pattern) {
try (Jedis jedis = jedisPool.getResource()) {
// 使用 SCAN 遍历匹配的 key
String cursor = "0";

do {
var scanResult = jedis.scan(cursor);
cursor = scanResult.getCursor();

for (String key : scanResult.getResult()) {
jedis.unlink(key);
}

} while (!cursor.equals("0"));
}
}
}
压缩存储
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.util.Base64;
import java.util.zip.Deflater;
import java.util.zip.Inflater;

public class CompressedCacheService {
private JedisPool jedisPool;

/**
* 压缩存储
*/
public void setCompressed(String key, String value) {
try {
byte[] compressed = compress(value);
String encoded = Base64.getEncoder().encodeToString(compressed);

try (Jedis jedis = jedisPool.getResource()) {
jedis.setex(key, 3600, encoded);
}
} catch (Exception e) {
throw new RuntimeException("压缩失败", e);
}
}

/**
* 解压缩读取
*/
public String getCompressed(String key) {
try (Jedis jedis = jedisPool.getResource()) {
String encoded = jedis.get(key);
if (encoded == null) {
return null;
}

byte[] compressed = Base64.getDecoder().decode(encoded);
return decompress(compressed);
} catch (Exception e) {
throw new RuntimeException("解压缩失败", e);
}
}

private byte[] compress(String data) throws Exception {
Deflater deflater = new Deflater();
deflater.setInput(data.getBytes());
deflater.finish();

byte[] buffer = new byte[data.length()];
int compressedSize = deflater.deflate(buffer);
deflater.end();

byte[] result = new byte[compressedSize];
System.arraycopy(buffer, 0, result, 0, compressedSize);
return result;
}

private String decompress(byte[] data) throws Exception {
Inflater inflater = new Inflater();
inflater.setInput(data);

byte[] buffer = new byte[data.length * 10];
int resultSize = inflater.inflate(buffer);
inflater.end();

return new String(buffer, 0, resultSize);
}
}

十三、多级缓存架构实战

经典三级架构

graph TB
    A[用户请求] --> B[Nginx Lua]
    B -->|L1 Cache| C{命中?}
    C -->|是| D[返回数据]
    C -->|否| E[本地缓存 Caffeine]
    E -->|L2 Cache| F{命中?}
    F -->|是| D
    F -->|否| G[Redis]
    G -->|L3 Cache| H{命中?}
    H -->|是| D
    H -->|否| I[数据库]
    I -->|回源| G
    G -->|回源| E
    E -->|回源| B

OpenResty + Lua 实现接入层缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
-- ngx_shared_dict 定义在 nginx.conf 中
local cache = ngx.shared.my_cache

local function get_from_cache(key)
local cached_data = cache:get(key)
if cached_data then
return cached_data
end
return nil
end

local function set_to_cache(key, value, ttl)
local success, err = cache:set(key, value, ttl)
if not success then
ngx.log(ngx.ERR, "failed to set cache: ", err)
end
end

local function fetch_from_upstream(key)
local http = require "resty.http"
local httpc = http.new()

local res, err = httpc:request_uri("http://backend/api/data/" .. key, {
method = "GET",
timeout = 5000
})

if not res then
return nil, err
end

return res.body
end

local key = ngx.var.arg_key or "default"
local cached_data = get_from_cache(key)

if cached_data then
ngx.say(cached_data)
ngx.log(ngx.INFO, "cache hit for key: ", key)
else
local data, err = fetch_from_upstream(key)
if data then
set_to_cache(key, data, 300) -- 缓存5分钟
ngx.say(data)
ngx.log(ngx.INFO, "cache miss, fetched from upstream")
else
ngx.status = 500
ngx.say("failed to fetch data")
ngx.log(ngx.ERR, "failed to fetch from upstream: ", err)
end
end

JetCache 框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import com.alicp.jetcache.Cache;
import com.alicp.jetcache.anno.CacheType;
import com.alicp.jetcache.anno.CacheInvalidate;
import com.alicp.jetcache.anno.CachePut;
import com.alicp.jetcache.anno.CacheRefresh;
import com.alicp.jetcache.anno.Cached;
import com.alicp.jetcache.anno.CreateCache;
import org.springframework.stereotype.Service;

@Service
public class JetCacheUserService {

// 创建两级缓存
@CreateCache(name = "userCache", expire = 3600, cacheType = CacheType.BOTH, localLimit = 1000)
private Cache<Long, User> userCache;

@Cached(name = "userCache:", key = "#userId", expire = 3600)
public User getUserById(Long userId) {
// 从数据库加载
return userRepository.findById(userId);
}

@CachePut(name = "userCache:", key = "#user.id")
public User updateUser(User user) {
return userRepository.save(user);
}

@CacheInvalidate(name = "userCache:", key = "#userId")
public void deleteUser(Long userId) {
userRepository.deleteById(userId);
}

// 定时刷新热点数据
@Cached(name = "userCache:", key = "#userId", expire = 3600, cacheRefreshInterval = 60)
@CacheRefresh(refreshInterval = 60, stopRefreshAfterLastAccess = 3600)
public User getHotUserById(Long userId) {
return userRepository.findById(userId);
}
}

多级缓存的一致性挑战

广播失效方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

import java.util.List;

public class CacheInvalidationService {
private RedisTemplate<String, String> redisTemplate;
private RedisMessageListenerContainer listenerContainer;

/**
* 发布缓存失效消息
*/
public void publishInvalidation(String cacheKey) {
redisTemplate.convertAndSend("cache:invalidation", cacheKey);
}

/**
* 订阅缓存失效消息
*/
public void subscribeInvalidation(List<String> localCacheKeys) {
MessageListenerAdapter listener = new MessageListenerAdapter(
new CacheInvalidationListener(localCacheKeys)
);

listenerContainer.addMessageListener(listener, new ChannelTopic("cache:invalidation"));
}

public static class CacheInvalidationListener {
private List<String> localCacheKeys;

public CacheInvalidationListener(List<String> localCacheKeys) {
this.localCacheKeys = localCacheKeys;
}

public void handleMessage(String message) {
// 删除本地缓存
localCacheKeys.remove(message);
System.out.println("Local cache invalidated: " + message);
}
}
}

缓存预热策略

策略 优点 缺点 适用场景
全量预热 启动后立即可用 耗时长,占用资源 数据量小,启动时间不敏感
增量预热 快速启动 可能缓存穿透 数据量大,可接受渐进式加载
懒加载 无需预热 首次访问慢 数据量大,访问模式不确定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.springframework.scheduling.annotation.Scheduled;

import java.util.List;
import java.util.concurrent.TimeUnit;

public class CacheWarmupService {
private Cache<String, String> cache;

public CacheWarmupService() {
cache = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(1, TimeUnit.HOURS)
.build();
}

/**
* 全量预热
*/
public void fullWarmup() {
List<String> hotKeys = getHotKeysFromDatabase();

hotKeys.parallelStream().forEach(key -> {
String value = loadFromDatabase(key);
cache.put(key, value);
});

System.out.println("Full warmup completed, " + hotKeys.size() + " keys loaded");
}

/**
* 增量预热(定时任务)
*/
@Scheduled(fixedDelay = 300000) // 每5分钟
public void incrementalWarmup() {
List<String> recentlyAccessedKeys = getRecentlyAccessedKeys();

recentlyAccessedKeys.forEach(key -> {
if (cache.getIfPresent(key) == null) {
String value = loadFromDatabase(key);
cache.put(key, value);
}
});
}

private List<String> getHotKeysFromDatabase() {
// 从数据库获取热点 key
return List.of("key1", "key2", "key3");
}

private List<String> getRecentlyAccessedKeys() {
// 获取最近访问的 key
return List.of("key1", "key4", "key5");
}

private String loadFromDatabase(String key) {
// 从数据库加载数据
return "value_for_" + key;
}
}

十四、Redis 集群模式下的缓存设计

Redis Cluster 的数据分片

Redis Cluster 使用 CRC16 算法对 key 进行哈希,计算结果对 16384 取模,确定 key 所在的 slot。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import redis.clients.jedis.ClusterCRC16;

public class RedisClusterSlotCalculator {

/**
* 计算 key 所在的 slot
*/
public static int calculateSlot(String key) {
// 提取 hash tag(如果有)
String hashTag = extractHashTag(key);
String keyToHash = hashTag != null ? hashTag : key;

// 使用 CRC16 计算
int slot = ClusterCRC16.getSlot(keyToHash);
return slot;
}

/**
* 提取 hash tag
* 例如:{user}:profile -> user
*/
private static String extractHashTag(String key) {
int start = key.indexOf('{');
if (start == -1) {
return null;
}

int end = key.indexOf('}', start);
if (end == -1) {
return null;
}

return key.substring(start + 1, end);
}

public static void main(String[] args) {
String key1 = "user:1001:profile";
String key2 = "{user:1001}:profile";
String key3 = "{user:1001}:orders";

System.out.println("Slot for " + key1 + ": " + calculateSlot(key1));
System.out.println("Slot for " + key2 + ": " + calculateSlot(key2));
System.out.println("Slot for " + key3 + ": " + calculateSlot(key3));

// key2 和 key3 使用相同的 hash tag,会在同一个 slot
}
}

跨 slot 操作的限制

在 Redis Cluster 中,不支持跨 slot 的多 key 操作,如 MGETMSETSUNION 等。

解决方案

  1. 使用 HashTag:确保相关 key 在同一个 slot
  2. 客户端分片:在客户端按 slot 分组,批量执行
  3. 使用 Hash 结构:将相关数据存储在一个 Hash 中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import redis.clients.jedis.JedisCluster;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class RedisClusterMultiKeyService {
private JedisCluster jedisCluster;

/**
* 方案1:使用 HashTag
*/
public void setWithHashTag(String userId, String profile, String orders) {
String profileKey = "{user:" + userId + "}:profile";
String ordersKey = "{user:" + userId + "}:orders";

jedisCluster.set(profileKey, profile);
jedisCluster.set(ordersKey, orders);

// 这两个 key 在同一个 slot,可以使用 MGET
List<String> values = jedisCluster.mget(profileKey, ordersKey);
}

/**
* 方案2:客户端分片执行 MGET
*/
public Map<String, String> smartMget(List<String> keys) {
// 按 slot 分组
Map<Integer, List<String>> slotGroups = keys.stream()
.collect(Collectors.groupingBy(RedisClusterSlotCalculator::calculateSlot));

Map<String, String> result = new HashMap<>();

// 每个 slot 单独执行 MGET
for (Map.Entry<Integer, List<String>> entry : slotGroups.entrySet()) {
List<String> slotKeys = entry.getValue();
List<String> values = jedisCluster.mget(slotKeys.toArray(new String[0]));

for (int i = 0; i < slotKeys.size(); i++) {
result.put(slotKeys.get(i), values.get(i));
}
}

return result;
}

/**
* 方案3:使用 Hash 结构
*/
public void setAsHash(String userId, Map<String, String> fields) {
String hashKey = "user:" + userId;

Map<String, String> hashData = new HashMap<>();
hashData.put("profile", fields.get("profile"));
hashData.put("orders", fields.get("orders"));
hashData.put("preferences", fields.get("preferences"));

jedisCluster.hmset(hashKey, hashData);
}

public Map<String, String> getAsHash(String userId) {
String hashKey = "user:" + userId;
return jedisCluster.hgetAll(hashKey);
}
}

HashTag 的使用

场景:需要原子操作多个相关的 key,如用户资料和订单。

graph LR
    A[用户请求] --> B{HashTag?}
    B -->|是| C[计算 hash tag 值]
    B -->|否| D[计算 key 值]
    C --> E[CRC16 哈希]
    D --> E
    E --> F[取模 16384]
    F --> G[确定 slot]
    G --> H[路由到对应节点]

最佳实践

  • 只在需要多 key 原子操作时使用 HashTag
  • HashTag 会破坏数据分布均匀性,不要滥用
  • HashTag 应该是业务相关的标识,如用户 ID、订单 ID

集群扩缩容对缓存的影响

Slot 迁移过程

sequenceDiagram
    participant Client
    participant SourceNode
    participant TargetNode
    participant MigrateTool
    
    Client->>SourceNode: 读写 key(slot 1000)
    MigrateTool->>SourceNode: 开始迁移 slot 1000
    SourceNode->>TargetNode: 迁移数据
    SourceNode->>Client: 返回 ASK 重定向
    Client->>TargetNode: 发送 ASKING 命令
    Client->>TargetNode: 读写 key
    MigrateTool->>SourceNode: 迁移完成
    MigrateTool->>TargetNode: slot 1000 归属更新
    Client->>TargetNode: 正常读写

应对策略

  1. 迁移期间:客户端支持 ASK 重定向
  2. 缓存预热:迁移完成后预热新节点
  3. 监控:监控迁移进度和性能指标
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import redis.clients.jedis.exceptions.JedisAskDataException;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import redis.clients.jedis.JedisCluster;

public class RedisClusterMigrationAwareClient {
private JedisCluster jedisCluster;

public String getWithRetry(String key) {
int maxRetries = 3;

for (int i = 0; i < maxRetries; i++) {
try {
return jedisCluster.get(key);
} catch (JedisMovedDataException e) {
// 槽已永久迁移,更新集群拓扑
System.out.println("Slot moved, retrying...");
jedisCluster.renewSlotCache();
} catch (JedisAskDataException e) {
// 槽正在迁移,尝试从目标节点获取
System.out.println("Slot migrating, asking target node...");
// JedisCluster 会自动处理 ASK 重定向
}
}

throw new RuntimeException("Failed to get key after " + maxRetries + " retries");
}
}

Redis Sentinel vs Cluster 的选型

特性 Redis Sentinel Redis Cluster
数据分片 是(16384 slots)
水平扩展 需要客户端分片 原生支持
最大内存 单节点限制 集群总内存
故障转移 自动 自动
复杂度 较低 较高
多 key 操作 支持 受限(需 HashTag)
适用场景 中小规模,简单架构 大规模,高性能需求

十五、缓存的安全与运维

缓存数据安全

敏感数据加密存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import javax.crypto.Cipher;
import javax.crypto.spec.SecretKeySpec;
import java.util.Base64;

public class SecureCacheService {
private JedisPool jedisPool;
private static final String SECRET_KEY = "my-secret-key-123"; // 16/24/32 bytes
private static final String ALGORITHM = "AES";

/**
* 加密存储
*/
public void setSecure(String key, String sensitiveValue) {
try {
String encrypted = encrypt(sensitiveValue);

try (Jedis jedis = jedisPool.getResource()) {
jedis.setex(key, 3600, encrypted);
}
} catch (Exception e) {
throw new RuntimeException("加密失败", e);
}
}

/**
* 解密读取
*/
public String getSecure(String key) {
try (Jedis jedis = jedisPool.getResource()) {
String encrypted = jedis.get(key);
if (encrypted == null) {
return null;
}

return decrypt(encrypted);
} catch (Exception e) {
throw new RuntimeException("解密失败", e);
}
}

private String encrypt(String data) throws Exception {
SecretKeySpec keySpec = new SecretKeySpec(SECRET_KEY.getBytes(), ALGORITHM);
Cipher cipher = Cipher.getInstance(ALGORITHM);
cipher.init(Cipher.ENCRYPT_MODE, keySpec);

byte[] encrypted = cipher.doFinal(data.getBytes());
return Base64.getEncoder().encodeToString(encrypted);
}

private String decrypt(String encrypted) throws Exception {
SecretKeySpec keySpec = new SecretKeySpec(SECRET_KEY.getBytes(), ALGORITHM);
Cipher cipher = Cipher.getInstance(ALGORITHM);
cipher.init(Cipher.DECRYPT_MODE, keySpec);

byte[] decoded = Base64.getDecoder().decode(encrypted);
byte[] decrypted = cipher.doFinal(decoded);
return new String(decrypted);
}
}

TTL 控制最佳实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class TTLManagementService {
private JedisPool jedisPool;

/**
* 设置不同的 TTL 策略
*/
public void setWithTTLStrategy(String key, String value, TTLStrategy strategy) {
try (Jedis jedis = jedisPool.getResource()) {
int ttl = strategy.getTTL();
jedis.setex(key, ttl, value);

// 记录 TTL 设置日志
System.out.println("Set key " + key + " with TTL: " + ttl + "s, strategy: " + strategy);
}
}

public enum TTLStrategy {
SHORT(60), // 短期缓存:1分钟
MEDIUM(300), // 中期缓存:5分钟
LONG(3600), // 长期缓存:1小时
VERY_LONG(86400); // 超长期缓存:1天

private final int seconds;

TTLStrategy(int seconds) {
this.seconds = seconds;
}

public int getTTL() {
return seconds;
}
}

/**
* 监控即将过期的 key
*/
public void monitorExpiringKeys() {
try (Jedis jedis = jedisPool.getResource()) {
// 查找即将在 10 分钟内过期的 key
// 注意:Redis 没有直接按 TTL 查找的命令,需要结合业务设计

// 方案1:使用有序集合记录过期时间
// 方案2:定期扫描并检查 TTL
}
}
}

缓存容量告警

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.info.Info;
import redis.clients.jedis.info.ServerInfo;

import java.util.Map;

public class CacheMonitoringService {
private JedisPool jedisPool;

/**
* 监控缓存容量
*/
public void monitorCapacity() {
try (Jedis jedis = jedisPool.getResource()) {
// 获取内存使用信息
String info = jedis.info("memory");
Map<String, String> memoryInfo = parseInfo(info);

long usedMemory = Long.parseLong(memoryInfo.get("used_memory"));
long maxMemory = Long.parseLong(memoryInfo.get("maxmemory"));
double usagePercent = (double) usedMemory / maxMemory * 100;

System.out.println("Memory Usage: " + String.format("%.2f", usagePercent) + "%");

// 获取驱逐统计
String stats = jedis.info("stats");
Map<String, String> statsInfo = parseInfo(stats);

long evictedKeys = Long.parseLong(statsInfo.get("evicted_keys"));
System.out.println("Evicted Keys: " + evictedKeys);

// 告警判断
if (usagePercent > 80) {
sendAlert("内存使用率过高: " + String.format("%.2f", usagePercent) + "%");
}

if (evictedKeys > 1000) {
sendAlert("驱逐次数过多: " + evictedKeys);
}
}
}

private Map<String, String> parseInfo(String info) {
Map<String, String> result = new HashMap<>();
String[] lines = info.split("\r?\n");

for (String line : lines) {
if (line.contains(":") && !line.startsWith("#")) {
String[] parts = line.split(":");
result.put(parts[0], parts[1]);
}
}

return result;
}

private void sendAlert(String message) {
// 发送告警通知(邮件、钉钉、短信等)
System.out.println("ALERT: " + message);
}
}

慢查询排查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class SlowQueryAnalyzer {
private JedisPool jedisPool;

/**
* 获取慢查询日志
*/
public void analyzeSlowQueries() {
try (Jedis jedis = jedisPool.getResource()) {
// 获取慢查询列表(最多 10 条)
var slowLog = jedis.slowlogGet(10);

for (var entry : slowLog) {
long id = entry.getId();
long timestamp = entry.getTimeStamp();
long duration = entry.getDuration(); // 微秒
String command = entry.getArguments().toString();

System.out.println("Slow Query ID: " + id);
System.out.println("Time: " + timestamp);
System.out.println("Duration: " + duration + " μs");
System.out.println("Command: " + command);
System.out.println("---");
}
}
}

/**
* 配置慢查询阈值
*/
public void configureSlowLog(long thresholdMicros) {
try (Jedis jedis = jedisPool.getResource()) {
// 设置慢查询阈值(单位:微秒)
jedis.configSet("slowlog-log-slower-than", String.valueOf(thresholdMicros));

// 设置慢查询日志最大长度
jedis.configSet("slowlog-max-len", "128");
}
}
}

缓存灾备

主从切换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.exceptions.JedisConnectionException;

public class CacheFailoverService {
private JedisPool masterPool;
private JedisPool slavePool;
private volatile JedisPool currentPool;

public CacheFailoverService(JedisPool masterPool, JedisPool slavePool) {
this.masterPool = masterPool;
this.slavePool = slavePool;
this.currentPool = masterPool;
}

public String getWithFailover(String key) {
int maxRetries = 3;

for (int i = 0; i < maxRetries; i++) {
try {
return getFromCurrentPool(key);
} catch (JedisConnectionException e) {
System.out.println("Connection failed, switching pool...");
switchPool();
} catch (Exception e) {
System.out.println("Error getting key: " + e.getMessage());
if (i == maxRetries - 1) {
throw e;
}
}
}

return null;
}

private String getFromCurrentPool(String key) {
try (Jedis jedis = currentPool.getResource()) {
return jedis.get(key);
}
}

private synchronized void switchPool() {
if (currentPool == masterPool) {
currentPool = slavePool;
System.out.println("Switched to slave pool");
} else {
currentPool = masterPool;
System.out.println("Switched to master pool");
}
}
}

跨机房容灾架构

graph TB
    A[用户请求] --> B[负载均衡]
    B --> C[机房A - Redis Master]
    B --> D[机房B - Redis Slave]
    C -->|同步| D
    C --> E[机房A - 应用]
    D --> F[机房B - 应用]
    C -->|异步复制| G[机房C - 冷备]
    D -->|异步复制| G

冷备恢复方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;

public class CacheBackupRestoreService {
private JedisPool jedisPool;

/**
* 创建 RDB 快照备份
*/
public void createSnapshot(String backupPath) {
try (Jedis jedis = jedisPool.getResource()) {
// 触发 BGSAVE
String result = jedis.bgsave();
System.out.println("BGSAVE command sent: " + result);

// 等待备份完成
waitForBackupComplete(jedis);

// 复制 RDB 文件到备份目录
File rdbFile = new File("/var/lib/redis/dump.rdb");
File backupFile = new File(backupPath + "/dump_" + System.currentTimeMillis() + ".rdb");

copyFile(rdbFile, backupFile);
System.out.println("Backup created: " + backupFile.getAbsolutePath());
}
}

private void waitForBackupComplete(Jedis jedis) {
while (true) {
String info = jedis.info("persistence");
if (info.contains("rdb_bgsave_in_progress:0")) {
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}

private void copyFile(File source, File dest) throws Exception {
try (FileInputStream fis = new FileInputStream(source);
FileOutputStream fos = new FileOutputStream(dest)) {
byte[] buffer = new byte[1024];
int length;
while ((length = fis.read(buffer)) > 0) {
fos.write(buffer, 0, length);
}
}
}

/**
* 从备份恢复
*/
public void restoreFromBackup(String backupPath) {
// 注意:恢复需要停机,在生产环境谨慎操作
System.out.println("Restore operation requires Redis restart");
System.out.println("Steps:");
System.out.println("1. Stop Redis server");
System.out.println("2. Replace dump.rdb with backup file");
System.out.println("3. Start Redis server");
}
}

参考文献:

  1. 《Guava Cache》
  2. 美团技术团队的《缓存那些事》
  3. 例子很多:《caffeine vs ehcache》