Skip to content

令牌桶

以固定速率向桶中添加令牌,每个请求需要从桶中获取令牌,只有拿到令牌的请求才能被处理。

核心参数

  • 容量(Capacity):桶的最大容量
  • 速率(Rate):每秒添加的令牌数量
  • 当前令牌数:桶中当前的令牌数量

令牌桶算法的实现代码如下:

java

import org.apache.commons.lang3.time.DateFormatUtils;

import java.util.Date;
import java.util.concurrent.CountDownLatch;

public class TokenBucketRateLimiter {
    private final int capacity;
    private final int inRate; // 每秒能够添加的令牌数

    private long lastInTime; // 上次添加令牌的时间
    private long remainTokens; // 剩余的令牌数

    public TokenBucketRateLimiter(int capacity, int inRate) {
        if (capacity <= 0 || inRate <= 0) {
            throw new IllegalArgumentException("容量和速率必须大于0");
        }
        this.capacity = capacity;
        this.inRate = inRate;
        this.remainTokens = capacity;
        this.lastInTime = System.currentTimeMillis();
    }

    public void acquire(int tokens) throws InterruptedException {
        if (tokens <= 0) {
            throw new IllegalArgumentException("令牌数必须大于0");
        }
        if (tokens > capacity) {
            throw new IllegalArgumentException("请求令牌数超过桶容量");
        }
        synchronized (this) {
            while (!tryAcquire(tokens)) {
                Thread.sleep(1000);
            }
        }
    }

    public void acquire() throws InterruptedException {
        acquire(1);
    }

    public boolean tryAcquire(int tokens) {
        if (tokens <= 0) {
            throw new IllegalArgumentException("令牌数必须大于0");
        }
        if (tokens > capacity) {
            return false;
        }
        return tryAcquireInternal(tokens);
    }

    public boolean tryAcquire() {
        return tryAcquire(1);
    }

    /**
     * 内部获取方法,必须在同步块中调用
     */
    private synchronized boolean tryAcquireInternal(int tokens) {
        long now = System.currentTimeMillis();
        // 计算应该添加的令牌数
        long timePassed = now - lastInTime;
        if (timePassed > 0) {
            int tokensToAdd = (int) (inRate * (timePassed / 1000.0));
            remainTokens = Math.min(capacity, (remainTokens + tokensToAdd));
            lastInTime = now;
        }

        if (remainTokens >= tokens) {
            remainTokens -= tokens;
            return true;
        }
        return false;
    }
}

测试代码如下:

java
public static void main(String[] args) throws InterruptedException {
    TokenBucketRateLimiter limiter = new TokenBucketRateLimiter(10, 1);
    CountDownLatch latch = new CountDownLatch(100);

    for (int i = 0; i < 5; i++) {
        new Thread(() -> {
            try {
                while (latch.getCount() > 0) {
                    limiter.acquire();
                    System.out.printf("%s - %s => 线程通过限流开始执行。\n",
                            DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss.SSS"),
                            Thread.currentThread().getName());
                    latch.countDown();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "Worker-" + i).start();
    }

    latch.await();
    System.out.println("所有线程执行完成");
}

漏斗

漏桶算法以固定速率处理请求,可以平滑突发流量

核心参数:

  • 桶的容量(capacity): 最大容量
  • 处理速率(Rate): 每秒流出的请求数
  • 当前存储的请求数量

算法实现:

java
public class LeakyBucketRateLimiter {
    private final int capacity; // 桶的容量
    private final int outRate; // 每秒流出的请求数(处理速率)

    private long lastOutTime; // 上次处理请求的时间
    private long storedRequests; // 当前存储的请求数量

    public LeakyBucketRateLimiter(int capacity, int outRate) {
        if (capacity <= 0 || outRate <= 0) {
            throw new IllegalArgumentException("容量和速率必须大于0");
        }
        this.capacity = capacity;
        this.outRate = outRate;
        this.storedRequests = 0;
        this.lastOutTime = System.currentTimeMillis();
    }

    public synchronized boolean tryAcquire() {
        return tryAcquire(1);
    }

    public synchronized boolean tryAcquire(int requests) {
        if (requests <= 0) {
            throw new IllegalArgumentException("请求数必须大于0");
        }

        refreshStoredRequests(); // 更新当前存储的请求数量(模拟漏水过程)

        if (storedRequests + requests <= capacity) {     // 如果请求能放入桶中,则允许通过
            storedRequests += requests;
            return true;
        }
        return false;
    }

    public void acquire() throws InterruptedException {
        acquire(1);
    }

    public void acquire(int requests) throws InterruptedException {
        if (requests <= 0) {
            throw new IllegalArgumentException("请求数必须大于0");
        }
        if (requests > capacity) {
            throw new IllegalArgumentException("请求超过桶容量");
        }

        while (!tryAcquire(requests)) {
            long sleepTime =  1000 / outRate; // 计算需要等待的时间(基于漏桶的处理速率)
            Thread.sleep(sleepTime);
        }
    }

    private void refreshStoredRequests() {
        long now = System.currentTimeMillis();
        long timePassed = now - lastOutTime;

        if (timePassed > 0) {
            long processedRequests = (long) (outRate * (timePassed / 1000.0));// 计算这段时间内处理的请求数量
            storedRequests = Math.max(0, storedRequests - processedRequests);// 减少存储的请求数量(但不能小于0)
            lastOutTime = now;  // 更新上次处理时间
        }
    }

}

原文链接: http://herman7z.site