前言

关于这篇文章起因,最近计划看一下《Effective Java》,然后我了解到此书的作者同样也是 Google Guava 库的作者,作为一个练习2年的 Java 的程序员,还没有研究过 Guava,属实有点说不过去,借此机会研究了一下 Google Guava。

先简单说一下 Google Guava 库,它不仅仅是 JDK 的升级库,诸如包含集合(collections)、缓存(caching)、并发库(concurrency libraries)、原生类型支持(primitives support)、字符串处理(string processing)、I/O 库等,还是《Effective Java》这本书中那些优秀经验的实践代表,两者应该结合起来阅读学习。在 Guava 中有两个实现我很感兴趣,一个是它实现的 Bloom Filter(布隆过滤器),另外一个就是本篇文章即将介绍的 RateLimiter(限流器)。

我们平时在开发高并发的分布式系统时,会借助许多手段来满足系统的并发要求,我总结下来大致分为下面几个方面:

  • 缓存:通过缓存系统可以减少对底层数据存储系统的频繁访问,从而有效地提高系统的访问能力。从前端的浏览器,到网络,在到后端的服务,底层的数据库、文件系统、硬盘和CPU,全都都有设计缓存,这是提高快速访问能力的有效手段。
  • 负载均衡:通过负载均衡可以将请求分摊到多个服务器上,避免单点故障和资源瓶颈,它是水平扩展的关键技术。
  • 异步调用:异步调用的一个主要的技术手段就是消息队列,通过消息队列实现异步任务处理、系统解耦、削峰填谷等。
  • 流量控制:通过限制系统等请求量和并发量,防止系统过载。常见的流量控制的手段包括限流、熔断和降级。

常见的限流实现方式

计数器算法

这是最基本、最粗暴的算法。一般来说,该算法会会维护一个计数器,当处理请求时,就对计数器做加一操作,当你请求处理完时,就对计数器做减一操作,如果计数器达到某个数量(预先设定的阀值),则按照限流的规则处理,防止系统过载。

采用队列的算法

这个算法有点像 FIFO,请求有快有慢,所有的请求都放入队列中,消费端以固定的速率从消息队列中取出请求消费,从而控制整个系统的处理流量,以下就是这个算法的示意图:

当然,我们可以在上面这个算法的基础上,延伸出其他更为高阶的用法,例如利用优先级队列实现一个具有优先级的流量控制算法。例如针对不同的请求,处理时先处理优先级高的队列,等优先级高的队列处理完了再处理优先级低的队列;再比如,还可以设置不同的权重,根据队列的权重处理来先后处理不同的请求。

漏桶(Leaky Bucket)算法

关于漏桶,wikipedia上有关于它的词条

漏桶算法,就像一个漏斗一样,如果把进入漏斗的水比做进入的流量,那么从漏斗中漏出的水就是系统处理的流量。进入漏斗中的流量可快可慢,但是经过漏斗,流量都会以一个固定的速率流出,如果遇到突发流量,超过了漏斗的最大容量,则会丢弃流量,通过这种方式,可以保护系统,防止系统被突发的大流量冲垮。

关于漏桶算法的实现,一个比较常见的实现方式就是利用 FIFO 队列。把数据包放入 FIFO 队列中,如果流量都以固定大小的数据包组成,则在该进程的时钟每次 tick 时,从队列中移除该固定长度的数据用于消费;如果流量是以可变长度大小的数据包组成,则每次对队列中的数据进行消费时依赖于字节或者比特的数量。

下面就是一个处理可变长度数据包流量的算法:

  1. 在时钟tick时初始化一个计数器n
  2. 重复下面的操作直到n小于队列头部数据包的数据大小
    1. 从队列头部弹出一个数据包,计做P
    2. 将数据包P发出
    3. 将计数器n减少数据包P的长度
  3. 重置计数器n,重复步骤1

代码大致逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* leaky bucket, variable-length packets
* @author cczyWyc
*/
public class VariableLenPacketDemo {
public static void main(String[] args) {
int storage = 0; // stored packet in bucket
int bucketSize = 10; // the capacity of the bucket
int inputPktSize = 4; // inputs packet into the bucket at a time
int outPktSize = 1; // outputs packet from the bucket at a time
int numberQueries = 4; // total number of times bucket content is checked
for (int i = 0; i < numberQueries; i++) {
int leftSize = bucketSize - storage;
if (inputPktSize <= leftSize) {
// add packet into the bucket
storage += inputPktSize;
} else {
System.out.println("packet loss = " + inputPktSize);
}
System.out.println("buff size = " + storage + " out of bucket size = " + bucketSize);
storage -= outPktSize;
}
}
}

