Source file src/runtime/chan.go
1 // Copyright 2014 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 package runtime 6 7 // This file contains the implementation of Go channels. 8 9 // Invariants: 10 // At least one of c.sendq and c.recvq is empty, 11 // except for the case of an unbuffered channel with a single goroutine 12 // blocked on it for both sending and receiving using a select statement, 13 // in which case the length of c.sendq and c.recvq is limited only by the 14 // size of the select statement. 15 // 16 // For buffered channels, also: 17 // c.qcount > 0 implies that c.recvq is empty. 18 // c.qcount < c.dataqsiz implies that c.sendq is empty. 19 20 import ( 21 "internal/abi" 22 "internal/runtime/atomic" 23 "internal/runtime/math" 24 "internal/runtime/sys" 25 "unsafe" 26 ) 27 28 const ( 29 maxAlign = 8 30 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) 31 debugChan = false 32 ) 33 34 type hchan struct { 35 qcount uint // total data in the queue 36 dataqsiz uint // size of the circular queue 37 buf unsafe.Pointer // points to an array of dataqsiz elements 38 elemsize uint16 39 closed uint32 40 timer *timer // timer feeding this chan 41 elemtype *_type // element type 42 sendx uint // send index 43 recvx uint // receive index 44 recvq waitq // list of recv waiters 45 sendq waitq // list of send waiters 46 47 // lock protects all fields in hchan, as well as several 48 // fields in sudogs blocked on this channel. 49 // 50 // Do not change another G's status while holding this lock 51 // (in particular, do not ready a G), as this can deadlock 52 // with stack shrinking. 53 lock mutex 54 } 55 56 type waitq struct { 57 first *sudog 58 last *sudog 59 } 60 61 //go:linkname reflect_makechan reflect.makechan 62 func reflect_makechan(t *chantype, size int) *hchan { 63 return makechan(t, size) 64 } 65 66 func makechan64(t *chantype, size int64) *hchan { 67 if int64(int(size)) != size { 68 panic(plainError("makechan: size out of range")) 69 } 70 71 return makechan(t, int(size)) 72 } 73 74 func makechan(t *chantype, size int) *hchan { 75 elem := t.Elem 76 77 // compiler checks this but be safe. 78 if elem.Size_ >= 1<<16 { 79 throw("makechan: invalid channel element type") 80 } 81 if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign { 82 throw("makechan: bad alignment") 83 } 84 85 mem, overflow := math.MulUintptr(elem.Size_, uintptr(size)) 86 if overflow || mem > maxAlloc-hchanSize || size < 0 { 87 panic(plainError("makechan: size out of range")) 88 } 89 90 // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. 91 // buf points into the same allocation, elemtype is persistent. 92 // SudoG's are referenced from their owning thread so they can't be collected. 93 // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. 94 var c *hchan 95 switch { 96 case mem == 0: 97 // Queue or element size is zero. 98 c = (*hchan)(mallocgc(hchanSize, nil, true)) 99 // Race detector uses this location for synchronization. 100 c.buf = c.raceaddr() 101 case !elem.Pointers(): 102 // Elements do not contain pointers. 103 // Allocate hchan and buf in one call. 104 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) 105 c.buf = add(unsafe.Pointer(c), hchanSize) 106 default: 107 // Elements contain pointers. 108 c = new(hchan) 109 c.buf = mallocgc(mem, elem, true) 110 } 111 112 c.elemsize = uint16(elem.Size_) 113 c.elemtype = elem 114 c.dataqsiz = uint(size) 115 lockInit(&c.lock, lockRankHchan) 116 117 if debugChan { 118 print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n") 119 } 120 return c 121 } 122 123 // chanbuf(c, i) is pointer to the i'th slot in the buffer. 124 // 125 // chanbuf should be an internal detail, 126 // but widely used packages access it using linkname. 127 // Notable members of the hall of shame include: 128 // - github.com/fjl/memsize 129 // 130 // Do not remove or change the type signature. 131 // See go.dev/issue/67401. 132 // 133 //go:linkname chanbuf 134 func chanbuf(c *hchan, i uint) unsafe.Pointer { 135 return add(c.buf, uintptr(i)*uintptr(c.elemsize)) 136 } 137 138 // full reports whether a send on c would block (that is, the channel is full). 139 // It uses a single word-sized read of mutable state, so although 140 // the answer is instantaneously true, the correct answer may have changed 141 // by the time the calling function receives the return value. 142 func full(c *hchan) bool { 143 // c.dataqsiz is immutable (never written after the channel is created) 144 // so it is safe to read at any time during channel operation. 145 if c.dataqsiz == 0 { 146 // Assumes that a pointer read is relaxed-atomic. 147 return c.recvq.first == nil 148 } 149 // Assumes that a uint read is relaxed-atomic. 150 return c.qcount == c.dataqsiz 151 } 152 153 // entry point for c <- x from compiled code. 154 // 155 //go:nosplit 156 func chansend1(c *hchan, elem unsafe.Pointer) { 157 chansend(c, elem, true, sys.GetCallerPC()) 158 } 159 160 /* 161 * generic single channel send/recv 162 * If block is not nil, 163 * then the protocol will not 164 * sleep but return if it could 165 * not complete. 166 * 167 * sleep can wake up with g.param == nil 168 * when a channel involved in the sleep has 169 * been closed. it is easiest to loop and re-run 170 * the operation; we'll see that it's now closed. 171 */ 172 func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { 173 if c == nil { 174 if !block { 175 return false 176 } 177 gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2) 178 throw("unreachable") 179 } 180 181 if debugChan { 182 print("chansend: chan=", c, "\n") 183 } 184 185 if raceenabled { 186 racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend)) 187 } 188 189 // Fast path: check for failed non-blocking operation without acquiring the lock. 190 // 191 // After observing that the channel is not closed, we observe that the channel is 192 // not ready for sending. Each of these observations is a single word-sized read 193 // (first c.closed and second full()). 194 // Because a closed channel cannot transition from 'ready for sending' to 195 // 'not ready for sending', even if the channel is closed between the two observations, 196 // they imply a moment between the two when the channel was both not yet closed 197 // and not ready for sending. We behave as if we observed the channel at that moment, 198 // and report that the send cannot proceed. 199 // 200 // It is okay if the reads are reordered here: if we observe that the channel is not 201 // ready for sending and then observe that it is not closed, that implies that the 202 // channel wasn't closed during the first observation. However, nothing here 203 // guarantees forward progress. We rely on the side effects of lock release in 204 // chanrecv() and closechan() to update this thread's view of c.closed and full(). 205 if !block && c.closed == 0 && full(c) { 206 return false 207 } 208 209 var t0 int64 210 if blockprofilerate > 0 { 211 t0 = cputicks() 212 } 213 214 lock(&c.lock) 215 216 if c.closed != 0 { 217 unlock(&c.lock) 218 panic(plainError("send on closed channel")) 219 } 220 221 if sg := c.recvq.dequeue(); sg != nil { 222 // Found a waiting receiver. We pass the value we want to send 223 // directly to the receiver, bypassing the channel buffer (if any). 224 send(c, sg, ep, func() { unlock(&c.lock) }, 3) 225 return true 226 } 227 228 if c.qcount < c.dataqsiz { 229 // Space is available in the channel buffer. Enqueue the element to send. 230 qp := chanbuf(c, c.sendx) 231 if raceenabled { 232 racenotify(c, c.sendx, nil) 233 } 234 typedmemmove(c.elemtype, qp, ep) 235 c.sendx++ 236 if c.sendx == c.dataqsiz { 237 c.sendx = 0 238 } 239 c.qcount++ 240 unlock(&c.lock) 241 return true 242 } 243 244 if !block { 245 unlock(&c.lock) 246 return false 247 } 248 249 // Block on the channel. Some receiver will complete our operation for us. 250 gp := getg() 251 mysg := acquireSudog() 252 mysg.releasetime = 0 253 if t0 != 0 { 254 mysg.releasetime = -1 255 } 256 // No stack splits between assigning elem and enqueuing mysg 257 // on gp.waiting where copystack can find it. 258 mysg.elem = ep 259 mysg.waitlink = nil 260 mysg.g = gp 261 mysg.isSelect = false 262 mysg.c = c 263 gp.waiting = mysg 264 gp.param = nil 265 c.sendq.enqueue(mysg) 266 // Signal to anyone trying to shrink our stack that we're about 267 // to park on a channel. The window between when this G's status 268 // changes and when we set gp.activeStackChans is not safe for 269 // stack shrinking. 270 gp.parkingOnChan.Store(true) 271 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2) 272 // Ensure the value being sent is kept alive until the 273 // receiver copies it out. The sudog has a pointer to the 274 // stack object, but sudogs aren't considered as roots of the 275 // stack tracer. 276 KeepAlive(ep) 277 278 // someone woke us up. 279 if mysg != gp.waiting { 280 throw("G waiting list is corrupted") 281 } 282 gp.waiting = nil 283 gp.activeStackChans = false 284 closed := !mysg.success 285 gp.param = nil 286 if mysg.releasetime > 0 { 287 blockevent(mysg.releasetime-t0, 2) 288 } 289 mysg.c = nil 290 releaseSudog(mysg) 291 if closed { 292 if c.closed == 0 { 293 throw("chansend: spurious wakeup") 294 } 295 panic(plainError("send on closed channel")) 296 } 297 return true 298 } 299 300 // send processes a send operation on an empty channel c. 301 // The value ep sent by the sender is copied to the receiver sg. 302 // The receiver is then woken up to go on its merry way. 303 // Channel c must be empty and locked. send unlocks c with unlockf. 304 // sg must already be dequeued from c. 305 // ep must be non-nil and point to the heap or the caller's stack. 306 func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { 307 if raceenabled { 308 if c.dataqsiz == 0 { 309 racesync(c, sg) 310 } else { 311 // Pretend we go through the buffer, even though 312 // we copy directly. Note that we need to increment 313 // the head/tail locations only when raceenabled. 314 racenotify(c, c.recvx, nil) 315 racenotify(c, c.recvx, sg) 316 c.recvx++ 317 if c.recvx == c.dataqsiz { 318 c.recvx = 0 319 } 320 c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz 321 } 322 } 323 if sg.elem != nil { 324 sendDirect(c.elemtype, sg, ep) 325 sg.elem = nil 326 } 327 gp := sg.g 328 unlockf() 329 gp.param = unsafe.Pointer(sg) 330 sg.success = true 331 if sg.releasetime != 0 { 332 sg.releasetime = cputicks() 333 } 334 goready(gp, skip+1) 335 } 336 337 // timerchandrain removes all elements in channel c's buffer. 338 // It reports whether any elements were removed. 339 // Because it is only intended for timers, it does not 340 // handle waiting senders at all (all timer channels 341 // use non-blocking sends to fill the buffer). 342 func timerchandrain(c *hchan) bool { 343 // Note: Cannot use empty(c) because we are called 344 // while holding c.timer.sendLock, and empty(c) will 345 // call c.timer.maybeRunChan, which will deadlock. 346 // We are emptying the channel, so we only care about 347 // the count, not about potentially filling it up. 348 if atomic.Loaduint(&c.qcount) == 0 { 349 return false 350 } 351 lock(&c.lock) 352 any := false 353 for c.qcount > 0 { 354 any = true 355 typedmemclr(c.elemtype, chanbuf(c, c.recvx)) 356 c.recvx++ 357 if c.recvx == c.dataqsiz { 358 c.recvx = 0 359 } 360 c.qcount-- 361 } 362 unlock(&c.lock) 363 return any 364 } 365 366 // Sends and receives on unbuffered or empty-buffered channels are the 367 // only operations where one running goroutine writes to the stack of 368 // another running goroutine. The GC assumes that stack writes only 369 // happen when the goroutine is running and are only done by that 370 // goroutine. Using a write barrier is sufficient to make up for 371 // violating that assumption, but the write barrier has to work. 372 // typedmemmove will call bulkBarrierPreWrite, but the target bytes 373 // are not in the heap, so that will not help. We arrange to call 374 // memmove and typeBitsBulkBarrier instead. 375 376 func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) { 377 // src is on our stack, dst is a slot on another stack. 378 379 // Once we read sg.elem out of sg, it will no longer 380 // be updated if the destination's stack gets copied (shrunk). 381 // So make sure that no preemption points can happen between read & use. 382 dst := sg.elem 383 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_) 384 // No need for cgo write barrier checks because dst is always 385 // Go memory. 386 memmove(dst, src, t.Size_) 387 } 388 389 func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) { 390 // dst is on our stack or the heap, src is on another stack. 391 // The channel is locked, so src will not move during this 392 // operation. 393 src := sg.elem 394 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_) 395 memmove(dst, src, t.Size_) 396 } 397 398 func closechan(c *hchan) { 399 if c == nil { 400 panic(plainError("close of nil channel")) 401 } 402 403 lock(&c.lock) 404 if c.closed != 0 { 405 unlock(&c.lock) 406 panic(plainError("close of closed channel")) 407 } 408 409 if raceenabled { 410 callerpc := sys.GetCallerPC() 411 racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan)) 412 racerelease(c.raceaddr()) 413 } 414 415 c.closed = 1 416 417 var glist gList 418 419 // release all readers 420 for { 421 sg := c.recvq.dequeue() 422 if sg == nil { 423 break 424 } 425 if sg.elem != nil { 426 typedmemclr(c.elemtype, sg.elem) 427 sg.elem = nil 428 } 429 if sg.releasetime != 0 { 430 sg.releasetime = cputicks() 431 } 432 gp := sg.g 433 gp.param = unsafe.Pointer(sg) 434 sg.success = false 435 if raceenabled { 436 raceacquireg(gp, c.raceaddr()) 437 } 438 glist.push(gp) 439 } 440 441 // release all writers (they will panic) 442 for { 443 sg := c.sendq.dequeue() 444 if sg == nil { 445 break 446 } 447 sg.elem = nil 448 if sg.releasetime != 0 { 449 sg.releasetime = cputicks() 450 } 451 gp := sg.g 452 gp.param = unsafe.Pointer(sg) 453 sg.success = false 454 if raceenabled { 455 raceacquireg(gp, c.raceaddr()) 456 } 457 glist.push(gp) 458 } 459 unlock(&c.lock) 460 461 // Ready all Gs now that we've dropped the channel lock. 462 for !glist.empty() { 463 gp := glist.pop() 464 gp.schedlink = 0 465 goready(gp, 3) 466 } 467 } 468 469 // empty reports whether a read from c would block (that is, the channel is 470 // empty). It is atomically correct and sequentially consistent at the moment 471 // it returns, but since the channel is unlocked, the channel may become 472 // non-empty immediately afterward. 473 func empty(c *hchan) bool { 474 // c.dataqsiz is immutable. 475 if c.dataqsiz == 0 { 476 return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil 477 } 478 // c.timer is also immutable (it is set after make(chan) but before any channel operations). 479 // All timer channels have dataqsiz > 0. 480 if c.timer != nil { 481 c.timer.maybeRunChan() 482 } 483 return atomic.Loaduint(&c.qcount) == 0 484 } 485 486 // entry points for <- c from compiled code. 487 // 488 //go:nosplit 489 func chanrecv1(c *hchan, elem unsafe.Pointer) { 490 chanrecv(c, elem, true) 491 } 492 493 //go:nosplit 494 func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { 495 _, received = chanrecv(c, elem, true) 496 return 497 } 498 499 // chanrecv receives on channel c and writes the received data to ep. 500 // ep may be nil, in which case received data is ignored. 501 // If block == false and no elements are available, returns (false, false). 502 // Otherwise, if c is closed, zeros *ep and returns (true, false). 503 // Otherwise, fills in *ep with an element and returns (true, true). 504 // A non-nil ep must point to the heap or the caller's stack. 505 func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { 506 // raceenabled: don't need to check ep, as it is always on the stack 507 // or is new memory allocated by reflect. 508 509 if debugChan { 510 print("chanrecv: chan=", c, "\n") 511 } 512 513 if c == nil { 514 if !block { 515 return 516 } 517 gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2) 518 throw("unreachable") 519 } 520 521 if c.timer != nil { 522 c.timer.maybeRunChan() 523 } 524 525 // Fast path: check for failed non-blocking operation without acquiring the lock. 526 if !block && empty(c) { 527 // After observing that the channel is not ready for receiving, we observe whether the 528 // channel is closed. 529 // 530 // Reordering of these checks could lead to incorrect behavior when racing with a close. 531 // For example, if the channel was open and not empty, was closed, and then drained, 532 // reordered reads could incorrectly indicate "open and empty". To prevent reordering, 533 // we use atomic loads for both checks, and rely on emptying and closing to happen in 534 // separate critical sections under the same lock. This assumption fails when closing 535 // an unbuffered channel with a blocked send, but that is an error condition anyway. 536 if atomic.Load(&c.closed) == 0 { 537 // Because a channel cannot be reopened, the later observation of the channel 538 // being not closed implies that it was also not closed at the moment of the 539 // first observation. We behave as if we observed the channel at that moment 540 // and report that the receive cannot proceed. 541 return 542 } 543 // The channel is irreversibly closed. Re-check whether the channel has any pending data 544 // to receive, which could have arrived between the empty and closed checks above. 545 // Sequential consistency is also required here, when racing with such a send. 546 if empty(c) { 547 // The channel is irreversibly closed and empty. 548 if raceenabled { 549 raceacquire(c.raceaddr()) 550 } 551 if ep != nil { 552 typedmemclr(c.elemtype, ep) 553 } 554 return true, false 555 } 556 } 557 558 var t0 int64 559 if blockprofilerate > 0 { 560 t0 = cputicks() 561 } 562 563 lock(&c.lock) 564 565 if c.closed != 0 { 566 if c.qcount == 0 { 567 if raceenabled { 568 raceacquire(c.raceaddr()) 569 } 570 unlock(&c.lock) 571 if ep != nil { 572 typedmemclr(c.elemtype, ep) 573 } 574 return true, false 575 } 576 // The channel has been closed, but the channel's buffer have data. 577 } else { 578 // Just found waiting sender with not closed. 579 if sg := c.sendq.dequeue(); sg != nil { 580 // Found a waiting sender. If buffer is size 0, receive value 581 // directly from sender. Otherwise, receive from head of queue 582 // and add sender's value to the tail of the queue (both map to 583 // the same buffer slot because the queue is full). 584 recv(c, sg, ep, func() { unlock(&c.lock) }, 3) 585 return true, true 586 } 587 } 588 589 if c.qcount > 0 { 590 // Receive directly from queue 591 qp := chanbuf(c, c.recvx) 592 if raceenabled { 593 racenotify(c, c.recvx, nil) 594 } 595 if ep != nil { 596 typedmemmove(c.elemtype, ep, qp) 597 } 598 typedmemclr(c.elemtype, qp) 599 c.recvx++ 600 if c.recvx == c.dataqsiz { 601 c.recvx = 0 602 } 603 c.qcount-- 604 unlock(&c.lock) 605 return true, true 606 } 607 608 if !block { 609 unlock(&c.lock) 610 return false, false 611 } 612 613 // no sender available: block on this channel. 614 gp := getg() 615 mysg := acquireSudog() 616 mysg.releasetime = 0 617 if t0 != 0 { 618 mysg.releasetime = -1 619 } 620 // No stack splits between assigning elem and enqueuing mysg 621 // on gp.waiting where copystack can find it. 622 mysg.elem = ep 623 mysg.waitlink = nil 624 gp.waiting = mysg 625 626 mysg.g = gp 627 mysg.isSelect = false 628 mysg.c = c 629 gp.param = nil 630 c.recvq.enqueue(mysg) 631 if c.timer != nil { 632 blockTimerChan(c) 633 } 634 635 // Signal to anyone trying to shrink our stack that we're about 636 // to park on a channel. The window between when this G's status 637 // changes and when we set gp.activeStackChans is not safe for 638 // stack shrinking. 639 gp.parkingOnChan.Store(true) 640 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2) 641 642 // someone woke us up 643 if mysg != gp.waiting { 644 throw("G waiting list is corrupted") 645 } 646 if c.timer != nil { 647 unblockTimerChan(c) 648 } 649 gp.waiting = nil 650 gp.activeStackChans = false 651 if mysg.releasetime > 0 { 652 blockevent(mysg.releasetime-t0, 2) 653 } 654 success := mysg.success 655 gp.param = nil 656 mysg.c = nil 657 releaseSudog(mysg) 658 return true, success 659 } 660 661 // recv processes a receive operation on a full channel c. 662 // There are 2 parts: 663 // 1. The value sent by the sender sg is put into the channel 664 // and the sender is woken up to go on its merry way. 665 // 2. The value received by the receiver (the current G) is 666 // written to ep. 667 // 668 // For synchronous channels, both values are the same. 669 // For asynchronous channels, the receiver gets its data from 670 // the channel buffer and the sender's data is put in the 671 // channel buffer. 672 // Channel c must be full and locked. recv unlocks c with unlockf. 673 // sg must already be dequeued from c. 674 // A non-nil ep must point to the heap or the caller's stack. 675 func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { 676 if c.dataqsiz == 0 { 677 if raceenabled { 678 racesync(c, sg) 679 } 680 if ep != nil { 681 // copy data from sender 682 recvDirect(c.elemtype, sg, ep) 683 } 684 } else { 685 // Queue is full. Take the item at the 686 // head of the queue. Make the sender enqueue 687 // its item at the tail of the queue. Since the 688 // queue is full, those are both the same slot. 689 qp := chanbuf(c, c.recvx) 690 if raceenabled { 691 racenotify(c, c.recvx, nil) 692 racenotify(c, c.recvx, sg) 693 } 694 // copy data from queue to receiver 695 if ep != nil { 696 typedmemmove(c.elemtype, ep, qp) 697 } 698 // copy data from sender to queue 699 typedmemmove(c.elemtype, qp, sg.elem) 700 c.recvx++ 701 if c.recvx == c.dataqsiz { 702 c.recvx = 0 703 } 704 c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz 705 } 706 sg.elem = nil 707 gp := sg.g 708 unlockf() 709 gp.param = unsafe.Pointer(sg) 710 sg.success = true 711 if sg.releasetime != 0 { 712 sg.releasetime = cputicks() 713 } 714 goready(gp, skip+1) 715 } 716 717 func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool { 718 // There are unlocked sudogs that point into gp's stack. Stack 719 // copying must lock the channels of those sudogs. 720 // Set activeStackChans here instead of before we try parking 721 // because we could self-deadlock in stack growth on the 722 // channel lock. 723 gp.activeStackChans = true 724 // Mark that it's safe for stack shrinking to occur now, 725 // because any thread acquiring this G's stack for shrinking 726 // is guaranteed to observe activeStackChans after this store. 727 gp.parkingOnChan.Store(false) 728 // Make sure we unlock after setting activeStackChans and 729 // unsetting parkingOnChan. The moment we unlock chanLock 730 // we risk gp getting readied by a channel operation and 731 // so gp could continue running before everything before 732 // the unlock is visible (even to gp itself). 733 unlock((*mutex)(chanLock)) 734 return true 735 } 736 737 // compiler implements 738 // 739 // select { 740 // case c <- v: 741 // ... foo 742 // default: 743 // ... bar 744 // } 745 // 746 // as 747 // 748 // if selectnbsend(c, v) { 749 // ... foo 750 // } else { 751 // ... bar 752 // } 753 func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { 754 return chansend(c, elem, false, sys.GetCallerPC()) 755 } 756 757 // compiler implements 758 // 759 // select { 760 // case v, ok = <-c: 761 // ... foo 762 // default: 763 // ... bar 764 // } 765 // 766 // as 767 // 768 // if selected, ok = selectnbrecv(&v, c); selected { 769 // ... foo 770 // } else { 771 // ... bar 772 // } 773 func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) { 774 return chanrecv(c, elem, false) 775 } 776 777 //go:linkname reflect_chansend reflect.chansend0 778 func reflect_chansend(c *hchan, elem unsafe.Pointer, nb bool) (selected bool) { 779 return chansend(c, elem, !nb, sys.GetCallerPC()) 780 } 781 782 //go:linkname reflect_chanrecv reflect.chanrecv 783 func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) { 784 return chanrecv(c, elem, !nb) 785 } 786 787 func chanlen(c *hchan) int { 788 if c == nil { 789 return 0 790 } 791 async := debug.asynctimerchan.Load() != 0 792 if c.timer != nil && async { 793 c.timer.maybeRunChan() 794 } 795 if c.timer != nil && !async { 796 // timer channels have a buffered implementation 797 // but present to users as unbuffered, so that we can 798 // undo sends without users noticing. 799 return 0 800 } 801 return int(c.qcount) 802 } 803 804 func chancap(c *hchan) int { 805 if c == nil { 806 return 0 807 } 808 if c.timer != nil { 809 async := debug.asynctimerchan.Load() != 0 810 if async { 811 return int(c.dataqsiz) 812 } 813 // timer channels have a buffered implementation 814 // but present to users as unbuffered, so that we can 815 // undo sends without users noticing. 816 return 0 817 } 818 return int(c.dataqsiz) 819 } 820 821 //go:linkname reflect_chanlen reflect.chanlen 822 func reflect_chanlen(c *hchan) int { 823 return chanlen(c) 824 } 825 826 //go:linkname reflectlite_chanlen internal/reflectlite.chanlen 827 func reflectlite_chanlen(c *hchan) int { 828 return chanlen(c) 829 } 830 831 //go:linkname reflect_chancap reflect.chancap 832 func reflect_chancap(c *hchan) int { 833 return chancap(c) 834 } 835 836 //go:linkname reflect_chanclose reflect.chanclose 837 func reflect_chanclose(c *hchan) { 838 closechan(c) 839 } 840 841 func (q *waitq) enqueue(sgp *sudog) { 842 sgp.next = nil 843 x := q.last 844 if x == nil { 845 sgp.prev = nil 846 q.first = sgp 847 q.last = sgp 848 return 849 } 850 sgp.prev = x 851 x.next = sgp 852 q.last = sgp 853 } 854 855 func (q *waitq) dequeue() *sudog { 856 for { 857 sgp := q.first 858 if sgp == nil { 859 return nil 860 } 861 y := sgp.next 862 if y == nil { 863 q.first = nil 864 q.last = nil 865 } else { 866 y.prev = nil 867 q.first = y 868 sgp.next = nil // mark as removed (see dequeueSudoG) 869 } 870 871 // if a goroutine was put on this queue because of a 872 // select, there is a small window between the goroutine 873 // being woken up by a different case and it grabbing the 874 // channel locks. Once it has the lock 875 // it removes itself from the queue, so we won't see it after that. 876 // We use a flag in the G struct to tell us when someone 877 // else has won the race to signal this goroutine but the goroutine 878 // hasn't removed itself from the queue yet. 879 if sgp.isSelect && !sgp.g.selectDone.CompareAndSwap(0, 1) { 880 continue 881 } 882 883 return sgp 884 } 885 } 886 887 func (c *hchan) raceaddr() unsafe.Pointer { 888 // Treat read-like and write-like operations on the channel to 889 // happen at this address. Avoid using the address of qcount 890 // or dataqsiz, because the len() and cap() builtins read 891 // those addresses, and we don't want them racing with 892 // operations like close(). 893 return unsafe.Pointer(&c.buf) 894 } 895 896 func racesync(c *hchan, sg *sudog) { 897 racerelease(chanbuf(c, 0)) 898 raceacquireg(sg.g, chanbuf(c, 0)) 899 racereleaseg(sg.g, chanbuf(c, 0)) 900 raceacquire(chanbuf(c, 0)) 901 } 902 903 // Notify the race detector of a send or receive involving buffer entry idx 904 // and a channel c or its communicating partner sg. 905 // This function handles the special case of c.elemsize==0. 906 func racenotify(c *hchan, idx uint, sg *sudog) { 907 // We could have passed the unsafe.Pointer corresponding to entry idx 908 // instead of idx itself. However, in a future version of this function, 909 // we can use idx to better handle the case of elemsize==0. 910 // A future improvement to the detector is to call TSan with c and idx: 911 // this way, Go will continue to not allocating buffer entries for channels 912 // of elemsize==0, yet the race detector can be made to handle multiple 913 // sync objects underneath the hood (one sync object per idx) 914 qp := chanbuf(c, idx) 915 // When elemsize==0, we don't allocate a full buffer for the channel. 916 // Instead of individual buffer entries, the race detector uses the 917 // c.buf as the only buffer entry. This simplification prevents us from 918 // following the memory model's happens-before rules (rules that are 919 // implemented in racereleaseacquire). Instead, we accumulate happens-before 920 // information in the synchronization object associated with c.buf. 921 if c.elemsize == 0 { 922 if sg == nil { 923 raceacquire(qp) 924 racerelease(qp) 925 } else { 926 raceacquireg(sg.g, qp) 927 racereleaseg(sg.g, qp) 928 } 929 } else { 930 if sg == nil { 931 racereleaseacquire(qp) 932 } else { 933 racereleaseacquireg(sg.g, qp) 934 } 935 } 936 } 937