Source file src/runtime/chan.go
1 // Copyright 2014 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 package runtime 6 7 // This file contains the implementation of Go channels. 8 9 // Invariants: 10 // At least one of c.sendq and c.recvq is empty, 11 // except for the case of an unbuffered channel with a single goroutine 12 // blocked on it for both sending and receiving using a select statement, 13 // in which case the length of c.sendq and c.recvq is limited only by the 14 // size of the select statement. 15 // 16 // For buffered channels, also: 17 // c.qcount > 0 implies that c.recvq is empty. 18 // c.qcount < c.dataqsiz implies that c.sendq is empty. 19 20 import ( 21 "internal/abi" 22 "internal/runtime/atomic" 23 "runtime/internal/math" 24 "unsafe" 25 ) 26 27 const ( 28 maxAlign = 8 29 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) 30 debugChan = false 31 ) 32 33 type hchan struct { 34 qcount uint // total data in the queue 35 dataqsiz uint // size of the circular queue 36 buf unsafe.Pointer // points to an array of dataqsiz elements 37 elemsize uint16 38 closed uint32 39 timer *timer // timer feeding this chan 40 elemtype *_type // element type 41 sendx uint // send index 42 recvx uint // receive index 43 recvq waitq // list of recv waiters 44 sendq waitq // list of send waiters 45 46 // lock protects all fields in hchan, as well as several 47 // fields in sudogs blocked on this channel. 48 // 49 // Do not change another G's status while holding this lock 50 // (in particular, do not ready a G), as this can deadlock 51 // with stack shrinking. 52 lock mutex 53 } 54 55 type waitq struct { 56 first *sudog 57 last *sudog 58 } 59 60 //go:linkname reflect_makechan reflect.makechan 61 func reflect_makechan(t *chantype, size int) *hchan { 62 return makechan(t, size) 63 } 64 65 func makechan64(t *chantype, size int64) *hchan { 66 if int64(int(size)) != size { 67 panic(plainError("makechan: size out of range")) 68 } 69 70 return makechan(t, int(size)) 71 } 72 73 func makechan(t *chantype, size int) *hchan { 74 elem := t.Elem 75 76 // compiler checks this but be safe. 77 if elem.Size_ >= 1<<16 { 78 throw("makechan: invalid channel element type") 79 } 80 if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign { 81 throw("makechan: bad alignment") 82 } 83 84 mem, overflow := math.MulUintptr(elem.Size_, uintptr(size)) 85 if overflow || mem > maxAlloc-hchanSize || size < 0 { 86 panic(plainError("makechan: size out of range")) 87 } 88 89 // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. 90 // buf points into the same allocation, elemtype is persistent. 91 // SudoG's are referenced from their owning thread so they can't be collected. 92 // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. 93 var c *hchan 94 switch { 95 case mem == 0: 96 // Queue or element size is zero. 97 c = (*hchan)(mallocgc(hchanSize, nil, true)) 98 // Race detector uses this location for synchronization. 99 c.buf = c.raceaddr() 100 case !elem.Pointers(): 101 // Elements do not contain pointers. 102 // Allocate hchan and buf in one call. 103 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) 104 c.buf = add(unsafe.Pointer(c), hchanSize) 105 default: 106 // Elements contain pointers. 107 c = new(hchan) 108 c.buf = mallocgc(mem, elem, true) 109 } 110 111 c.elemsize = uint16(elem.Size_) 112 c.elemtype = elem 113 c.dataqsiz = uint(size) 114 lockInit(&c.lock, lockRankHchan) 115 116 if debugChan { 117 print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n") 118 } 119 return c 120 } 121 122 // chanbuf(c, i) is pointer to the i'th slot in the buffer. 123 // 124 // chanbuf should be an internal detail, 125 // but widely used packages access it using linkname. 126 // Notable members of the hall of shame include: 127 // - github.com/fjl/memsize 128 // 129 // Do not remove or change the type signature. 130 // See go.dev/issue/67401. 131 // 132 //go:linkname chanbuf 133 func chanbuf(c *hchan, i uint) unsafe.Pointer { 134 return add(c.buf, uintptr(i)*uintptr(c.elemsize)) 135 } 136 137 // full reports whether a send on c would block (that is, the channel is full). 138 // It uses a single word-sized read of mutable state, so although 139 // the answer is instantaneously true, the correct answer may have changed 140 // by the time the calling function receives the return value. 141 func full(c *hchan) bool { 142 // c.dataqsiz is immutable (never written after the channel is created) 143 // so it is safe to read at any time during channel operation. 144 if c.dataqsiz == 0 { 145 // Assumes that a pointer read is relaxed-atomic. 146 return c.recvq.first == nil 147 } 148 // Assumes that a uint read is relaxed-atomic. 149 return c.qcount == c.dataqsiz 150 } 151 152 // entry point for c <- x from compiled code. 153 // 154 //go:nosplit 155 func chansend1(c *hchan, elem unsafe.Pointer) { 156 chansend(c, elem, true, getcallerpc()) 157 } 158 159 /* 160 * generic single channel send/recv 161 * If block is not nil, 162 * then the protocol will not 163 * sleep but return if it could 164 * not complete. 165 * 166 * sleep can wake up with g.param == nil 167 * when a channel involved in the sleep has 168 * been closed. it is easiest to loop and re-run 169 * the operation; we'll see that it's now closed. 170 */ 171 func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { 172 if c == nil { 173 if !block { 174 return false 175 } 176 gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2) 177 throw("unreachable") 178 } 179 180 if debugChan { 181 print("chansend: chan=", c, "\n") 182 } 183 184 if raceenabled { 185 racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend)) 186 } 187 188 // Fast path: check for failed non-blocking operation without acquiring the lock. 189 // 190 // After observing that the channel is not closed, we observe that the channel is 191 // not ready for sending. Each of these observations is a single word-sized read 192 // (first c.closed and second full()). 193 // Because a closed channel cannot transition from 'ready for sending' to 194 // 'not ready for sending', even if the channel is closed between the two observations, 195 // they imply a moment between the two when the channel was both not yet closed 196 // and not ready for sending. We behave as if we observed the channel at that moment, 197 // and report that the send cannot proceed. 198 // 199 // It is okay if the reads are reordered here: if we observe that the channel is not 200 // ready for sending and then observe that it is not closed, that implies that the 201 // channel wasn't closed during the first observation. However, nothing here 202 // guarantees forward progress. We rely on the side effects of lock release in 203 // chanrecv() and closechan() to update this thread's view of c.closed and full(). 204 if !block && c.closed == 0 && full(c) { 205 return false 206 } 207 208 var t0 int64 209 if blockprofilerate > 0 { 210 t0 = cputicks() 211 } 212 213 lock(&c.lock) 214 215 if c.closed != 0 { 216 unlock(&c.lock) 217 panic(plainError("send on closed channel")) 218 } 219 220 if sg := c.recvq.dequeue(); sg != nil { 221 // Found a waiting receiver. We pass the value we want to send 222 // directly to the receiver, bypassing the channel buffer (if any). 223 send(c, sg, ep, func() { unlock(&c.lock) }, 3) 224 return true 225 } 226 227 if c.qcount < c.dataqsiz { 228 // Space is available in the channel buffer. Enqueue the element to send. 229 qp := chanbuf(c, c.sendx) 230 if raceenabled { 231 racenotify(c, c.sendx, nil) 232 } 233 typedmemmove(c.elemtype, qp, ep) 234 c.sendx++ 235 if c.sendx == c.dataqsiz { 236 c.sendx = 0 237 } 238 c.qcount++ 239 unlock(&c.lock) 240 return true 241 } 242 243 if !block { 244 unlock(&c.lock) 245 return false 246 } 247 248 // Block on the channel. Some receiver will complete our operation for us. 249 gp := getg() 250 mysg := acquireSudog() 251 mysg.releasetime = 0 252 if t0 != 0 { 253 mysg.releasetime = -1 254 } 255 // No stack splits between assigning elem and enqueuing mysg 256 // on gp.waiting where copystack can find it. 257 mysg.elem = ep 258 mysg.waitlink = nil 259 mysg.g = gp 260 mysg.isSelect = false 261 mysg.c = c 262 gp.waiting = mysg 263 gp.param = nil 264 c.sendq.enqueue(mysg) 265 // Signal to anyone trying to shrink our stack that we're about 266 // to park on a channel. The window between when this G's status 267 // changes and when we set gp.activeStackChans is not safe for 268 // stack shrinking. 269 gp.parkingOnChan.Store(true) 270 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2) 271 // Ensure the value being sent is kept alive until the 272 // receiver copies it out. The sudog has a pointer to the 273 // stack object, but sudogs aren't considered as roots of the 274 // stack tracer. 275 KeepAlive(ep) 276 277 // someone woke us up. 278 if mysg != gp.waiting { 279 throw("G waiting list is corrupted") 280 } 281 gp.waiting = nil 282 gp.activeStackChans = false 283 closed := !mysg.success 284 gp.param = nil 285 if mysg.releasetime > 0 { 286 blockevent(mysg.releasetime-t0, 2) 287 } 288 mysg.c = nil 289 releaseSudog(mysg) 290 if closed { 291 if c.closed == 0 { 292 throw("chansend: spurious wakeup") 293 } 294 panic(plainError("send on closed channel")) 295 } 296 return true 297 } 298 299 // send processes a send operation on an empty channel c. 300 // The value ep sent by the sender is copied to the receiver sg. 301 // The receiver is then woken up to go on its merry way. 302 // Channel c must be empty and locked. send unlocks c with unlockf. 303 // sg must already be dequeued from c. 304 // ep must be non-nil and point to the heap or the caller's stack. 305 func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { 306 if raceenabled { 307 if c.dataqsiz == 0 { 308 racesync(c, sg) 309 } else { 310 // Pretend we go through the buffer, even though 311 // we copy directly. Note that we need to increment 312 // the head/tail locations only when raceenabled. 313 racenotify(c, c.recvx, nil) 314 racenotify(c, c.recvx, sg) 315 c.recvx++ 316 if c.recvx == c.dataqsiz { 317 c.recvx = 0 318 } 319 c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz 320 } 321 } 322 if sg.elem != nil { 323 sendDirect(c.elemtype, sg, ep) 324 sg.elem = nil 325 } 326 gp := sg.g 327 unlockf() 328 gp.param = unsafe.Pointer(sg) 329 sg.success = true 330 if sg.releasetime != 0 { 331 sg.releasetime = cputicks() 332 } 333 goready(gp, skip+1) 334 } 335 336 // timerchandrain removes all elements in channel c's buffer. 337 // It reports whether any elements were removed. 338 // Because it is only intended for timers, it does not 339 // handle waiting senders at all (all timer channels 340 // use non-blocking sends to fill the buffer). 341 func timerchandrain(c *hchan) bool { 342 // Note: Cannot use empty(c) because we are called 343 // while holding c.timer.sendLock, and empty(c) will 344 // call c.timer.maybeRunChan, which will deadlock. 345 // We are emptying the channel, so we only care about 346 // the count, not about potentially filling it up. 347 if atomic.Loaduint(&c.qcount) == 0 { 348 return false 349 } 350 lock(&c.lock) 351 any := false 352 for c.qcount > 0 { 353 any = true 354 typedmemclr(c.elemtype, chanbuf(c, c.recvx)) 355 c.recvx++ 356 if c.recvx == c.dataqsiz { 357 c.recvx = 0 358 } 359 c.qcount-- 360 } 361 unlock(&c.lock) 362 return any 363 } 364 365 // Sends and receives on unbuffered or empty-buffered channels are the 366 // only operations where one running goroutine writes to the stack of 367 // another running goroutine. The GC assumes that stack writes only 368 // happen when the goroutine is running and are only done by that 369 // goroutine. Using a write barrier is sufficient to make up for 370 // violating that assumption, but the write barrier has to work. 371 // typedmemmove will call bulkBarrierPreWrite, but the target bytes 372 // are not in the heap, so that will not help. We arrange to call 373 // memmove and typeBitsBulkBarrier instead. 374 375 func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) { 376 // src is on our stack, dst is a slot on another stack. 377 378 // Once we read sg.elem out of sg, it will no longer 379 // be updated if the destination's stack gets copied (shrunk). 380 // So make sure that no preemption points can happen between read & use. 381 dst := sg.elem 382 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_) 383 // No need for cgo write barrier checks because dst is always 384 // Go memory. 385 memmove(dst, src, t.Size_) 386 } 387 388 func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) { 389 // dst is on our stack or the heap, src is on another stack. 390 // The channel is locked, so src will not move during this 391 // operation. 392 src := sg.elem 393 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_) 394 memmove(dst, src, t.Size_) 395 } 396 397 func closechan(c *hchan) { 398 if c == nil { 399 panic(plainError("close of nil channel")) 400 } 401 402 lock(&c.lock) 403 if c.closed != 0 { 404 unlock(&c.lock) 405 panic(plainError("close of closed channel")) 406 } 407 408 if raceenabled { 409 callerpc := getcallerpc() 410 racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan)) 411 racerelease(c.raceaddr()) 412 } 413 414 c.closed = 1 415 416 var glist gList 417 418 // release all readers 419 for { 420 sg := c.recvq.dequeue() 421 if sg == nil { 422 break 423 } 424 if sg.elem != nil { 425 typedmemclr(c.elemtype, sg.elem) 426 sg.elem = nil 427 } 428 if sg.releasetime != 0 { 429 sg.releasetime = cputicks() 430 } 431 gp := sg.g 432 gp.param = unsafe.Pointer(sg) 433 sg.success = false 434 if raceenabled { 435 raceacquireg(gp, c.raceaddr()) 436 } 437 glist.push(gp) 438 } 439 440 // release all writers (they will panic) 441 for { 442 sg := c.sendq.dequeue() 443 if sg == nil { 444 break 445 } 446 sg.elem = nil 447 if sg.releasetime != 0 { 448 sg.releasetime = cputicks() 449 } 450 gp := sg.g 451 gp.param = unsafe.Pointer(sg) 452 sg.success = false 453 if raceenabled { 454 raceacquireg(gp, c.raceaddr()) 455 } 456 glist.push(gp) 457 } 458 unlock(&c.lock) 459 460 // Ready all Gs now that we've dropped the channel lock. 461 for !glist.empty() { 462 gp := glist.pop() 463 gp.schedlink = 0 464 goready(gp, 3) 465 } 466 } 467 468 // empty reports whether a read from c would block (that is, the channel is 469 // empty). It is atomically correct and sequentially consistent at the moment 470 // it returns, but since the channel is unlocked, the channel may become 471 // non-empty immediately afterward. 472 func empty(c *hchan) bool { 473 // c.dataqsiz is immutable. 474 if c.dataqsiz == 0 { 475 return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil 476 } 477 // c.timer is also immutable (it is set after make(chan) but before any channel operations). 478 // All timer channels have dataqsiz > 0. 479 if c.timer != nil { 480 c.timer.maybeRunChan() 481 } 482 return atomic.Loaduint(&c.qcount) == 0 483 } 484 485 // entry points for <- c from compiled code. 486 // 487 //go:nosplit 488 func chanrecv1(c *hchan, elem unsafe.Pointer) { 489 chanrecv(c, elem, true) 490 } 491 492 //go:nosplit 493 func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { 494 _, received = chanrecv(c, elem, true) 495 return 496 } 497 498 // chanrecv receives on channel c and writes the received data to ep. 499 // ep may be nil, in which case received data is ignored. 500 // If block == false and no elements are available, returns (false, false). 501 // Otherwise, if c is closed, zeros *ep and returns (true, false). 502 // Otherwise, fills in *ep with an element and returns (true, true). 503 // A non-nil ep must point to the heap or the caller's stack. 504 func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { 505 // raceenabled: don't need to check ep, as it is always on the stack 506 // or is new memory allocated by reflect. 507 508 if debugChan { 509 print("chanrecv: chan=", c, "\n") 510 } 511 512 if c == nil { 513 if !block { 514 return 515 } 516 gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2) 517 throw("unreachable") 518 } 519 520 if c.timer != nil { 521 c.timer.maybeRunChan() 522 } 523 524 // Fast path: check for failed non-blocking operation without acquiring the lock. 525 if !block && empty(c) { 526 // After observing that the channel is not ready for receiving, we observe whether the 527 // channel is closed. 528 // 529 // Reordering of these checks could lead to incorrect behavior when racing with a close. 530 // For example, if the channel was open and not empty, was closed, and then drained, 531 // reordered reads could incorrectly indicate "open and empty". To prevent reordering, 532 // we use atomic loads for both checks, and rely on emptying and closing to happen in 533 // separate critical sections under the same lock. This assumption fails when closing 534 // an unbuffered channel with a blocked send, but that is an error condition anyway. 535 if atomic.Load(&c.closed) == 0 { 536 // Because a channel cannot be reopened, the later observation of the channel 537 // being not closed implies that it was also not closed at the moment of the 538 // first observation. We behave as if we observed the channel at that moment 539 // and report that the receive cannot proceed. 540 return 541 } 542 // The channel is irreversibly closed. Re-check whether the channel has any pending data 543 // to receive, which could have arrived between the empty and closed checks above. 544 // Sequential consistency is also required here, when racing with such a send. 545 if empty(c) { 546 // The channel is irreversibly closed and empty. 547 if raceenabled { 548 raceacquire(c.raceaddr()) 549 } 550 if ep != nil { 551 typedmemclr(c.elemtype, ep) 552 } 553 return true, false 554 } 555 } 556 557 var t0 int64 558 if blockprofilerate > 0 { 559 t0 = cputicks() 560 } 561 562 lock(&c.lock) 563 564 if c.closed != 0 { 565 if c.qcount == 0 { 566 if raceenabled { 567 raceacquire(c.raceaddr()) 568 } 569 unlock(&c.lock) 570 if ep != nil { 571 typedmemclr(c.elemtype, ep) 572 } 573 return true, false 574 } 575 // The channel has been closed, but the channel's buffer have data. 576 } else { 577 // Just found waiting sender with not closed. 578 if sg := c.sendq.dequeue(); sg != nil { 579 // Found a waiting sender. If buffer is size 0, receive value 580 // directly from sender. Otherwise, receive from head of queue 581 // and add sender's value to the tail of the queue (both map to 582 // the same buffer slot because the queue is full). 583 recv(c, sg, ep, func() { unlock(&c.lock) }, 3) 584 return true, true 585 } 586 } 587 588 if c.qcount > 0 { 589 // Receive directly from queue 590 qp := chanbuf(c, c.recvx) 591 if raceenabled { 592 racenotify(c, c.recvx, nil) 593 } 594 if ep != nil { 595 typedmemmove(c.elemtype, ep, qp) 596 } 597 typedmemclr(c.elemtype, qp) 598 c.recvx++ 599 if c.recvx == c.dataqsiz { 600 c.recvx = 0 601 } 602 c.qcount-- 603 unlock(&c.lock) 604 return true, true 605 } 606 607 if !block { 608 unlock(&c.lock) 609 return false, false 610 } 611 612 // no sender available: block on this channel. 613 gp := getg() 614 mysg := acquireSudog() 615 mysg.releasetime = 0 616 if t0 != 0 { 617 mysg.releasetime = -1 618 } 619 // No stack splits between assigning elem and enqueuing mysg 620 // on gp.waiting where copystack can find it. 621 mysg.elem = ep 622 mysg.waitlink = nil 623 gp.waiting = mysg 624 625 mysg.g = gp 626 mysg.isSelect = false 627 mysg.c = c 628 gp.param = nil 629 c.recvq.enqueue(mysg) 630 if c.timer != nil { 631 blockTimerChan(c) 632 } 633 634 // Signal to anyone trying to shrink our stack that we're about 635 // to park on a channel. The window between when this G's status 636 // changes and when we set gp.activeStackChans is not safe for 637 // stack shrinking. 638 gp.parkingOnChan.Store(true) 639 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2) 640 641 // someone woke us up 642 if mysg != gp.waiting { 643 throw("G waiting list is corrupted") 644 } 645 if c.timer != nil { 646 unblockTimerChan(c) 647 } 648 gp.waiting = nil 649 gp.activeStackChans = false 650 if mysg.releasetime > 0 { 651 blockevent(mysg.releasetime-t0, 2) 652 } 653 success := mysg.success 654 gp.param = nil 655 mysg.c = nil 656 releaseSudog(mysg) 657 return true, success 658 } 659 660 // recv processes a receive operation on a full channel c. 661 // There are 2 parts: 662 // 1. The value sent by the sender sg is put into the channel 663 // and the sender is woken up to go on its merry way. 664 // 2. The value received by the receiver (the current G) is 665 // written to ep. 666 // 667 // For synchronous channels, both values are the same. 668 // For asynchronous channels, the receiver gets its data from 669 // the channel buffer and the sender's data is put in the 670 // channel buffer. 671 // Channel c must be full and locked. recv unlocks c with unlockf. 672 // sg must already be dequeued from c. 673 // A non-nil ep must point to the heap or the caller's stack. 674 func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { 675 if c.dataqsiz == 0 { 676 if raceenabled { 677 racesync(c, sg) 678 } 679 if ep != nil { 680 // copy data from sender 681 recvDirect(c.elemtype, sg, ep) 682 } 683 } else { 684 // Queue is full. Take the item at the 685 // head of the queue. Make the sender enqueue 686 // its item at the tail of the queue. Since the 687 // queue is full, those are both the same slot. 688 qp := chanbuf(c, c.recvx) 689 if raceenabled { 690 racenotify(c, c.recvx, nil) 691 racenotify(c, c.recvx, sg) 692 } 693 // copy data from queue to receiver 694 if ep != nil { 695 typedmemmove(c.elemtype, ep, qp) 696 } 697 // copy data from sender to queue 698 typedmemmove(c.elemtype, qp, sg.elem) 699 c.recvx++ 700 if c.recvx == c.dataqsiz { 701 c.recvx = 0 702 } 703 c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz 704 } 705 sg.elem = nil 706 gp := sg.g 707 unlockf() 708 gp.param = unsafe.Pointer(sg) 709 sg.success = true 710 if sg.releasetime != 0 { 711 sg.releasetime = cputicks() 712 } 713 goready(gp, skip+1) 714 } 715 716 func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool { 717 // There are unlocked sudogs that point into gp's stack. Stack 718 // copying must lock the channels of those sudogs. 719 // Set activeStackChans here instead of before we try parking 720 // because we could self-deadlock in stack growth on the 721 // channel lock. 722 gp.activeStackChans = true 723 // Mark that it's safe for stack shrinking to occur now, 724 // because any thread acquiring this G's stack for shrinking 725 // is guaranteed to observe activeStackChans after this store. 726 gp.parkingOnChan.Store(false) 727 // Make sure we unlock after setting activeStackChans and 728 // unsetting parkingOnChan. The moment we unlock chanLock 729 // we risk gp getting readied by a channel operation and 730 // so gp could continue running before everything before 731 // the unlock is visible (even to gp itself). 732 unlock((*mutex)(chanLock)) 733 return true 734 } 735 736 // compiler implements 737 // 738 // select { 739 // case c <- v: 740 // ... foo 741 // default: 742 // ... bar 743 // } 744 // 745 // as 746 // 747 // if selectnbsend(c, v) { 748 // ... foo 749 // } else { 750 // ... bar 751 // } 752 func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { 753 return chansend(c, elem, false, getcallerpc()) 754 } 755 756 // compiler implements 757 // 758 // select { 759 // case v, ok = <-c: 760 // ... foo 761 // default: 762 // ... bar 763 // } 764 // 765 // as 766 // 767 // if selected, ok = selectnbrecv(&v, c); selected { 768 // ... foo 769 // } else { 770 // ... bar 771 // } 772 func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) { 773 return chanrecv(c, elem, false) 774 } 775 776 //go:linkname reflect_chansend reflect.chansend0 777 func reflect_chansend(c *hchan, elem unsafe.Pointer, nb bool) (selected bool) { 778 return chansend(c, elem, !nb, getcallerpc()) 779 } 780 781 //go:linkname reflect_chanrecv reflect.chanrecv 782 func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) { 783 return chanrecv(c, elem, !nb) 784 } 785 786 func chanlen(c *hchan) int { 787 if c == nil { 788 return 0 789 } 790 async := debug.asynctimerchan.Load() != 0 791 if c.timer != nil && async { 792 c.timer.maybeRunChan() 793 } 794 if c.timer != nil && !async { 795 // timer channels have a buffered implementation 796 // but present to users as unbuffered, so that we can 797 // undo sends without users noticing. 798 return 0 799 } 800 return int(c.qcount) 801 } 802 803 func chancap(c *hchan) int { 804 if c == nil { 805 return 0 806 } 807 if c.timer != nil { 808 async := debug.asynctimerchan.Load() != 0 809 if async { 810 return int(c.dataqsiz) 811 } 812 // timer channels have a buffered implementation 813 // but present to users as unbuffered, so that we can 814 // undo sends without users noticing. 815 return 0 816 } 817 return int(c.dataqsiz) 818 } 819 820 //go:linkname reflect_chanlen reflect.chanlen 821 func reflect_chanlen(c *hchan) int { 822 return chanlen(c) 823 } 824 825 //go:linkname reflectlite_chanlen internal/reflectlite.chanlen 826 func reflectlite_chanlen(c *hchan) int { 827 return chanlen(c) 828 } 829 830 //go:linkname reflect_chancap reflect.chancap 831 func reflect_chancap(c *hchan) int { 832 return chancap(c) 833 } 834 835 //go:linkname reflect_chanclose reflect.chanclose 836 func reflect_chanclose(c *hchan) { 837 closechan(c) 838 } 839 840 func (q *waitq) enqueue(sgp *sudog) { 841 sgp.next = nil 842 x := q.last 843 if x == nil { 844 sgp.prev = nil 845 q.first = sgp 846 q.last = sgp 847 return 848 } 849 sgp.prev = x 850 x.next = sgp 851 q.last = sgp 852 } 853 854 func (q *waitq) dequeue() *sudog { 855 for { 856 sgp := q.first 857 if sgp == nil { 858 return nil 859 } 860 y := sgp.next 861 if y == nil { 862 q.first = nil 863 q.last = nil 864 } else { 865 y.prev = nil 866 q.first = y 867 sgp.next = nil // mark as removed (see dequeueSudoG) 868 } 869 870 // if a goroutine was put on this queue because of a 871 // select, there is a small window between the goroutine 872 // being woken up by a different case and it grabbing the 873 // channel locks. Once it has the lock 874 // it removes itself from the queue, so we won't see it after that. 875 // We use a flag in the G struct to tell us when someone 876 // else has won the race to signal this goroutine but the goroutine 877 // hasn't removed itself from the queue yet. 878 if sgp.isSelect && !sgp.g.selectDone.CompareAndSwap(0, 1) { 879 continue 880 } 881 882 return sgp 883 } 884 } 885 886 func (c *hchan) raceaddr() unsafe.Pointer { 887 // Treat read-like and write-like operations on the channel to 888 // happen at this address. Avoid using the address of qcount 889 // or dataqsiz, because the len() and cap() builtins read 890 // those addresses, and we don't want them racing with 891 // operations like close(). 892 return unsafe.Pointer(&c.buf) 893 } 894 895 func racesync(c *hchan, sg *sudog) { 896 racerelease(chanbuf(c, 0)) 897 raceacquireg(sg.g, chanbuf(c, 0)) 898 racereleaseg(sg.g, chanbuf(c, 0)) 899 raceacquire(chanbuf(c, 0)) 900 } 901 902 // Notify the race detector of a send or receive involving buffer entry idx 903 // and a channel c or its communicating partner sg. 904 // This function handles the special case of c.elemsize==0. 905 func racenotify(c *hchan, idx uint, sg *sudog) { 906 // We could have passed the unsafe.Pointer corresponding to entry idx 907 // instead of idx itself. However, in a future version of this function, 908 // we can use idx to better handle the case of elemsize==0. 909 // A future improvement to the detector is to call TSan with c and idx: 910 // this way, Go will continue to not allocating buffer entries for channels 911 // of elemsize==0, yet the race detector can be made to handle multiple 912 // sync objects underneath the hood (one sync object per idx) 913 qp := chanbuf(c, idx) 914 // When elemsize==0, we don't allocate a full buffer for the channel. 915 // Instead of individual buffer entries, the race detector uses the 916 // c.buf as the only buffer entry. This simplification prevents us from 917 // following the memory model's happens-before rules (rules that are 918 // implemented in racereleaseacquire). Instead, we accumulate happens-before 919 // information in the synchronization object associated with c.buf. 920 if c.elemsize == 0 { 921 if sg == nil { 922 raceacquire(qp) 923 racerelease(qp) 924 } else { 925 raceacquireg(sg.g, qp) 926 racereleaseg(sg.g, qp) 927 } 928 } else { 929 if sg == nil { 930 racereleaseacquire(qp) 931 } else { 932 racereleaseacquireg(sg.g, qp) 933 } 934 } 935 } 936