缓存的套路
本文探讨缓存设计的通用模式,涵盖从选型决策、更新策略到故障防护的完整体系。
mindmap
root((缓存架构))
何时使用
读多写少
热点集中
可容忍最终一致性
缓存层次
近端缓存
Guava
Caffeine
EhCache
远端缓存
Redis
Memcached
核心挑战
更新策略
Cache Aside
Read Through
Write Through
Write Behind
一致性保障
故障防护
击穿防护
雪崩防护
穿透防护
模式总览
#
模式名称
一句话口诀
适用场景
1
分层降级
本地兜底,远程扩展
多级缓存架构
2
惰性填充
触发加载,按需扩容
冷启动与预热
3
旁路同步
先库后删,读写互斥
Cache Aside 模式
4
空值防御
存空防击,短TTL控险
缓存穿透防护
5
随机散列
过期分散,渐进刷新
缓存雪崩防护
一、缓存的本质与适用场景
1.1 什么是缓存
缓存是一种以空间换时间 的优化技术,通过在计算代价较高的数据源(数据库、外部服务等)和消费方之间插入高速存储层,减少重复计算的执行次数。
1.2 何时使用缓存
满足以下全部条件的场景适合引入缓存:
读多写少 :读操作占比显著高于写操作(典型比例 > 10:1)
热点集中 :小部分数据承担大部分访问流量(帕累托分布)
可容忍最终一致性 :业务逻辑能接受短暂的数据不一致窗口
计算代价高 :原始数据源响应慢或资源消耗大
1.3 不适用缓存的场景
场景
原因
实时性要求极高(如交易撮合)
毫秒级延迟无法接受任何缓存失效
写多读少
同步成本超过缓存收益
数据变化极快(如股票行情)
缓存命中率趋近于零
数据量极大但无热点
缓存无法有效缩减数据集
二、缓存的层次与选型
2.1 近端缓存 vs 远端缓存
缓存按部署位置分为两类:
维度
近端缓存 (In-Memory)
远端缓存 (Remote)
典型实现
Guava, Caffeine, EhCache
Redis, Memcached
访问延迟
微秒级(~100μs)
毫秒级(~1-5ms)
容量限制
受限于单节点内存
水平扩展
数据共享
进程私有
多进程共享
运维复杂度
低(内置)
高(独立集群)
2.2 关于多级缓存
多级缓存(本地 + 远端)的设计需谨慎评估:
可行场景
L1:本地缓存(Caffeine)——存储极热数据
L2:远端缓存(Redis)——存储热数据
差异化 TTL:L1 TTL << L2 TTL
风险点
一致性问题:更新时需要同时失效多个层级
复杂性倍增:每个层级都需要独立的容量规划、监控、故障预案
建议
多数应用从单一远端缓存起步即可
仅在 QPS > 10万且 p99 延迟 < 1ms 为硬性 SLA 时考虑多级缓存
三、缓存更新策略
缓存与数据源的一致性维护是核心挑战,业界形成了四种经典模式:
3.1 Cache Aside(旁路缓存)
最常用的模式,由应用程序显式控制缓存与数据库的交互。
1 2 3 4 5 读取流程: Client -> Cache Miss? -> Load from DB -> Populate Cache -> Return Data 写入流程: Client -> Update DB -> Delete Cache (非更新缓存)
为什么写入时是 Delete 而非 Update?
Update 模式在并发场景下存在竞态条件:
1 2 3 T1: 更新数据库 A =1 T2: 更新数据库 A =2, 更新缓存 A =2 T1: 更新缓存 A =1 (覆盖了T2的正确值,导致不一致)
Delete 模式下,下次读取会从数据库加载最新值,保证最终一致性。
3.2 Read Through / Write Through
缓存组件接管数据访问,对应用透明。
1 2 Read Through: Client -> Cache -> (Miss时Cache自动从DB加载) Write Through: Client -> Cache -> Cache 同步写入DB -> 返回成功
特点
简化应用逻辑,无需处理缓存细节
强耦合于特定缓存框架
Write Through 写延迟较高(需等待双写完成)
3.3 Write Behind(异步回写)
写入操作仅更新缓存,由后台线程异步批量刷盘到数据库。
1 2 Client -> Update Cache -> 立即返回成功 └-> 异步线程批量 flush 到 DB
优势
写性能最优(纯内存操作)
可合并多次更新,降低 DB 压力
风险
数据丢失窗口:宕机时未 flush 的数据丢失
实现复杂:需处理事务边界、失败重试、幂等问题
3.4 策略对比
策略
一致性
读性能
写性能
实现复杂度
Cache Aside
最终一致
高
高
低
Read Through
最终一致
高
-
中
Write Through
强一致
-
低
中
Write Behind
弱一致
-
最高
高
四、Java 进程内缓存详解
4.1 Guava Cache
Guava Cache 是 Google 提供的轻量级缓存库,设计上是对 ConcurrentHashMap 的增强。
核心特性
特性
说明
最大容量限制
基于数量或权重
过期策略
expireAfterWrite / expireAfterAccess
刷新机制
refreshAfterWrite(异步刷新,不阻塞读)
引用类型
支持 weakKeys / weakValues / softValues
统计信息
hit/miss/eviction 计数
基本用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import com.google.common.cache.CacheBuilder;import com.google.common.cache.LoadingCache;import com.google.common.cache.CacheLoader; LoadingCache<String, User> userCache = CacheBuilder.newBuilder() .maximumSize(10000 ) .expireAfterWrite(10 , TimeUnit.MINUTES) .refreshAfterWrite(1 , TimeUnit.MINUTES) .recordStats() .build(new CacheLoader <String, User>() { @Override public User load (String key) throws Exception { return loadUserFromDatabase(key); } });User user = userCache.get(userId);
⚠️ 重要限制:Guava Cache 不支持 null value
Guava Cache 的设计哲学将 null 视为"该 key 无对应数据",因此禁止存入 null。这导致一个实际问题:如何处理缓存穿透?
解决方案是使用空对象模式(Null Object Pattern):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private static final User EMPTY_USER = new User (); LoadingCache<String, Optional<User>> userCache = CacheBuilder.newBuilder() .build(new CacheLoader <String, Optional<User>>() { @Override public Optional<User> load (String key) { User user = loadUserFromDatabase(key); return Optional.ofNullable(user != null ? user : EMPTY_USER); } }); Optional<User> result = userCache.get(userId);if (result.isPresent() && result.get() != EMPTY_USER) { return result.get(); }return null ;
4.2 Caffeine
Caffeine 是 Guava Cache 的高性能替代品,采用 Window-TinyLFU 淘汰算法,性能显著优于 Guava 的 LRU。
演进关系
1 2 3 4 5 concurrentlinkedhashmap (Ben Manes) ↓ Guava Cache (Google) ↓ Caffeine (Ben Manes, Java 8 )
关键改进
维度
Guava
Caffeine
淘汰算法
Segmented LRU
W-TinyLFU
并发模型
分段锁
无锁 + RingBuffer
刷新机制
同步阻塞
全异步
统计精度
简单计数
频率素描
异步加载示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import com.github.benmanes.caffeine.cache.Caffeine;import com.github.benmanes.caffeine.cache.AsyncLoadingCache; AsyncLoadingCache<String, User> asyncCache = Caffeine.newBuilder() .maximumSize(10000 ) .expireAfterWrite(10 , TimeUnit.MINUTES) .executor(Executors.newFixedThreadPool(10 )) .buildAsync((key, executor) -> CompletableFuture.supplyAsync(() -> loadUser(key), executor) ); CompletableFuture<User> future = asyncCache.get(userId); future.thenAccept(user -> System.out.println(user.getName()));
4.3 EhCache 3
EhCache 是唯一支持持久化的本地缓存,适合大容量、可容忍磁盘访问延迟的场景。
堆内外分层配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 <config xmlns ="http://www.ehcache.org/v3" > <cache alias ="largeDataCache" > <key-type > java.lang.Long</key-type > <value-type > java.io.Serializable</value-type > <resources > <heap unit ="entries" > 1000</heap > <offheap unit ="MB" > 128</offheap > <disk unit ="GB" > 10</disk > </resources > <expiry > <ttl unit ="minutes" > 60</ttl > </expiry > </cache > </config >
与 Spring Boot 集成
1 2 3 4 5 6 7 8 9 10 11 12 13 @Configuration @EnableCaching public class CacheConfig { @Bean public CacheManager cacheManager () { CachingProvider provider = Caching.getCachingProvider(); CacheManager manager = provider.getCacheManager(); return new JCacheCacheManager (manager); } }
五、分布式缓存:Redis
Redis 是目前最流行的分布式缓存,本节聚焦 Java 客户端的最佳实践。
5.1 客户端选择
客户端
连接方式
特性
推荐场景
Jedis
直连
成熟稳定,API丰富
单机或简单分片
Lettuce
Netty + 异步
响应式编程,集群友好
高并发、Reactive
Redisson
高级封装
分布式锁、对象映射
需要分布式协调
5.2 Spring Data Redis 配置
1 2 3 4 5 6 7 8 9 10 11 spring: redis: host: localhost port: 6379 lettuce: pool: max-active: 50 max-idle: 20 min-idle: 5 max-wait: 3000ms timeout: 2000ms
5.3 缓存序列化
默认 JDK 序列化存在性能和空间问题,推荐配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Bean public RedisTemplate<String, Object> redisTemplate (RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate <>(); template.setConnectionFactory(factory); template.setKeySerializer(new StringRedisSerializer ()); template.setHashKeySerializer(new StringRedisSerializer ()); Jackson2JsonRedisSerializer<Object> jsonSerializer = new Jackson2JsonRedisSerializer <>(Object.class); ObjectMapper mapper = new ObjectMapper (); mapper.registerModule(new JavaTimeModule ()); mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); jsonSerializer.setObjectMapper(mapper); template.setValueSerializer(jsonSerializer); template.afterPropertiesSet(); return template; }
六、缓存故障防护
缓存系统面临三类典型故障场景,每类都有对应的防护模式。
6.1 缓存击穿(Hot Key Expiration)
现象 :热点 key 过期瞬间,大量请求同时穿透到数据库。
防护模式
方案
原理
实现
互斥重建
单线程回源,其他线程等待
分布式锁或本地 synchronized
逻辑过期
不设置物理 TTL,通过逻辑字段判断是否过期
延长实际 TTL,后台异步刷新
热点识别
预加载即将过期的热 key
访问统计 + 主动刷新
Guava 互斥重建示例
1 2 3 4 5 6 LoadingCache<String, User> cache = CacheBuilder.newBuilder() .expireAfterWrite(5 , TimeUnit.MINUTES) .refreshAfterWrite(1 , TimeUnit.MINUTES) .build(CacheLoader.from(this ::loadUser));
6.2 缓存雪崩(Mass Expiration)
现象 :大量 key 同时过期,引发数据库流量洪峰。
防护模式:随机散列
在基础 TTL 上增加随机偏移:
1 2 3 4 int baseTtlSeconds = 3600 ; int randomOffset = ThreadLocalRandom.current().nextInt(600 ); redisTemplate.opsForValue().set(key, value, baseTtlSeconds + randomOffset, TimeUnit.SECONDS);
对于定时任务刷新的场景,采用阶梯过期 :将数据分批,每批在不同时间点刷新。
6.3 缓存穿透(Phantom Key)
现象 :查询不存在的数据,每次都穿透到数据库。
防护模式:空值防御
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public User getUser (String userId) { String cacheKey = "user:" + userId; User cached = redisTemplate.opsForValue().get(cacheKey); if (cached != null ) { if (cached == EMPTY_USER) { return null ; } return cached; } User user = userRepository.findById(userId); if (user != null ) { redisTemplate.opsForValue().set(cacheKey, user, 1 , TimeUnit.HOURS); } else { redisTemplate.opsForValue().set(cacheKey, EMPTY_USER, 5 , TimeUnit.MINUTES); } return user; }
进阶:布隆过滤器拦截
对于查询模式固定的场景,可用布隆过滤器前置过滤:
1 2 3 Client -> Bloom Filter Check? └─ 可能存在 -> Query Cache -> (Miss) -> Query DB └─ 肯定不存在 -> 直接返回 Null
七、🔑 模式提炼
模式一:分层降级
公式 :L1(高频小容量) → L2(中频大容量) → Origin
迁移表
场景
L1
L2
Origin
用户信息
Caffeine (10s)
Redis (1h)
MySQL
配置项
Guava (1min)
Consul
Config Server
API 限流状态
Local Counter
Redis Cell
-
核心洞察 :每一层的职责是"挡住上一层的穿透",TTL 必须逐层递增,否则失去分层意义。
模式二:惰性填充
公式 :On-Demand = Trigger(Load(Key) -> Store) -> Return
迁移表
场景
触发条件
加载来源
应用
缓存预热
系统启动/定时任务
数据库全量扫描
商品类目
懒加载
首次访问 miss
单条查询
用户资料
异步回填
消息队列通知
Kafka Topic
订单状态
模式三:空值防御
公式 :Key → {Value | EmptyMarker} with Short-TTL
关键参数
空对象标记 TTL:通常为正常数据的 1/10 ~ 1/5
必须配合手动清除逻辑,防止真实数据产生后仍返回空
八、生产环境实践要点
8.1 监控指标
指标
健康阈值
报警条件
命中率
> 90%
< 80%
平均加载时间
< 100ms
> 500ms
驱逐率
视容量而定
突增 > 200%
错误率
0%
> 0.1%
8.2 容量规划
估算公式:
1 缓存条目数 ≈ 峰值 QPS × 平均访问间隔 / (命中率目标 / (1 - 命中率目标))
示例:QPS=10000,用户平均10分钟访问一次,目标命中率95%:
1 条目数 ≈ 10000 × 600 s × (0.05/0.95 ) ≈ 315,789
8.3 故障演练清单
[ ] 模拟 Redis 宕机,验证降级逻辑
[ ] 模拟大 key 过期,观察数据库压力
[ ] 模拟网络分区,测试最终一致性收敛时间
[ ] 压测缓存满载时的驱逐行为
九、模式速查表
听到的需求关键词
对应模式
推荐方案
口诀
本地缓存要高性能
分层降级
Caffeine + Redis
本地兜底,远程扩展
冷启动加载慢
惰性填充
@PostConstruct 预加载
触发加载,按需扩容
数据更新后不一致
旁路同步
Cache Aside + 延迟双删
先库后删,读写互斥
恶意请求导致 DB 崩溃
空值防御
空对象 + 布隆过滤器
存空防击,短TTL控险
大量 key 同时过期
随机散列
TTL + Random Offset
过期分散,渐进刷新
热点 key 频繁失效
互斥重建
分布式锁 / 逻辑过期
单线回源,排队等待
参考文献
陈皓:缓存更新的套路
美团技术团队:缓存那些事
Guava Cache 官方文档
Caffeine Wiki
Guava
spring 的 cache 用的是 cachemanager。
guava 的 cache 用的是 cachebuilder。
Spring + Guava 一般的短路器式的用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Configuration @ComponentScan("com.concretepage") @EnableCaching public class AppConfigA { @Bean public CacheManager cacheManager () { GuavaCacheManager cacheManager = new GuavaCacheManager ("mycache" ); CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder() .maximumSize(100 ) .expireAfterWrite(10 , TimeUnit.MINUTES); cacheManager.setCacheBuilder(cacheBuilder); return cacheManager; } }@Configuration @ComponentScan("com.concretepage") @EnableCaching public class AppConfigB { @Bean public CacheManager cacheManager () { SimpleCacheManager cacheManager = new SimpleCacheManager (); GuavaCache guavaCache1 = new GuavaCache ("book" , CacheBuilder.newBuilder() .maximumSize(50 ).build()); GuavaCache guavaCache2 = new GuavaCache ("bookstore" , CacheBuilder.newBuilder() .maximumSize(100 ).expireAfterAccess(5 , TimeUnit.MINUTES).build()); cacheManager.setCaches(Arrays.asList(guavaCache1, guavaCache2)); return cacheManager; } } @Service public class BookAppA { Book book = new Book (); @Cacheable(value = "mycache") public Book getBook () { System.out.println("Executing getBook method..." ); book.setBookName("Mahabharat" ); return book; } }
只需要覆盖 load 方法
load 和 evict 逻辑是解耦的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Test public void whenCacheMiss_thenValueIsComputed () { CacheLoader<String, String> loader; loader = new CacheLoader <String, String>() { @Override public String load (String key) { return key.toUpperCase(); } }; LoadingCache<String, String> cache; cache = CacheBuilder.newBuilder().build(loader); assertEquals(0 , cache.size()); assertEquals("HELLO" , cache.getUnchecked("hello" )); assertEquals(1 , cache.size()); } CacheLoader<Key, Graph> loader = new CacheLoader <Key, Graph>() { public Graph load (Key key) throws AnyException { return createExpensiveGraph(key); } }; LoadingCache<Key, Graph> cache = CacheBuilder.newBuilder().build(loader);}
带权重的缓存配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Test public void whenCacheReachMaxWeight_thenEviction () { CacheLoader<String, String> loader; loader = new CacheLoader <String, String>() { @Override public String load (String key) { return key.toUpperCase(); } }; Weigher<String, String> weighByLength; weighByLength = new Weigher <String, String>() { @Override public int weigh (String key, String value) { return value.length(); } }; LoadingCache<String, String> cache; cache = CacheBuilder.newBuilder() .maximumWeight(16 ) .weigher(weighByLength) .build(loader); cache.getUnchecked("first" ); cache.getUnchecked("second" ); cache.getUnchecked("third" ); cache.getUnchecked("last" ); assertEquals(3 , cache.size()); assertNull(cache.getIfPresent("first" )); assertEquals("LAST" , cache.getIfPresent("last" ));
指定过期时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 @Test public void whenEntryIdle_thenEviction () throws InterruptedException { CacheLoader<String, String> loader; loader = new CacheLoader <String, String>() { @Override public String load (String key) { return key.toUpperCase(); } }; LoadingCache<String, String> cache; cache = CacheBuilder.newBuilder() .expireAfterAccess(2 ,TimeUnit.MILLISECONDS) .build(loader); cache.getUnchecked("hello" ); assertEquals(1 , cache.size()); cache.getUnchecked("hello" ); Thread.sleep(300 ); cache.getUnchecked("test" ); assertEquals(1 , cache.size()); assertNull(cache.getIfPresent("hello" )); }@Test public void whenEntryLiveTimeExpire_thenEviction () throws InterruptedException { CacheLoader<String, String> loader; loader = new CacheLoader <String, String>() { @Override public String load (String key) { return key.toUpperCase(); } }; LoadingCache<String, String> cache; cache = CacheBuilder.newBuilder() .expireAfterWrite(2 ,TimeUnit.MILLISECONDS) .build(loader); cache.getUnchecked("hello" ); assertEquals(1 , cache.size()); Thread.sleep(300 ); cache.getUnchecked("test" ); assertEquals(1 , cache.size()); assertNull(cache.getIfPresent("hello" )); }
弱引用和软引用 key
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Test public void whenWeakKeyHasNoRef_thenRemoveFromCache () { CacheLoader<String, String> loader; loader = new CacheLoader <String, String>() { @Override public String load (String key) { return key.toUpperCase(); } }; LoadingCache<String, String> cache; cache = CacheBuilder.newBuilder().weakKeys().build(loader); }@Test public void whenSoftValue_thenRemoveFromCache () { CacheLoader<String, String> loader; loader = new CacheLoader <String, String>() { @Override public String load (String key) { return key.toUpperCase(); } }; LoadingCache<String, String> cache; cache = CacheBuilder.newBuilder().softValues().build(loader); }
定时触发 load 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Test public void whenLiveTimeEnd_thenRefresh () { CacheLoader<String, String> loader; loader = new CacheLoader <String, String>() { @Override public String load (String key) { return key.toUpperCase(); } }; LoadingCache<String, String> cache; cache = CacheBuilder.newBuilder() .refreshAfterWrite(1 ,TimeUnit.MINUTES) .build(loader); }
主动预热
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Testpublic void whenPreloadCache_thenUsePutAll() { CacheLoader<String , String > loader; loader = new CacheLoader<String , String >() { @Override public String load(String key) { return key.toUpperCase(); } }; LoadingCache<String , String > cache ; cache = CacheBuilder.newBuilder().build(loader); Map <String , String > map = new HashMap<String , String >(); map .put("first" , "FIRST" ); map .put("second" , "SECOND" ); cache .putAll(map ); assertEquals(2 , cache .size()); }
必须使用 optional 来应对 null 值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Test public void whenNullValue_thenOptional ( ) { CacheLoader <String , Optional <String >> loader; loader = new CacheLoader <String , Optional <String >>() { @Override public Optional <String > load (String key ) { return Optional .fromNullable (getSuffix (key)); } }; LoadingCache <String , Optional <String >> cache; cache = CacheBuilder .newBuilder ().build (loader); assertEquals ("txt" , cache.getUnchecked ("text.txt" ).get ()); assertFalse (cache.getUnchecked ("hello" ).isPresent ()); }private String getSuffix (final String str ) { int lastIndex = str.lastIndexOf ('.' ); if (lastIndex == -1 ) { return null ; } return str.substring (lastIndex + 1 ); }
订阅删除事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Test public void whenEntryRemovedFromCache_thenNotify () { CacheLoader<String, String> loader; loader = new CacheLoader <String, String>() { @Override public String load (final String key) { return key.toUpperCase(); } }; RemovalListener<String, String> listener; listener = new RemovalListener <String, String>() { @Override public void onRemoval (RemovalNotification<String, String> n) { if (n.wasEvicted()) { String cause = n.getCause().name(); assertEquals(RemovalCause.SIZE.toString(),cause); } } }; LoadingCache<String, String> cache; cache = CacheBuilder.newBuilder() .maximumSize(3 ) .removalListener(listener) .build(loader); cache.getUnchecked("first" ); cache.getUnchecked("second" ); cache.getUnchecked("third" ); cache.getUnchecked("last" ); assertEquals(3 , cache.size()); }
Cache Statistic
可以 logging cache statistic data
Cache Stats= CacheStats{hitCount=3296628, missCount=1353372,
loadSuccessCount=1353138, loadExceptionCount=0,
totalLoadTime=2268064327604, evictionCount=1325410} Cache Stats=
CacheStats{hitCount=3334167, missCount=1365834,
loadSuccessCount=1365597, loadExceptionCount=0,
totalLoadTime=2287551024797, evictionCount=1337740} Cache Stats=
CacheStats{hitCount=3371463, missCount=1378536,
loadSuccessCount=1378296, loadExceptionCount=0,
totalLoadTime=2309012047459, evictionCount=1350990} Cache Stats=
CacheStats{hitCount=3407719, missCount=1392280,
loadSuccessCount=1392039, loadExceptionCount=0,
totalLoadTime=2331355983194, evictionCount=1364535} Cache Stats=
CacheStats{hitCount=3443848, missCount=1406152,
loadSuccessCount=1405908, loadExceptionCount=0,
totalLoadTime=2354162371299, evictionCount=1378654}
参考:recordStats
ECache
Spring4 + ECache 2
整个 namespace 的说明见这里 。
具体的配置选项见:
name:缓存名称。
maxElementsInMemory:缓存最大个数。
eternal:缓存中对象是否为永久的,如果是,超时设置将被忽略,对象从不过期。
timeToIdleSeconds:置对象在失效前的允许闲置时间(单位:秒)。仅当eternal=false对象不是永久有效时使用,可选属性,默认值是0,也就是可闲置时间无穷大。
timeToLiveSeconds:缓存数据的生存时间(TTL),也就是一个元素从构建到消亡的最大时间间隔值,这只能在元素不是永久驻留时有效,如果该值是0就意味着元素可以停顿无穷长的时间。
maxEntriesLocalDisk:当内存中对象数量达到maxElementsInMemory时,Ehcache将会对象写到磁盘中。
overflowToDisk:内存不足时,是否启用磁盘缓存。
diskSpoolBufferSizeMB:这个参数设置DiskStore(磁盘缓存)的缓存区大小。默认是30MB。每个Cache都应该有自己的一个缓冲区。
maxElementsOnDisk:硬盘最大缓存个数。
diskPersistent:是否在VM重启时存储硬盘的缓存数据。默认值是false。
diskExpiryThreadIntervalSeconds:磁盘失效线程运行时间间隔,默认是120秒。
memoryStoreEvictionPolicy:当达到maxElementsInMemory限制时,Ehcache将会根据指定的策略去清理内存。默认策略是LRU(最近最少使用)。你可以设置为FIFO(先进先出)或是LFU(较少使用)。
clearOnFlush:内存数量最大时是否清除。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 <ehcache xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation ="ehcache.xsd" updateCheck ="true" monitoring ="autodetect" dynamicConfig ="true" > <diskStore path ="java.io.tmpdir" /> <cache name ="movieFindCache" maxEntriesLocalHeap ="10000" maxEntriesLocalDisk ="1000" eternal ="false" diskSpoolBufferSizeMB ="20" timeToIdleSeconds ="300" timeToLiveSeconds ="600" memoryStoreEvictionPolicy ="LFU" transactionalMode ="off" > <persistence strategy ="localTempSwap" /> </cache > </ehcache >
对应的 java code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 package com.mkyong.test;import org.springframework.cache.CacheManager;import org.springframework.cache.annotation.EnableCaching;import org.springframework.cache.ehcache.EhCacheCacheManager;import org.springframework.cache.ehcache.EhCacheManagerFactoryBean;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.ComponentScan;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.ClassPathResource;@Configuration @EnableCaching @ComponentScan({ "com.mkyong.*" }) public class AppConfig { @Bean public CacheManager cacheManager () { return new EhCacheCacheManager (ehCacheCacheManager().getObject()); } @Bean public EhCacheManagerFactoryBean ehCacheCacheManager () { EhCacheManagerFactoryBean cmfb = new EhCacheManagerFactoryBean (); cmfb.setConfigLocation(new ClassPathResource ("ehcache.xml" )); cmfb.setShared(true ); return cmfb; } }package com.mkyong.movie;import org.springframework.cache.annotation.Cacheable;import org.springframework.stereotype.Repository;@Repository("movieDao") public class MovieDaoImpl implements MovieDao { @Cacheable(value="movieFindCache", key="#name") public Movie findByDirector (String name) { slowQuery(2000L ); System.out.println("findByDirector is running..." ); return new Movie (1 ,"Forrest Gump" ,"Robert Zemeckis" ); } private void slowQuery (long seconds) { try { Thread.sleep(seconds); } catch (InterruptedException e) { throw new IllegalStateException (e); } } }
Spring Boot 2 + ECache3
ECache 是 Hibernate 中的默认缓存框架。
要引入 javax 的 cache api(JSR-107):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-cache</artifactId > <version > 2.2.2.RELEASE</version > </dependency > <dependency > <groupId > javax.cache</groupId > <artifactId > cache-api</artifactId > <version > 1.1.1</version > </dependency > <dependency > <groupId > org.ehcache</groupId > <artifactId > ehcache</artifactId > <version > 3.8.1</version > </dependency >
对应的缓存注解:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Service public class NumberService { @Cacheable( value = "squareCache", key = "#number", condition = "#number>10") public BigDecimal square (Long number) { BigDecimal square = BigDecimal.valueOf(number) .multiply(BigDecimal.valueOf(number)); log.info("square of {} is {}" , number, square); return square; } }@Configuration @EnableCaching public class CacheConfig { }public class CacheEventLogger implements CacheEventListener <Object, Object> { @Override public void onEvent ( CacheEvent<? extends Object, ? extends Object> cacheEvent) { log.info(, cacheEvent.getKey(), cacheEvent.getOldValue(), cacheEvent.getNewValue()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 <config xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xmlns ="http://www.ehcache.org/v3" xmlns:jsr107 ="http://www.ehcache.org/v3/jsr107" xsi:schemaLocation =" http://www.ehcache.org/v3 http://www.ehcache.org/schema/ehcache-core-3.0.xsd http://www.ehcache.org/v3/jsr107 http://www.ehcache.org/schema/ehcache-107-ext-3.0.xsd" > <cache alias ="squareCache" > <key-type > java.lang.Long</key-type > <value-type > java.math.BigDecimal</value-type > <expiry > <ttl unit ="seconds" > 30</ttl > </expiry > <listeners > <listener > <class > com.baeldung.cachetest.config.CacheEventLogger</class > <event-firing-mode > ASYNCHRONOUS</event-firing-mode > <event-ordering-mode > UNORDERED</event-ordering-mode > <events-to-fire-on > CREATED</events-to-fire-on > <events-to-fire-on > EXPIRED</events-to-fire-on > </listener > </listeners > <resources > <heap unit ="entries" > 2</heap > <offheap unit ="MB" > 10</offheap > </resources > </cache > </config >
具体的其他 cache 操作的注解:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Service public class UserService { @Cacheable({"users"}) public User findUser (User user) { return findUserInDB(user.getId()); } @Cacheable(value = "users", condition = "#user.getId() <= 2") public User findUserInLimit (User user) { return findUserInDB(user.getId()); } @CachePut(value = "users", key = "#user.getId()") public void updateUser (User user) { updateUserInDB(user); } @CacheEvict(value = "users") public void removeUser (User user) { removeUserInDB(user.getId()); } @CacheEvict(value = "users", allEntries = true) public void clear () { removeAllInDB(); } }@Caching(evict = { @CacheEvict("primary"), @CacheEvict(cacheNames="secondary", key="#p0") }) public Book importBooks (String deposit, Date date) 如果类的所有操作都是缓存操作,你可以使用@CacheConfig 来指定类,省去一些配置。@CacheConfig("books") public class BookRepositoryImpl implements BookRepository { @Cacheable public Book findBook (ISBN isbn) {...} }
可以考虑,定义自己的 KeyGenerator
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Component public class MyKeyGenerator implements KeyGenerator { @Override public Object generate (Object target, Method method, Object... params) { return method.getName()+Arrays.toString(params); } }@Cacheable(keyGenerator = "myKeyGenerator") public User getUserById (Long id) { User user = new User (); user.setId(id); user.setUsername("lisi" ); System.out.println(user); return user; }
另外,可以用的 key 的缓存专用 SPEL 表达式,在这里 。
Caffeine
它几个特别有意思的特性:time-based eviction、size-based eviction、异步加载、弱引用 key(不考虑 referenceQueue 的特性,WeakReference 是最适合我们用的)。
automatic loading of entries into the cache, optionally asynchronously
size-based eviction when a maximum is exceeded based on frequency and recency
time-based expiration of entries, measured since last access or last write
asynchronously refresh when the first stale request for an entry occurs
keys automatically wrapped in weak references
values automatically wrapped in weak or soft references
notification of evicted (or otherwise removed) entries
writes propagated to an external resource
accumulation of cache access statistics
不搭配 Spring
1 2 3 4 5 <dependency > <groupId > com.github.ben-manes.caffeine</groupId > <artifactId > caffeine</artifactId > <version > 2.5.5</version > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 Cache<String, DataObject> cache = Caffeine.newBuilder() .expireAfterWrite(1 , TimeUnit.MINUTES) .maximumSize(100 ) .build(); String key = "A" ;DataObject dataObject = cache.getIfPresent(key); assertNull(dataObject); cache.put(key, dataObject); dataObject = cache.getIfPresent(key); assertNotNull(dataObject); dataObject = cache .get(key, k -> DataObject.get("Data for A" )); assertNotNull(dataObject); assertEquals("Data for A" , dataObject.getData()); LoadingCache<String, DataObject> cache = Caffeine.newBuilder() .maximumSize(100 ) .expireAfterWrite(1 , TimeUnit.MINUTES) .build(k -> DataObject.get("Data for " + k));DataObject dataObject = cache.get(key); assertNotNull(dataObject); assertEquals("Data for " + key, dataObject.getData()); Map<String, DataObject> dataObjectMap = cache.getAll(Arrays.asList("A" , "B" , "C" )); assertEquals(3 , dataObjectMap.size()); AsyncLoadingCache<String, DataObject> cache = Caffeine.newBuilder() .maximumSize(100 ) .expireAfterWrite(1 , TimeUnit.MINUTES) .buildAsync(k -> DataObject.get("Data for " + k));String key = "A" ; cache.get(key).thenAccept(dataObject -> { assertNotNull(dataObject); assertEquals("Data for " + key, dataObject.getData()); }); cache.getAll(Arrays.asList("A" , "B" , "C" )) .thenAccept(dataObjectMap -> assertEquals(3 , dataObjectMap.size())); LoadingCache<String, DataObject> cache = Caffeine.newBuilder() .maximumSize(1 ) .build(k -> DataObject.get("Data for " + k)); assertEquals(0 , cache.estimatedSize()); cache.cleanUp(); LoadingCache<String, DataObject> cache = Caffeine.newBuilder() .maximumWeight(10 ) .weigher((k,v) -> 5 ) .build(k -> DataObject.get("Data for " + k)); assertEquals(0 , cache.estimatedSize()); cache.get("A" ); assertEquals(1 , cache.estimatedSize()); cache.get("B" ); assertEquals(2 , cache.estimatedSize()); LoadingCache<String, DataObject> cache = Caffeine.newBuilder() .expireAfterAccess(5 , TimeUnit.MINUTES) .build(k -> DataObject.get("Data for " + k)); cache = Caffeine.newBuilder() .expireAfterWrite(10 , TimeUnit.SECONDS) .weakKeys() .weakValues() .build(k -> DataObject.get("Data for " + k)); cache = Caffeine.newBuilder().expireAfter(new Expiry <String, DataObject>() { @Override public long expireAfterCreate ( String key, DataObject value, long currentTime) { return value.getData().length() * 1000 ; } @Override public long expireAfterUpdate ( String key, DataObject value, long currentTime, long currentDuration) { return currentDuration; } @Override public long expireAfterRead ( String key, DataObject value, long currentTime, long currentDuration) { return currentDuration; } }).build(k -> DataObject.get("Data for " + k)); LoadingCache<String, DataObject> cache = Caffeine.newBuilder() .expireAfterWrite(10 , TimeUnit.SECONDS) .weakKeys() .weakValues() .build(k -> DataObject.get("Data for " + k)); cache = Caffeine.newBuilder() .expireAfterWrite(10 , TimeUnit.SECONDS) .softValues() .build(k -> DataObject.get("Data for " + k)); Caffeine.newBuilder() .refreshAfterWrite(1 , TimeUnit.MINUTES) .build(k -> DataObject.get("Data for " + k)); LoadingCache<String, DataObject> cache = Caffeine.newBuilder() .maximumSize(100 ) .recordStats() .build(k -> DataObject.get("Data for " + k)); cache.get("A" ); cache.get("A" ); assertEquals(1 , cache.stats().hitCount()); assertEquals(1 , cache.stats().missCount());
这里的 get 类操作的原子性特别重要:
method invocation is performed atomically, so the function is applied
at most once per key. Some attempted update operations on this cache
by other threads may be blocked while the computation is in progress,
so the computation should be short and simple, and must not attempt to
update any other mappings of this cache.
这样可以保证线性一致性,实现立即可见(类似强广播),但中间插入的这个原子操作必须短,类似 redis 的 set 才可以。
搭配 Spring
1 2 3 4 5 6 7 8 9 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-cache</artifactId > </dependency > <dependency > <groupId > com.github.ben-manes.caffeine</groupId > <artifactId > caffeine</artifactId > <version > 2.6.0</version > </dependency >
相关的配置文件:
initialCapacity: # 初始的缓存空间大小
maximumSize: # 缓存的最大条数
maximumWeight: # 缓存的最大权重
expireAfterAccess: # 最后一次写入或访问后经过固定时间过期
expireAfterWrite: # 最后一次写入后经过固定时间过期
refreshAfterWrite: # 创建缓存或者最近一次更新缓存后经过固定的时间间隔,刷新缓存
weakKeys: # 打开 key 的弱引用
weakValues: # 打开 value 的弱引用
softValues: # 打开 value 的软引用
recordStats: # 开发统计功能
1 2 3 4 5 6 spring: cache: type: caffeine caffeine: spec: maximumSize=1024 cache-names: cache1,cache2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 @Configuration public class CaffeineCacheConfig { public CacheManager cacheManager () { CaffeineCacheManager cacheManager = new CaffeineCacheManager ("customer" ); cacheManager.setCaffeine(caffeineCacheBuilder()); return cacheManager; } Caffeine < Object, Object > caffeineCacheBuilder() { return Caffeine.newBuilder() .initialCapacity(100 ) .maximumSize(500 ) .expireAfterAccess(10 , TimeUnit.MINUTES) .weakKeys() .recordStats(); } }public interface CustomerService { Customer getCustomer (final Long customerID) ; }@Service @CacheConfig(cacheNames = {"customer"}) public class DefaultCustomerService implements CustomerService { private static final Logger LOG = LoggerFactory.getLogger(DefaultCustomerService.class); @Cacheable @Override public Customer getCustomer (Long customerID) { LOG.info("Trying to get customer information for id {} " ,customerID); return getCustomerData(customerID); } private Customer getCustomerData (final Long id) { Customer customer = new Customer (id, "testemail@test.com" , "Test Customer" ); return customer; } }
如何应对缓存 miss 的问题
cache miss 在中文的语境里经常被人分为缓存击穿、缓存雪崩和缓存穿透,这三种类型并不完全互斥穷举,在概念上极其容易造成混淆。在这一段总结的时候姑且依照这三种类型分别加以论述。
如何应对缓存击穿(breakthrough)
缓存击穿,指的是某个 key 应该访问缓存,却没有访问到,导致缓存通过兜底的策略去更下游的冷存储加载内容,给下游的系统造成了读压力。
应对这个问题的基本思路有:
事前:
在可能热点数据的访问高峰到达以前,提前把数据预热。
永远不要让缓存失效-这样缓存 stale 以后,怎么保鲜是个巨大的问题,只能靠一个后端的主动更新机制来尽最大努力来更新缓存。这种方案是一致性最差的。
在缓存击穿以前,主动更新缓存,即不让缓存击穿发生,即同时才有 expireAfter(t1) + loadAfter(t2) 的策略。
极端热的数据,不允许缓存被动失效,必须使用主动更新的模式。
并发操作下更新缓存一定要注意顺序!如果有消息来更新更要注意顺序!
定时任务和广播刷新有时候可以互相补充-定时任务是超时的补充。
事中:
在缓存击穿的时候,严格限制读冷存储 + 预热缓存的流量,即有限降级,有损服务。
如果缓存更新是同步读写(Cache Aside 或者 Read/Write Through)的模式,则引入各种限流工具(限制线程数的线程池/信号量/SLA/Rhino/Redis 计数器/线程内计数器/Hystrix/Web 容器的限流器),保障可用性的同时保障吞吐量。
如果缓存更新可以异步主动更新,则考虑单线程执行或者使用消息队列进行低流量更新。能怎样在事中限制这个问题,取决于缓存和读写接入层之间本来的架构关系是如何设计的。
某类特别热的 key 可能一旦失效会导致大量的读,这种 key 的实际更新流程还要加上分布式锁-而且还要使用试锁而不能使用阻塞锁-facebook 的论文里没有提到这种策略,不知道是不是数据很均匀。
事后:
如果系统无法自愈,熔断拒绝服务以后(所以熔断、降级限流每一手准备都要准备好,可以用限流为 0 来制造熔断),手工预热缓存。
如何应对缓存雪崩
雪崩问题,指的是:大规模的缓存失效,再加上大规模的访问流量,造成对后端非高可用的冷存储(通常是 RDBMS)的大规模读写,导致 RDBMS 可用性下降,甚至整个系统级联崩溃。
从某种意义上,单一缓存的击穿并不可怕,缓存雪崩才是最可怕的。
应对缓存雪崩问题,基本思路是大规模使用应对缓存击穿的基础策略的基础上,把缓存预热的行为模式打散。
基于超时时间的思路是:不同的 key 设置不同的超时时间,让缓存失效不同时到来。但这样并不能完全解决问题,因为缓存并不是失效以后就直接可以被加载上,除非缓存自带异步自加载的机制(很多 in-memory cache 有,但 Redis 没有),否则不均匀的流量还是可能到达缓存后导致大规模击穿。对超时时间的方案的加强版是,采用一套主动更新缓存的机制。
基于预热的思路是:缓存一开始分好集群。允许某些集群的上游准备好熔断,然后集体停下流量以后,使用脚本批量预热整个集群数据。
如何应对缓存穿透(penetration)
缓存穿透不同于缓存击穿。
缓存穿透指的是试图查询不存在的缓存数据。
可以针对缓存穿透来刷冷数据,导致整个集群频繁查询冷存储而崩溃。
解决方案有:
对明显不符合要求的请求,直接返回 null。
使用一个大的 bitmap 或者布隆过滤器来拦截可能不存在的请求,直接返回 null。
缓存穿透一次,就在 cache 中存上 null - 允许使用 null 的缓存能够天然抵挡缓存穿透问题。Guava 的缺点就在这里被体现出来了
以上措施混合使用的话,必须考虑缓存里的 null。 必须有超时时间,而且应该有对应的无 null。 以后主动更新的机制,否则这个空值就被污染了。
远端缓存与近端缓存的辨析
缓存在哪端,哪端就能定制它的行为,但要供应它消耗的资源。近端缓存通常简单,但也就意味着没有什么功能。
远端缓存的好处
自带广播、同步和共识功能,能够对接写入服务。
自带独立的集群,有专业的运维人员,适合存储海量数据。
远端缓存的坏处
制造了复杂的依赖,比如接入变复杂、流程变复杂。
所有的服务都依赖于一个服务,配置和流程不易于差异化,冲突比例增多。
近端缓存的好处
接入简单。
自己可以把控自己的缓存使用逻辑。
近端缓存的坏处
相对于广播同步一致性难度大,通信成本高-易引起通信风暴。
占用内存变大,无法解决海量数据存储。
十、缓存与数据库一致性深度分析
在分布式系统中,缓存与数据库的数据一致性是永恒的难题。本节深入分析几种高级一致性方案。
延迟双删策略
问题描述 :在 Cache Aside 模式下,先删除缓存再更新数据库,在更新完成之前,如果有读请求进来,会读取到旧数据并写入缓存,导致缓存与数据库不一致。
解决方案 :延迟双删策略通过在数据库更新后再次删除缓存来解决这个问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;public class DelayedDoubleDeleteCacheService { private JedisPool jedisPool; public void updateWithDelayedDoubleDelete (String key, String value, long delayMs) { deleteCache(key); updateDatabase(key, value); scheduleDelayedDelete(key, delayMs); } private void deleteCache (String key) { try (Jedis jedis = jedisPool.getResource()) { jedis.del(key); } } private void updateDatabase (String key, String value) { } private void scheduleDelayedDelete (String key, long delayMs) { new Thread (() -> { try { Thread.sleep(delayMs); deleteCache(key); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }).start(); } }
时序分析 :
sequenceDiagram
participant Client
participant Cache
participant DB
participant AsyncDelete
Client->>Cache: 1. 删除缓存
Client->>DB: 2. 更新数据库
par 并发读请求
Client->>Cache: 3a. 读取缓存(miss)
Client->>DB: 3b. 读取数据库(旧值)
Client->>Cache: 3c. 写入缓存(旧值)
end
Client->>AsyncDelete: 4. 延迟N秒后删除缓存
AsyncDelete->>Cache: 5. 删除缓存
最佳实践 :
延迟时间应大于数据库主从同步延迟 + 业务读请求平均耗时
建议延迟时间设置为 500ms-2s
配合消息队列实现异步删除,避免线程阻塞
Canal/Binlog 订阅方案
问题描述 :业务代码中维护缓存一致性逻辑复杂,容易遗漏,且侵入性强。
解决方案 :通过监听 MySQL binlog,异步更新缓存,实现最终一致性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.protocol.CanalEntry;import com.alibaba.otter.canal.protocol.Message;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import java.net.InetSocketAddress;import java.util.List;public class CanalCacheSyncService { private CanalConnector connector; private JedisPool jedisPool; public void start () { connector = CanalConnectors.newSingleConnector( new InetSocketAddress ("127.0.0.1" , 11111 ), "example" , "" , "" ); connector.connect(); connector.subscribe(".*\\..*" ); connector.rollback(); while (true ) { Message message = connector.getWithoutAck(100 ); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId != -1 && size > 0 ) { processEntries(message.getEntries()); } connector.ack(batchId); } } private void processEntries (List<CanalEntry.Entry> entries) { for (CanalEntry.Entry entry : entries) { if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) { CanalEntry.RowChange rowChange; try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { continue ; } String tableName = entry.getHeader().getTableName(); CanalEntry.EventType eventType = rowChange.getEventType(); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { if (eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE) { handleInsertOrUpdate(tableName, rowData.getAfterColumnsList()); } else if (eventType == CanalEntry.EventType.DELETE) { handleDelete(tableName, rowData.getBeforeColumnsList()); } } } } } private void handleInsertOrUpdate (String tableName, List<CanalEntry.Column> columns) { String cacheKey = buildCacheKey(tableName, columns); String cacheValue = buildCacheValue(columns); try (Jedis jedis = jedisPool.getResource()) { jedis.setex(cacheKey, 3600 , cacheValue); } } private void handleDelete (String tableName, List<CanalEntry.Column> columns) { String cacheKey = buildCacheKey(tableName, columns); try (Jedis jedis = jedisPool.getResource()) { jedis.del(cacheKey); } } private String buildCacheKey (String tableName, List<CanalEntry.Column> columns) { return tableName + ":" + getColumnValue(columns, "id" ); } private String buildCacheValue (List<CanalEntry.Column> columns) { StringBuilder sb = new StringBuilder ("{" ); for (CanalEntry.Column column : columns) { sb.append("\"" ).append(column.getName()).append("\":\"" ) .append(column.getValue()).append("\"," ); } sb.deleteCharAt(sb.length() - 1 ); sb.append("}" ); return sb.toString(); } private String getColumnValue (List<CanalEntry.Column> columns, String columnName) { return columns.stream() .filter(c -> c.getName().equals(columnName)) .findFirst() .map(CanalEntry.Column::getValue) .orElse("" ); } }
架构图 :
graph LR
A[业务应用] -->|写操作| B[(MySQL)]
B -->|Binlog| C[Canal Server]
C -->|订阅| D[Canal Client]
D -->|解析| E[缓存更新服务]
E -->|更新/删除| F[(Redis)]
A -->|读操作| F
最佳实践 :
使用消息队列(如 Kafka)缓冲 binlog 事件,提高吞吐量
实现幂等性,避免重复更新
监控 binlog 延迟,及时发现同步问题
分布式事务与缓存
问题描述 :在分布式事务场景下,数据库和缓存的操作原子性难以保证。
解决方案 :使用 TCC(Try-Confirm-Cancel)模式或 Saga 模式,结合缓存补偿机制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.Pipeline;public class DistributedTransactionCacheService { private JedisPool jedisPool; public void updateWithTCC (String key, String value) { try { tryPhase(key, value); confirmPhase(key, value); } catch (Exception e) { cancelPhase(key); throw e; } } private void tryPhase (String key, String value) { try (Jedis jedis = jedisPool.getResource()) { jedis.setex(key + ":try" , 30 , "processing" ); String oldValue = jedis.get(key); if (oldValue != null ) { jedis.setex(key + ":old" , 30 , oldValue); } } } private void confirmPhase (String key, String value) { try (Jedis jedis = jedisPool.getResource()) { Pipeline pipeline = jedis.pipelined(); pipeline.setex(key, 3600 , value); pipeline.del(key + ":try" ); pipeline.del(key + ":old" ); pipeline.sync(); } } private void cancelPhase (String key) { try (Jedis jedis = jedisPool.getResource()) { Pipeline pipeline = jedis.pipelined(); String oldValue = jedis.get(key + ":old" ); if (oldValue != null ) { pipeline.setex(key, 3600 , oldValue); } else { pipeline.del(key); } pipeline.del(key + ":try" ); pipeline.del(key + ":old" ); pipeline.sync(); } } }
版本号/时间戳方案
问题描述 :并发更新时,旧数据可能覆盖新数据,导致缓存脏数据。
解决方案 :使用版本号或时间戳实现乐观锁机制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;public class VersionedCacheService { private JedisPool jedisPool; public boolean updateWithVersion (String key, String value, long expectedVersion) { try (Jedis jedis = jedisPool.getResource()) { String luaScript = "local currentVersion = redis.call('HGET', KEYS[1], 'version') " + "if currentVersion == false then " + " return 0 " + "end " + "if tonumber(currentVersion) == tonumber(ARGV[1]) then " + " redis.call('HMSET', KEYS[1], 'value', ARGV[2], 'version', ARGV[3]) " + " return 1 " + "else " + " return 0 " + "end" ; Long result = (Long) jedis.eval( luaScript, 1 , key + ":versioned" , String.valueOf(expectedVersion), value, String.valueOf(expectedVersion + 1 ) ); return result == 1 ; } } public VersionedValue getWithVersion (String key) { try (Jedis jedis = jedisPool.getResource()) { String value = jedis.hget(key + ":versioned" , "value" ); String version = jedis.hget(key + ":versioned" , "version" ); if (value == null ) { return null ; } return new VersionedValue (value, Long.parseLong(version)); } } public static class VersionedValue { private String value; private long version; public VersionedValue (String value, long version) { this .value = value; this .version = version; } public String getValue () { return value; } public long getVersion () { return version; } } }
Facebook Memcache 论文要点
Lease 机制 :防止多个客户端同时缓存未命中数据,导致数据库压力。
sequenceDiagram
participant Client1
participant Client2
participant Cache
participant DB
Client1->>Cache: 1. Get(key) miss
Cache->>Client1: 2. 返回 lease (1秒有效)
Client1->>DB: 3. 从DB加载数据
Client2->>Cache: 4. Get(key) miss
Cache->>Client2: 5. 返回 lease (等待中)
Client1->>Cache: 6. Set(key, value) with lease
Cache->>Client2: 7. 通知数据已更新
Client2->>Cache: 8. Get(key) hit
Cache->>Client2: 9. 返回数据
十一、热点探测与热 Key 治理
热点 Key 的定义和危害
定义 :单个 key 的访问频率远超平均水平,导致单个缓存节点压力过大。
危害 :
单节点 CPU/内存资源耗尽
网络带宽瓶颈
影响其他 key 的访问性能
可能导致整个缓存集群不可用
热点探测方案
客户端统计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.atomic.AtomicLong;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;public class ClientSideHotKeyDetector { private ConcurrentHashMap<String, AtomicLong> keyCounter = new ConcurrentHashMap <>(); private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1 ); private long hotThreshold = 1000 ; public ClientSideHotKeyDetector () { scheduler.scheduleAtFixedRate(this ::detectHotKeys, 1 , 1 , TimeUnit.SECONDS); } public void recordAccess (String key) { keyCounter.computeIfAbsent(key, k -> new AtomicLong (0 )).incrementAndGet(); } private void detectHotKeys () { long currentTime = System.currentTimeMillis(); keyCounter.forEach((key, count) -> { long qps = count.getAndSet(0 ); if (qps > hotThreshold) { System.out.println("检测到热点 Key: " + key + ", QPS: " + qps); handleHotKey(key); } }); } private void handleHotKey (String key) { } }
服务端统计(Redis hotkeys)
1 2 3 4 5 6 7 8 9 redis-cli --hotkeys
京东 HotKey 框架
架构图 :
graph TB
A[客户端应用] -->|上报访问| B[Worker节点]
A -->|查询热点| C[本地缓存]
B -->|聚合统计| D[Etcd集群]
D -->|推送热点| B
B -->|推送热点| A
D -->|持久化| E[(数据库)]
热 Key 治理方案
本地缓存兜底(L1 Cache)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import com.github.benmanes.caffeine.cache.Cache;import com.github.benmanes.caffeine.cache.Caffeine;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import java.util.concurrent.TimeUnit;public class TwoLevelCacheService { private JedisPool redisPool; private Cache<String, String> localCache; public TwoLevelCacheService () { localCache = Caffeine.newBuilder() .maximumSize(1000 ) .expireAfterWrite(10 , TimeUnit.SECONDS) .build(); } public String get (String key) { String value = localCache.getIfPresent(key); if (value != null ) { return value; } try (Jedis jedis = redisPool.getResource()) { value = jedis.get(key); if (value != null ) { localCache.put(key, value); } } return value; } public void set (String key, String value) { try (Jedis jedis = redisPool.getResource()) { jedis.setex(key, 3600 , value); } localCache.put(key, value); } }
Key 分散/打散
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import java.util.Random;public class KeyDistributeService { private JedisPool jedisPool; private static final int SHARD_COUNT = 10 ; private Random random = new Random (); public void writeDistributed (String baseKey, String value) { int shardId = random.nextInt(SHARD_COUNT); String distributedKey = baseKey + ":" + shardId; try (Jedis jedis = jedisPool.getResource()) { jedis.setex(distributedKey, 3600 , value); } } public String readDistributed (String baseKey) { int shardId = random.nextInt(SHARD_COUNT); String distributedKey = baseKey + ":" + shardId; try (Jedis jedis = jedisPool.getResource()) { return jedis.get(distributedKey); } } public String readAllShards (String baseKey) { for (int i = 0 ; i < SHARD_COUNT; i++) { String distributedKey = baseKey + ":" + i; try (Jedis jedis = jedisPool.getResource()) { String value = jedis.get(distributedKey); if (value != null ) { return value; } } } return null ; } }
对比表 :
方案
优点
缺点
适用场景
本地缓存
响应快,减轻Redis压力
数据不一致,内存占用大
读多写少,一致性要求不高
Key分散
负载均衡,无单点
读取复杂,可能读到旧数据
写多读少,可接受最终一致
读写分离
读写分离,性能好
架构复杂,主从延迟
读多写少,可接受延迟
十二、大 Key 问题与治理
大 Key 的定义
类型
大 Key 定义
String
值大小 > 10KB
Hash
字段数 > 5000
List
元素数 > 5000
Set
元素数 > 5000
Sorted Set
元素数 > 5000
大 Key 的危害
网络带宽 :大 Key 传输占用大量带宽,影响其他请求
内存碎片 :大 Key 申请大块内存,难以重用,导致碎片
慢查询 :大 Key 操作耗时,阻塞其他请求
主从同步延迟 :大 Key 同步到从节点慢,导致主从延迟
阻塞风险 :DEL、HDEL 等命令可能阻塞 Redis
发现手段
redis-cli --bigkeys
1 2 3 4 5 6 7 8 9 10 11 12 13 14 redis-cli --bigkeys
memory usage
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;public class BigKeyAnalyzer { private JedisPool jedisPool; public void analyzeKeySize (String key) { try (Jedis jedis = jedisPool.getResource()) { String type = jedis.type(key); long memory = jedis.memoryUsage(key); System.out.println("Key: " + key); System.out.println("Type: " + type); System.out.println("Memory: " + memory + " bytes (" + memory / 1024 + " KB)" ); switch (type) { case "string" : long len = jedis.strlen(key); System.out.println("Length: " + len); break ; case "hash" : long hlen = jedis.hlen(key); System.out.println("Hash fields: " + hlen); break ; case "list" : long llen = jedis.llen(key); System.out.println("List elements: " + llen); break ; case "set" : long scard = jedis.scard(key); System.out.println("Set members: " + scard); break ; case "zset" : long zcard = jedis.zcard(key); System.out.println("Sorted set members: " + zcard); break ; } } } }
治理方案
拆分大 Key
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import java.util.HashMap;import java.util.Map;public class BigKeySplitService { private JedisPool jedisPool; private static final int MAX_HASH_SIZE = 1000 ; public void splitBigHash (String bigKey) { try (Jedis jedis = jedisPool.getResource()) { Map<String, String> bigHash = jedis.hgetAll(bigKey); if (bigHash.size() <= MAX_HASH_SIZE) { return ; } int shardIndex = 0 ; Map<String, String> shard = new HashMap <>(); for (Map.Entry<String, String> entry : bigHash.entrySet()) { shard.put(entry.getKey(), entry.getValue()); if (shard.size() >= MAX_HASH_SIZE) { String shardKey = bigKey + ":" + shardIndex; jedis.hmset(shardKey, shard); shard.clear(); shardIndex++; } } if (!shard.isEmpty()) { String shardKey = bigKey + ":" + shardIndex; jedis.hmset(shardKey, shard); } jedis.del(bigKey); } } public Map<String, String> getFromSplitHash (String baseKey, String field) { try (Jedis jedis = jedisPool.getResource()) { int shardIndex = Math.abs(field.hashCode()) % 10 ; String shardKey = baseKey + ":" + shardIndex; String value = jedis.hget(shardKey, field); Map<String, String> result = new HashMap <>(); result.put(field, value); return result; } } }
异步删除(UNLINK)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;public class AsyncDeleteService { private JedisPool jedisPool; public void asyncDelete (String key) { try (Jedis jedis = jedisPool.getResource()) { jedis.unlink(key); } } public void batchAsyncDelete (String pattern) { try (Jedis jedis = jedisPool.getResource()) { String cursor = "0" ; do { var scanResult = jedis.scan(cursor); cursor = scanResult.getCursor(); for (String key : scanResult.getResult()) { jedis.unlink(key); } } while (!cursor.equals("0" )); } } }
压缩存储
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import java.util.Base64;import java.util.zip.Deflater;import java.util.zip.Inflater;public class CompressedCacheService { private JedisPool jedisPool; public void setCompressed (String key, String value) { try { byte [] compressed = compress(value); String encoded = Base64.getEncoder().encodeToString(compressed); try (Jedis jedis = jedisPool.getResource()) { jedis.setex(key, 3600 , encoded); } } catch (Exception e) { throw new RuntimeException ("压缩失败" , e); } } public String getCompressed (String key) { try (Jedis jedis = jedisPool.getResource()) { String encoded = jedis.get(key); if (encoded == null ) { return null ; } byte [] compressed = Base64.getDecoder().decode(encoded); return decompress(compressed); } catch (Exception e) { throw new RuntimeException ("解压缩失败" , e); } } private byte [] compress(String data) throws Exception { Deflater deflater = new Deflater (); deflater.setInput(data.getBytes()); deflater.finish(); byte [] buffer = new byte [data.length()]; int compressedSize = deflater.deflate(buffer); deflater.end(); byte [] result = new byte [compressedSize]; System.arraycopy(buffer, 0 , result, 0 , compressedSize); return result; } private String decompress (byte [] data) throws Exception { Inflater inflater = new Inflater (); inflater.setInput(data); byte [] buffer = new byte [data.length * 10 ]; int resultSize = inflater.inflate(buffer); inflater.end(); return new String (buffer, 0 , resultSize); } }
十三、多级缓存架构实战
经典三级架构
graph TB
A[用户请求] --> B[Nginx Lua]
B -->|L1 Cache| C{命中?}
C -->|是| D[返回数据]
C -->|否| E[本地缓存 Caffeine]
E -->|L2 Cache| F{命中?}
F -->|是| D
F -->|否| G[Redis]
G -->|L3 Cache| H{命中?}
H -->|是| D
H -->|否| I[数据库]
I -->|回源| G
G -->|回源| E
E -->|回源| B
OpenResty + Lua 实现接入层缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 local cache = ngx.shared.my_cachelocal function get_from_cache (key) local cached_data = cache:get(key) if cached_data then return cached_data end return nil end local function set_to_cache (key, value, ttl) local success, err = cache:set(key, value, ttl) if not success then ngx.log (ngx.ERR, "failed to set cache: " , err) end end local function fetch_from_upstream (key) local http = require "resty.http" local httpc = http.new() local res, err = httpc:request_uri("http://backend/api/data/" .. key, { method = "GET" , timeout = 5000 }) if not res then return nil , err end return res.bodyend local key = ngx.var.arg_key or "default" local cached_data = get_from_cache(key)if cached_data then ngx.say(cached_data) ngx.log (ngx.INFO, "cache hit for key: " , key)else local data, err = fetch_from_upstream(key) if data then set_to_cache(key, data, 300 ) ngx.say(data) ngx.log (ngx.INFO, "cache miss, fetched from upstream" ) else ngx.status = 500 ngx.say("failed to fetch data" ) ngx.log (ngx.ERR, "failed to fetch from upstream: " , err) end end
JetCache 框架
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import com.alicp.jetcache.Cache;import com.alicp.jetcache.anno.CacheType;import com.alicp.jetcache.anno.CacheInvalidate;import com.alicp.jetcache.anno.CachePut;import com.alicp.jetcache.anno.CacheRefresh;import com.alicp.jetcache.anno.Cached;import com.alicp.jetcache.anno.CreateCache;import org.springframework.stereotype.Service;@Service public class JetCacheUserService { @CreateCache(name = "userCache", expire = 3600, cacheType = CacheType.BOTH, localLimit = 1000) private Cache<Long, User> userCache; @Cached(name = "userCache:", key = "#userId", expire = 3600) public User getUserById (Long userId) { return userRepository.findById(userId); } @CachePut(name = "userCache:", key = "#user.id") public User updateUser (User user) { return userRepository.save(user); } @CacheInvalidate(name = "userCache:", key = "#userId") public void deleteUser (Long userId) { userRepository.deleteById(userId); } @Cached(name = "userCache:", key = "#userId", expire = 3600, cacheRefreshInterval = 60) @CacheRefresh(refreshInterval = 60, stopRefreshAfterLastAccess = 3600) public User getHotUserById (Long userId) { return userRepository.findById(userId); } }
多级缓存的一致性挑战
广播失效方案 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.listener.ChannelTopic;import org.springframework.data.redis.listener.RedisMessageListenerContainer;import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;import java.util.List;public class CacheInvalidationService { private RedisTemplate<String, String> redisTemplate; private RedisMessageListenerContainer listenerContainer; public void publishInvalidation (String cacheKey) { redisTemplate.convertAndSend("cache:invalidation" , cacheKey); } public void subscribeInvalidation (List<String> localCacheKeys) { MessageListenerAdapter listener = new MessageListenerAdapter ( new CacheInvalidationListener (localCacheKeys) ); listenerContainer.addMessageListener(listener, new ChannelTopic ("cache:invalidation" )); } public static class CacheInvalidationListener { private List<String> localCacheKeys; public CacheInvalidationListener (List<String> localCacheKeys) { this .localCacheKeys = localCacheKeys; } public void handleMessage (String message) { localCacheKeys.remove(message); System.out.println("Local cache invalidated: " + message); } } }
缓存预热策略
策略
优点
缺点
适用场景
全量预热
启动后立即可用
耗时长,占用资源
数据量小,启动时间不敏感
增量预热
快速启动
可能缓存穿透
数据量大,可接受渐进式加载
懒加载
无需预热
首次访问慢
数据量大,访问模式不确定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 import com.github.benmanes.caffeine.cache.Cache;import com.github.benmanes.caffeine.cache.Caffeine;import org.springframework.scheduling.annotation.Scheduled;import java.util.List;import java.util.concurrent.TimeUnit;public class CacheWarmupService { private Cache<String, String> cache; public CacheWarmupService () { cache = Caffeine.newBuilder() .maximumSize(10000 ) .expireAfterWrite(1 , TimeUnit.HOURS) .build(); } public void fullWarmup () { List<String> hotKeys = getHotKeysFromDatabase(); hotKeys.parallelStream().forEach(key -> { String value = loadFromDatabase(key); cache.put(key, value); }); System.out.println("Full warmup completed, " + hotKeys.size() + " keys loaded" ); } @Scheduled(fixedDelay = 300000) public void incrementalWarmup () { List<String> recentlyAccessedKeys = getRecentlyAccessedKeys(); recentlyAccessedKeys.forEach(key -> { if (cache.getIfPresent(key) == null ) { String value = loadFromDatabase(key); cache.put(key, value); } }); } private List<String> getHotKeysFromDatabase () { return List.of("key1" , "key2" , "key3" ); } private List<String> getRecentlyAccessedKeys () { return List.of("key1" , "key4" , "key5" ); } private String loadFromDatabase (String key) { return "value_for_" + key; } }
十四、Redis 集群模式下的缓存设计
Redis Cluster 的数据分片
Redis Cluster 使用 CRC16 算法对 key 进行哈希,计算结果对 16384 取模,确定 key 所在的 slot。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import redis.clients.jedis.ClusterCRC16;public class RedisClusterSlotCalculator { public static int calculateSlot (String key) { String hashTag = extractHashTag(key); String keyToHash = hashTag != null ? hashTag : key; int slot = ClusterCRC16.getSlot(keyToHash); return slot; } private static String extractHashTag (String key) { int start = key.indexOf('{' ); if (start == -1 ) { return null ; } int end = key.indexOf('}' , start); if (end == -1 ) { return null ; } return key.substring(start + 1 , end); } public static void main (String[] args) { String key1 = "user:1001:profile" ; String key2 = "{user:1001}:profile" ; String key3 = "{user:1001}:orders" ; System.out.println("Slot for " + key1 + ": " + calculateSlot(key1)); System.out.println("Slot for " + key2 + ": " + calculateSlot(key2)); System.out.println("Slot for " + key3 + ": " + calculateSlot(key3)); } }
跨 slot 操作的限制
在 Redis Cluster 中,不支持跨 slot 的多 key 操作 ,如 MGET、MSET、SUNION 等。
解决方案 :
使用 HashTag :确保相关 key 在同一个 slot
客户端分片 :在客户端按 slot 分组,批量执行
使用 Hash 结构 :将相关数据存储在一个 Hash 中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 import redis.clients.jedis.JedisCluster;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.stream.Collectors;public class RedisClusterMultiKeyService { private JedisCluster jedisCluster; public void setWithHashTag (String userId, String profile, String orders) { String profileKey = "{user:" + userId + "}:profile" ; String ordersKey = "{user:" + userId + "}:orders" ; jedisCluster.set(profileKey, profile); jedisCluster.set(ordersKey, orders); List<String> values = jedisCluster.mget(profileKey, ordersKey); } public Map<String, String> smartMget (List<String> keys) { Map<Integer, List<String>> slotGroups = keys.stream() .collect(Collectors.groupingBy(RedisClusterSlotCalculator::calculateSlot)); Map<String, String> result = new HashMap <>(); for (Map.Entry<Integer, List<String>> entry : slotGroups.entrySet()) { List<String> slotKeys = entry.getValue(); List<String> values = jedisCluster.mget(slotKeys.toArray(new String [0 ])); for (int i = 0 ; i < slotKeys.size(); i++) { result.put(slotKeys.get(i), values.get(i)); } } return result; } public void setAsHash (String userId, Map<String, String> fields) { String hashKey = "user:" + userId; Map<String, String> hashData = new HashMap <>(); hashData.put("profile" , fields.get("profile" )); hashData.put("orders" , fields.get("orders" )); hashData.put("preferences" , fields.get("preferences" )); jedisCluster.hmset(hashKey, hashData); } public Map<String, String> getAsHash (String userId) { String hashKey = "user:" + userId; return jedisCluster.hgetAll(hashKey); } }
HashTag 的使用
场景 :需要原子操作多个相关的 key,如用户资料和订单。
graph LR
A[用户请求] --> B{HashTag?}
B -->|是| C[计算 hash tag 值]
B -->|否| D[计算 key 值]
C --> E[CRC16 哈希]
D --> E
E --> F[取模 16384]
F --> G[确定 slot]
G --> H[路由到对应节点]
最佳实践 :
只在需要多 key 原子操作时使用 HashTag
HashTag 会破坏数据分布均匀性,不要滥用
HashTag 应该是业务相关的标识,如用户 ID、订单 ID
集群扩缩容对缓存的影响
Slot 迁移过程 :
sequenceDiagram
participant Client
participant SourceNode
participant TargetNode
participant MigrateTool
Client->>SourceNode: 读写 key(slot 1000)
MigrateTool->>SourceNode: 开始迁移 slot 1000
SourceNode->>TargetNode: 迁移数据
SourceNode->>Client: 返回 ASK 重定向
Client->>TargetNode: 发送 ASKING 命令
Client->>TargetNode: 读写 key
MigrateTool->>SourceNode: 迁移完成
MigrateTool->>TargetNode: slot 1000 归属更新
Client->>TargetNode: 正常读写
应对策略 :
迁移期间 :客户端支持 ASK 重定向
缓存预热 :迁移完成后预热新节点
监控 :监控迁移进度和性能指标
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import redis.clients.jedis.exceptions.JedisAskDataException;import redis.clients.jedis.exceptions.JedisMovedDataException;import redis.clients.jedis.JedisCluster;public class RedisClusterMigrationAwareClient { private JedisCluster jedisCluster; public String getWithRetry (String key) { int maxRetries = 3 ; for (int i = 0 ; i < maxRetries; i++) { try { return jedisCluster.get(key); } catch (JedisMovedDataException e) { System.out.println("Slot moved, retrying..." ); jedisCluster.renewSlotCache(); } catch (JedisAskDataException e) { System.out.println("Slot migrating, asking target node..." ); } } throw new RuntimeException ("Failed to get key after " + maxRetries + " retries" ); } }
Redis Sentinel vs Cluster 的选型
特性
Redis Sentinel
Redis Cluster
数据分片
否
是(16384 slots)
水平扩展
需要客户端分片
原生支持
最大内存
单节点限制
集群总内存
故障转移
自动
自动
复杂度
较低
较高
多 key 操作
支持
受限(需 HashTag)
适用场景
中小规模,简单架构
大规模,高性能需求
十五、缓存的安全与运维
缓存数据安全
敏感数据加密存储 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import javax.crypto.Cipher;import javax.crypto.spec.SecretKeySpec;import java.util.Base64;public class SecureCacheService { private JedisPool jedisPool; private static final String SECRET_KEY = "my-secret-key-123" ; private static final String ALGORITHM = "AES" ; public void setSecure (String key, String sensitiveValue) { try { String encrypted = encrypt(sensitiveValue); try (Jedis jedis = jedisPool.getResource()) { jedis.setex(key, 3600 , encrypted); } } catch (Exception e) { throw new RuntimeException ("加密失败" , e); } } public String getSecure (String key) { try (Jedis jedis = jedisPool.getResource()) { String encrypted = jedis.get(key); if (encrypted == null ) { return null ; } return decrypt(encrypted); } catch (Exception e) { throw new RuntimeException ("解密失败" , e); } } private String encrypt (String data) throws Exception { SecretKeySpec keySpec = new SecretKeySpec (SECRET_KEY.getBytes(), ALGORITHM); Cipher cipher = Cipher.getInstance(ALGORITHM); cipher.init(Cipher.ENCRYPT_MODE, keySpec); byte [] encrypted = cipher.doFinal(data.getBytes()); return Base64.getEncoder().encodeToString(encrypted); } private String decrypt (String encrypted) throws Exception { SecretKeySpec keySpec = new SecretKeySpec (SECRET_KEY.getBytes(), ALGORITHM); Cipher cipher = Cipher.getInstance(ALGORITHM); cipher.init(Cipher.DECRYPT_MODE, keySpec); byte [] decoded = Base64.getDecoder().decode(encrypted); byte [] decrypted = cipher.doFinal(decoded); return new String (decrypted); } }
TTL 控制最佳实践 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;public class TTLManagementService { private JedisPool jedisPool; public void setWithTTLStrategy (String key, String value, TTLStrategy strategy) { try (Jedis jedis = jedisPool.getResource()) { int ttl = strategy.getTTL(); jedis.setex(key, ttl, value); System.out.println("Set key " + key + " with TTL: " + ttl + "s, strategy: " + strategy); } } public enum TTLStrategy { SHORT(60 ), MEDIUM(300 ), LONG(3600 ), VERY_LONG(86400 ); private final int seconds; TTLStrategy(int seconds) { this .seconds = seconds; } public int getTTL () { return seconds; } } public void monitorExpiringKeys () { try (Jedis jedis = jedisPool.getResource()) { } } }
缓存容量告警
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.info.Info;import redis.clients.jedis.info.ServerInfo;import java.util.Map;public class CacheMonitoringService { private JedisPool jedisPool; public void monitorCapacity () { try (Jedis jedis = jedisPool.getResource()) { String info = jedis.info("memory" ); Map<String, String> memoryInfo = parseInfo(info); long usedMemory = Long.parseLong(memoryInfo.get("used_memory" )); long maxMemory = Long.parseLong(memoryInfo.get("maxmemory" )); double usagePercent = (double ) usedMemory / maxMemory * 100 ; System.out.println("Memory Usage: " + String.format("%.2f" , usagePercent) + "%" ); String stats = jedis.info("stats" ); Map<String, String> statsInfo = parseInfo(stats); long evictedKeys = Long.parseLong(statsInfo.get("evicted_keys" )); System.out.println("Evicted Keys: " + evictedKeys); if (usagePercent > 80 ) { sendAlert("内存使用率过高: " + String.format("%.2f" , usagePercent) + "%" ); } if (evictedKeys > 1000 ) { sendAlert("驱逐次数过多: " + evictedKeys); } } } private Map<String, String> parseInfo (String info) { Map<String, String> result = new HashMap <>(); String[] lines = info.split("\r?\n" ); for (String line : lines) { if (line.contains(":" ) && !line.startsWith("#" )) { String[] parts = line.split(":" ); result.put(parts[0 ], parts[1 ]); } } return result; } private void sendAlert (String message) { System.out.println("ALERT: " + message); } }
慢查询排查
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;public class SlowQueryAnalyzer { private JedisPool jedisPool; public void analyzeSlowQueries () { try (Jedis jedis = jedisPool.getResource()) { var slowLog = jedis.slowlogGet(10 ); for (var entry : slowLog) { long id = entry.getId(); long timestamp = entry.getTimeStamp(); long duration = entry.getDuration(); String command = entry.getArguments().toString(); System.out.println("Slow Query ID: " + id); System.out.println("Time: " + timestamp); System.out.println("Duration: " + duration + " μs" ); System.out.println("Command: " + command); System.out.println("---" ); } } } public void configureSlowLog (long thresholdMicros) { try (Jedis jedis = jedisPool.getResource()) { jedis.configSet("slowlog-log-slower-than" , String.valueOf(thresholdMicros)); jedis.configSet("slowlog-max-len" , "128" ); } } }
缓存灾备
主从切换 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.exceptions.JedisConnectionException;public class CacheFailoverService { private JedisPool masterPool; private JedisPool slavePool; private volatile JedisPool currentPool; public CacheFailoverService (JedisPool masterPool, JedisPool slavePool) { this .masterPool = masterPool; this .slavePool = slavePool; this .currentPool = masterPool; } public String getWithFailover (String key) { int maxRetries = 3 ; for (int i = 0 ; i < maxRetries; i++) { try { return getFromCurrentPool(key); } catch (JedisConnectionException e) { System.out.println("Connection failed, switching pool..." ); switchPool(); } catch (Exception e) { System.out.println("Error getting key: " + e.getMessage()); if (i == maxRetries - 1 ) { throw e; } } } return null ; } private String getFromCurrentPool (String key) { try (Jedis jedis = currentPool.getResource()) { return jedis.get(key); } } private synchronized void switchPool () { if (currentPool == masterPool) { currentPool = slavePool; System.out.println("Switched to slave pool" ); } else { currentPool = masterPool; System.out.println("Switched to master pool" ); } } }
跨机房容灾架构 :
graph TB
A[用户请求] --> B[负载均衡]
B --> C[机房A - Redis Master]
B --> D[机房B - Redis Slave]
C -->|同步| D
C --> E[机房A - 应用]
D --> F[机房B - 应用]
C -->|异步复制| G[机房C - 冷备]
D -->|异步复制| G
冷备恢复方案 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import java.io.File;import java.io.FileInputStream;import java.io.FileOutputStream;public class CacheBackupRestoreService { private JedisPool jedisPool; public void createSnapshot (String backupPath) { try (Jedis jedis = jedisPool.getResource()) { String result = jedis.bgsave(); System.out.println("BGSAVE command sent: " + result); waitForBackupComplete(jedis); File rdbFile = new File ("/var/lib/redis/dump.rdb" ); File backupFile = new File (backupPath + "/dump_" + System.currentTimeMillis() + ".rdb" ); copyFile(rdbFile, backupFile); System.out.println("Backup created: " + backupFile.getAbsolutePath()); } } private void waitForBackupComplete (Jedis jedis) { while (true ) { String info = jedis.info("persistence" ); if (info.contains("rdb_bgsave_in_progress:0" )) { break ; } try { Thread.sleep(1000 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break ; } } } private void copyFile (File source, File dest) throws Exception { try (FileInputStream fis = new FileInputStream (source); FileOutputStream fos = new FileOutputStream (dest)) { byte [] buffer = new byte [1024 ]; int length; while ((length = fis.read(buffer)) > 0 ) { fos.write(buffer, 0 , length); } } } public void restoreFromBackup (String backupPath) { System.out.println("Restore operation requires Redis restart" ); System.out.println("Steps:" ); System.out.println("1. Stop Redis server" ); System.out.println("2. Replace dump.rdb with backup file" ); System.out.println("3. Start Redis server" ); } }
参考文献:
《Guava Cache》
美团技术团队的《缓存那些事》
例子很多:《caffeine vs ehcache》