将相似或重复请求在上游系统中合并后发往下游系统,可以大大降低下游系统的负载,提升系统整体吞吐率。文章介绍了 hystrix collapser、ConcurrentHashMultiset、自实现BatchCollapser 三种请求合并技术,并通过其具体实现对比各自适用的场景。
工作中,我们常见的请求模型都是”请求-应答”式,即一次请求中,服务给请求分配一个独立的线程,一块独立的内存空间,所有的操作都是独立的,包括资源和系统运算。我们也知道,在请求中处理一次系统 I/O 的消耗是非常大的,如果有非常多的请求都进行同一类 I/O 操作,那么是否可以将这些 I/O 操作都合并到一起,进行一次 I/O 操作,是否可以大大降低下游资源服务器的负担呢?
最近我工作之余的大部分时间都花在这个问题的探究上了,对比了几个现有类库,为了解决一个小问题把 hystrix javanica 的代码翻了一遍,也根据自己工作中遇到的业务需求实现了一个简单的合并类,收获还是挺大的。可能这个需求有点”偏门”,在网上搜索结果并不多,也没有综合一点的资料,索性自己总结分享一下,希望能帮到后来遇到这种问题的小伙伴。
开源的请求合并类库(知名的)好像也只有 Netflix 公司开源的 Hystrix 了, hystrix 专注于保持 WEB 服务器在高并发环境下的系统稳定,我们常用它的熔断器(Circuit Breaker) 来实现服务的服务隔离和灾时降级,有了它,可以使整个系统不至于被某一个接口的高并发洪流冲塌,即使接口挂了也可以将服务降级,返回一个人性化的响应。请求合并作为一个保障下游服务稳定的利器,在 hystrix 内实现也并不意外。
我们在使用 hystrix 时,常用它的 javanica 模块,以注解的方式编写 hystrix 代码,使代码更简洁而且对业务代码侵入更低。所以在项目中我们一般至少需要引用 hystrix-core 和 hystrix-javanica 两个包。
另外,hystrix 的实现都是通过 AOP,我们要还要在项目 xml 里显式配置 HystrixAspect 的 bean 来启用它。
<aop:aspectj-autoproxy/> <bean id="hystrixAspect" class="com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect" />
hystrix collapser 是 hystrix 内的请求合并器,它有自定义 BatchMethod 和 注解两种实现方式,自定义 BatchMethod 网上有各种教程,实现起来很复杂,需要手写大量代码,而注解方式只需要添加两行注解即可,但配置方式我在官方文档上也没找见,中文方面本文应该是独一份儿了。
其实现需要注意的是:
下面是一个简单的示例:
public class HystrixCollapserSample { @HystrixCollapser(batchMethod = "batch") public Future<Boolean> single(String input) { return null; // single方法不会被执行到 } public List<Boolean> batch(List<String> inputs) { return inputs.stream().map(it -> Boolean.TRUE).collect(Collectors.toList()); } }
为了解决 hystrix collapser 的配置问题看了下 hystrix javanica 的源码,这里简单总结一下 hystrix 请求合并器的具体实现,源码的详细解析在我的笔记:Hystrix collasper 源码解析。
需要注意,由于需要等待 timer 执行真正的请求操作,collapser 会导致所有的请求的 cost 都会增加约 timerInterval/2 ms;
hystrix collapser 的配置需要在 @HystrixCollapser 注解上使用,主要包括两个部分,专有配置和 hystrixCommand 通用配置;
专有配置包括:
通用配置包括:
一个完整的配置如下:
@HystrixCollapser( batchMethod = "batch", collapserKey = "single", scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL, collapserProperties = { @HystrixProperty(name = "maxRequestsInBatch", value = "100"), @HystrixProperty(name = "timerDelayInMilliseconds", value = "1000"), @HystrixProperty(name = "requestCache.enabled", value = "true") })
由于业务需求,我们并不太关心被合并请求的返回值,而且觉得 hystrix 保持那么多的 Future 并没有必要,于是自己实现了一个简单的请求合并器,业务线程简单地将请求放到一个容器里,请求数累积到一定量或延迟了一定的时间,就取出容器内的数据统一发送给下游系统。
设计思想跟 hystrix 类似,合并器有一个字段作为存储请求的容器,且设置一个 timer 线程定时消费容器内的请求,业务线程将请求参数提交到合并 器的容器内。不同之处在于,业务线程将请求提交给容器后立即同步返回成功,不必管请求的消费结果,这样便实现了时间维度上的合并触发。
另外,我还添加了另外一个维度的触发条件,每次将请求参数添加到容器后都会检验一下容器内请求的数量,如果数量达到一定的阈值,将在业务线程内合并执行一次。
由于有两个维度会触发合并,就不可避免会遇到线程安全问题。为了保证容器内的请求不会被多个线程重复消费或都漏掉,我需要一个容器能满足以下条件:
java.util.concurrent 包内的 LinkedBlockingDeque 刚好符合要求,首先它实现了 BlockingDeque 接口,多线程环境下的存取操作是安全的;此外,它还提供 drainTo(Collection<? super E> c, int maxElements)方法,可以将容器内 maxElements 个元素安全地取出来,放到 Collection c 中。
以下是具体的代码实现:
public class BatchCollapser<E> implements InitializingBean { private static final Logger logger = LoggerFactory.getLogger(BatchCollapser.class); private static volatile Map<Class, BatchCollapser> instance = Maps.newConcurrentMap(); private static final ScheduledExecutorService SCHEDULE_EXECUTOR = Executors.newScheduledThreadPool(1); private volatile LinkedBlockingDeque<E> batchContainer = new LinkedBlockingDeque<>(); private Handler<List<E>, Boolean> cleaner; private long interval; private int threshHold; private BatchCollapser(Handler<List<E>, Boolean> cleaner, int threshHold, long interval) { this.cleaner = cleaner; this.threshHold = threshHold; this.interval = interval; } @Override public void afterPropertiesSet() throws Exception { SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> { try { this.clean(); } catch (Exception e) { logger.error("clean container exception", e); } }, 0, interval, TimeUnit.MILLISECONDS); } public void submit(E event) { batchContainer.add(event); if (batchContainer.size() >= threshHold) { clean(); } } private void clean() { List<E> transferList = Lists.newArrayListWithExpectedSize(threshHold); batchContainer.drainTo(transferList, 100); if (CollectionUtils.isEmpty(transferList)) { return; } try { cleaner.handle(transferList); } catch (Exception e) { logger.error("batch execute error, transferList:{}", transferList, e); } } public static <E> BatchCollapser getInstance(Handler<List<E>, Boolean> cleaner, int threshHold, long interval) { Class jobClass = cleaner.getClass(); if (instance.get(jobClass) == null) { synchronized (BatchCollapser.class) { if (instance.get(jobClass) == null) { instance.put(jobClass, new BatchCollapser<>(cleaner, threshHold, interval)); } } } return instance.get(jobClass); } }
以下代码内需要注意的点:
上面介绍的请求合并都是将多个请求一次发送,下游服务器处理时本质上还是多个请求,最好的请求合并是在内存中进行,将请求结果简单合并成一个发送给下游服务器。如我们经常会遇到的需求:元素分值累加或数据统计,就可以先在内存中将某一项的分值或数据累加起来,定时请求数据库保存。
Guava 内就提供了这么一种数据结构:ConcurrentHashMultiset,它不同于普通的 set 结构存储相同元素时直接覆盖原有元素,而是给每个元素保持一个计数 count, 插入重复时元素的 count 值加1。而且它在添加和删除时并不加锁也能保证线程安全,具体实现是通过一个 while(true) 循环尝试操作,直到操作够所需要的数量。
ConcurrentHashMultiset 这种排重计数的特性,非常适合数据统计这种元素在短时间内重复率很高的场景,经过排重后的数量计算,可以大大降低下游服务器的压力,即使重复率不高,能用少量的内存空间换取系统可用性的提高,也是很划算的。
使用 ConcurrentHashMultiset 进行请求合并与使用普通容器在整体结构上并无太大差异,具体类似于:
if (ConcurrentHashMultiset.isEmpty()) { return; } List<Request> transferList = Lists.newArrayList(); ConcurrentHashMultiset.elementSet().forEach(request -> { int count = ConcurrentHashMultiset.count(request); if (count <= 0) { return; } transferList.add(count == 1 ? request : new Request(request.getIncrement() * count)); ConcurrentHashMultiset.remove(request, count); });
最后总结一下各个技术适用的场景:
另外,如果选择自己来实现的话,完全可以将 BatchCollapser 和 ConcurrentHashMultiset 结合一下,在BatchCollapser 里使用 ConcurrentHashMultiset 作为容器,这样就可以结合两者的优势了。
本文链接://www.dmpip.com//www.dmpip.com/showinfo-26-89407-0.html请求合并的三种技巧,性能起飞!
声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com