关于漏桶算法,以下是一个完整的算法demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public class LeakyBucket {
/** bucket size */
private int bucketSize;
/** leakRate */
private int leakRate;
/** current water in bucket */
private int currentPacket;
/** bucket, use queue */
private Queue<Packet> queue;

public LeakyBucket(int bucketSize, int leakRate) {
this.bucketSize = bucketSize;
this.leakRate = leakRate;
this.currentPacket = 0;
this.queue = new ArrayDeque<>();
}

/**
* add packet into the bucket
* @param packetSize packet size
* @return true/false
*/
public boolean addPacket(int packetSize) {
if (this.currentPacket + packetSize <= this.bucketSize) {
this.queue.add(new Packet(packetSize, System.currentTimeMillis()));
this.currentPacket += packetSize;
return true;
} else {
return false;
}
}

/**
* leak the packet
*/
public void leak() {
while (!this.queue.isEmpty()) {
Packet packet = this.queue.peek();
long timeElapsed = System.currentTimeMillis() - packet.getArrivalTime();
int leakPacket = (int) (timeElapsed * leakRate / 1000);
if (leakPacket >= packet.getPktSize()) {
this.queue.poll();
this.currentPacket -= packet.getPktSize();
} else {
break;
}
}
}
}

public class Packet {
/** packet size */
private int pktSize;
/** packet arrival time */
private long arrivalTime;

public Packet(int pktSize, long arrivalTime) {
this.pktSize = pktSize;
this.arrivalTime = arrivalTime;
}

public long getArrivalTime() {
return arrivalTime;
}

public int getPktSize() {
return pktSize;
}
}

public class Main {
public static void main(String[] args) throws InterruptedException {
int bucketSize = 100;
int leakRate = 10;
LeakyBucket leakyBucket = new LeakyBucket(bucketSize, leakRate);

int[] packets = {20, 30, 10, 50, 40};
for (int packetSize : packets) {
if (leakyBucket.addPacket(packetSize)) {
System.out.println("packet length " + packetSize + " has been add the leaky bucket");
} else {
System.out.println("leaky bucket full, packet length " + packetSize + " has been dropped");
}
leakyBucket.leak();
Thread.sleep(1000);
}
}
}

令牌桶(Token Bucket)算法

关于令牌桶算法,可以参考维基百科的词条:令牌桶。令牌桶算法是有一个中间代理人,以固定的速率往一个桶(Bucket)内放入令牌,只有拿到令牌的请求才能进行消费。和漏桶算法都以一个固定的速率消费不同,令牌桶算法在流量不是很大时放桶内放入令牌,在流量大时,可以以较快的速率消费(只要桶内的令牌足够)。

以下是一个令牌桶算法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
type TokenBucket struct {
rate int64
maxTokens int64
currentTokens int64
lastRefillTmestamp time.Time
mutex sync.Mutex
}

// NewTokenBucket return a new token bucket
func NewTokenBucket(Rate int64, MaxTokens int64) *TokenBucket {
return &TokenBucket{
rate: Rate,
maxTokens: MaxTokens,
lastRefillTmestamp: time.Now(),
currentTokens: MaxTokens,
}
}

// refill add token into bucket
func (tb *TokenBucket) refill() {
now := time.Now()
end := time.Since(tb.lastRefillTmestamp)
tokensTobeAdded := (end.Nanoseconds() * tb.rate) / 1000000000
tb.currentTokens = int64(math.Min(float64(tb.currentTokens+tokensTobeAdded), float64(tb.maxTokens)))
tb.lastRefillTmestamp = now
}

// RequestAllowed if the requests are allowed. true:allowed, false:reject
func (tb *TokenBucket) RequestAllowed(tokens int64) bool {
tb.mutex.Lock()
defer tb.mutex.Unlock()
tb.refill()
if tb.currentTokens >= tokens {
tb.currentTokens = tb.currentTokens - tokens
return true
}
return false
}

应用

在许多的开源框架和工具中,都实现了 Token Bucket(令牌桶)算法,例如一个比较常见的例子就是 Google Guava实现的 RateLimiter。RateLimiter 是一个限流工具,基于令牌桶算法实现,可以控制某个时间段内控制请求的数量。

从使用上来看,Google Guava 的限流器使用非常简单,只需要创建一个 RateLimiter 对象,并调用它的 acquire 方法即可。acquire 方法会阻塞调用线程,自到获取令牌为止。

