diff options
author | n1t0r <git@n1t0r.com> | 2023-03-24 21:55:23 +1100 |
---|---|---|
committer | n1t0r <git@n1t0r.com> | 2023-04-01 19:18:26 +1100 |
commit | 30b41ef603d24b74d5fdc20532e7c73438de8155 (patch) | |
tree | 45432a40ddb27b8d84d0d9cf5e064f5acc829046 | |
parent | bd7fb7e33678f59c71cec00112bf3f03ea67f9e6 (diff) | |
download | steady-30b41ef603d24b74d5fdc20532e7c73438de8155.tar.gz steady-30b41ef603d24b74d5fdc20532e7c73438de8155.tar.bz2 steady-30b41ef603d24b74d5fdc20532e7c73438de8155.zip |
Move generic key to item interface.
Instead of passing it as an argument, ensure each item defines the Key()
functions with the type the queue is initialised with.
-rw-r--r-- | priority_queue.go | 12 | ||||
-rw-r--r-- | queue.go | 37 | ||||
-rw-r--r-- | queue_test.go | 41 |
3 files changed, 46 insertions, 44 deletions
diff --git a/priority_queue.go b/priority_queue.go index 941b0ac..87de7e0 100644 --- a/priority_queue.go +++ b/priority_queue.go @@ -4,8 +4,16 @@ import ( "container/heap" ) -func InMemPriorityQueue() PriorityQueue[Item] { - return &priorityQueue[Item]{} +type PriorityQueue[V any] interface { + Len() int + Push(V, int64) + Pop() (V, int64) + Peek() (V, int64) + Close() error +} + +func InMemPriorityQueue[K comparable]() PriorityQueue[Item[K]] { + return &priorityQueue[Item[K]]{} } type priorityQueue[T any] struct { @@ -7,23 +7,16 @@ import ( var DefaultInterval = time.Second * 4 -type PriorityQueue[V any] interface { - Len() int - Push(V, int64) - Pop() (V, int64) - Peek() (V, int64) - Close() error -} - -type Item interface { +type Item[K comparable] interface { Priority() int64 + Key() K } -type partition struct { +type partition[K comparable] struct { mu sync.Mutex notFull sync.Cond exit bool - pq PriorityQueue[Item] + pq PriorityQueue[Item[K]] } type Queue[K comparable] struct { @@ -36,7 +29,7 @@ type Queue[K comparable] struct { Interval time.Duration // PriorityQueueCreateFunc is the function used to create a priority queue. // Default is InMemPriorityQueue. - PriorityQueueCreateFunc func() PriorityQueue[Item] + PriorityQueueCreateFunc func() PriorityQueue[Item[K]] // Control flow blocking cancellation. exitTimer chan struct{} @@ -49,12 +42,12 @@ type Queue[K comparable] struct { snoozeQ priorityQueue[K] partitionsMu sync.Mutex - partitions map[K]*partition + partitions map[K]*partition[K] } -func (q *Queue[K]) priorityQueueCreate() PriorityQueue[Item] { +func (q *Queue[K]) priorityQueueCreate() PriorityQueue[Item[K]] { if q.PriorityQueueCreateFunc == nil { - return InMemPriorityQueue() + return InMemPriorityQueue[K]() } return q.PriorityQueueCreateFunc() } @@ -69,22 +62,22 @@ func (q *Queue[K]) interval() time.Duration { func (q *Queue[K]) init() { q.exitTimer = make(chan struct{}) q.snoozeQNotEmpty.L = &q.snoozeQMu - q.partitions = make(map[K]*partition) + q.partitions = make(map[K]*partition[K]) } // Push pushes an item into the queue with a specific key for rate limiting. // This is a blocking operation if MaxItemsPerKey is met for the specified key. // To cancel all blocking operations call Stop and check for ok = false on // return. -func (q *Queue[K]) Push(key K, item Item) (ok bool) { +func (q *Queue[K]) Push(item Item[K]) (ok bool) { q.once.Do(q.init) q.partitionsMu.Lock() - part := q.partitions[key] + part := q.partitions[item.Key()] if part == nil { // New partition. - part = &partition{pq: q.priorityQueueCreate()} + part = &partition[K]{pq: q.priorityQueueCreate()} part.notFull.L = &part.mu - q.partitions[key] = part + q.partitions[item.Key()] = part } q.partitionsMu.Unlock() // Got partition. @@ -106,7 +99,7 @@ func (q *Queue[K]) Push(key K, item Item) (ok bool) { } part.pq.Push(item, item.Priority()) if part.pq.Len()-1 == 0 { - q.snoozeQ.Push(key, time.Now().UnixMicro()) + q.snoozeQ.Push(item.Key(), time.Now().UnixMicro()) q.snoozeQNotEmpty.Signal() } q.snoozeQMu.Unlock() @@ -123,7 +116,7 @@ func (q *Queue[K]) Push(key K, item Item) (ok bool) { // // To cancel all blocking operations call Stop and check for ok = false on // return. -func (q *Queue[K]) Pop() (item Item, ok bool) { +func (q *Queue[K]) Pop() (item Item[K], ok bool) { q.once.Do(q.init) // In case it's called after stop. q.snoozeQMu.Lock() diff --git a/queue_test.go b/queue_test.go index 406cc4a..03602c7 100644 --- a/queue_test.go +++ b/queue_test.go @@ -8,6 +8,10 @@ import ( "time" ) +func init() { + rand.Seed(time.Now().UnixNano()) +} + var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") func randStringRunes(n int) string { @@ -17,21 +21,6 @@ func randStringRunes(n int) string { } return string(b) } - -type testItem struct { - key string - value string - priority int64 -} - -func init() { - rand.Seed(time.Now().UnixNano()) -} - -func (t *testItem) Priority() int64 { - return t.priority -} - func randItemWithKey(key string) *testItem { return &testItem{ key: key, @@ -39,7 +28,6 @@ func randItemWithKey(key string) *testItem { priority: int64(rand.Intn(100)), } } - func randItemsWithKey(key string, n int) []*testItem { var tti []*testItem for i := 0; i < n; i++ { @@ -52,15 +40,28 @@ func randItemsWithKey(key string, n int) []*testItem { return tti } +type testItem struct { + key string + value string + priority int64 +} + +func (t *testItem) Priority() int64 { + return t.priority +} +func (t *testItem) Key() string { + return t.key +} + func TestQueueGeneric(t *testing.T) { q := &Queue[string]{Interval: time.Microsecond * 2} itemsk1 := randItemsWithKey("k1", 100) itemsk2 := randItemsWithKey("k2", 100) for _, it := range itemsk1 { - q.Push(it.key, it) + q.Push(it) } for _, it := range itemsk2 { - q.Push(it.key, it) + q.Push(it) } if q.Len() != 200 { t.Errorf("want, got: 200, %d", q.Len()) @@ -88,7 +89,7 @@ func TestQueueOrder(t *testing.T) { order0 := []int{1, 2, 3, 0} order1 := []int{2, 1, 0, 3} for _, it := range itt { - q.Push(it.key, it) + q.Push(it) } var got []*testItem for q.Len() > 0 { @@ -134,7 +135,7 @@ func TestQueueConcurrentWithOverlap(t *testing.T) { return case <-rate.C: it := randItemWithKey(fmt.Sprintf("k%d", cntIn%keylimit)) - if ok := q.Push(it.key, it); !ok { + if ok := q.Push(it); !ok { wg.Done() return } |