大模型工程化之流量管控
背景: 在大模型工程化落地过程中, 由于算力资源有限, 模型服务处理能力有限, 会遇到突发流量, 请求失败, 响应延迟等场景, 需要进行限流, 负载均衡等流量管控
架构简介
- LLM的工程化架构简化如下, 拿我司的架构举例
- 前端分为dsweb端(还有很多渠道端略), 以及各个业务/技术应用场景服务方接入, 流量由网关转发至web后端服务以及大模型api服务, 再统一由LLM平台调度使用LLM集群资源
- 网关前置, 既规范, 又方便后续做全链路灰度; Api调用一般流量比较大, 为了避免影响在线用户, 因此要跟web端做隔离, 同时api服务也根据业务特性进一步拆分通用业务侧, 通用研发侧, 以及若干重点场景, 也是为了避免相关影响
- LLM集群有多台机器, 分别部署多种大模型, 资源来源也各异, 因此需要建立LLM平台统一调度处理, 提升资源利用率
- 应用服务有appId, source等场景标识
限流设计
- 大模型场景的限流比较特殊
- 限制并发数, 指的是限制在途的请求数, 而不是实际的QPS
- 请求有流式/非流式的调用, 流式一般采用SSE协议
限流方案
排队限流的方案很像医院取号排队就诊的场景, 不同诊室就是一个个资源池, 诊室里有多个医生, 还可以动态调整
- 首先将大模型算力集群的资源池化, 落库(并内存缓存), 然后针对每个资源维护一个队列
- 请求进来先去资源池获取并发资源(令牌), 获取成功则直接访问对应的资源, 并返回响应; 获取失败则进入内存队列, 请求信息也会缓存在服务内存(SseEmitter->BlockingQueue, hold住长连接)
- 资源池队列会限制最大长度, 超过长度, 则直接拒绝请求
- 当一个请求响应结束时, 会释放并发资源, 然后取出队列头部请求, 重新尝试获取并发资源
- 如果客户端超时主动断开连接等, SseEmitter也是可以感知到, 并且及时从内存中移除请求信息
补充:
- 获取资源的时候, 会一并查出资源的最大并发数限制, 然后作为参数一并通过lua脚本在redis中执行计算的操作, 因此资源是支持动态调整的, redis只做并发计算; 释放资源也要通过lua脚本
- lua脚本以及资源存储的复杂度较高, 优化方案, 可以在内存中缓存一份当前的资源分配情况, 定时拉redis同步, 然后在每次分配资源的时候, 直接基于当前的资源分配情况, 以及策略, 计算想要获取的资源, 然后去redis获取资源, 若获取失败, 则触发同步, 类似CAS的操作, 但在高并发的时候可能同步不及时, 导致资源一直获取失败
- 获取资源的计算可以有多种策略, 比如一个资源池有多个资源, 可按配置顺位计算, 也可按资源并发利用率优先计算等, 可以支持多种负载均衡策略
FAQ
是否可以独立出单独的限流服务?
- 首先上述方案, 为了保证客户端体验, 在获取令牌失败时, 并没有直接拒绝请求, 而是允许排队, 因此服务端需要缓存请求信息, 在内存中需要维护缓存的队列, 因此限流服务只是做令牌的获取与释放, 而这部分恰是需要考虑并发问题的点, 限流服务无法降低复杂度, 也无法分担并发压力, 因此没有必要
其他常见的限流方案
补充: 实际生产场景, 资源一般是可动态调节的, 限流的配置一般也都是要支持热刷新的, 因此以下方案也都要结合可热刷的配置来落地
固定窗口的计数器
- 限制单位时间内的请求量
- 缺点: 在窗口边界可能出现流量突增, 比如win=1s, limit=5, 在第0.99s接收5个请求, 但在1.01s又突发5个请求, 计数器就拦不住, 所以一般不用这个
- 在实现上直接用一个局部变量lastResetTime来缓存窗口起始位置, 并使用了int类型的计数器
public class CounterLimiter {
private final int limit;
private final long windowMillis;
private long lastResetTime;
private int counter;
public CounterLimiter(int limit, long windowMillis) {
this.limit = limit;
this.windowMillis = windowMillis;
this.lastResetTime = System.currentTimeMillis();
}
public synchronized boolean tryAcquire() {
long now = System.currentTimeMillis();
if (now - lastResetTime > windowMillis) {
counter = 0;
lastResetTime = now;
}
if (counter >= limit) {
return false;
}
counter++;
return true;
}
}
滑动窗口的计数器
- 解决固定窗口的边界突增问题
- 在实现上, 使用一个队列来计数, 队列保存多个时间戳, 每次取出头部元素与当前时间对比, 若大于时间窗口, 则把当前时间置于队列, 若在时间窗口之内, 则向队列尾部添加元素, 当队列元素大于等于限流阈值时, 则执行限流
public class SlidingWindowLimiter {
private final int limit;
private final long windowMillis;
private final LinkedList<Long> timestamps = new LinkedList<>();
public SlidingWindowLimiter(int limit, long windowMillis) {
this.limit = limit;
this.windowMillis = windowMillis;
}
public synchronized boolean tryAcquire() {
long now = System.currentTimeMillis();
// 移除过期的时间戳
while (!timestamps.isEmpty() && now - timestamps.peekFirst() > windowMillis) {
timestamps.pollFirst();
}
if (timestamps.size() >= limit) {
return false;
}
timestamps.addLast(now);
return true;
}
}
漏桶算法
- 平滑突发流量, 控制请求以恒定速率处理
- 适用于下游系统处理能力固定, 需要严格控制处理速率的场景, 比令牌桶更加严格
public class LeakyBucketLimiter {
private final int capacity;
private final long leakRateMillis;
private int water;
private long lastLeakTime;
public LeakyBucketLimiter(int capacity, long leakRateMillis) {
this.capacity = capacity;
this.leakRateMillis = leakRateMillis;
this.lastLeakTime = System.currentTimeMillis();
}
public synchronized boolean tryAcquire() {
long now = System.currentTimeMillis();
// 计算漏出的水量
long elapsed = now - lastLeakTime;
int leaked = (int)(elapsed / leakRateMillis);
if (leaked > 0) {
water = Math.max(0, water - leaked);
lastLeakTime = now;
}
if (water >= capacity) {
return false;
}
water++;
return true;
}
}
令牌桶算法
- 以固定速率往桶中添加令牌, 相比于漏洞算法, 允许一定程度的突发流量, 同时限制长期平均速率
- 应用场景比如电商秒杀, 第三方API调用限额, 资源下载限速等
public class TokenBucketLimiter {
private final int capacity;
private final long refillRateMillis;
private int tokens;
private long lastRefillTime;
public TokenBucketLimiter(int capacity, long refillRateMillis) {
this.capacity = capacity;
this.refillRateMillis = refillRateMillis;
this.tokens = capacity;
this.lastRefillTime = System.currentTimeMillis();
}
public synchronized boolean tryAcquire() {
long now = System.currentTimeMillis();
// 计算补充的令牌数
long elapsed = now - lastRefillTime;
int refill = (int)(elapsed / refillRateMillis);
if (refill > 0) {
tokens = Math.min(capacity, tokens + refill);
lastRefillTime = now;
}
if (tokens <= 0) {
return false;
}
tokens--;
return true;
}
}
分布式计数器
- Semaphere是本地计数, 分布式计数器只是将计数逻辑放到redis中
public class RedisRateLimiter {
private final Jedis jedis;
private final String key;
private final int limit;
private final long windowMillis;
public RedisRateLimiter(Jedis jedis, String key, int limit, long windowMillis) {
this.jedis = jedis;
this.key = key;
this.limit = limit;
this.windowMillis = windowMillis;
}
public boolean tryAcquire() {
String luaScript = "local current = redis.call('get', KEYS[1])\n" +
"if current and tonumber(current) > tonumber(ARGV[1]) then\n" +
" return 0\n" +
"else\n" +
" redis.call('incr', KEYS[1])\n" +
" if tonumber(current) == 0 then\n" +
" redis.call('pexpire', KEYS[1], ARGV[2])\n" +
" end\n" +
" return 1\n" +
"end";
Long result = (Long) jedis.eval(luaScript, 1, key, String.valueOf(limit - 1), String.valueOf(windowMillis));
return result == 1;
}
}