diff options
author | n1t0r <git@n1t0r.com> | 2023-04-16 18:19:43 +1000 |
---|---|---|
committer | n1t0r <git@n1t0r.com> | 2023-04-16 18:19:46 +1000 |
commit | c1794294b4c7606036ea9f4e36a8a8440aa57f08 (patch) | |
tree | 1f75e16fac90e5ba9e2200c0a46ba3d269bbc613 | |
parent | 169b58daedada9437e94309ede1d0de6a5f39194 (diff) | |
download | steady-c1794294b4c7606036ea9f4e36a8a8440aa57f08.tar.gz steady-c1794294b4c7606036ea9f4e36a8a8440aa57f08.tar.bz2 steady-c1794294b4c7606036ea9f4e36a8a8440aa57f08.zip |
Add keys with no items left to inactive keys until the interval expires.
This fixes bypassing ratelimiting by popping all items in a key and
pushing new items in the same key, by keeping track of old keys until
their interval expires.
-rw-r--r-- | queue.go | 34 |
1 files changed, 31 insertions, 3 deletions
@@ -57,6 +57,7 @@ type Queue[K comparable] struct { snoozeQNotEmpty sync.Cond // Exit condition + not empty. snoozeQMu sync.Mutex snoozeQ priorityQueue[K] + inactiveKeys map[K]time.Time partitionsMu sync.Mutex partitions map[K]*partition[K] @@ -89,6 +90,26 @@ func (q *Queue[K]) init() { q.exitTimer = make(chan struct{}) q.snoozeQNotEmpty.L = &q.snoozeQMu q.partitions = make(map[K]*partition[K]) + q.inactiveKeys = make(map[K]time.Time) + go q.inactiveCleaner() +} + +func (q *Queue[K]) inactiveCleaner() { + t := time.NewTicker(time.Second * 10) + defer t.Stop() + for range t.C { + q.snoozeQMu.Lock() + if q.exit { + return + } + // Delete expired keys. + for k, v := range q.inactiveKeys { + if time.Now().After(v) { + delete(q.inactiveKeys, k) + } + } + q.snoozeQMu.Unlock() + } } // Push pushes an item into the queue with a specific key for rate limiting. @@ -147,7 +168,12 @@ func (q *Queue[K]) Push(item Item[K]) (ok bool) { } part.pq.Push(item, item.Priority()) if part.pq.Len()-1 == 0 { - q.snoozeQ.Push(item.Key(), time.Now().UnixMicro()) + if expiration, ok := q.inactiveKeys[item.Key()]; ok { + q.snoozeQ.Push(item.Key(), expiration.UnixMicro()) + delete(q.inactiveKeys, item.Key()) + } else { + q.snoozeQ.Push(item.Key(), time.Now().UnixMicro()) + } q.snoozeQNotEmpty.Signal() } q.snoozeQMu.Unlock() @@ -213,12 +239,14 @@ func (q *Queue[K]) Pop() (item Item[K], ok bool) { } // Add it back to the snooze queue with the new dequeue time if it has // any items left. + q.snoozeQMu.Lock() if part.pq.Len() > 0 { - q.snoozeQMu.Lock() q.snoozeQ.Push(key, time.Now().Add(q.interval()).UnixMicro()) q.snoozeQNotEmpty.Signal() - q.snoozeQMu.Unlock() + } else { + q.inactiveKeys[key] = time.Now().Add(q.interval()) } + q.snoozeQMu.Unlock() part.mu.Unlock() return i, true } |