commit febcffda68f981f09e784ce3616c455e8ba3d2f0 Author: Gregory Date: Sun Oct 31 17:55:40 2021 +0200 init diff --git a/README.md b/README.md new file mode 100644 index 0000000..bce0433 --- /dev/null +++ b/README.md @@ -0,0 +1,4 @@ +This is queue data structure designed +by Bryan C. Mills (https://github.com/bcmills) +and showcased in talk "Rethinking Classical Concurrency Patterns" +(https://www.youtube.com/watch?v=5zXAHh5tJqQ) \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..d2ab22b --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/fotonmoton/queue + +go 1.16 diff --git a/queue.go b/queue.go new file mode 100644 index 0000000..76f4d90 --- /dev/null +++ b/queue.go @@ -0,0 +1,85 @@ +// This is queue data structure designed +// by Bryan C. Mills (https://github.com/bcmills) +// and showcased in talk "Rethinking Classical Concurrency Patterns" +// (https://www.youtube.com/watch?v=5zXAHh5tJqQ) +package queue + +type Item struct { + val interface{} +} + +// This type represents request for content from queue. +// When n number of items become avaliable in a queue +// they will be send to ch channel for caller to consume. +type waiter struct { + n int + ch chan []Item +} + +// Holds all requests for items and all items +type state struct { + waiters []waiter + items []Item +} + +// This is the interesting part. To make this queue goroutine-safe state of the +// queue should be owned by one goroutine at a time. One option is to use locks +// but here channel used as synchronization mechanism. Queue holds state chanel +// with capacity of 1. Because first read from state channel will be nonblocking +// and all subsequent calls will wait until state value become avaliable this +// prevents data races and makes this type safe to use concurrently. +// What an ingenious design! +type Queue struct { + state chan state +} + +func NewQueue() *Queue { + queue := Queue{state: make(chan state, 1)} + // Shared state that will be passed + // betwee all goroutines to synchronize access + queue.state <- state{} + return &queue +} + +func (q *Queue) Put(item Item) { + // This read from state channel works like mutex.Lock() call. + // No one can modify sate until we put it back to channel + state := <-q.state + state.items = append(state.items, item) + // If state has waiting requests and queue has enough items + // for first request we send it to waiter channel + for len(state.waiters) > 0 { + waiter := state.waiters[0] + if waiter.n < len(state.items) { + break + } + // This channel is blocking Get* calls until we put + // requested number of items to it + waiter.ch <- state.items[:waiter.n:waiter.n] + state.items = state.items[waiter.n:] + state.waiters = state.waiters[1:] + } + // Release state for another goroutines to use + q.state <- state +} + +func (q *Queue) GetMany(n int) []Item { + // Acquire exclusive right to modify state + state := <-q.state + // We can return items right away without creating a waiter + if len(state.waiters) == 0 && len(state.items) >= n { + items := state.items[:n:n] + state.items = state.items[n:] + q.state <- state + return items + } + ch := make(chan []Item) + state.waiters = append(state.waiters, waiter{n, ch}) + q.state <- state + // Wait for Put call to push items to ch channel + return <-ch +} + +func (q *Queue) Get() Item { + return q.GetMany(1)[0] +} diff --git a/queue_test.go b/queue_test.go new file mode 100644 index 0000000..4fa0441 --- /dev/null +++ b/queue_test.go @@ -0,0 +1,104 @@ +package queue + +import ( + "log" + "sync" + "testing" + "time" +) + +func TestPut(t *testing.T) { + q := NewQueue() + q.Put(Item{1}) + if q.Get().val.(int) != 1 { + t.Fatal("wrong item in queue") + } +} + +func TestPutMultiple(t *testing.T) { + q := NewQueue() + q.Put(Item{1}) + q.Put(Item{2}) + first, second := q.Get().val.(int), q.Get().val.(int) + if first != 1 || second != 2 { + t.Fatal("wrong item in queue or wrong order") + } +} + +func TestEmptyGet(t *testing.T) { + + result := func() chan Item { + q := NewQueue() + c := make(chan Item) + go func() { c <- q.Get() }() + return c + } + + select { + case <-result(): + log.Fatal("empty queue should block") + case <-time.After(time.Millisecond): + } +} + +func TestGetMany(t *testing.T) { + q := NewQueue() + + result2 := func() chan []Item { + c := make(chan []Item) + go func() { c <- q.GetMany(2) }() + return c + } + + q.Put(Item{1}) + + select { + case <-result2(): + log.Fatal("GetMany should block if not enough items in queue") + case <-time.After(time.Millisecond): + } + + // this call unblocks first GetMany call and empties queue + q.Put(Item{2}) + // Put enough items in queue for result2 not to block + q.Put(Item{3}) + q.Put(Item{4}) + + select { + case res := <-result2(): + third, fourth := res[0].val.(int), res[1].val.(int) + if third != 3 || fourth != 4 { + t.Fatal("wrong item in queue or wrong order") + } + case <-time.After(time.Millisecond): + log.Fatal("GetMany shouldn't block when queue has enough items") + } +} + +func TestConcurrent(t *testing.T) { + q := NewQueue() + wg := sync.WaitGroup{} + sum := 0 + + wg.Add(2000) + + for i := 0; i < 1000; i++ { + go func(i int) { + defer wg.Done() + q.Put(Item{i}) + }(i) + } + + for i := 0; i < 1000; i++ { + go func() { + defer wg.Done() + sum += q.Get().val.(int) + }() + } + + wg.Wait() + + if sum != 1000*(0+999)/2 { + log.Fatalf("data race. Sum: %v", sum) + } +}