1
2
3
4
5
6
7
8
// 每秒钟只允许通过10个请求
RateLimiter limiter = RateLimiter.create(10.0);
while (true) {
// 获取令牌
limiter.acquire();
// 处理请求
handleRequest();
}

在下面的例子中,我创建了一个流速为 2 个请求/秒的限速器,从直观上来看,这里的流速指的是每秒最多允许 2 个请求通过限流器,在 Guava 中,这里其实是一种匀速的概念,2 个请求/秒等价于1一个请求/500毫秒,在向线程池提交任务之前,调用 acquire() 方法就能起到限流的作用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

//限流器流速:2个请求/秒
RateLimiter limiter =
RateLimiter.create(2.0);
//执行任务的线程池
ExecutorService es = Executors
.newFixedThreadPool(1);
//记录上一次执行时间
prev = System.nanoTime();
//测试执行20次
for (int i=0; i<20; i++){
//限流器限流
limiter.acquire();
//提交任务异步执行
es.execute(()->{
long cur=System.nanoTime();
//打印时间间隔:毫秒
System.out.println(
(cur-prev)/1000_000);
prev = cur;
});
}

输出结果:
...
500
499
499
500
499

Google Guava RateLimiter实现原理分析

Guava RateLimiter 使用的是令牌桶算法,在分析原理之前,我们再来描述一下令牌桶算法:

  1. 令牌以固定的速率添加到令牌桶中,假设限流的速率是 r/ 秒,则令牌每 1/r 秒会添加一个;
  2. 假设令牌桶的容量是 b,如果令牌桶已满,则新的令牌会丢弃;
  3. 请求能够通过限流器的前提是令牌桶中有令牌。

上面我们解释了流速 r,它其实是一种匀速的概念,那么令牌桶的容量 b 该怎么理解呢?b 其实是 burst 的缩写,意义是限流器允许的最大突发流量。比如 b=10,并且桶中的令牌已满,此时限流器允许 10 个请求同时通过限流器,当然只是突发流量而已,这 10 个请求会带走 10 个令牌,所以后续的流量只能按照速率 r 通过限流器。

以下涉及到源码分析,本文使用的 Guava 版本是截止目前最新的31.1-jre版本。

前面讲到,RateLimiter的使用非常简单,它有两个静态方法用来实例化,实例化以后,我们只需要关心 acquire 就行了,甚至都没有 release 操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 实例化的两种方式:
public static RateLimiter create(double permitsPerSecond){}
public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {}

public double acquire() {}
public double acquire(int permits) {}

public boolean tryAcquire() {}
public boolean tryAcquire(int permits) {}
public boolean tryAcquire(long timeout, TimeUnit unit) {}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {}

public final double getRate() {}
public final void setRate(double permitsPerSecond) {}

我们知道,Java 并发包中提供了 Semaphore,它也能够对资源访问进行控制,不同的是,Semaphore 控制的是并发的数量,而 RateLimiter 是用来控制资源访问的速率(rate)的,它强调的是控制速率。

可以看到,create 方法指定一个 permitsPerSecond 参数,代表每秒钟产生多少个 permits,这就是速率。RateLimiter允许预占未来的令牌,比如,每秒钟产生5个 permits,我们可以单次请求 100 个 permits,这样,紧接着的下一个请求需要等待大概 20 秒才能获取到 permits。

SmoothRateLimiter介绍

通过源码可以看到,RateLimiter只有一个子类,就是抽象类 SmoothRateLimiter,SmoothRateLimiter 有两个实现类,其实就是对应了限流器的两种模式,这里先介绍一下中间的抽象类 SmoothRateLimiter,然后后面分别介绍它的两个实现类(对应的两种模式)。

RateLimiter 作为抽象类,它只有两个属性

1
2
3
private final SleepingStopwatch stopwatch;

private volatile Object mutexDoNotUseDirectly;

其中,stopwatch非常重要,它是一个底层计时器,用它来“计时”,RateLimiter 把实例化的时间设置为 0 值,后续都是取相对时间,用微秒表示。

mutexDoNotUseDirectly 用来做锁,RateLimiter 依赖于 synchronized 来控制并发,所以我们其实可以看到,它的各个属性都没没有用 volatile 修饰。

具体看抽象类 SmoothRateLimiter 源码,可以看到它有如下属性:

1
2
3
4
5
6
7
8
9
10
11
12
// 当前还有多少 permits 没有被使用,被存下来的 permits 数量
double storedPermits;

