aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorn1t0r <git@n1t0r.com>2023-03-29 22:13:32 +1100
committern1t0r <git@n1t0r.com>2023-04-01 19:18:26 +1100
commit5352244d0166d34620897eeae0efa71801f48fae (patch)
tree0f91bcf3dcb28b05d58cc0885dde7ac0feb8c3c2
parent30b41ef603d24b74d5fdc20532e7c73438de8155 (diff)
downloadsteady-5352244d0166d34620897eeae0efa71801f48fae.tar.gz
steady-5352244d0166d34620897eeae0efa71801f48fae.tar.bz2
steady-5352244d0166d34620897eeae0efa71801f48fae.zip
Add tag support with a limit.
-rw-r--r--priority_queue.go4
-rw-r--r--queue.go69
-rw-r--r--queue_test.go85
3 files changed, 133 insertions, 25 deletions
diff --git a/priority_queue.go b/priority_queue.go
index 87de7e0..f4b4363 100644
--- a/priority_queue.go
+++ b/priority_queue.go
@@ -12,10 +12,6 @@ type PriorityQueue[V any] interface {
Close() error
}
-func InMemPriorityQueue[K comparable]() PriorityQueue[Item[K]] {
- return &priorityQueue[Item[K]]{}
-}
-
type priorityQueue[T any] struct {
items repr[T]
}
diff --git a/queue.go b/queue.go
index ed1cd9f..88dccba 100644
--- a/queue.go
+++ b/queue.go
@@ -7,9 +7,17 @@ import (
var DefaultInterval = time.Second * 4
+func InMemPriorityQueue[K comparable]() PriorityQueue[Item[K]] {
+ return &priorityQueue[Item[K]]{}
+}
+
type Item[K comparable] interface {
- Priority() int64
Key() K
+ Priority() int64
+}
+
+type Tagged interface {
+ Tag() string
}
type partition[K comparable] struct {
@@ -19,6 +27,14 @@ type partition[K comparable] struct {
pq PriorityQueue[Item[K]]
}
+type tagCount struct {
+ mu sync.Mutex
+ exit bool
+ limit int
+ count int
+ notFull sync.Cond
+}
+
type Queue[K comparable] struct {
// MaxItemsPerKey specifies the maximum number of items per key queue. If
// exceeded Enqueue will block that gorouting until there is space. If 0
@@ -31,6 +47,7 @@ type Queue[K comparable] struct {
// Default is InMemPriorityQueue.
PriorityQueueCreateFunc func() PriorityQueue[Item[K]]
+ tagCount map[string]*tagCount
// Control flow blocking cancellation.
exitTimer chan struct{}
@@ -45,6 +62,15 @@ type Queue[K comparable] struct {
partitions map[K]*partition[K]
}
+func (q *Queue[K]) RegisterTag(tag string, limit int) {
+ if q.tagCount == nil {
+ q.tagCount = make(map[string]*tagCount)
+ }
+ tc := &tagCount{limit: limit}
+ tc.notFull.L = &tc.mu
+ q.tagCount[tag] = tc
+}
+
func (q *Queue[K]) priorityQueueCreate() PriorityQueue[Item[K]] {
if q.PriorityQueueCreateFunc == nil {
return InMemPriorityQueue[K]()
@@ -71,6 +97,28 @@ func (q *Queue[K]) init() {
// return.
func (q *Queue[K]) Push(item Item[K]) (ok bool) {
q.once.Do(q.init)
+ if t, ok1 := item.(Tagged); ok1 {
+ tc := q.tagCount[t.Tag()]
+ if tc == nil {
+ // Item is tagged but the tag doesn't exist. Programmer error; this
+ // should panic.
+ panic("steady: item is tagged but the tag is not registered. Call RegisterTag first.")
+ }
+ tc.mu.Lock()
+ for tc.count >= tc.limit && !tc.exit {
+ tc.notFull.Wait()
+ }
+ if tc.exit {
+ tc.mu.Unlock()
+ return false
+ }
+ // The count is technically incremented before the item is pushed into
+ // the queue, however the only way this function exits is if the queue
+ // is stopped, otherwise it's guaranteed to be enqueued after the next
+ // condition is satisfied, therefore it makes no difference.
+ tc.count++
+ tc.mu.Unlock()
+ }
q.partitionsMu.Lock()
part := q.partitions[item.Key()]
if part == nil {
@@ -139,7 +187,7 @@ func (q *Queue[K]) Pop() (item Item[K], ok bool) {
q.partitionsMu.Lock()
part := q.partitions[key]
if part == nil {
- panic("steady: dequeue: key doesn't exist")
+ panic("steady: pop: key doesn't exist")
}
q.partitionsMu.Unlock()
// Got partition.
@@ -152,6 +200,17 @@ func (q *Queue[K]) Pop() (item Item[K], ok bool) {
panic("steady: dequeue: MaxItemsPerKey not respected")
}
part.notFull.Signal()
+ // Signal the tagCount if the item is tagged
+ if t, ok := i.(Tagged); ok {
+ tc := q.tagCount[t.Tag()]
+ if tc == nil {
+ panic("steady: pop: tag doesn't exist")
+ }
+ tc.mu.Lock()
+ tc.count--
+ tc.notFull.Signal()
+ tc.mu.Unlock()
+ }
// Add it back to the snooze queue with the new dequeue time if it has
// any items left.
if part.pq.Len() > 0 {
@@ -199,6 +258,12 @@ func (q *Queue[K]) Stop() {
part.mu.Unlock()
}
q.partitionsMu.Unlock()
+ for _, tc := range q.tagCount {
+ tc.mu.Lock()
+ tc.exit = true
+ tc.notFull.Broadcast()
+ tc.mu.Unlock()
+ }
// Exit snoozeQ conditions.
q.snoozeQMu.Lock()
q.exit = true
diff --git a/queue_test.go b/queue_test.go
index 03602c7..ab5eac1 100644
--- a/queue_test.go
+++ b/queue_test.go
@@ -12,6 +12,36 @@ func init() {
rand.Seed(time.Now().UnixNano())
}
+type testItem struct {
+ key string
+ value string
+ priority int64
+}
+
+func (t *testItem) Priority() int64 {
+ return t.priority
+}
+func (t *testItem) Key() string {
+ return t.key
+}
+
+type testItemTagged struct {
+ key string
+ value string
+ priority int64
+ tag string
+}
+
+func (t *testItemTagged) Priority() int64 {
+ return t.priority
+}
+func (t *testItemTagged) Key() string {
+ return t.key
+}
+func (t *testItemTagged) Tag() string {
+ return t.tag
+}
+
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
func randStringRunes(n int) string {
@@ -28,29 +58,27 @@ func randItemWithKey(key string) *testItem {
priority: int64(rand.Intn(100)),
}
}
+func randItemWithKeyTagged(key string, tag string) *testItemTagged {
+ return &testItemTagged{
+ tag: tag,
+ key: key,
+ value: randStringRunes(10),
+ priority: int64(rand.Intn(100)),
+ }
+}
func randItemsWithKey(key string, n int) []*testItem {
var tti []*testItem
for i := 0; i < n; i++ {
- tti = append(tti, &testItem{
- key: key,
- value: randStringRunes(10),
- priority: int64(rand.Intn(100)),
- })
+ tti = append(tti, randItemWithKey(key))
}
return tti
}
-
-type testItem struct {
- key string
- value string
- priority int64
-}
-
-func (t *testItem) Priority() int64 {
- return t.priority
-}
-func (t *testItem) Key() string {
- return t.key
+func randItemsWithKeyTagged(key string, tag string, n int) []*testItemTagged {
+ var tti []*testItemTagged
+ for i := 0; i < n; i++ {
+ tti = append(tti, randItemWithKeyTagged(key, tag))
+ }
+ return tti
}
func TestQueueGeneric(t *testing.T) {
@@ -76,7 +104,6 @@ func TestQueueGeneric(t *testing.T) {
t.Errorf("want, got: 200, %d", len(got))
}
}
-
func TestQueueOrder(t *testing.T) {
q := &Queue[string]{Interval: time.Microsecond * 2}
itt := []*testItem{
@@ -115,7 +142,6 @@ func TestQueueOrder(t *testing.T) {
}
}
}
-
func TestQueueConcurrentWithOverlap(t *testing.T) {
q := &Queue[string]{Interval: time.Microsecond * 2}
var cntIn, cntOut int
@@ -172,3 +198,24 @@ func TestQueueConcurrentWithOverlap(t *testing.T) {
}
}
}
+func TestQueuePushItemWithTagBlockingBehaviour(t *testing.T) {
+ q := &Queue[string]{Interval: time.Microsecond * 2}
+ tag := "t1"
+ q.RegisterTag(tag, 99) // Allow only 99.
+ itemskt := randItemsWithKeyTagged("k1", tag, 100)
+ done := make(chan struct{})
+ go func() {
+ for _, it := range itemskt {
+ if ok := q.Push(it); !ok {
+ break
+ }
+ }
+ done <- struct{}{}
+ }()
+ <-time.After(time.Millisecond * 100)
+ if q.Len() != 99 {
+ t.Errorf("want, got: 99, %d", q.Len())
+ }
+ q.Stop()
+ <-done
+}