限流想必大家都不陌生,它是一种控制资源访问速率的策略,用于保护系统免受过载和崩溃的风险。限流可以控制某个服务、接口或系统在一段时间内能够处理的请求或数据量,以防止系统资源耗尽、性能下降或服务不可用。
常见的限流策略有以下几种:
通过合理的限流策略,可以保护系统免受恶意攻击、突发流量和资源滥用的影响,确保系统稳定和可靠运行。在实际应用中,限流常用于接口防刷、防止 DDoS 攻击、保护关键服务等场景。
在 Java 中,限流的实现方式有很多种,例如以下这些:
所谓的自适应限流是结合应用的 Load、CPU 使用率、总体平均 RT、入口 QPS 和并发线程数等几个维度的监控指标,通过自适应的流控策略,让系统的入口流量和系统的负载达到一个平衡,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性。
类似的实现思路还有很多,如,自适应自旋锁、还有 K8S 中根据负载进行动态扩容等。
以 Sentinel 中的自适应限流来说,它的实现思路是用负载(load1)作为启动控制流量的值,而允许通过的流量由处理请求的能力,即请求的响应时间以及当前系统正在处理的请求速率来决定。
为什么要这样设计?
长期以来,系统自适应保护的思路是根据硬指标,即系统的负载 (load1) 来做系统过载保护。当系统负载高于某个阈值,就禁止或者减少流量的进入;当 load 开始好转,则恢复流量的进入。这个思路给我们带来了不可避免的两个问题:
TCP BBR 的思想给了我们一个很大的启发。我们应该根据系统能够处理的请求,和允许进来的请求,来做平衡,而不是根据一个间接的指标(系统 load)来做限流。最终我们追求的目标是 在系统不被拖垮的情况下,提高系统的吞吐率,而不是 load 一定要到低于某个阈值。如果我们还是按照固有的思维,超过特定的 load 就禁止流量进入,系统 load 恢复就放开流量,这样做的结果是无论我们怎么调参数,调比例,都是按照果来调节因,都无法取得良好的效果。 所以,Sentinel 在系统自适应限流的做法是,用 load1 作为启动控制流量的值,而允许通过的流量由处理请求的能力,即请求的响应时间以及当前系统正在处理的请求速率来决定。
Sentinel 是从单台机器的总体 Load、RT、入口 QPS 和线程数四个维度监控应用数据,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性。
系统保护规则是应用整体维度的,而不是资源维度的,并且仅对入口流量生效。入口流量指的是进入应用的流量(EntryType.IN),比如 Web 服务或 Dubbo 服务端接收的请求,都属于入口流量。
注意:系统规则只对入口流量起作用(调用类型为 EntryType.IN),对出口流量无效。可通过 SphU.entry(res, entryType) 指定调用类型,如果不指定,默认是 EntryType.OUT。
Sentinel 支持以下的阈值规则:
在 Sentinel 中,可以通过系统规则 -> 新增系统规则,设置阈值以实现自适应限流功能,如下图所示:
先用经典图来镇楼:
我们把系统处理请求的过程想象为一个水管,到来的请求是往这个水管灌水,当系统处理顺畅的时候,请求不需要排队,直接从水管中穿过,这个请求的RT是最短的;反之,当请求堆积的时候,那么处理请求的时间则会变为:排队时间 + 最短处理时间。
推论一:如果我们能够保证水管里的水量,能够让水顺畅的流动,则不会增加排队的请求;也就是说,这个时候的系统负载不会进一步恶化。
我们用 T 来表示(水管内部的水量),用 RT 来表示请求的处理时间,用P来表示进来的请求数,那么一个请求从进入水管道到从水管出来,这个水管会存在 P * RT 个请求。换一句话来说,当 T ≈ QPS * Avg(RT) 的时候,我们可以认为系统的处理能力和允许进入的请求个数达到了平衡,系统的负载不会进一步恶化。
接下来的问题是,水管的水位是可以达到了一个平衡点,但是这个平衡点只能保证水管的水位不再继续增高,但是还面临一个问题,就是在达到平衡点之前,这个水管里已经堆积了多少水。如果之前水管的水已经在一个量级了,那么这个时候系统允许通过的水量可能只能缓慢通过,RT 会大,之前堆积在水管里的水会滞留;反之,如果之前的水管水位偏低,那么又会浪费了系统的处理能力。
推论二:当保持入口的流量使水管出来的流量达到最大值的时候,可以最大利用水管的处理能力。
然而,和 TCP BBR 的不一样的地方在于,还需要用一个系统负载的值(load1)来激发这套机制启动。
注:这种系统自适应算法对于低 load 的请求,它的效果是一个“兜底”的角色。对于不是应用本身造成的 load 高的情况(如其它进程导致的不稳定的情况),效果不明显。
以 Sentinel 官方提供的自适应限流代码为例,我们可以再来了解一下它的具体使用:
/* * Copyright 1999-2018 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package com.alibaba.csp.sentinel.demo.system;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;import com.alibaba.csp.sentinel.util.TimeUtil;import com.alibaba.csp.sentinel.Entry;import com.alibaba.csp.sentinel.EntryType;import com.alibaba.csp.sentinel.SphU;import com.alibaba.csp.sentinel.slots.block.BlockException;import com.alibaba.csp.sentinel.slots.system.SystemRule;import com.alibaba.csp.sentinel.slots.system.SystemRuleManager;/** * @author jialiang.linjl */public class SystemGuardDemo { private static AtomicInteger pass = new AtomicInteger(); private static AtomicInteger block = new AtomicInteger(); private static AtomicInteger total = new AtomicInteger(); private static volatile boolean stop = false; private static final int threadCount = 100; private static int seconds = 60 + 40; public static void main(String[] args) throws Exception { tick(); initSystemRule(); for (int i = 0; i < threadCount; i++) { Thread entryThread = new Thread(new Runnable() { @Override public void run() { while (true) { Entry entry = null; try { entry = SphU.entry("methodA", EntryType.IN); pass.incrementAndGet(); try { TimeUnit.MILLISECONDS.sleep(20); } catch (InterruptedException e) { // ignore } } catch (BlockException e1) { block.incrementAndGet(); try { TimeUnit.MILLISECONDS.sleep(20); } catch (InterruptedException e) { // ignore } } catch (Exception e2) { // biz exception } finally { total.incrementAndGet(); if (entry != null) { entry.exit(); } } } } }); entryThread.setName("working-thread"); entryThread.start(); } } private static void initSystemRule() { SystemRule rule = new SystemRule(); // max load is 3 rule.setHighestSystemLoad(3.0); // max cpu usage is 60% rule.setHighestCpuUsage(0.6); // max avg rt of all request is 10 ms rule.setAvgRt(10); // max total qps is 20 rule.setQps(20); // max parallel working thread is 10 rule.setMaxThread(10); SystemRuleManager.loadRules(Collections.singletonList(rule)); } private static void tick() { Thread timer = new Thread(new TimerTask()); timer.setName("sentinel-timer-task"); timer.start(); } static class TimerTask implements Runnable { @Override public void run() { System.out.println("begin to statistic!!!"); long oldTotal = 0; long oldPass = 0; long oldBlock = 0; while (!stop) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } long globalTotal = total.get(); long oneSecondTotal = globalTotal - oldTotal; oldTotal = globalTotal; long globalPass = pass.get(); long oneSecondPass = globalPass - oldPass; oldPass = globalPass; long globalBlock = block.get(); long oneSecondBlock = globalBlock - oldBlock; oldBlock = globalBlock; System.out.println(seconds + ", " + TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal + ", pass:" + oneSecondPass + ", block:" + oneSecondBlock); if (seconds-- <= 0) { stop = true; } } System.exit(0); } }}
本文链接://www.dmpip.com//www.dmpip.com/showinfo-26-90038-0.html阿里面试:说说自适应限流?
声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com