Skip to content

大模型工程化之流量管控

背景: 在大模型工程化落地过程中, 由于算力资源有限, 模型服务处理能力有限, 会遇到突发流量, 请求失败, 响应延迟等场景, 需要进行限流, 负载均衡等流量管控

架构简介

  • LLM的工程化架构简化如下, 拿我司的架构举例
    • 前端分为dsweb端(还有很多渠道端略), 以及各个业务/技术应用场景服务方接入, 流量由网关转发至web后端服务以及大模型api服务, 再统一由LLM平台调度使用LLM集群资源
    • 网关前置, 既规范, 又方便后续做全链路灰度; Api调用一般流量比较大, 为了避免影响在线用户, 因此要跟web端做隔离, 同时api服务也根据业务特性进一步拆分通用业务侧, 通用研发侧, 以及若干重点场景, 也是为了避免相关影响
    • LLM集群有多台机器, 分别部署多种大模型, 资源来源也各异, 因此需要建立LLM平台统一调度处理, 提升资源利用率
    • 应用服务有appId, source等场景标识

1748342203640

限流设计

  • 大模型场景的限流比较特殊
    • 限制并发数, 指的是限制在途的请求数, 而不是实际的QPS
    • 请求有流式/非流式的调用, 流式一般采用SSE协议

限流方案

排队限流的方案很像医院取号排队就诊的场景, 不同诊室就是一个个资源池, 诊室里有多个医生, 还可以动态调整

  1. 首先将大模型算力集群的资源池化, 落库(并内存缓存), 然后针对每个资源维护一个队列
  2. 请求进来先去资源池获取并发资源(令牌), 获取成功则直接访问对应的资源, 并返回响应; 获取失败则进入内存队列, 请求信息也会缓存在服务内存(SseEmitter->BlockingQueue, hold住长连接)
  3. 资源池队列会限制最大长度, 超过长度, 则直接拒绝请求
  4. 当一个请求响应结束时, 会释放并发资源, 然后取出队列头部请求, 重新尝试获取并发资源
  5. 如果客户端超时主动断开连接等, SseEmitter也是可以感知到, 并且及时从内存中移除请求信息

补充:

  • 获取资源的时候, 会一并查出资源的最大并发数限制, 然后作为参数一并通过lua脚本在redis中执行计算的操作, 因此资源是支持动态调整的, redis只做并发计算; 释放资源也要通过lua脚本
  • lua脚本以及资源存储的复杂度较高, 优化方案, 可以在内存中缓存一份当前的资源分配情况, 定时拉redis同步, 然后在每次分配资源的时候, 直接基于当前的资源分配情况, 以及策略, 计算想要获取的资源, 然后去redis获取资源, 若获取失败, 则触发同步, 类似CAS的操作, 但在高并发的时候可能同步不及时, 导致资源一直获取失败
  • 获取资源的计算可以有多种策略, 比如一个资源池有多个资源, 可按配置顺位计算, 也可按资源并发利用率优先计算等, 可以支持多种负载均衡策略

FAQ

是否可以独立出单独的限流服务?

  • 首先上述方案, 为了保证客户端体验, 在获取令牌失败时, 并没有直接拒绝请求, 而是允许排队, 因此服务端需要缓存请求信息, 在内存中需要维护缓存的队列, 因此限流服务只是做令牌的获取与释放, 而这部分恰是需要考虑并发问题的点, 限流服务无法降低复杂度, 也无法分担并发压力, 因此没有必要

其他常见的限流方案

补充: 实际生产场景, 资源一般是可动态调节的, 限流的配置一般也都是要支持热刷新的, 因此以下方案也都要结合可热刷的配置来落地

固定窗口的计数器

  • 限制单位时间内的请求量
  • 缺点: 在窗口边界可能出现流量突增, 比如win=1s, limit=5, 在第0.99s接收5个请求, 但在1.01s又突发5个请求, 计数器就拦不住, 所以一般不用这个
  • 在实现上直接用一个局部变量lastResetTime来缓存窗口起始位置, 并使用了int类型的计数器
public class CounterLimiter {
    private final int limit;
    private final long windowMillis;
    private long lastResetTime;
    private int counter;

    public CounterLimiter(int limit, long windowMillis) {
        this.limit = limit;
        this.windowMillis = windowMillis;
        this.lastResetTime = System.currentTimeMillis();
    }

