diff options
author | n1t0r <git@n1t0r.com> | 2023-03-29 22:13:32 +1100 |
---|---|---|
committer | n1t0r <git@n1t0r.com> | 2023-04-01 19:18:26 +1100 |
commit | 5352244d0166d34620897eeae0efa71801f48fae (patch) | |
tree | 0f91bcf3dcb28b05d58cc0885dde7ac0feb8c3c2 | |
parent | 30b41ef603d24b74d5fdc20532e7c73438de8155 (diff) | |
download | steady-5352244d0166d34620897eeae0efa71801f48fae.tar.gz steady-5352244d0166d34620897eeae0efa71801f48fae.tar.bz2 steady-5352244d0166d34620897eeae0efa71801f48fae.zip |
Add tag support with a limit.
-rw-r--r-- | priority_queue.go | 4 | ||||
-rw-r--r-- | queue.go | 69 | ||||
-rw-r--r-- | queue_test.go | 85 |
3 files changed, 133 insertions, 25 deletions
diff --git a/priority_queue.go b/priority_queue.go index 87de7e0..f4b4363 100644 --- a/priority_queue.go +++ b/priority_queue.go @@ -12,10 +12,6 @@ type PriorityQueue[V any] interface { Close() error } -func InMemPriorityQueue[K comparable]() PriorityQueue[Item[K]] { - return &priorityQueue[Item[K]]{} -} - type priorityQueue[T any] struct { items repr[T] } @@ -7,9 +7,17 @@ import ( var DefaultInterval = time.Second * 4 +func InMemPriorityQueue[K comparable]() PriorityQueue[Item[K]] { + return &priorityQueue[Item[K]]{} +} + type Item[K comparable] interface { - Priority() int64 Key() K + Priority() int64 +} + +type Tagged interface { + Tag() string } type partition[K comparable] struct { @@ -19,6 +27,14 @@ type partition[K comparable] struct { pq PriorityQueue[Item[K]] } +type tagCount struct { + mu sync.Mutex + exit bool + limit int + count int + notFull sync.Cond +} + type Queue[K comparable] struct { // MaxItemsPerKey specifies the maximum number of items per key queue. If // exceeded Enqueue will block that gorouting until there is space. If 0 @@ -31,6 +47,7 @@ type Queue[K comparable] struct { // Default is InMemPriorityQueue. PriorityQueueCreateFunc func() PriorityQueue[Item[K]] + tagCount map[string]*tagCount // Control flow blocking cancellation. exitTimer chan struct{} @@ -45,6 +62,15 @@ type Queue[K comparable] struct { partitions map[K]*partition[K] } +func (q *Queue[K]) RegisterTag(tag string, limit int) { + if q.tagCount == nil { + q.tagCount = make(map[string]*tagCount) + } + tc := &tagCount{limit: limit} + tc.notFull.L = &tc.mu + q.tagCount[tag] = tc +} + func (q *Queue[K]) priorityQueueCreate() PriorityQueue[Item[K]] { if q.PriorityQueueCreateFunc == nil { return InMemPriorityQueue[K]() @@ -71,6 +97,28 @@ func (q *Queue[K]) init() { // return. func (q *Queue[K]) Push(item Item[K]) (ok bool) { q.once.Do(q.init) + if t, ok1 := item.(Tagged); ok1 { + tc := q.tagCount[t.Tag()] + if tc == nil { + // Item is tagged but the tag doesn't exist. Programmer error; this + // should panic. + panic("steady: item is tagged but the tag is not registered. Call RegisterTag first.") + } + tc.mu.Lock() + for tc.count >= tc.limit && !tc.exit { + tc.notFull.Wait() + } + if tc.exit { + tc.mu.Unlock() + return false + } + // The count is technically incremented before the item is pushed into + // the queue, however the only way this function exits is if the queue + // is stopped, otherwise it's guaranteed to be enqueued after the next + // condition is satisfied, therefore it makes no difference. + tc.count++ + tc.mu.Unlock() + } q.partitionsMu.Lock() part := q.partitions[item.Key()] if part == nil { @@ -139,7 +187,7 @@ func (q *Queue[K]) Pop() (item Item[K], ok bool) { q.partitionsMu.Lock() part := q.partitions[key] if part == nil { - panic("steady: dequeue: key doesn't exist") + panic("steady: pop: key doesn't exist") } q.partitionsMu.Unlock() // Got partition. @@ -152,6 +200,17 @@ func (q *Queue[K]) Pop() (item Item[K], ok bool) { panic("steady: dequeue: MaxItemsPerKey not respected") } part.notFull.Signal() + // Signal the tagCount if the item is tagged + if t, ok := i.(Tagged); ok { + tc := q.tagCount[t.Tag()] + if tc == nil { + panic("steady: pop: tag doesn't exist") + } + tc.mu.Lock() + tc.count-- + tc.notFull.Signal() + tc.mu.Unlock() + } // Add it back to the snooze queue with the new dequeue time if it has // any items left. if part.pq.Len() > 0 { @@ -199,6 +258,12 @@ func (q *Queue[K]) Stop() { part.mu.Unlock() } q.partitionsMu.Unlock() + for _, tc := range q.tagCount { + tc.mu.Lock() + tc.exit = true + tc.notFull.Broadcast() + tc.mu.Unlock() + } // Exit snoozeQ conditions. q.snoozeQMu.Lock() q.exit = true diff --git a/queue_test.go b/queue_test.go index 03602c7..ab5eac1 100644 --- a/queue_test.go +++ b/queue_test.go @@ -12,6 +12,36 @@ func init() { rand.Seed(time.Now().UnixNano()) } +type testItem struct { + key string + value string + priority int64 +} + +func (t *testItem) Priority() int64 { + return t.priority +} +func (t *testItem) Key() string { + return t.key +} + +type testItemTagged struct { + key string + value string + priority int64 + tag string +} + +func (t *testItemTagged) Priority() int64 { + return t.priority +} +func (t *testItemTagged) Key() string { + return t.key +} +func (t *testItemTagged) Tag() string { + return t.tag +} + var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") func randStringRunes(n int) string { @@ -28,29 +58,27 @@ func randItemWithKey(key string) *testItem { priority: int64(rand.Intn(100)), } } +func randItemWithKeyTagged(key string, tag string) *testItemTagged { + return &testItemTagged{ + tag: tag, + key: key, + value: randStringRunes(10), + priority: int64(rand.Intn(100)), + } +} func randItemsWithKey(key string, n int) []*testItem { var tti []*testItem for i := 0; i < n; i++ { - tti = append(tti, &testItem{ - key: key, - value: randStringRunes(10), - priority: int64(rand.Intn(100)), - }) + tti = append(tti, randItemWithKey(key)) } return tti } - -type testItem struct { - key string - value string - priority int64 -} - -func (t *testItem) Priority() int64 { - return t.priority -} -func (t *testItem) Key() string { - return t.key +func randItemsWithKeyTagged(key string, tag string, n int) []*testItemTagged { + var tti []*testItemTagged + for i := 0; i < n; i++ { + tti = append(tti, randItemWithKeyTagged(key, tag)) + } + return tti } func TestQueueGeneric(t *testing.T) { @@ -76,7 +104,6 @@ func TestQueueGeneric(t *testing.T) { t.Errorf("want, got: 200, %d", len(got)) } } - func TestQueueOrder(t *testing.T) { q := &Queue[string]{Interval: time.Microsecond * 2} itt := []*testItem{ @@ -115,7 +142,6 @@ func TestQueueOrder(t *testing.T) { } } } - func TestQueueConcurrentWithOverlap(t *testing.T) { q := &Queue[string]{Interval: time.Microsecond * 2} var cntIn, cntOut int @@ -172,3 +198,24 @@ func TestQueueConcurrentWithOverlap(t *testing.T) { } } } +func TestQueuePushItemWithTagBlockingBehaviour(t *testing.T) { + q := &Queue[string]{Interval: time.Microsecond * 2} + tag := "t1" + q.RegisterTag(tag, 99) // Allow only 99. + itemskt := randItemsWithKeyTagged("k1", tag, 100) + done := make(chan struct{}) + go func() { + for _, it := range itemskt { + if ok := q.Push(it); !ok { + break + } + } + done <- struct{}{} + }() + <-time.After(time.Millisecond * 100) + if q.Len() != 99 { + t.Errorf("want, got: 99, %d", q.Len()) + } + q.Stop() + <-done +} |