1
2
3
4
5 package par
6
7 import (
8 "sync"
9 "testing"
10 )
11
12 func TestQueueIdle(t *testing.T) {
13 q := NewQueue(1)
14 select {
15 case <-q.Idle():
16 default:
17 t.Errorf("NewQueue(1) is not initially idle.")
18 }
19
20 started := make(chan struct{})
21 unblock := make(chan struct{})
22 q.Add(func() {
23 close(started)
24 <-unblock
25 })
26
27 <-started
28 idle := q.Idle()
29 select {
30 case <-idle:
31 t.Errorf("NewQueue(1) is marked idle while processing work.")
32 default:
33 }
34
35 close(unblock)
36 <-idle
37 }
38
39 func TestQueueBacklog(t *testing.T) {
40 const (
41 maxActive = 2
42 totalWork = 3 * maxActive
43 )
44
45 q := NewQueue(maxActive)
46 t.Logf("q = NewQueue(%d)", maxActive)
47
48 var wg sync.WaitGroup
49 wg.Add(totalWork)
50 started := make([]chan struct{}, totalWork)
51 unblock := make(chan struct{})
52 for i := range started {
53 started[i] = make(chan struct{})
54 i := i
55 q.Add(func() {
56 close(started[i])
57 <-unblock
58 wg.Done()
59 })
60 }
61
62 for i, c := range started {
63 if i < maxActive {
64 <-c
65 } else {
66 select {
67 case <-c:
68 t.Errorf("Work item %d started before previous items finished.", i)
69 default:
70 }
71 }
72 }
73
74 close(unblock)
75 for _, c := range started[maxActive:] {
76 <-c
77 }
78 wg.Wait()
79 }
80
View as plain text