Source file
src/runtime/chan.go
1
2
3
4
5 package runtime
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import (
21 "internal/abi"
22 "internal/runtime/atomic"
23 "internal/runtime/math"
24 "internal/runtime/sys"
25 "unsafe"
26 )
27
28 const (
29 maxAlign = 8
30 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
31 debugChan = false
32 )
33
34 type hchan struct {
35 qcount uint
36 dataqsiz uint
37 buf unsafe.Pointer
38 elemsize uint16
39 closed uint32
40 timer *timer
41 elemtype *_type
42 sendx uint
43 recvx uint
44 recvq waitq
45 sendq waitq
46 bubble *synctestBubble
47
48
49
50
51
52
53
54 lock mutex
55 }
56
57 type waitq struct {
58 first *sudog
59 last *sudog
60 }
61
62
63 func reflect_makechan(t *chantype, size int) *hchan {
64 return makechan(t, size)
65 }
66
67 func makechan64(t *chantype, size int64) *hchan {
68 if int64(int(size)) != size {
69 panic(plainError("makechan: size out of range"))
70 }
71
72 return makechan(t, int(size))
73 }
74
75 func makechan(t *chantype, size int) *hchan {
76 elem := t.Elem
77
78
79 if elem.Size_ >= 1<<16 {
80 throw("makechan: invalid channel element type")
81 }
82 if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
83 throw("makechan: bad alignment")
84 }
85
86 mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
87 if overflow || mem > maxAlloc-hchanSize || size < 0 {
88 panic(plainError("makechan: size out of range"))
89 }
90
91
92
93
94
95 var c *hchan
96 switch {
97 case mem == 0:
98
99 c = (*hchan)(mallocgc(hchanSize, nil, true))
100
101 c.buf = c.raceaddr()
102 case !elem.Pointers():
103
104
105 c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
106 c.buf = add(unsafe.Pointer(c), hchanSize)
107 default:
108
109 c = new(hchan)
110 c.buf = mallocgc(mem, elem, true)
111 }
112
113 c.elemsize = uint16(elem.Size_)
114 c.elemtype = elem
115 c.dataqsiz = uint(size)
116 if b := getg().bubble; b != nil {
117 c.bubble = b
118 }
119 lockInit(&c.lock, lockRankHchan)
120
121 if debugChan {
122 print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
123 }
124 return c
125 }
126
127
128
129
130
131
132
133
134
135
136
137
138 func chanbuf(c *hchan, i uint) unsafe.Pointer {
139 return add(c.buf, uintptr(i)*uintptr(c.elemsize))
140 }
141
142
143
144
145
146 func full(c *hchan) bool {
147
148
149 if c.dataqsiz == 0 {
150
151 return c.recvq.first == nil
152 }
153
154 return c.qcount == c.dataqsiz
155 }
156
157
158
159
160 func chansend1(c *hchan, elem unsafe.Pointer) {
161 chansend(c, elem, true, sys.GetCallerPC())
162 }
163
164
176 func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
177 if c == nil {
178 if !block {
179 return false
180 }
181 gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
182 throw("unreachable")
183 }
184
185 if debugChan {
186 print("chansend: chan=", c, "\n")
187 }
188
189 if raceenabled {
190 racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
191 }
192
193 if c.bubble != nil && getg().bubble != c.bubble {
194 panic(plainError("send on synctest channel from outside bubble"))
195 }
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213 if !block && c.closed == 0 && full(c) {
214 return false
215 }
216
217 var t0 int64
218 if blockprofilerate > 0 {
219 t0 = cputicks()
220 }
221
222 lock(&c.lock)
223
224 if c.closed != 0 {
225 unlock(&c.lock)
226 panic(plainError("send on closed channel"))
227 }
228
229 if sg := c.recvq.dequeue(); sg != nil {
230
231
232 send(c, sg, ep, func() { unlock(&c.lock) }, 3)
233 return true
234 }
235
236 if c.qcount < c.dataqsiz {
237
238 qp := chanbuf(c, c.sendx)
239 if raceenabled {
240 racenotify(c, c.sendx, nil)
241 }
242 typedmemmove(c.elemtype, qp, ep)
243 c.sendx++
244 if c.sendx == c.dataqsiz {
245 c.sendx = 0
246 }
247 c.qcount++
248 unlock(&c.lock)
249 return true
250 }
251
252 if !block {
253 unlock(&c.lock)
254 return false
255 }
256
257
258 gp := getg()
259 mysg := acquireSudog()
260 mysg.releasetime = 0
261 if t0 != 0 {
262 mysg.releasetime = -1
263 }
264
265
266 mysg.elem = ep
267 mysg.waitlink = nil
268 mysg.g = gp
269 mysg.isSelect = false
270 mysg.c = c
271 gp.waiting = mysg
272 gp.param = nil
273 c.sendq.enqueue(mysg)
274
275
276
277
278 gp.parkingOnChan.Store(true)
279 reason := waitReasonChanSend
280 if c.bubble != nil {
281 reason = waitReasonSynctestChanSend
282 }
283 gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanSend, 2)
284
285
286
287
288 KeepAlive(ep)
289
290
291 if mysg != gp.waiting {
292 throw("G waiting list is corrupted")
293 }
294 gp.waiting = nil
295 gp.activeStackChans = false
296 closed := !mysg.success
297 gp.param = nil
298 if mysg.releasetime > 0 {
299 blockevent(mysg.releasetime-t0, 2)
300 }
301 mysg.c = nil
302 releaseSudog(mysg)
303 if closed {
304 if c.closed == 0 {
305 throw("chansend: spurious wakeup")
306 }
307 panic(plainError("send on closed channel"))
308 }
309 return true
310 }
311
312
313
314
315
316
317
318 func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
319 if c.bubble != nil && getg().bubble != c.bubble {
320 unlockf()
321 panic(plainError("send on synctest channel from outside bubble"))
322 }
323 if raceenabled {
324 if c.dataqsiz == 0 {
325 racesync(c, sg)
326 } else {
327
328
329
330 racenotify(c, c.recvx, nil)
331 racenotify(c, c.recvx, sg)
332 c.recvx++
333 if c.recvx == c.dataqsiz {
334 c.recvx = 0
335 }
336 c.sendx = c.recvx
337 }
338 }
339 if sg.elem != nil {
340 sendDirect(c.elemtype, sg, ep)
341 sg.elem = nil
342 }
343 gp := sg.g
344 unlockf()
345 gp.param = unsafe.Pointer(sg)
346 sg.success = true
347 if sg.releasetime != 0 {
348 sg.releasetime = cputicks()
349 }
350 goready(gp, skip+1)
351 }
352
353
354
355
356
357
358 func timerchandrain(c *hchan) bool {
359
360
361
362
363
364 if atomic.Loaduint(&c.qcount) == 0 {
365 return false
366 }
367 lock(&c.lock)
368 any := false
369 for c.qcount > 0 {
370 any = true
371 typedmemclr(c.elemtype, chanbuf(c, c.recvx))
372 c.recvx++
373 if c.recvx == c.dataqsiz {
374 c.recvx = 0
375 }
376 c.qcount--
377 }
378 unlock(&c.lock)
379 return any
380 }
381
382
383
384
385
386
387
388
389
390
391
392 func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
393
394
395
396
397
398 dst := sg.elem
399 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
400
401
402 memmove(dst, src, t.Size_)
403 }
404
405 func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
406
407
408
409 src := sg.elem
410 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
411 memmove(dst, src, t.Size_)
412 }
413
414 func closechan(c *hchan) {
415 if c == nil {
416 panic(plainError("close of nil channel"))
417 }
418 if c.bubble != nil && getg().bubble != c.bubble {
419 panic(plainError("close of synctest channel from outside bubble"))
420 }
421
422 lock(&c.lock)
423 if c.closed != 0 {
424 unlock(&c.lock)
425 panic(plainError("close of closed channel"))
426 }
427
428 if raceenabled {
429 callerpc := sys.GetCallerPC()
430 racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
431 racerelease(c.raceaddr())
432 }
433
434 c.closed = 1
435
436 var glist gList
437
438
439 for {
440 sg := c.recvq.dequeue()
441 if sg == nil {
442 break
443 }
444 if sg.elem != nil {
445 typedmemclr(c.elemtype, sg.elem)
446 sg.elem = nil
447 }
448 if sg.releasetime != 0 {
449 sg.releasetime = cputicks()
450 }
451 gp := sg.g
452 gp.param = unsafe.Pointer(sg)
453 sg.success = false
454 if raceenabled {
455 raceacquireg(gp, c.raceaddr())
456 }
457 glist.push(gp)
458 }
459
460
461 for {
462 sg := c.sendq.dequeue()
463 if sg == nil {
464 break
465 }
466 sg.elem = nil
467 if sg.releasetime != 0 {
468 sg.releasetime = cputicks()
469 }
470 gp := sg.g
471 gp.param = unsafe.Pointer(sg)
472 sg.success = false
473 if raceenabled {
474 raceacquireg(gp, c.raceaddr())
475 }
476 glist.push(gp)
477 }
478 unlock(&c.lock)
479
480
481 for !glist.empty() {
482 gp := glist.pop()
483 gp.schedlink = 0
484 goready(gp, 3)
485 }
486 }
487
488
489
490
491
492 func empty(c *hchan) bool {
493
494 if c.dataqsiz == 0 {
495 return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
496 }
497
498
499 if c.timer != nil {
500 c.timer.maybeRunChan(c)
501 }
502 return atomic.Loaduint(&c.qcount) == 0
503 }
504
505
506
507
508 func chanrecv1(c *hchan, elem unsafe.Pointer) {
509 chanrecv(c, elem, true)
510 }
511
512
513 func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
514 _, received = chanrecv(c, elem, true)
515 return
516 }
517
518
519
520
521
522
523
524 func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
525
526
527
528 if debugChan {
529 print("chanrecv: chan=", c, "\n")
530 }
531
532 if c == nil {
533 if !block {
534 return
535 }
536 gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
537 throw("unreachable")
538 }
539
540 if c.bubble != nil && getg().bubble != c.bubble {
541 panic(plainError("receive on synctest channel from outside bubble"))
542 }
543
544 if c.timer != nil {
545 c.timer.maybeRunChan(c)
546 }
547
548
549 if !block && empty(c) {
550
551
552
553
554
555
556
557
558
559 if atomic.Load(&c.closed) == 0 {
560
561
562
563
564 return
565 }
566
567
568
569 if empty(c) {
570
571 if raceenabled {
572 raceacquire(c.raceaddr())
573 }
574 if ep != nil {
575 typedmemclr(c.elemtype, ep)
576 }
577 return true, false
578 }
579 }
580
581 var t0 int64
582 if blockprofilerate > 0 {
583 t0 = cputicks()
584 }
585
586 lock(&c.lock)
587
588 if c.closed != 0 {
589 if c.qcount == 0 {
590 if raceenabled {
591 raceacquire(c.raceaddr())
592 }
593 unlock(&c.lock)
594 if ep != nil {
595 typedmemclr(c.elemtype, ep)
596 }
597 return true, false
598 }
599
600 } else {
601
602 if sg := c.sendq.dequeue(); sg != nil {
603
604
605
606
607 recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
608 return true, true
609 }
610 }
611
612 if c.qcount > 0 {
613
614 qp := chanbuf(c, c.recvx)
615 if raceenabled {
616 racenotify(c, c.recvx, nil)
617 }
618 if ep != nil {
619 typedmemmove(c.elemtype, ep, qp)
620 }
621 typedmemclr(c.elemtype, qp)
622 c.recvx++
623 if c.recvx == c.dataqsiz {
624 c.recvx = 0
625 }
626 c.qcount--
627 unlock(&c.lock)
628 return true, true
629 }
630
631 if !block {
632 unlock(&c.lock)
633 return false, false
634 }
635
636
637 gp := getg()
638 mysg := acquireSudog()
639 mysg.releasetime = 0
640 if t0 != 0 {
641 mysg.releasetime = -1
642 }
643
644
645 mysg.elem = ep
646 mysg.waitlink = nil
647 gp.waiting = mysg
648
649 mysg.g = gp
650 mysg.isSelect = false
651 mysg.c = c
652 gp.param = nil
653 c.recvq.enqueue(mysg)
654 if c.timer != nil {
655 blockTimerChan(c)
656 }
657
658
659
660
661
662 gp.parkingOnChan.Store(true)
663 reason := waitReasonChanReceive
664 if c.bubble != nil {
665 reason = waitReasonSynctestChanReceive
666 }
667 gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanRecv, 2)
668
669
670 if mysg != gp.waiting {
671 throw("G waiting list is corrupted")
672 }
673 if c.timer != nil {
674 unblockTimerChan(c)
675 }
676 gp.waiting = nil
677 gp.activeStackChans = false
678 if mysg.releasetime > 0 {
679 blockevent(mysg.releasetime-t0, 2)
680 }
681 success := mysg.success
682 gp.param = nil
683 mysg.c = nil
684 releaseSudog(mysg)
685 return true, success
686 }
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702 func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
703 if c.bubble != nil && getg().bubble != c.bubble {
704 unlockf()
705 panic(plainError("receive on synctest channel from outside bubble"))
706 }
707 if c.dataqsiz == 0 {
708 if raceenabled {
709 racesync(c, sg)
710 }
711 if ep != nil {
712
713 recvDirect(c.elemtype, sg, ep)
714 }
715 } else {
716
717
718
719
720 qp := chanbuf(c, c.recvx)
721 if raceenabled {
722 racenotify(c, c.recvx, nil)
723 racenotify(c, c.recvx, sg)
724 }
725
726 if ep != nil {
727 typedmemmove(c.elemtype, ep, qp)
728 }
729
730 typedmemmove(c.elemtype, qp, sg.elem)
731 c.recvx++
732 if c.recvx == c.dataqsiz {
733 c.recvx = 0
734 }
735 c.sendx = c.recvx
736 }
737 sg.elem = nil
738 gp := sg.g
739 unlockf()
740 gp.param = unsafe.Pointer(sg)
741 sg.success = true
742 if sg.releasetime != 0 {
743 sg.releasetime = cputicks()
744 }
745 goready(gp, skip+1)
746 }
747
748 func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool {
749
750
751
752
753
754 gp.activeStackChans = true
755
756
757
758 gp.parkingOnChan.Store(false)
759
760
761
762
763
764 unlock((*mutex)(chanLock))
765 return true
766 }
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784 func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
785 return chansend(c, elem, false, sys.GetCallerPC())
786 }
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804 func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
805 return chanrecv(c, elem, false)
806 }
807
808
809 func reflect_chansend(c *hchan, elem unsafe.Pointer, nb bool) (selected bool) {
810 return chansend(c, elem, !nb, sys.GetCallerPC())
811 }
812
813
814 func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {
815 return chanrecv(c, elem, !nb)
816 }
817
818 func chanlen(c *hchan) int {
819 if c == nil {
820 return 0
821 }
822 async := debug.asynctimerchan.Load() != 0
823 if c.timer != nil && async {
824 c.timer.maybeRunChan(c)
825 }
826 if c.timer != nil && !async {
827
828
829
830 return 0
831 }
832 return int(c.qcount)
833 }
834
835 func chancap(c *hchan) int {
836 if c == nil {
837 return 0
838 }
839 if c.timer != nil {
840 async := debug.asynctimerchan.Load() != 0
841 if async {
842 return int(c.dataqsiz)
843 }
844
845
846
847 return 0
848 }
849 return int(c.dataqsiz)
850 }
851
852
853 func reflect_chanlen(c *hchan) int {
854 return chanlen(c)
855 }
856
857
858 func reflectlite_chanlen(c *hchan) int {
859 return chanlen(c)
860 }
861
862
863 func reflect_chancap(c *hchan) int {
864 return chancap(c)
865 }
866
867
868 func reflect_chanclose(c *hchan) {
869 closechan(c)
870 }
871
872 func (q *waitq) enqueue(sgp *sudog) {
873 sgp.next = nil
874 x := q.last
875 if x == nil {
876 sgp.prev = nil
877 q.first = sgp
878 q.last = sgp
879 return
880 }
881 sgp.prev = x
882 x.next = sgp
883 q.last = sgp
884 }
885
886 func (q *waitq) dequeue() *sudog {
887 for {
888 sgp := q.first
889 if sgp == nil {
890 return nil
891 }
892 y := sgp.next
893 if y == nil {
894 q.first = nil
895 q.last = nil
896 } else {
897 y.prev = nil
898 q.first = y
899 sgp.next = nil
900 }
901
902
903
904
905
906
907
908
909
910 if sgp.isSelect {
911 if !sgp.g.selectDone.CompareAndSwap(0, 1) {
912
913 continue
914 }
915 }
916
917 return sgp
918 }
919 }
920
921 func (c *hchan) raceaddr() unsafe.Pointer {
922
923
924
925
926
927 return unsafe.Pointer(&c.buf)
928 }
929
930 func racesync(c *hchan, sg *sudog) {
931 racerelease(chanbuf(c, 0))
932 raceacquireg(sg.g, chanbuf(c, 0))
933 racereleaseg(sg.g, chanbuf(c, 0))
934 raceacquire(chanbuf(c, 0))
935 }
936
937
938
939
940 func racenotify(c *hchan, idx uint, sg *sudog) {
941
942
943
944
945
946
947
948 qp := chanbuf(c, idx)
949
950
951
952
953
954
955 if c.elemsize == 0 {
956 if sg == nil {
957 raceacquire(qp)
958 racerelease(qp)
959 } else {
960 raceacquireg(sg.g, qp)
961 racereleaseg(sg.g, qp)
962 }
963 } else {
964 if sg == nil {
965 racereleaseacquire(qp)
966 } else {
967 racereleaseacquireg(sg.g, qp)
968 }
969 }
970 }
971
View as plain text