Source file
src/runtime/select.go
1
2
3
4
5 package runtime
6
7
8
9 import (
10 "internal/abi"
11 "internal/runtime/sys"
12 "unsafe"
13 )
14
15 const debugSelect = false
16
17
18
19
20 type scase struct {
21 c *hchan
22 elem unsafe.Pointer
23 }
24
25 var (
26 chansendpc = abi.FuncPCABIInternal(chansend)
27 chanrecvpc = abi.FuncPCABIInternal(chanrecv)
28 )
29
30 func selectsetpc(pc *uintptr) {
31 *pc = sys.GetCallerPC()
32 }
33
34 func sellock(scases []scase, lockorder []uint16) {
35 var c *hchan
36 for _, o := range lockorder {
37 c0 := scases[o].c
38 if c0 != c {
39 c = c0
40 lock(&c.lock)
41 }
42 }
43 }
44
45 func selunlock(scases []scase, lockorder []uint16) {
46
47
48
49
50
51
52
53
54 for i := len(lockorder) - 1; i >= 0; i-- {
55 c := scases[lockorder[i]].c
56 if i > 0 && c == scases[lockorder[i-1]].c {
57 continue
58 }
59 unlock(&c.lock)
60 }
61 }
62
63 func selparkcommit(gp *g, _ unsafe.Pointer) bool {
64
65
66
67
68
69 gp.activeStackChans = true
70
71
72
73 gp.parkingOnChan.Store(false)
74
75
76
77
78
79
80
81
82
83
84 var lastc *hchan
85 for sg := gp.waiting; sg != nil; sg = sg.waitlink {
86 if sg.c != lastc && lastc != nil {
87
88
89
90
91
92
93 unlock(&lastc.lock)
94 }
95 lastc = sg.c
96 }
97 if lastc != nil {
98 unlock(&lastc.lock)
99 }
100 return true
101 }
102
103 func block() {
104 gopark(nil, nil, waitReasonSelectNoCases, traceBlockForever, 1)
105 }
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
123 if debugSelect {
124 print("select: cas0=", cas0, "\n")
125 }
126
127
128
129 cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
130 order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
131
132 ncases := nsends + nrecvs
133 scases := cas1[:ncases:ncases]
134 pollorder := order1[:ncases:ncases]
135 lockorder := order1[ncases:][:ncases:ncases]
136
137
138
139
140
141 var pcs []uintptr
142 if raceenabled && pc0 != nil {
143 pc1 := (*[1 << 16]uintptr)(unsafe.Pointer(pc0))
144 pcs = pc1[:ncases:ncases]
145 }
146 casePC := func(casi int) uintptr {
147 if pcs == nil {
148 return 0
149 }
150 return pcs[casi]
151 }
152
153 var t0 int64
154 if blockprofilerate > 0 {
155 t0 = cputicks()
156 }
157
158
159
160
161
162
163
164
165
166
167 norder := 0
168 for i := range scases {
169 cas := &scases[i]
170
171
172 if cas.c == nil {
173 cas.elem = nil
174 continue
175 }
176
177 if cas.c.timer != nil {
178 cas.c.timer.maybeRunChan()
179 }
180
181 j := cheaprandn(uint32(norder + 1))
182 pollorder[norder] = pollorder[j]
183 pollorder[j] = uint16(i)
184 norder++
185 }
186 pollorder = pollorder[:norder]
187 lockorder = lockorder[:norder]
188
189
190
191 for i := range lockorder {
192 j := i
193
194 c := scases[pollorder[i]].c
195 for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
196 k := (j - 1) / 2
197 lockorder[j] = lockorder[k]
198 j = k
199 }
200 lockorder[j] = pollorder[i]
201 }
202 for i := len(lockorder) - 1; i >= 0; i-- {
203 o := lockorder[i]
204 c := scases[o].c
205 lockorder[i] = lockorder[0]
206 j := 0
207 for {
208 k := j*2 + 1
209 if k >= i {
210 break
211 }
212 if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
213 k++
214 }
215 if c.sortkey() < scases[lockorder[k]].c.sortkey() {
216 lockorder[j] = lockorder[k]
217 j = k
218 continue
219 }
220 break
221 }
222 lockorder[j] = o
223 }
224
225 if debugSelect {
226 for i := 0; i+1 < len(lockorder); i++ {
227 if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {
228 print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")
229 throw("select: broken sort")
230 }
231 }
232 }
233
234
235 sellock(scases, lockorder)
236
237 var (
238 gp *g
239 sg *sudog
240 c *hchan
241 k *scase
242 sglist *sudog
243 sgnext *sudog
244 qp unsafe.Pointer
245 nextp **sudog
246 )
247
248
249 var casi int
250 var cas *scase
251 var caseSuccess bool
252 var caseReleaseTime int64 = -1
253 var recvOK bool
254 for _, casei := range pollorder {
255 casi = int(casei)
256 cas = &scases[casi]
257 c = cas.c
258
259 if casi >= nsends {
260 sg = c.sendq.dequeue()
261 if sg != nil {
262 goto recv
263 }
264 if c.qcount > 0 {
265 goto bufrecv
266 }
267 if c.closed != 0 {
268 goto rclose
269 }
270 } else {
271 if raceenabled {
272 racereadpc(c.raceaddr(), casePC(casi), chansendpc)
273 }
274 if c.closed != 0 {
275 goto sclose
276 }
277 sg = c.recvq.dequeue()
278 if sg != nil {
279 goto send
280 }
281 if c.qcount < c.dataqsiz {
282 goto bufsend
283 }
284 }
285 }
286
287 if !block {
288 selunlock(scases, lockorder)
289 casi = -1
290 goto retc
291 }
292
293
294 gp = getg()
295 if gp.waiting != nil {
296 throw("gp.waiting != nil")
297 }
298 nextp = &gp.waiting
299 for _, casei := range lockorder {
300 casi = int(casei)
301 cas = &scases[casi]
302 c = cas.c
303 sg := acquireSudog()
304 sg.g = gp
305 sg.isSelect = true
306
307
308 sg.elem = cas.elem
309 sg.releasetime = 0
310 if t0 != 0 {
311 sg.releasetime = -1
312 }
313 sg.c = c
314
315 *nextp = sg
316 nextp = &sg.waitlink
317
318 if casi < nsends {
319 c.sendq.enqueue(sg)
320 } else {
321 c.recvq.enqueue(sg)
322 }
323
324 if c.timer != nil {
325 blockTimerChan(c)
326 }
327 }
328
329
330 gp.param = nil
331
332
333
334
335 gp.parkingOnChan.Store(true)
336 gopark(selparkcommit, nil, waitReasonSelect, traceBlockSelect, 1)
337 gp.activeStackChans = false
338
339 sellock(scases, lockorder)
340
341 gp.selectDone.Store(0)
342 sg = (*sudog)(gp.param)
343 gp.param = nil
344
345
346
347
348
349 casi = -1
350 cas = nil
351 caseSuccess = false
352 sglist = gp.waiting
353
354 for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
355 sg1.isSelect = false
356 sg1.elem = nil
357 sg1.c = nil
358 }
359 gp.waiting = nil
360
361 for _, casei := range lockorder {
362 k = &scases[casei]
363 if k.c.timer != nil {
364 unblockTimerChan(k.c)
365 }
366 if sg == sglist {
367
368 casi = int(casei)
369 cas = k
370 caseSuccess = sglist.success
371 if sglist.releasetime > 0 {
372 caseReleaseTime = sglist.releasetime
373 }
374 } else {
375 c = k.c
376 if int(casei) < nsends {
377 c.sendq.dequeueSudoG(sglist)
378 } else {
379 c.recvq.dequeueSudoG(sglist)
380 }
381 }
382 sgnext = sglist.waitlink
383 sglist.waitlink = nil
384 releaseSudog(sglist)
385 sglist = sgnext
386 }
387
388 if cas == nil {
389 throw("selectgo: bad wakeup")
390 }
391
392 c = cas.c
393
394 if debugSelect {
395 print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " send=", casi < nsends, "\n")
396 }
397
398 if casi < nsends {
399 if !caseSuccess {
400 goto sclose
401 }
402 } else {
403 recvOK = caseSuccess
404 }
405
406 if raceenabled {
407 if casi < nsends {
408 raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
409 } else if cas.elem != nil {
410 raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
411 }
412 }
413 if msanenabled {
414 if casi < nsends {
415 msanread(cas.elem, c.elemtype.Size_)
416 } else if cas.elem != nil {
417 msanwrite(cas.elem, c.elemtype.Size_)
418 }
419 }
420 if asanenabled {
421 if casi < nsends {
422 asanread(cas.elem, c.elemtype.Size_)
423 } else if cas.elem != nil {
424 asanwrite(cas.elem, c.elemtype.Size_)
425 }
426 }
427
428 selunlock(scases, lockorder)
429 goto retc
430
431 bufrecv:
432
433 if raceenabled {
434 if cas.elem != nil {
435 raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
436 }
437 racenotify(c, c.recvx, nil)
438 }
439 if msanenabled && cas.elem != nil {
440 msanwrite(cas.elem, c.elemtype.Size_)
441 }
442 if asanenabled && cas.elem != nil {
443 asanwrite(cas.elem, c.elemtype.Size_)
444 }
445 recvOK = true
446 qp = chanbuf(c, c.recvx)
447 if cas.elem != nil {
448 typedmemmove(c.elemtype, cas.elem, qp)
449 }
450 typedmemclr(c.elemtype, qp)
451 c.recvx++
452 if c.recvx == c.dataqsiz {
453 c.recvx = 0
454 }
455 c.qcount--
456 selunlock(scases, lockorder)
457 goto retc
458
459 bufsend:
460
461 if raceenabled {
462 racenotify(c, c.sendx, nil)
463 raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
464 }
465 if msanenabled {
466 msanread(cas.elem, c.elemtype.Size_)
467 }
468 if asanenabled {
469 asanread(cas.elem, c.elemtype.Size_)
470 }
471 typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
472 c.sendx++
473 if c.sendx == c.dataqsiz {
474 c.sendx = 0
475 }
476 c.qcount++
477 selunlock(scases, lockorder)
478 goto retc
479
480 recv:
481
482 recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
483 if debugSelect {
484 print("syncrecv: cas0=", cas0, " c=", c, "\n")
485 }
486 recvOK = true
487 goto retc
488
489 rclose:
490
491 selunlock(scases, lockorder)
492 recvOK = false
493 if cas.elem != nil {
494 typedmemclr(c.elemtype, cas.elem)
495 }
496 if raceenabled {
497 raceacquire(c.raceaddr())
498 }
499 goto retc
500
501 send:
502
503 if raceenabled {
504 raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
505 }
506 if msanenabled {
507 msanread(cas.elem, c.elemtype.Size_)
508 }
509 if asanenabled {
510 asanread(cas.elem, c.elemtype.Size_)
511 }
512 send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
513 if debugSelect {
514 print("syncsend: cas0=", cas0, " c=", c, "\n")
515 }
516 goto retc
517
518 retc:
519 if caseReleaseTime > 0 {
520 blockevent(caseReleaseTime-t0, 1)
521 }
522 return casi, recvOK
523
524 sclose:
525
526 selunlock(scases, lockorder)
527 panic(plainError("send on closed channel"))
528 }
529
530 func (c *hchan) sortkey() uintptr {
531 return uintptr(unsafe.Pointer(c))
532 }
533
534
535
536 type runtimeSelect struct {
537 dir selectDir
538 typ unsafe.Pointer
539 ch *hchan
540 val unsafe.Pointer
541 }
542
543
544 type selectDir int
545
546 const (
547 _ selectDir = iota
548 selectSend
549 selectRecv
550 selectDefault
551 )
552
553
554 func reflect_rselect(cases []runtimeSelect) (int, bool) {
555 if len(cases) == 0 {
556 block()
557 }
558 sel := make([]scase, len(cases))
559 orig := make([]int, len(cases))
560 nsends, nrecvs := 0, 0
561 dflt := -1
562 for i, rc := range cases {
563 var j int
564 switch rc.dir {
565 case selectDefault:
566 dflt = i
567 continue
568 case selectSend:
569 j = nsends
570 nsends++
571 case selectRecv:
572 nrecvs++
573 j = len(cases) - nrecvs
574 }
575
576 sel[j] = scase{c: rc.ch, elem: rc.val}
577 orig[j] = i
578 }
579
580
581 if nsends+nrecvs == 0 {
582 return dflt, false
583 }
584
585
586 if nsends+nrecvs < len(cases) {
587 copy(sel[nsends:], sel[len(cases)-nrecvs:])
588 copy(orig[nsends:], orig[len(cases)-nrecvs:])
589 }
590
591 order := make([]uint16, 2*(nsends+nrecvs))
592 var pc0 *uintptr
593 if raceenabled {
594 pcs := make([]uintptr, nsends+nrecvs)
595 for i := range pcs {
596 selectsetpc(&pcs[i])
597 }
598 pc0 = &pcs[0]
599 }
600
601 chosen, recvOK := selectgo(&sel[0], &order[0], pc0, nsends, nrecvs, dflt == -1)
602
603
604 if chosen < 0 {
605 chosen = dflt
606 } else {
607 chosen = orig[chosen]
608 }
609 return chosen, recvOK
610 }
611
612 func (q *waitq) dequeueSudoG(sgp *sudog) {
613 x := sgp.prev
614 y := sgp.next
615 if x != nil {
616 if y != nil {
617
618 x.next = y
619 y.prev = x
620 sgp.next = nil
621 sgp.prev = nil
622 return
623 }
624
625 x.next = nil
626 q.last = x
627 sgp.prev = nil
628 return
629 }
630 if y != nil {
631
632 y.prev = nil
633 q.first = y
634 sgp.next = nil
635 return
636 }
637
638
639
640 if q.first == sgp {
641 q.first = nil
642 q.last = nil
643 }
644 }
645
View as plain text