    public synchronized boolean tryAcquire() {
        long now = System.currentTimeMillis();
        if (now - lastResetTime > windowMillis) {
            counter = 0;
            lastResetTime = now;
        }
        if (counter >= limit) {
            return false;
        }
        counter++;
        return true;
    }
}

滑动窗口的计数器

  • 解决固定窗口的边界突增问题
  • 在实现上, 使用一个队列来计数, 队列保存多个时间戳, 每次取出头部元素与当前时间对比, 若大于时间窗口, 则把当前时间置于队列, 若在时间窗口之内, 则向队列尾部添加元素, 当队列元素大于等于限流阈值时, 则执行限流
public class SlidingWindowLimiter {
    private final int limit;
    private final long windowMillis;
    private final LinkedList<Long> timestamps = new LinkedList<>();

    public SlidingWindowLimiter(int limit, long windowMillis) {
        this.limit = limit;
        this.windowMillis = windowMillis;
    }

    public synchronized boolean tryAcquire() {
        long now = System.currentTimeMillis();
        // 移除过期的时间戳
        while (!timestamps.isEmpty() && now - timestamps.peekFirst() > windowMillis) {
            timestamps.pollFirst();
        }
        if (timestamps.size() >= limit) {
            return false;
        }
        timestamps.addLast(now);
        return true;
    }
}

漏桶算法

  • 平滑突发流量, 控制请求以恒定速率处理
  • 适用于下游系统处理能力固定, 需要严格控制处理速率的场景, 比令牌桶更加严格
public class LeakyBucketLimiter {
    private final int capacity;
    private final long leakRateMillis;
    private int water;
    private long lastLeakTime;

    public LeakyBucketLimiter(int capacity, long leakRateMillis) {
        this.capacity = capacity;
        this.leakRateMillis = leakRateMillis;
        this.lastLeakTime = System.currentTimeMillis();
    }

    public synchronized boolean tryAcquire() {
        long now = System.currentTimeMillis();
        // 计算漏出的水量
        long elapsed = now - lastLeakTime;
        int leaked = (int)(elapsed / leakRateMillis);
        if (leaked > 0) {
            water = Math.max(0, water - leaked);
            lastLeakTime = now;
        }
        if (water >= capacity) {
            return false;
        }
        water++;
        return true;
    }
}

令牌桶算法

  • 以固定速率往桶中添加令牌, 相比于漏洞算法, 允许一定程度的突发流量, 同时限制长期平均速率
  • 应用场景比如电商秒杀, 第三方API调用限额, 资源下载限速等
public class TokenBucketLimiter {
    private final int capacity;
    private final long refillRateMillis;
    private int tokens;
    private long lastRefillTime;

    public TokenBucketLimiter(int capacity, long refillRateMillis) {
        this.capacity = capacity;
        this.refillRateMillis = refillRateMillis;
        this.tokens = capacity;
        this.lastRefillTime = System.currentTimeMillis();
    }

    public synchronized boolean tryAcquire() {
        long now = System.currentTimeMillis();
        // 计算补充的令牌数
        long elapsed = now - lastRefillTime;
        int refill = (int)(elapsed / refillRateMillis);
        if (refill > 0) {
            tokens = Math.min(capacity, tokens + refill);
            lastRefillTime = now;
        }
        if (tokens <= 0) {
            return false;
        }
        tokens--;
        return true;
    }
}

分布式计数器

  • Semaphere是本地计数, 分布式计数器只是将计数逻辑放到redis中
public class RedisRateLimiter {
    private final Jedis jedis;
    private final String key;
    private final int limit;
    private final long windowMillis;

    public RedisRateLimiter(Jedis jedis, String key, int limit, long windowMillis) {
        this.jedis = jedis;
        this.key = key;
        this.limit = limit;
        this.windowMillis = windowMillis;
    }

    public boolean tryAcquire() {
        String luaScript = "local current = redis.call('get', KEYS[1])\n" +
                "if current and tonumber(current) > tonumber(ARGV[1]) then\n" +
                "    return 0\n" +
                "else\n" +
                "    redis.call('incr', KEYS[1])\n" +
                "    if tonumber(current) == 0 then\n" +
                "        redis.call('pexpire', KEYS[1], ARGV[2])\n" +
                "    end\n" +
                "    return 1\n" +
                "end";
        Long result = (Long) jedis.eval(luaScript, 1, key, String.valueOf(limit - 1), String.valueOf(windowMillis));
        return result == 1;
    }
}