aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorn1t0r <git@n1t0r.com>2023-04-16 18:19:43 +1000
committern1t0r <git@n1t0r.com>2023-04-16 18:19:46 +1000
commitc1794294b4c7606036ea9f4e36a8a8440aa57f08 (patch)
tree1f75e16fac90e5ba9e2200c0a46ba3d269bbc613
parent169b58daedada9437e94309ede1d0de6a5f39194 (diff)
downloadsteady-c1794294b4c7606036ea9f4e36a8a8440aa57f08.tar.gz
steady-c1794294b4c7606036ea9f4e36a8a8440aa57f08.tar.bz2
steady-c1794294b4c7606036ea9f4e36a8a8440aa57f08.zip
Add keys with no items left to inactive keys until the interval expires.
This fixes bypassing ratelimiting by popping all items in a key and pushing new items in the same key, by keeping track of old keys until their interval expires.
-rw-r--r--queue.go34
1 files changed, 31 insertions, 3 deletions
diff --git a/queue.go b/queue.go
index daab68d..a1628a5 100644
--- a/queue.go
+++ b/queue.go
@@ -57,6 +57,7 @@ type Queue[K comparable] struct {
snoozeQNotEmpty sync.Cond // Exit condition + not empty.
snoozeQMu sync.Mutex
snoozeQ priorityQueue[K]
+ inactiveKeys map[K]time.Time
partitionsMu sync.Mutex
partitions map[K]*partition[K]
@@ -89,6 +90,26 @@ func (q *Queue[K]) init() {
q.exitTimer = make(chan struct{})
q.snoozeQNotEmpty.L = &q.snoozeQMu
q.partitions = make(map[K]*partition[K])
+ q.inactiveKeys = make(map[K]time.Time)
+ go q.inactiveCleaner()
+}
+
+func (q *Queue[K]) inactiveCleaner() {
+ t := time.NewTicker(time.Second * 10)
+ defer t.Stop()
+ for range t.C {
+ q.snoozeQMu.Lock()
+ if q.exit {
+ return
+ }
+ // Delete expired keys.
+ for k, v := range q.inactiveKeys {
+ if time.Now().After(v) {
+ delete(q.inactiveKeys, k)
+ }
+ }
+ q.snoozeQMu.Unlock()
+ }
}
// Push pushes an item into the queue with a specific key for rate limiting.
@@ -147,7 +168,12 @@ func (q *Queue[K]) Push(item Item[K]) (ok bool) {
}
part.pq.Push(item, item.Priority())
if part.pq.Len()-1 == 0 {
- q.snoozeQ.Push(item.Key(), time.Now().UnixMicro())
+ if expiration, ok := q.inactiveKeys[item.Key()]; ok {
+ q.snoozeQ.Push(item.Key(), expiration.UnixMicro())
+ delete(q.inactiveKeys, item.Key())
+ } else {
+ q.snoozeQ.Push(item.Key(), time.Now().UnixMicro())
+ }
q.snoozeQNotEmpty.Signal()
}
q.snoozeQMu.Unlock()
@@ -213,12 +239,14 @@ func (q *Queue[K]) Pop() (item Item[K], ok bool) {
}
// Add it back to the snooze queue with the new dequeue time if it has
// any items left.
+ q.snoozeQMu.Lock()
if part.pq.Len() > 0 {
- q.snoozeQMu.Lock()
q.snoozeQ.Push(key, time.Now().Add(q.interval()).UnixMicro())
q.snoozeQNotEmpty.Signal()
- q.snoozeQMu.Unlock()
+ } else {
+ q.inactiveKeys[key] = time.Now().Add(q.interval())
}
+ q.snoozeQMu.Unlock()
part.mu.Unlock()
return i, true
}