// 最大允许缓存的 permits 数量,也就是 storedPermits 能达到的最大值
double maxPermits;

// 每隔多少时间产生一个 permit,
// 比如我们构造方法中设置每秒 5 个,也就是每隔 200ms 一个,这里单位是微秒,也就是 200,000
double stableIntervalMicros;

// 下一次可以获取 permits 的时间,这个时间是相对 RateLimiter 的构造时间的,是一个相对时间,可以理解为时间戳
private long nextFreeTicketMicros = 0L;

nextFreeTicketMicros 是一个很重要的属性。我们每一次获取 permits 的时候,先拿 storedPermits 的值,因为它是当前存下来的 permits,如果当前存的 permits 够,storedPermits 减去请求消耗的 permits 就可以了,如果不够,那么就需要将这个 nextFreeTicketMicros 的时间往前推,表示预占了接下来多少时间的 permits。那么如果在下一个请求到来的时候,如果还没有到 nextFreeTicketMicros 这个时间点,需要 sleep 到这个时间点再返回,当然也需要再将这个时间往前推。

这里可能有个疑问:因为时间是一直在往前走的,应该要一直往池中添加 permits,所以 storedPermits 的值需要不断的往上添加,难道需要另外开启一个线程来单独完成这个事情吗?其实不是的,只需要在关键的操作中同步一下,然后重新计算 permits 就好了。

SmoothBursty 分析

查看源码,RateLimiter 有一个公有的静态 create 方法:

1
2
3
public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}

参与 permitsPerSecond 表示每秒钟可以产生多少个 permits,其中调用了如下方法

1
2
3
4
5
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}

这里实例化的是 SmoothBursty 的实例,顺着代码点进去,发现它只有一个属性 maxBurstSeconds,在实例化的时候,指定了 maxBurstSeconds 为 1.0,也就是说,最多会缓存 1 秒钟,也就是 (1.0 * permitsPerSecond)这么多个 permits 到池中。

这个 1.0 秒,还关系到 storedPermits 和 maxPermits:

0 <= storedPermits <= maxPermits = permitsPerSecond

继续往下看 setRate 方法:

1
2
3
4
5
6
7
public final void setRate(double permitsPerSecond) {
checkArgument(
permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
synchronized (mutex()) {
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}

setRate 是一个 public 方法,可以用它来调整速率。继续看初始化的过程,这里使用了 synchronized 来控制并发。往下跟踪可以看到子类抽象类 SmoothRateLimiter 中重写了这个方法:

1
2
3
4
5
6
7
8
9
@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
// 同步
resync(nowMicros);
// 计算属性 stableIntervalMicros
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}

Resend 方法用来调整 storesPermits 和 nextFreeTicketMicros。这就是我在上面说的,不需要单独开启一个线程增加 storedPermits 的值,在关键的步骤节点,需要先更新一下 storedPermits 到正确的值。

1
2
3
4
5
6
7
8
9
void resync(long nowMicros) {
// 如果 nextFreeTicket 已经过掉了,想象一下很长时间都没有再次调用 limiter.acquire() 的场景
// 需要将 nextFreeTicket 设置为当前时间,重新计算 storedPermits
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}

上述的源码中,coolDownIntervalMicros() 这个方法可以先不用关注,可以看到,在子类 SmoothBursty 类中的实现是直接返回了 stableIntervalMicros 的值,也就是我们说的,每产生一个 Permits 的时间长度。

看到这里不难发现,在 resync 的时候,此时的 stableIntervalMicros 其实并没有设置,是在下面的 doSetRate 的实现中设置的,也就是说这里发生了一次除 0 的操作,得到的结果其实是无穷大。而此时 maxPermits 还是 0,不过这里没有多大关系。

再回到前面的 doSetRate 方法,可以看到 resync 以后,会走到下面的 doSetRate逻辑,这里子类 SmoothBursty 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
// 这里计算了,maxPermits 为 1 秒产生的 permits
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
// 因为 storedPermits 的值域变化了,需要等比例缩放
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}

上面就是实例化 SmoothBursty 的一个过程,接下来再来分析 acquire 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@CanIgnoreReturnValue
public double acquire() {
return acquire(1);
}

@CanIgnoreReturnValue
public double acquire(int permits) {
// 预约,如果当前不能直接获取到 permits,需要等待
// 返回值代表需要 sleep 多久
long microsToWait = reserve(permits);
// sleep
stopwatch.sleepMicrosUninterruptibly(microsToWait);
// 返回 sleep 的时长
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

继续看 reserve 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}

