限流是保证服务可用性和稳定的利器之一。go自带有限流器rate,我们来研读一下源码,源码较少,我们列出主要代码部分,注释说明:
type Limit float64
const Inf = Limit(math.MaxFloat64)
type Limiter struct { mu sync.Mutex limit Limit burst int tokens float64 last time.Time lastEvent time.Time }
func NewLimiter(r Limit, b int) *Limiter { return &Limiter{ limit: r, burst: b, } }
type Reservation struct { ok bool lim *Limiter tokens int timeToAct time.Time limit Limit }
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { lim.mu.Lock()
if lim.limit == Inf { lim.mu.Unlock() return Reservation{ ok: true, lim: lim, tokens: n, timeToAct: now, } }
now, last, tokens := lim.advance(now)
tokens -= float64(n)
var waitDuration time.Duration if tokens < 0 { waitDuration = lim.limit.durationFromTokens(-tokens) }
ok := n <= lim.burst && waitDuration <= maxFutureReserve
r := Reservation{ ok: ok, lim: lim, limit: lim.limit, } if ok { r.tokens = n r.timeToAct = now.Add(waitDuration) }
if ok { lim.last = now lim.tokens = tokens lim.lastEvent = r.timeToAct } else { lim.last = last }
lim.mu.Unlock() return r }
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { last := lim.last if now.Before(last) { last = now }
maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens) elapsed := now.Sub(last) if elapsed > maxElapsed { elapsed = maxElapsed }
delta := lim.limit.tokensFromDuration(elapsed) tokens := lim.tokens + delta if burst := float64(lim.burst); tokens > burst { tokens = burst }
return now, last, tokens }
func (limit Limit) tokensFromDuration(d time.Duration) float64 { sec := float64(d/time.Second) * float64(limit) nsec := float64(d%time.Second) * float64(limit) return sec + nsec/1e9 }
|
使用实例和demo可看:
golang 限流器