令牌桶
以固定速率向桶中添加令牌,每个请求需要从桶中获取令牌,只有拿到令牌的请求才能被处理。
核心参数
- 容量(Capacity):桶的最大容量
- 速率(Rate):每秒添加的令牌数量
- 当前令牌数:桶中当前的令牌数量
令牌桶算法的实现代码如下:
java
import org.apache.commons.lang3.time.DateFormatUtils;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
public class TokenBucketRateLimiter {
private final int capacity;
private final int inRate; // 每秒能够添加的令牌数
private long lastInTime; // 上次添加令牌的时间
private long remainTokens; // 剩余的令牌数
public TokenBucketRateLimiter(int capacity, int inRate) {
if (capacity <= 0 || inRate <= 0) {
throw new IllegalArgumentException("容量和速率必须大于0");
}
this.capacity = capacity;
this.inRate = inRate;
this.remainTokens = capacity;
this.lastInTime = System.currentTimeMillis();
}
public void acquire(int tokens) throws InterruptedException {
if (tokens <= 0) {
throw new IllegalArgumentException("令牌数必须大于0");
}
if (tokens > capacity) {
throw new IllegalArgumentException("请求令牌数超过桶容量");
}
synchronized (this) {
while (!tryAcquire(tokens)) {
Thread.sleep(1000);
}
}
}
public void acquire() throws InterruptedException {
acquire(1);
}
public boolean tryAcquire(int tokens) {
if (tokens <= 0) {
throw new IllegalArgumentException("令牌数必须大于0");
}
if (tokens > capacity) {
return false;
}
return tryAcquireInternal(tokens);
}
public boolean tryAcquire() {
return tryAcquire(1);
}
/**
* 内部获取方法,必须在同步块中调用
*/
private synchronized boolean tryAcquireInternal(int tokens) {
long now = System.currentTimeMillis();
// 计算应该添加的令牌数
long timePassed = now - lastInTime;
if (timePassed > 0) {
int tokensToAdd = (int) (inRate * (timePassed / 1000.0));
remainTokens = Math.min(capacity, (remainTokens + tokensToAdd));
lastInTime = now;
}
if (remainTokens >= tokens) {
remainTokens -= tokens;
return true;
}
return false;
}
}测试代码如下:
java
public static void main(String[] args) throws InterruptedException {
TokenBucketRateLimiter limiter = new TokenBucketRateLimiter(10, 1);
CountDownLatch latch = new CountDownLatch(100);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
while (latch.getCount() > 0) {
limiter.acquire();
System.out.printf("%s - %s => 线程通过限流开始执行。\n",
DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss.SSS"),
Thread.currentThread().getName());
latch.countDown();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Worker-" + i).start();
}
latch.await();
System.out.println("所有线程执行完成");
}漏斗
漏桶算法以固定速率处理请求,可以平滑突发流量
核心参数:
- 桶的容量(capacity): 最大容量
- 处理速率(Rate): 每秒流出的请求数
- 当前存储的请求数量
算法实现:
java
public class LeakyBucketRateLimiter {
private final int capacity; // 桶的容量
private final int outRate; // 每秒流出的请求数(处理速率)
private long lastOutTime; // 上次处理请求的时间
private long storedRequests; // 当前存储的请求数量
public LeakyBucketRateLimiter(int capacity, int outRate) {
if (capacity <= 0 || outRate <= 0) {
throw new IllegalArgumentException("容量和速率必须大于0");
}
this.capacity = capacity;
this.outRate = outRate;
this.storedRequests = 0;
this.lastOutTime = System.currentTimeMillis();
}
public synchronized boolean tryAcquire() {
return tryAcquire(1);
}
public synchronized boolean tryAcquire(int requests) {
if (requests <= 0) {
throw new IllegalArgumentException("请求数必须大于0");
}
refreshStoredRequests(); // 更新当前存储的请求数量(模拟漏水过程)
if (storedRequests + requests <= capacity) { // 如果请求能放入桶中,则允许通过
storedRequests += requests;
return true;
}
return false;
}
public void acquire() throws InterruptedException {
acquire(1);
}
public void acquire(int requests) throws InterruptedException {
if (requests <= 0) {
throw new IllegalArgumentException("请求数必须大于0");
}
if (requests > capacity) {
throw new IllegalArgumentException("请求超过桶容量");
}
while (!tryAcquire(requests)) {
long sleepTime = 1000 / outRate; // 计算需要等待的时间(基于漏桶的处理速率)
Thread.sleep(sleepTime);
}
}
private void refreshStoredRequests() {
long now = System.currentTimeMillis();
long timePassed = now - lastOutTime;
if (timePassed > 0) {
long processedRequests = (long) (outRate * (timePassed / 1000.0));// 计算这段时间内处理的请求数量
storedRequests = Math.max(0, storedRequests - processedRequests);// 减少存储的请求数量(但不能小于0)
lastOutTime = now; // 更新上次处理时间
}
}
}原文链接: http://herman7z.site