目的
提升高频随机请求的吞吐量,减轻被依赖方(如db、下游微服务等)压力,且不产生数据一致性问题
思路
按请求特征+时间桶+请求数上限归类,同类请求合并为一个批处理执行,执行完逐一告知各请求结果
优点
- 充分利用特定场景的特性:批量吞吐量 > 随机请求吞吐量,减少额外开销
如:mysql insert-values 批量插入、同分片批量读写等
- 不侵入调用方
缺点
- 线程上下文无法有效传递,导致部分功能无法生效,如本地事务、调用链等
- 单次请求耗时有所增加:时间桶+批处理执行耗时
假代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| Object key = "请求特征";
var map = new ConcurrentHashMap<Object, MpscArrayQueue>(); Queue queue; int afterMargin; var future = new CompletableFuture(); while(true){ if((queue=map.get(key)) == null) { queue = new MpscArrayQueue(队列容量); Queue oldQueue = map.putIfAbsent(key, queue); if(oldQueue !=null){ queue = oldQueue; } } if((idx=queue.offer(请求入参+future)) > 0) { break; } map.remove(key, queue); Thread.onSpinWait(); } if(afterMargin == 队列容量-1) { 创建延时任务(时间桶刻度) { if(禁止队列生产 == false) { return; } map.remove(key, queue); 遍历队列; 执行批处理逻辑; 结果逐一返回给对应future; } }else if(afterMargin == 0 && 禁止队列生产 == true) { map.remove(key, queue); 尝试取消延时任务 遍历队列; 执行批处理逻辑; 结果逐一返回给对应future; } return future;
|
上述假代码的线程安全,由7个cas+1个自旋解决:
- cas1: 将
队列 初始化放到 请求特征集合
- cas2:
请求参数加入 队列
- cas3: 加入
队列失败(满/禁止生产),将队列从请求特征集合中移除
- cas4: 禁止队列生产
- cas5: 禁止队列生产后,将
队列从请求特征集合中移除
- cas6: 尝试取消延时任务
- cas7: 批处理逻辑执行后,将结果一一返回给队列的每个请求future
- 自旋1: MpscArrayQueue#offer()不提供返回余量,自行实现,则要自旋遍历加入队列
真实代码
待上传maven中央仓库