This commit is contained in:
Gregory 2021-10-31 17:55:40 +02:00
commit febcffda68
4 changed files with 196 additions and 0 deletions

4
README.md Normal file
View file

@ -0,0 +1,4 @@
This is queue data structure designed
by Bryan C. Mills (https://github.com/bcmills)
and showcased in talk "Rethinking Classical Concurrency Patterns"
(https://www.youtube.com/watch?v=5zXAHh5tJqQ)

3
go.mod Normal file
View file

@ -0,0 +1,3 @@
module github.com/fotonmoton/queue
go 1.16

85
queue.go Normal file
View file

@ -0,0 +1,85 @@
// This is queue data structure designed
// by Bryan C. Mills (https://github.com/bcmills)
// and showcased in talk "Rethinking Classical Concurrency Patterns"
// (https://www.youtube.com/watch?v=5zXAHh5tJqQ)
package queue
type Item struct {
val interface{}
}
// This type represents request for content from queue.
// When n number of items become avaliable in a queue
// they will be send to ch channel for caller to consume.
type waiter struct {
n int
ch chan []Item
}
// Holds all requests for items and all items
type state struct {
waiters []waiter
items []Item
}
// This is the interesting part. To make this queue goroutine-safe state of the
// queue should be owned by one goroutine at a time. One option is to use locks
// but here channel used as synchronization mechanism. Queue holds state chanel
// with capacity of 1. Because first read from state channel will be nonblocking
// and all subsequent calls will wait until state value become avaliable this
// prevents data races and makes this type safe to use concurrently.
// What an ingenious design!
type Queue struct {
state chan state
}
func NewQueue() *Queue {
queue := Queue{state: make(chan state, 1)}
// Shared state that will be passed
// betwee all goroutines to synchronize access
queue.state <- state{}
return &queue
}
func (q *Queue) Put(item Item) {
// This read from state channel works like mutex.Lock() call.
// No one can modify sate until we put it back to channel
state := <-q.state
state.items = append(state.items, item)
// If state has waiting requests and queue has enough items
// for first request we send it to waiter channel
for len(state.waiters) > 0 {
waiter := state.waiters[0]
if waiter.n < len(state.items) {
break
}
// This channel is blocking Get* calls until we put
// requested number of items to it
waiter.ch <- state.items[:waiter.n:waiter.n]
state.items = state.items[waiter.n:]
state.waiters = state.waiters[1:]
}
// Release state for another goroutines to use
q.state <- state
}
func (q *Queue) GetMany(n int) []Item {
// Acquire exclusive right to modify state
state := <-q.state
// We can return items right away without creating a waiter
if len(state.waiters) == 0 && len(state.items) >= n {
items := state.items[:n:n]
state.items = state.items[n:]
q.state <- state
return items
}
ch := make(chan []Item)
state.waiters = append(state.waiters, waiter{n, ch})
q.state <- state
// Wait for Put call to push items to ch channel
return <-ch
}
func (q *Queue) Get() Item {
return q.GetMany(1)[0]
}

104
queue_test.go Normal file
View file

@ -0,0 +1,104 @@
package queue
import (
"log"
"sync"
"testing"
"time"
)
func TestPut(t *testing.T) {
q := NewQueue()
q.Put(Item{1})
if q.Get().val.(int) != 1 {
t.Fatal("wrong item in queue")
}
}
func TestPutMultiple(t *testing.T) {
q := NewQueue()
q.Put(Item{1})
q.Put(Item{2})
first, second := q.Get().val.(int), q.Get().val.(int)
if first != 1 || second != 2 {
t.Fatal("wrong item in queue or wrong order")
}
}
func TestEmptyGet(t *testing.T) {
result := func() chan Item {
q := NewQueue()
c := make(chan Item)
go func() { c <- q.Get() }()
return c
}
select {
case <-result():
log.Fatal("empty queue should block")
case <-time.After(time.Millisecond):
}
}
func TestGetMany(t *testing.T) {
q := NewQueue()
result2 := func() chan []Item {
c := make(chan []Item)
go func() { c <- q.GetMany(2) }()
return c
}
q.Put(Item{1})
select {
case <-result2():
log.Fatal("GetMany should block if not enough items in queue")
case <-time.After(time.Millisecond):
}
// this call unblocks first GetMany call and empties queue
q.Put(Item{2})
// Put enough items in queue for result2 not to block
q.Put(Item{3})
q.Put(Item{4})
select {
case res := <-result2():
third, fourth := res[0].val.(int), res[1].val.(int)
if third != 3 || fourth != 4 {
t.Fatal("wrong item in queue or wrong order")
}
case <-time.After(time.Millisecond):
log.Fatal("GetMany shouldn't block when queue has enough items")
}
}
func TestConcurrent(t *testing.T) {
q := NewQueue()
wg := sync.WaitGroup{}
sum := 0
wg.Add(2000)
for i := 0; i < 1000; i++ {
go func(i int) {
defer wg.Done()
q.Put(Item{i})
}(i)
}
for i := 0; i < 1000; i++ {
go func() {
defer wg.Done()
sum += q.Get().val.(int)
}()
}
wg.Wait()
if sum != 1000*(0+999)/2 {
log.Fatalf("data race. Sum: %v", sum)
}
}