找回密码
 立即注册
首页 业界区 业界 新来的外包,在大群分享了它的限流算法的实现 ...

新来的外包,在大群分享了它的限流算法的实现

益竹月 2025-11-19 23:05:00
1.webp

1. 令牌桶按用户维度限流

前文golang/x/time/rate演示了基于整体请求速率的令牌桶限流;
那基于用户id、ip、apikey请求速率的限流(更贴近生产的需求), 阁下又该如何应对?
那这个问题就从全局速率变成了按照用户维度(group by userid)来做限流,那么

  • 早先的全局的rateLimiter就要变成 userid:rateLimiter的键值对, select count( * ) from table ---> select userid, count(*) from  table group by userid
  • 使用缓存组件来存储维度键值对: 缓存的剔除机制来清理不再访问的键值对 (30min过期,10min周期清理内存)。
  1. var userLimiters = cache.New(time.Minute*30, 10) // 10 items per minute
  2. func limiterForUser(userID string) *rate.Limiter {
  3.         if v, found := userLimiters.Get(userID); found {
  4.                 return v.(*rate.Limiter)
  5.         }
  6.         l := rate.NewLimiter(rate.Every(time.Minute/60), 10)
  7.         userLimiters.Set(userID, l, cache.DefaultExpiration)
  8.         return l
  9. }
  10. // 更细化的限流: 针对同一用户的请求次数限速, 增加了细粒度的用户维度,需要维护 用户与对应限速器的映射关系
  11. func userRatelimitMiddleware(c *gin.Context) {
  12.         userID := c.GetString("userID")  //  从每个请求context的key中取得信息, 这个key对于req context是排他性的
  13.         if userID == "" {
  14.                 c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "Unauthorized"})
  15.                 return
  16.         }
  17.         if userID == "" {
  18.                 userID = c.GetString("x-api-key")
  19.         }
  20.         if userID == "" {
  21.                 userID = c.ClientIP()
  22.         }
  23.         limiter := limiterForUser(userID) // 通过userid维度找到对应的限速器
  24.         if !limiter.Allow() {
  25.                 c.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{"error": "Too many requests"})
  26.                 return
  27.         }
  28.         c.Next()
  29. }
复制代码
2. redis 作为限流器第三方存储

这个思路也是极其常见的行为: redis可以成为用户令牌桶的全局中心存储: 当多个负载层需要读写用户限流器时,与redis交互。
本次通过golang的实战,深入理解基于redis的令牌桶限流器的算法实现。
①  请求到达负载层,被负载层识别为userid=junio
②  负载层请求redis获取该用户的token bucket的当前状态:
hget userbucket:junio tokens last_time
③  基于当前时间now和last_time,计算流逝的时间,再根据rate计算这一阶段下发了多少tokens:delta=(now-last_time) * r/1000,加上redis原始记录的token,就是本次请求时bucket中能用的tokens, 注意:令牌数量最多不能超过cap
④ 如果tokens>=1, 表示桶中有令牌,可放行请求,tokens数量减1
⑤ 最后将本次处理完后的 tokens和last_time=now写入原用户令牌桶
hset  userbucket:junio  tokens  20 last_time 990
2.png

使用redis 中的hashmap存储用户的tokenbucket状态,应用存在读取redis- 计算- 回写redis过程,使用redis lua的脚本执行三个动作,以保证线程安全。
为什么lua脚本能保证线程安全呢?
主要得益于 Redis 的单线程架构和原子性执行机制: 加载并执行lua脚本时所有的redis操作作为一个整体完成; 整个脚本执行期间没有其他命令可以插入。
  1. // 读取- 计算 - 重新赋值都在一个 lua 脚本里面
  2. var redisScript = `
  3.         local key = KEYS[1]
  4.         local capacity = tonumber(ARGV[1])
  5.         local rate = tonumber(ARGV[2])
  6.         local now = tonumber(ARGV[3])
  7.         local tokens =  tonumber(redis.call('hget', key, 'tokens') or '-1')
  8.         local last_time = tonumber(redis.call('hget', key, 'last_time') or  '-1')
  9.         if tokens  == -1 or last_time == -1 then
  10.                 tokens = capacity
  11.                 last_time = now
  12.         else
  13.                 local elapsed = now - last_time
  14.         if elapsed < 0
  15.                         then elapsed = 0
  16.                 end
  17.                 local delta  = elapsed * rate / 1000
  18.                 tokens = tokens + delta
  19.                 if tokens > capacity then
  20.                         tokens = capacity
  21.                 end
  22.         last_time = now
  23.         end
  24.         local allow = 0
  25.         if tokens >= 1 then
  26.                 allow = 1
  27.                 tokens= tokens - 1
  28.         else       
  29.                 allow = 0
  30.         end
  31.         redis.call('hset', key, 'tokens', tokens)
  32.         redis.call('hset', key, 'last_time', last_time)
  33.     redis.call('PEXPIRE', key,  math.max(1000, 2 * math.ceil((capacity / rate) * 5000)))
  34.         return allow
  35. `
复制代码
注意

  • 上面还使用的redis expire机制: redis expire不是滑动过期,但是每次被请求触发执行的时候就重新设置TTL, 表现为“滑动过期”。
  • 除了hset/hget ,还有hmget可用,另外这些操作还有配套的TTL指令,eg:hset key EXAT  1740470400 FIELDS 2 field1 "Hello" field2 "World"。
golang应用层的写法如下:
  1. func (r *RedisLimiter) Allow(c *gin.Context, userid string) bool {
  2.         key := r.keyprefix + userid // 定位这个用户的token bucket
  3.         now := time.Now().UnixMilli()
  4.         // Check if the key exists in Redis
  5.         rCmd := r.redis.Eval(redisScript, []string{key}, r.cap, r.rate, now)
  6.         res, err := rCmd.Result()
  7.         if err != nil {
  8.                 log.Printf("get from redis failure. ", err)
  9.                 return false
  10.         }
  11.         if allow, ok := res.(int64); ok { // 注意:lua返回的0,1 值对应golang的int64
  12.                 log.Printf("%v %v \n", allow, res)
  13.                 return allow == 1
  14.         } else {
  15.                 log.Printf("get from redis failure. ", err)
  16.                 return false
  17.         }
  18. }
复制代码
至此限流第二弹结束了,本文紧接掘金爆文
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

2025-11-25 13:59:38

举报

2025-11-27 02:35:11

举报

您需要登录后才可以回帖 登录 | 立即注册