Text file
talks/2013/advconc.slide
1 Advanced Go Concurrency Patterns
2
3 Sameer Ajmani
4 Google
5 http://profiles.google.com/ajmani
6 @Sajma
7 https://go.dev
8
9 * Video
10
11 This talk was presented at Google I/O in May 2013.
12
13 .link https://www.youtube.com/watch?v=QDDwwePbDtw Watch the talk on YouTube
14
15 * Get ready
16
17 .image advconc/gopherswim.jpg 400 400
18
19 * Go supports concurrency
20
21 In the language and runtime, not a library.
22
23 This changes how you structure your programs.
24
25 * Goroutines and Channels
26
27 Goroutines are independently executing functions in the same address space.
28
29 go f()
30 go g(1, 2)
31
32 Channels are typed values that allow goroutines to synchronize and exchange information.
33
34 c := make(chan int)
35 go func() { c <- 3 }()
36 n := <-c
37
38 For more on the basics, watch [[/talks/2012/concurrency.slide#1][Go Concurrency Patterns (Pike, 2012)]].
39
40 * Example: ping-pong
41
42 .play advconc/pingpong/pingpong.go /STARTMAIN1/,/STOPMAIN1/
43
44 * Deadlock detection
45
46 .play advconc/pingpongdeadlock/pingpongdeadlock.go /STARTMAIN1/,/STOPMAIN1/
47
48 * Panic dumps the stacks
49
50 .play advconc/pingpongpanic/pingpongpanic.go /STARTMAIN1/,/STOPMAIN1/
51
52 * It's easy to go, but how to stop?
53
54 Long-lived programs need to clean up.
55
56 Let's look at how to write programs that handle communication, periodic events, and cancellation.
57
58 The core is Go's `select` statement: like a `switch`, but the decision is made based on the ability to communicate.
59
60 select {
61 case xc <- x:
62 // sent x on xc
63 case y := <-yc:
64 // received y from yc
65 }
66
67 * Example: feed reader
68
69 My favorite feed reader disappeared. I need a new one.
70
71 Why not write one?
72
73 Where do we start?
74
75 * Find an RSS client
76
77 Searching [[https://pkg.go.dev][pkg.go.dev]] for *"rss"* turns up several hits, including one that provides:
78
79 // Fetch fetches Items for uri and returns the time when the next
80 // fetch should be attempted. On failure, Fetch returns an error.
81 func Fetch(uri string) (items []Item, next time.Time, err error)
82
83 type Item struct{
84 Title, Channel, GUID string // a subset of RSS fields
85 }
86
87 But I want a stream:
88
89 <-chan Item
90
91 And I want multiple subscriptions.
92
93 * Here's what we have
94
95 type Fetcher interface {
96 Fetch() (items []Item, next time.Time, err error)
97 }
98
99 func Fetch(domain string) Fetcher {...} // fetches Items from domain
100
101 * Here's what we want
102
103 type Subscription interface {
104 Updates() <-chan Item // stream of Items
105 Close() error // shuts down the stream
106 }
107
108 func Subscribe(fetcher Fetcher) Subscription {...} // converts Fetches to a stream
109
110 func Merge(subs ...Subscription) Subscription {...} // merges several streams
111
112 * Example
113
114 .play advconc/fakemain/fakemain.go /func main/,/^}/
115
116 * Subscribe
117
118 `Subscribe` creates a new `Subscription` that repeatedly fetches items until `Close` is called.
119
120 func Subscribe(fetcher Fetcher) Subscription {
121 s := &sub{
122 fetcher: fetcher,
123 updates: make(chan Item), // for Updates
124 }
125 go s.loop()
126 return s
127 }
128
129 // sub implements the Subscription interface.
130 type sub struct {
131 fetcher Fetcher // fetches items
132 updates chan Item // delivers items to the user
133 }
134
135 // loop fetches items using s.fetcher and sends them
136 // on s.updates. loop exits when s.Close is called.
137 func (s *sub) loop() {...}
138
139 * Implementing Subscription
140
141 To implement the `Subscription` interface, define `Updates` and `Close`.
142
143 .code advconc/fakemain/fakemain.go /func.* Updates/,/^}/
144
145 func (s *sub) Close() error {
146 // TODO: make loop exit
147 // TODO: find out about any error
148 return err
149 }
150
151 * What does loop do?
152
153 - periodically call `Fetch`
154 - send fetched items on the `Updates` channel
155 - exit when `Close` is called, reporting any error
156
157 * Naive Implementation
158
159 # Not quite enough room for this; retry after format change:
160 # .play advconc/naivemain/naivemain.go /naiveSub\) loop/,/^}/
161 # also on subsequent slides.
162
163 .play advconc/naivemain/naivemain.go /STARTNAIVE /,/STOPNAIVE /
164 .code advconc/naivemain/naivemain.go /naiveSub\) Close/,/^}/
165
166 * Bug 1: unsynchronized access to s.closed/s.err
167
168 .code advconc/naivemain/naivemain.go /STARTNAIVE /,/STOPNAIVE / HLsync
169 .code advconc/naivemain/naivemain.go /naiveSub\) Close/,/^}/ HLsync
170
171 * Race Detector
172
173 go run -race naivemain.go
174
175 # original is 400x1500
176 .image advconc/race.png 150 562
177 .play advconc/naivemain/naivemain.go /STARTNAIVE /,/s.err/ HLsync
178 .code advconc/naivemain/naivemain.go /naiveSub\) Close/,/^}/ HLsync
179
180 * Bug 2: time.Sleep may keep loop running
181
182 .code advconc/naivemain/naivemain.go /STARTNAIVE /,/STOPNAIVE / HLsleep
183
184 * Bug 3: loop may block forever on s.updates
185
186 .code advconc/naivemain/naivemain.go /STARTNAIVE /,/STOPNAIVE / HLsend
187
188 * Solution
189
190 Change the body of `loop` to a `select` with three cases:
191
192 - `Close` was called
193 - it's time to call `Fetch`
194 - send an item on `s.updates`
195
196 * Structure: for-select loop
197
198 `loop` runs in its own goroutine.
199
200 `select` lets `loop` avoid blocking indefinitely in any one state.
201
202 func (s *sub) loop() {
203 ... declare mutable state ...
204 for {
205 ... set up channels for cases ...
206 select {
207 case <-c1:
208 ... read/write state ...
209 case c2 <- x:
210 ... read/write state ...
211 case y := <-c3:
212 ... read/write state ...
213 }
214 }
215 }
216
217 The cases interact via local state in `loop`.
218
219 * Case 1: Close
220
221 `Close` communicates with `loop` via `s.closing`.
222
223 type sub struct {
224 closing chan chan error
225 }
226
227 The service (`loop`) listens for requests on its channel (`s.closing`).
228
229 The client (`Close`) sends a request on `s.closing`: _exit_and_reply_with_the_error_
230
231 In this case, the only thing in the request is the _reply_channel_.
232
233 * Case 1: Close
234
235 `Close` asks loop to exit and waits for a response.
236
237 .code advconc/fakemain/fakemain.go /\*sub\) Close/,/^}/ HLchan
238
239 `loop` handles `Close` by replying with the `Fetch` error and exiting.
240
241 .code advconc/fakemain/fakemain.go /STARTCLOSEONLY /,/STOPCLOSEONLY / HLchan
242
243 * Case 2: Fetch
244
245 Schedule the next `Fetch` after some delay.
246
247 .code advconc/fakemain/fakemain.go /STARTFETCHONLY /,/STOPFETCHONLY /
248
249 * Case 3: Send
250
251 Send the fetched items, one at a time.
252
253 var pending []Item // appended by fetch; consumed by send
254 for {
255 select {
256 case s.updates <- pending[0]:
257 pending = pending[1:]
258 }
259 }
260
261 Whoops. This crashes.
262
263 .image advconc/gopherswrench.jpg 200 337
264
265 * Select and nil channels
266
267 Sends and receives on nil channels block.
268
269 Select never selects a blocking case.
270
271 .play advconc/nilselect/nilselect.go /func main/,/^}/
272
273 * Case 3: Send (fixed)
274
275 Enable send only when pending is non-empty.
276
277 .code advconc/fakemain/fakemain.go /STARTSENDONLY /,/STOPSENDONLY / HLupdates
278
279 * Select
280
281 Put the three cases together:
282
283 .code advconc/fakemain/fakemain.go /STARTSELECT /,/STOPSELECT /
284
285 The cases interact via `err`, `next`, and `pending`.
286
287 No locks, no condition variables, no callbacks.
288
289 * Bugs fixed
290
291 - Bug 1: unsynchronized access to `s.closed` and `s.err`
292 - Bug 2: `time.Sleep` may keep loop running
293 - Bug 3: `loop` may block forever sending on `s.updates`
294
295 .code advconc/fakemain/fakemain.go /STARTSELECT /,/STOPSELECT / HLcases
296
297 * We can improve loop further
298
299 * Issue: Fetch may return duplicates
300
301 .code advconc/fakemain/fakemain.go /STARTFETCHVARS /,/STOPFETCHVARS / HLfetch
302 .code advconc/fakemain/fakemain.go /STARTFETCHCASE /,/STOPFETCHCASE / HLfetch
303
304 * Fix: Filter items before adding to pending
305
306 .code advconc/fakemain/fakemain.go /STARTSEEN /,/STOPSEEN / HLseen
307 .code advconc/fakemain/fakemain.go /STARTDEDUPE /,/STOPDEDUPE / HLdupe
308
309 * Issue: Pending queue grows without bound
310
311 .code advconc/fakemain/fakemain.go /STARTDEDUPE /,/STOPDEDUPE / HLdupe
312
313 * Fix: Disable fetch case when too much pending
314
315 const maxPending = 10
316
317 .code advconc/fakemain/fakemain.go /STARTCAP /,/STOPCAP / HLcap
318
319 Could instead drop older items from the head of `pending`.
320
321 * Issue: Loop blocks on Fetch
322
323 .code advconc/fakemain/fakemain.go /STARTDEDUPE /,/STOPDEDUPE / HLfetch
324
325 * Fix: Run Fetch asynchronously
326
327 Add a new `select` case for `fetchDone`.
328
329 type fetchResult struct{ fetched []Item; next time.Time; err error }
330
331 .code advconc/fakemain/fakemain.go /STARTFETCHDONE /,/STOPFETCHDONE / HLfetch
332 .code advconc/fakemain/fakemain.go /STARTFETCHIF /,/STOPFETCHIF / HLfetch
333 .code advconc/fakemain/fakemain.go /STARTFETCHASYNC /,/STOPFETCHASYNC / HLfetch
334
335 * Implemented Subscribe
336
337 Responsive. Cleans up. Easy to read and change.
338
339 Three techniques:
340
341 - `for-select` loop
342 - service channel, reply channels (`chan`chan`error`)
343 - `nil` channels in `select` cases
344
345 More details online, including `Merge`.
346
347 .image advconc/gopherhat.jpg 200 158
348
349 * Conclusion
350
351 Concurrent programming can be tricky.
352
353 Go makes it easier:
354
355 - channels convey data, timer events, cancellation signals
356 - goroutines serialize access to local mutable state
357 - stack traces & deadlock detector
358 - race detector
359
360 .image advconc/race.png 200 750
361
362 * Links
363
364 Go Concurrency Patterns (2012)
365
366 .link /talks/2012/concurrency.slide go.dev/talks/2012/concurrency.slide
367
368 Concurrency is not parallelism
369
370 .link /s/concurrency-is-not-parallelism go.dev/s/concurrency-is-not-parallelism
371
372 Share memory by communicating
373
374 .link /doc/codewalk/sharemem go.dev/doc/codewalk/sharemem
375
376 Go Tour (learn Go in your browser)
377
378 .link /tour/ go.dev/tour
379
View as plain text