final long reserveAndGetWaitLength(int permits, long nowMicros) {
// 返回 nextFreeTicketMicros
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
// 计算时长
return max(momentAvailable - nowMicros, 0);
}

顺着代码继续往里看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
// 这里做一次同步,更新 storedPermits 和 nextFreeTicketMicros (如果需要)
resync(nowMicros);
// 返回值就是 nextFreeTicketMicros,注意刚刚已经做了 resync 了,此时它是最新的正确的值
long returnValue = nextFreeTicketMicros;
// storedPermits 中可以使用多少个 permits
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
// storedPermits 中不够的部分
double freshPermits = requiredPermits - storedPermitsToSpend;
// 为了这个不够的部分,需要等待多久时间
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) // 这部分固定返回 0
+ (long) (freshPermits * stableIntervalMicros);
// 将 nextFreeTicketMicros 往前推
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
// storedPermits 减去被拿走的部分
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}

可以看到获取 permits 的时候,其实是获取了两部分,一部分来自于存量 storedPermits,存量不够的话,另一部分来自于预占未来的 freshPermits。这里有一个关键点,可以看到上面方法的返回值是 nextFreeTicketMicros 的旧值,因为只要到这个时间节点,就说明在当次 acquire 可以成功返回了,而不管 sroredPermits 够不够,如果不够,会将 nextFreeTicketMicros 往前推一定的时间,预占了一定的量。

SmoothWarmingUp 分析

下面再来看 Guava 限流器的另外一种模式。SmoothWarmingUp 适用于资源需要预热的场景,比如某个接口的业务需要使用到数据库连接,由于连接需要预热才能进入到最佳状态,如果我们的系统长时间处于低负载或零负载状态,连接池中的连接慢慢释放掉了,此时我们认为连接池是冷的。假设业务在稳定状态下,正常可以提供最大 1000 QPS 的访问,但是如果连接池是冷的,我们就不能让系统达到 1000 的QPS,要限制住突发流量,因为这会拖垮后端数据库,从而拖垮整个系统,应该有一个预热的过程。这里我先贴一个 Guava 源码中 SmoothWarmingUp 的注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* This implements the following function where coldInterval = coldFactor * stableInterval.
*
* <pre>
* ^ throttling
* |
* cold + /
* interval | /.
* | / .
* | / . ← "warmup period" is the area of the trapezoid between
* | / . thresholdPermits and maxPermits
* | / .
* | / .
* | / .
* stable +----------/ WARM .
* interval | . UP .
* | . PERIOD.
* | . .
* 0 +----------+-------+--------------→ storedPermits
* 0 thresholdPermits maxPermits
* </pre>
*/

可以看到 X 轴代表 storedPermits 的数量,Y 轴代表获取一个 permits需要的时间。如果系统处于低负载的状态,storedPermits 会一直增加,当请求来的时候,我们要从 storedPermits 中取 permits,在这种模式下,如果 storedPermits越大,代表系统越冷,则获取 permits 需要的时间就越多。这里回顾一下上面介绍的 SmoothBursty 模式,它从 storedPermtis 中获取 permits 是不需要等待时间的,而 SmoothWarmingUp 恰恰相反。这里大致总结一下 SmoothBursty 和 SmoothWarmingUp 的区别:

  1. SmoothBursty 初始化的时候令牌池中的令牌数量为 0,而 SmoothWarmingUp 初始化的时候令牌池数量为 maxPermits。
  2. SmoothBursty 从令牌池中获取令牌不需要等待,而 SmoothWarmingUp 从令牌池中获取令牌需要等待一段时间,该时间长短和令牌池中的令牌数量有关系

这里我在网上找到了一个更详细的图对此预热过程进行说明:

上图中 slope 表示绿色实线的斜率,其计算方式如下:

1
slope = (stableIntervalMicros * coldFactor - stableIntervalMicros) / (maxPermits - thresholdPermits)

由于横坐标表示令牌桶中令牌使用,纵坐标表示从令牌桶中获取一个令牌需要的时间,则红色实线对应的矩形面积、绿色实线对应的梯形面积都代表时间,因此预热时间 warmupPeriodMicros的定义如下(梯形面积):从满状态的令牌桶中取出(maxPermits - thresholdPermits)个令牌所需花费的时间。预热时间是我们在构造的时候指定的,完成预热后,我们能进入到一个稳定的速率中(stableInterval)。有一个关键的点是从 thresholdPermits 到 0 的时间,是从 maxPermits 到 thresholdPermits 时间到一半,也就是梯形面积是长方形面积的 2 倍,至于具体原因,可以自行论证。下面就是根据构造参数计算出 thresholdPermits 和 maxPermits 的值。

