aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorn1t0r <git@n1t0r.com>2023-03-24 21:55:23 +1100
committern1t0r <git@n1t0r.com>2023-04-01 19:18:26 +1100
commit30b41ef603d24b74d5fdc20532e7c73438de8155 (patch)
tree45432a40ddb27b8d84d0d9cf5e064f5acc829046
parentbd7fb7e33678f59c71cec00112bf3f03ea67f9e6 (diff)
downloadsteady-30b41ef603d24b74d5fdc20532e7c73438de8155.tar.gz
steady-30b41ef603d24b74d5fdc20532e7c73438de8155.tar.bz2
steady-30b41ef603d24b74d5fdc20532e7c73438de8155.zip
Move generic key to item interface.
Instead of passing it as an argument, ensure each item defines the Key() functions with the type the queue is initialised with.
-rw-r--r--priority_queue.go12
-rw-r--r--queue.go37
-rw-r--r--queue_test.go41
3 files changed, 46 insertions, 44 deletions
diff --git a/priority_queue.go b/priority_queue.go
index 941b0ac..87de7e0 100644
--- a/priority_queue.go
+++ b/priority_queue.go
@@ -4,8 +4,16 @@ import (
"container/heap"
)
-func InMemPriorityQueue() PriorityQueue[Item] {
- return &priorityQueue[Item]{}
+type PriorityQueue[V any] interface {
+ Len() int
+ Push(V, int64)
+ Pop() (V, int64)
+ Peek() (V, int64)
+ Close() error
+}
+
+func InMemPriorityQueue[K comparable]() PriorityQueue[Item[K]] {
+ return &priorityQueue[Item[K]]{}
}
type priorityQueue[T any] struct {
diff --git a/queue.go b/queue.go
index 0b59e6e..ed1cd9f 100644
--- a/queue.go
+++ b/queue.go
@@ -7,23 +7,16 @@ import (
var DefaultInterval = time.Second * 4
-type PriorityQueue[V any] interface {
- Len() int
- Push(V, int64)
- Pop() (V, int64)
- Peek() (V, int64)
- Close() error
-}
-
-type Item interface {
+type Item[K comparable] interface {
Priority() int64
+ Key() K
}
-type partition struct {
+type partition[K comparable] struct {
mu sync.Mutex
notFull sync.Cond
exit bool
- pq PriorityQueue[Item]
+ pq PriorityQueue[Item[K]]
}
type Queue[K comparable] struct {
@@ -36,7 +29,7 @@ type Queue[K comparable] struct {
Interval time.Duration
// PriorityQueueCreateFunc is the function used to create a priority queue.
// Default is InMemPriorityQueue.
- PriorityQueueCreateFunc func() PriorityQueue[Item]
+ PriorityQueueCreateFunc func() PriorityQueue[Item[K]]
// Control flow blocking cancellation.
exitTimer chan struct{}
@@ -49,12 +42,12 @@ type Queue[K comparable] struct {
snoozeQ priorityQueue[K]
partitionsMu sync.Mutex
- partitions map[K]*partition
+ partitions map[K]*partition[K]
}
-func (q *Queue[K]) priorityQueueCreate() PriorityQueue[Item] {
+func (q *Queue[K]) priorityQueueCreate() PriorityQueue[Item[K]] {
if q.PriorityQueueCreateFunc == nil {
- return InMemPriorityQueue()
+ return InMemPriorityQueue[K]()
}
return q.PriorityQueueCreateFunc()
}
@@ -69,22 +62,22 @@ func (q *Queue[K]) interval() time.Duration {
func (q *Queue[K]) init() {
q.exitTimer = make(chan struct{})
q.snoozeQNotEmpty.L = &q.snoozeQMu
- q.partitions = make(map[K]*partition)
+ q.partitions = make(map[K]*partition[K])
}
// Push pushes an item into the queue with a specific key for rate limiting.
// This is a blocking operation if MaxItemsPerKey is met for the specified key.
// To cancel all blocking operations call Stop and check for ok = false on
// return.
-func (q *Queue[K]) Push(key K, item Item) (ok bool) {
+func (q *Queue[K]) Push(item Item[K]) (ok bool) {
q.once.Do(q.init)
q.partitionsMu.Lock()
- part := q.partitions[key]
+ part := q.partitions[item.Key()]
if part == nil {
// New partition.
- part = &partition{pq: q.priorityQueueCreate()}
+ part = &partition[K]{pq: q.priorityQueueCreate()}
part.notFull.L = &part.mu
- q.partitions[key] = part
+ q.partitions[item.Key()] = part
}
q.partitionsMu.Unlock()
// Got partition.
@@ -106,7 +99,7 @@ func (q *Queue[K]) Push(key K, item Item) (ok bool) {
}
part.pq.Push(item, item.Priority())
if part.pq.Len()-1 == 0 {
- q.snoozeQ.Push(key, time.Now().UnixMicro())
+ q.snoozeQ.Push(item.Key(), time.Now().UnixMicro())
q.snoozeQNotEmpty.Signal()
}
q.snoozeQMu.Unlock()
@@ -123,7 +116,7 @@ func (q *Queue[K]) Push(key K, item Item) (ok bool) {
//
// To cancel all blocking operations call Stop and check for ok = false on
// return.
-func (q *Queue[K]) Pop() (item Item, ok bool) {
+func (q *Queue[K]) Pop() (item Item[K], ok bool) {
q.once.Do(q.init)
// In case it's called after stop.
q.snoozeQMu.Lock()
diff --git a/queue_test.go b/queue_test.go
index 406cc4a..03602c7 100644
--- a/queue_test.go
+++ b/queue_test.go
@@ -8,6 +8,10 @@ import (
"time"
)
+func init() {
+ rand.Seed(time.Now().UnixNano())
+}
+
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
func randStringRunes(n int) string {
@@ -17,21 +21,6 @@ func randStringRunes(n int) string {
}
return string(b)
}
-
-type testItem struct {
- key string
- value string
- priority int64
-}
-
-func init() {
- rand.Seed(time.Now().UnixNano())
-}
-
-func (t *testItem) Priority() int64 {
- return t.priority
-}
-
func randItemWithKey(key string) *testItem {
return &testItem{
key: key,
@@ -39,7 +28,6 @@ func randItemWithKey(key string) *testItem {
priority: int64(rand.Intn(100)),
}
}
-
func randItemsWithKey(key string, n int) []*testItem {
var tti []*testItem
for i := 0; i < n; i++ {
@@ -52,15 +40,28 @@ func randItemsWithKey(key string, n int) []*testItem {
return tti
}
+type testItem struct {
+ key string
+ value string
+ priority int64
+}
+
+func (t *testItem) Priority() int64 {
+ return t.priority
+}
+func (t *testItem) Key() string {
+ return t.key
+}
+
func TestQueueGeneric(t *testing.T) {
q := &Queue[string]{Interval: time.Microsecond * 2}
itemsk1 := randItemsWithKey("k1", 100)
itemsk2 := randItemsWithKey("k2", 100)
for _, it := range itemsk1 {
- q.Push(it.key, it)
+ q.Push(it)
}
for _, it := range itemsk2 {
- q.Push(it.key, it)
+ q.Push(it)
}
if q.Len() != 200 {
t.Errorf("want, got: 200, %d", q.Len())
@@ -88,7 +89,7 @@ func TestQueueOrder(t *testing.T) {
order0 := []int{1, 2, 3, 0}
order1 := []int{2, 1, 0, 3}
for _, it := range itt {
- q.Push(it.key, it)
+ q.Push(it)
}
var got []*testItem
for q.Len() > 0 {
@@ -134,7 +135,7 @@ func TestQueueConcurrentWithOverlap(t *testing.T) {
return
case <-rate.C:
it := randItemWithKey(fmt.Sprintf("k%d", cntIn%keylimit))
- if ok := q.Push(it.key, it); !ok {
+ if ok := q.Push(it); !ok {
wg.Done()
return
}