梯形面积为 warmupPeriod,而长方形面积为 stableInterval * thresholdPermits,即:

1
warmupPeriod = 2 * stableInterval * thresholdPermits

由此可以得出 thresholdPermits 的值:

1
thresholdPermits = 0.5 * warmupPeriod / stableInterval

然后根据梯形面积公式:

1
warmupPeriod = 0.5 * (stableInterval + coldInterval) * (maxPermits - thresholdPermits)

可以得出 maxPermits 为:

1
maxPermits = thresholdPermits + 2.0 * warmupPeriod / (stableInterval + coldInterval)

这样我们就算出了 thresholdPermits 和 maxPermits 的值。

下面再来看一下冷却时间间隔,它指的是 sroredPermits 中每个 permits 的增长速度,为了达到从 0 到 maxPermits 花费 warmupPeriodMicros 的时间,在源码中将其定义为:

1
2
3
4
@Override
double coolDownIntervalMicros() {
return warmupPeriodMicros / maxPermits;
}

它在 resync 中有使用:

1
2
3
4
5
6
7
8
void resync(long nowMicros) {
if (nowMicros > nextFreeTicketMicros) {
// coolDownIntervalMicros 在这里使用
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}

接着来看它的 doSetRate 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = maxPermits;
// coldFactor 是固定的 3
double coldIntervalMicros = stableIntervalMicros * coldFactor;
// 这个公式上面已经解释了
thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
// 这个公式上面也已经解释了
maxPermits =
thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
// 计算那条斜线的斜率。上面解释了
slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = 0.0;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? maxPermits // initial state is cold
: storedPermits * maxPermits / oldMaxPermits;
}
}

然后再来回顾一下下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros);
long returnValue = nextFreeTicketMicros;
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);

this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}

这段代码上面解释 acquire 的时候已经解释过了,它是 acquire 的核心,waitMicros 由两部分组成,一部分是从 storedPermits 中获取花费的时间,一部分是等待 freshPermits 产生花费的时间。在 SmoothBursty 的实现中,从 storedPermits 中获取 permits 直接返回 0,不需要等待。而在 SmoothWarmingUp 的实现中,由于需要预热,所以从 storedPermits 中取 permits 需要花费一定的时间,其实就是要计算下图中,阴影部分的面积。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
// 如果右边梯形部分有 permits,那么先从右边部分获取permits,计算梯形部分的阴影部分的面积
if (availablePermitsAboveThreshold > 0.0) {
// 从右边部分获取的 permits 数量
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
// 梯形面积公式:(上底+下底)*高/2
double length =
permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
micros = (long) (permitsAboveThresholdToTake * length / 2.0);
permitsToTake -= permitsAboveThresholdToTake;
}
// 加上 长方形部分的阴影面积
micros += (long) (stableIntervalMicros * permitsToTake);
return micros;
}

// 对于给定的 x 值,计算 y 值
private double permitsToTime(double permits) {
return stableIntervalMicros + permits * slope;
}

到这里,SmoothWarmingUp 基本上说完了。

小结

按照我写文章的惯例,对这篇文章做一个小结。本文首先从限流算法说起,介绍了两种常见的限流算法,分别是 Leaky Bucket Algorithm(漏桶算法) 和 Token Bucket Algorithm(令牌桶算法);接着重点以 Token Bucket Algorithm 为例,讲了 Token Bucket Algorithm 的具体实现,即 Google Guava RateLimiter。Guava RateLimiter 有两种模式,应对突发流量的 SmoothBursty 和 可以预热的 SmoothWarmingUp。SmoothBursty 获取 permits 不需要等待,可以现在令牌桶中存入 permits,需要注意的是,这里 permits 的增长是一种匀速的状态,当有突发流量时,一次性可以从令牌桶获取多个 permits;而 SmoothWarmingUp 获取 permits需要一个预热的状态,这样设计的作用是,如果系统可以承受最大的 QPS 是1000,如果系统是冷的,让系统立即达到 1000 QPS 会拖垮系统,因此这个预热的过程可以有效的保护系统,具体表现为从令牌桶中获取 permits 等待的时间会随着令牌被消耗逐渐缩短,直至一个稳定的时间。

注:本篇是流量控制的第一篇,后面可能会单独写一篇文章聊一下 TCP 中滑动窗口实现的流量控制。

(全文完)