Source file src/runtime/chan.go

     1  // Copyright 2014 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package runtime
     6  
     7  // This file contains the implementation of Go channels.
     8  
     9  // Invariants:
    10  //  At least one of c.sendq and c.recvq is empty,
    11  //  except for the case of an unbuffered channel with a single goroutine
    12  //  blocked on it for both sending and receiving using a select statement,
    13  //  in which case the length of c.sendq and c.recvq is limited only by the
    14  //  size of the select statement.
    15  //
    16  // For buffered channels, also:
    17  //  c.qcount > 0 implies that c.recvq is empty.
    18  //  c.qcount < c.dataqsiz implies that c.sendq is empty.
    19  
    20  import (
    21  	"internal/abi"
    22  	"internal/runtime/atomic"
    23  	"internal/runtime/math"
    24  	"internal/runtime/sys"
    25  	"unsafe"
    26  )
    27  
    28  const (
    29  	maxAlign  = 8
    30  	hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
    31  	debugChan = false
    32  )
    33  
    34  type hchan struct {
    35  	qcount   uint           // total data in the queue
    36  	dataqsiz uint           // size of the circular queue
    37  	buf      unsafe.Pointer // points to an array of dataqsiz elements
    38  	elemsize uint16
    39  	closed   uint32
    40  	timer    *timer // timer feeding this chan
    41  	elemtype *_type // element type
    42  	sendx    uint   // send index
    43  	recvx    uint   // receive index
    44  	recvq    waitq  // list of recv waiters
    45  	sendq    waitq  // list of send waiters
    46  	bubble   *synctestBubble
    47  
    48  	// lock protects all fields in hchan, as well as several
    49  	// fields in sudogs blocked on this channel.
    50  	//
    51  	// Do not change another G's status while holding this lock
    52  	// (in particular, do not ready a G), as this can deadlock
    53  	// with stack shrinking.
    54  	lock mutex
    55  }
    56  
    57  type waitq struct {
    58  	first *sudog
    59  	last  *sudog
    60  }
    61  
    62  //go:linkname reflect_makechan reflect.makechan
    63  func reflect_makechan(t *chantype, size int) *hchan {
    64  	return makechan(t, size)
    65  }
    66  
    67  func makechan64(t *chantype, size int64) *hchan {
    68  	if int64(int(size)) != size {
    69  		panic(plainError("makechan: size out of range"))
    70  	}
    71  
    72  	return makechan(t, int(size))
    73  }
    74  
    75  func makechan(t *chantype, size int) *hchan {
    76  	elem := t.Elem
    77  
    78  	// compiler checks this but be safe.
    79  	if elem.Size_ >= 1<<16 {
    80  		throw("makechan: invalid channel element type")
    81  	}
    82  	if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
    83  		throw("makechan: bad alignment")
    84  	}
    85  
    86  	mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
    87  	if overflow || mem > maxAlloc-hchanSize || size < 0 {
    88  		panic(plainError("makechan: size out of range"))
    89  	}
    90  
    91  	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    92  	// buf points into the same allocation, elemtype is persistent.
    93  	// SudoG's are referenced from their owning thread so they can't be collected.
    94  	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    95  	var c *hchan
    96  	switch {
    97  	case mem == 0:
    98  		// Queue or element size is zero.
    99  		c = (*hchan)(mallocgc(hchanSize, nil, true))
   100  		// Race detector uses this location for synchronization.
   101  		c.buf = c.raceaddr()
   102  	case !elem.Pointers():
   103  		// Elements do not contain pointers.
   104  		// Allocate hchan and buf in one call.
   105  		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
   106  		c.buf = add(unsafe.Pointer(c), hchanSize)
   107  	default:
   108  		// Elements contain pointers.
   109  		c = new(hchan)
   110  		c.buf = mallocgc(mem, elem, true)
   111  	}
   112  
   113  	c.elemsize = uint16(elem.Size_)
   114  	c.elemtype = elem
   115  	c.dataqsiz = uint(size)
   116  	if b := getg().bubble; b != nil {
   117  		c.bubble = b
   118  	}
   119  	lockInit(&c.lock, lockRankHchan)
   120  
   121  	if debugChan {
   122  		print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
   123  	}
   124  	return c
   125  }
   126  
   127  // chanbuf(c, i) is pointer to the i'th slot in the buffer.
   128  //
   129  // chanbuf should be an internal detail,
   130  // but widely used packages access it using linkname.
   131  // Notable members of the hall of shame include:
   132  //   - github.com/fjl/memsize
   133  //
   134  // Do not remove or change the type signature.
   135  // See go.dev/issue/67401.
   136  //
   137  //go:linkname chanbuf
   138  func chanbuf(c *hchan, i uint) unsafe.Pointer {
   139  	return add(c.buf, uintptr(i)*uintptr(c.elemsize))
   140  }
   141  
   142  // full reports whether a send on c would block (that is, the channel is full).
   143  // It uses a single word-sized read of mutable state, so although
   144  // the answer is instantaneously true, the correct answer may have changed
   145  // by the time the calling function receives the return value.
   146  func full(c *hchan) bool {
   147  	// c.dataqsiz is immutable (never written after the channel is created)
   148  	// so it is safe to read at any time during channel operation.
   149  	if c.dataqsiz == 0 {
   150  		// Assumes that a pointer read is relaxed-atomic.
   151  		return c.recvq.first == nil
   152  	}
   153  	// Assumes that a uint read is relaxed-atomic.
   154  	return c.qcount == c.dataqsiz
   155  }
   156  
   157  // entry point for c <- x from compiled code.
   158  //
   159  //go:nosplit
   160  func chansend1(c *hchan, elem unsafe.Pointer) {
   161  	chansend(c, elem, true, sys.GetCallerPC())
   162  }
   163  
   164  /*
   165   * generic single channel send/recv
   166   * If block is not nil,
   167   * then the protocol will not
   168   * sleep but return if it could
   169   * not complete.
   170   *
   171   * sleep can wake up with g.param == nil
   172   * when a channel involved in the sleep has
   173   * been closed.  it is easiest to loop and re-run
   174   * the operation; we'll see that it's now closed.
   175   */
   176  func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   177  	if c == nil {
   178  		if !block {
   179  			return false
   180  		}
   181  		gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
   182  		throw("unreachable")
   183  	}
   184  
   185  	if debugChan {
   186  		print("chansend: chan=", c, "\n")
   187  	}
   188  
   189  	if raceenabled {
   190  		racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
   191  	}
   192  
   193  	if c.bubble != nil && getg().bubble != c.bubble {
   194  		panic(plainError("send on synctest channel from outside bubble"))
   195  	}
   196  
   197  	// Fast path: check for failed non-blocking operation without acquiring the lock.
   198  	//
   199  	// After observing that the channel is not closed, we observe that the channel is
   200  	// not ready for sending. Each of these observations is a single word-sized read
   201  	// (first c.closed and second full()).
   202  	// Because a closed channel cannot transition from 'ready for sending' to
   203  	// 'not ready for sending', even if the channel is closed between the two observations,
   204  	// they imply a moment between the two when the channel was both not yet closed
   205  	// and not ready for sending. We behave as if we observed the channel at that moment,
   206  	// and report that the send cannot proceed.
   207  	//
   208  	// It is okay if the reads are reordered here: if we observe that the channel is not
   209  	// ready for sending and then observe that it is not closed, that implies that the
   210  	// channel wasn't closed during the first observation. However, nothing here
   211  	// guarantees forward progress. We rely on the side effects of lock release in
   212  	// chanrecv() and closechan() to update this thread's view of c.closed and full().
   213  	if !block && c.closed == 0 && full(c) {
   214  		return false
   215  	}
   216  
   217  	var t0 int64
   218  	if blockprofilerate > 0 {
   219  		t0 = cputicks()
   220  	}
   221  
   222  	lock(&c.lock)
   223  
   224  	if c.closed != 0 {
   225  		unlock(&c.lock)
   226  		panic(plainError("send on closed channel"))
   227  	}
   228  
   229  	if sg := c.recvq.dequeue(); sg != nil {
   230  		// Found a waiting receiver. We pass the value we want to send
   231  		// directly to the receiver, bypassing the channel buffer (if any).
   232  		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
   233  		return true
   234  	}
   235  
   236  	if c.qcount < c.dataqsiz {
   237  		// Space is available in the channel buffer. Enqueue the element to send.
   238  		qp := chanbuf(c, c.sendx)
   239  		if raceenabled {
   240  			racenotify(c, c.sendx, nil)
   241  		}
   242  		typedmemmove(c.elemtype, qp, ep)
   243  		c.sendx++
   244  		if c.sendx == c.dataqsiz {
   245  			c.sendx = 0
   246  		}
   247  		c.qcount++
   248  		unlock(&c.lock)
   249  		return true
   250  	}
   251  
   252  	if !block {
   253  		unlock(&c.lock)
   254  		return false
   255  	}
   256  
   257  	// Block on the channel. Some receiver will complete our operation for us.
   258  	gp := getg()
   259  	mysg := acquireSudog()
   260  	mysg.releasetime = 0
   261  	if t0 != 0 {
   262  		mysg.releasetime = -1
   263  	}
   264  	// No stack splits between assigning elem and enqueuing mysg
   265  	// on gp.waiting where copystack can find it.
   266  	mysg.elem = ep
   267  	mysg.waitlink = nil
   268  	mysg.g = gp
   269  	mysg.isSelect = false
   270  	mysg.c = c
   271  	gp.waiting = mysg
   272  	gp.param = nil
   273  	c.sendq.enqueue(mysg)
   274  	// Signal to anyone trying to shrink our stack that we're about
   275  	// to park on a channel. The window between when this G's status
   276  	// changes and when we set gp.activeStackChans is not safe for
   277  	// stack shrinking.
   278  	gp.parkingOnChan.Store(true)
   279  	reason := waitReasonChanSend
   280  	if c.bubble != nil {
   281  		reason = waitReasonSynctestChanSend
   282  	}
   283  	gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanSend, 2)
   284  	// Ensure the value being sent is kept alive until the
   285  	// receiver copies it out. The sudog has a pointer to the
   286  	// stack object, but sudogs aren't considered as roots of the
   287  	// stack tracer.
   288  	KeepAlive(ep)
   289  
   290  	// someone woke us up.
   291  	if mysg != gp.waiting {
   292  		throw("G waiting list is corrupted")
   293  	}
   294  	gp.waiting = nil
   295  	gp.activeStackChans = false
   296  	closed := !mysg.success
   297  	gp.param = nil
   298  	if mysg.releasetime > 0 {
   299  		blockevent(mysg.releasetime-t0, 2)
   300  	}
   301  	mysg.c = nil
   302  	releaseSudog(mysg)
   303  	if closed {
   304  		if c.closed == 0 {
   305  			throw("chansend: spurious wakeup")
   306  		}
   307  		panic(plainError("send on closed channel"))
   308  	}
   309  	return true
   310  }
   311  
   312  // send processes a send operation on an empty channel c.
   313  // The value ep sent by the sender is copied to the receiver sg.
   314  // The receiver is then woken up to go on its merry way.
   315  // Channel c must be empty and locked.  send unlocks c with unlockf.
   316  // sg must already be dequeued from c.
   317  // ep must be non-nil and point to the heap or the caller's stack.
   318  func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   319  	if c.bubble != nil && getg().bubble != c.bubble {
   320  		unlockf()
   321  		panic(plainError("send on synctest channel from outside bubble"))
   322  	}
   323  	if raceenabled {
   324  		if c.dataqsiz == 0 {
   325  			racesync(c, sg)
   326  		} else {
   327  			// Pretend we go through the buffer, even though
   328  			// we copy directly. Note that we need to increment
   329  			// the head/tail locations only when raceenabled.
   330  			racenotify(c, c.recvx, nil)
   331  			racenotify(c, c.recvx, sg)
   332  			c.recvx++
   333  			if c.recvx == c.dataqsiz {
   334  				c.recvx = 0
   335  			}
   336  			c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
   337  		}
   338  	}
   339  	if sg.elem != nil {
   340  		sendDirect(c.elemtype, sg, ep)
   341  		sg.elem = nil
   342  	}
   343  	gp := sg.g
   344  	unlockf()
   345  	gp.param = unsafe.Pointer(sg)
   346  	sg.success = true
   347  	if sg.releasetime != 0 {
   348  		sg.releasetime = cputicks()
   349  	}
   350  	goready(gp, skip+1)
   351  }
   352  
   353  // timerchandrain removes all elements in channel c's buffer.
   354  // It reports whether any elements were removed.
   355  // Because it is only intended for timers, it does not
   356  // handle waiting senders at all (all timer channels
   357  // use non-blocking sends to fill the buffer).
   358  func timerchandrain(c *hchan) bool {
   359  	// Note: Cannot use empty(c) because we are called
   360  	// while holding c.timer.sendLock, and empty(c) will
   361  	// call c.timer.maybeRunChan, which will deadlock.
   362  	// We are emptying the channel, so we only care about
   363  	// the count, not about potentially filling it up.
   364  	if atomic.Loaduint(&c.qcount) == 0 {
   365  		return false
   366  	}
   367  	lock(&c.lock)
   368  	any := false
   369  	for c.qcount > 0 {
   370  		any = true
   371  		typedmemclr(c.elemtype, chanbuf(c, c.recvx))
   372  		c.recvx++
   373  		if c.recvx == c.dataqsiz {
   374  			c.recvx = 0
   375  		}
   376  		c.qcount--
   377  	}
   378  	unlock(&c.lock)
   379  	return any
   380  }
   381  
   382  // Sends and receives on unbuffered or empty-buffered channels are the
   383  // only operations where one running goroutine writes to the stack of
   384  // another running goroutine. The GC assumes that stack writes only
   385  // happen when the goroutine is running and are only done by that
   386  // goroutine. Using a write barrier is sufficient to make up for
   387  // violating that assumption, but the write barrier has to work.
   388  // typedmemmove will call bulkBarrierPreWrite, but the target bytes
   389  // are not in the heap, so that will not help. We arrange to call
   390  // memmove and typeBitsBulkBarrier instead.
   391  
   392  func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
   393  	// src is on our stack, dst is a slot on another stack.
   394  
   395  	// Once we read sg.elem out of sg, it will no longer
   396  	// be updated if the destination's stack gets copied (shrunk).
   397  	// So make sure that no preemption points can happen between read & use.
   398  	dst := sg.elem
   399  	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
   400  	// No need for cgo write barrier checks because dst is always
   401  	// Go memory.
   402  	memmove(dst, src, t.Size_)
   403  }
   404  
   405  func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
   406  	// dst is on our stack or the heap, src is on another stack.
   407  	// The channel is locked, so src will not move during this
   408  	// operation.
   409  	src := sg.elem
   410  	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
   411  	memmove(dst, src, t.Size_)
   412  }
   413  
   414  func closechan(c *hchan) {
   415  	if c == nil {
   416  		panic(plainError("close of nil channel"))
   417  	}
   418  	if c.bubble != nil && getg().bubble != c.bubble {
   419  		panic(plainError("close of synctest channel from outside bubble"))
   420  	}
   421  
   422  	lock(&c.lock)
   423  	if c.closed != 0 {
   424  		unlock(&c.lock)
   425  		panic(plainError("close of closed channel"))
   426  	}
   427  
   428  	if raceenabled {
   429  		callerpc := sys.GetCallerPC()
   430  		racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
   431  		racerelease(c.raceaddr())
   432  	}
   433  
   434  	c.closed = 1
   435  
   436  	var glist gList
   437  
   438  	// release all readers
   439  	for {
   440  		sg := c.recvq.dequeue()
   441  		if sg == nil {
   442  			break
   443  		}
   444  		if sg.elem != nil {
   445  			typedmemclr(c.elemtype, sg.elem)
   446  			sg.elem = nil
   447  		}
   448  		if sg.releasetime != 0 {
   449  			sg.releasetime = cputicks()
   450  		}
   451  		gp := sg.g
   452  		gp.param = unsafe.Pointer(sg)
   453  		sg.success = false
   454  		if raceenabled {
   455  			raceacquireg(gp, c.raceaddr())
   456  		}
   457  		glist.push(gp)
   458  	}
   459  
   460  	// release all writers (they will panic)
   461  	for {
   462  		sg := c.sendq.dequeue()
   463  		if sg == nil {
   464  			break
   465  		}
   466  		sg.elem = nil
   467  		if sg.releasetime != 0 {
   468  			sg.releasetime = cputicks()
   469  		}
   470  		gp := sg.g
   471  		gp.param = unsafe.Pointer(sg)
   472  		sg.success = false
   473  		if raceenabled {
   474  			raceacquireg(gp, c.raceaddr())
   475  		}
   476  		glist.push(gp)
   477  	}
   478  	unlock(&c.lock)
   479  
   480  	// Ready all Gs now that we've dropped the channel lock.
   481  	for !glist.empty() {
   482  		gp := glist.pop()
   483  		gp.schedlink = 0
   484  		goready(gp, 3)
   485  	}
   486  }
   487  
   488  // empty reports whether a read from c would block (that is, the channel is
   489  // empty).  It is atomically correct and sequentially consistent at the moment
   490  // it returns, but since the channel is unlocked, the channel may become
   491  // non-empty immediately afterward.
   492  func empty(c *hchan) bool {
   493  	// c.dataqsiz is immutable.
   494  	if c.dataqsiz == 0 {
   495  		return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
   496  	}
   497  	// c.timer is also immutable (it is set after make(chan) but before any channel operations).
   498  	// All timer channels have dataqsiz > 0.
   499  	if c.timer != nil {
   500  		c.timer.maybeRunChan(c)
   501  	}
   502  	return atomic.Loaduint(&c.qcount) == 0
   503  }
   504  
   505  // entry points for <- c from compiled code.
   506  //
   507  //go:nosplit
   508  func chanrecv1(c *hchan, elem unsafe.Pointer) {
   509  	chanrecv(c, elem, true)
   510  }
   511  
   512  //go:nosplit
   513  func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
   514  	_, received = chanrecv(c, elem, true)
   515  	return
   516  }
   517  
   518  // chanrecv receives on channel c and writes the received data to ep.
   519  // ep may be nil, in which case received data is ignored.
   520  // If block == false and no elements are available, returns (false, false).
   521  // Otherwise, if c is closed, zeros *ep and returns (true, false).
   522  // Otherwise, fills in *ep with an element and returns (true, true).
   523  // A non-nil ep must point to the heap or the caller's stack.
   524  func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   525  	// raceenabled: don't need to check ep, as it is always on the stack
   526  	// or is new memory allocated by reflect.
   527  
   528  	if debugChan {
   529  		print("chanrecv: chan=", c, "\n")
   530  	}
   531  
   532  	if c == nil {
   533  		if !block {
   534  			return
   535  		}
   536  		gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
   537  		throw("unreachable")
   538  	}
   539  
   540  	if c.bubble != nil && getg().bubble != c.bubble {
   541  		panic(plainError("receive on synctest channel from outside bubble"))
   542  	}
   543  
   544  	if c.timer != nil {
   545  		c.timer.maybeRunChan(c)
   546  	}
   547  
   548  	// Fast path: check for failed non-blocking operation without acquiring the lock.
   549  	if !block && empty(c) {
   550  		// After observing that the channel is not ready for receiving, we observe whether the
   551  		// channel is closed.
   552  		//
   553  		// Reordering of these checks could lead to incorrect behavior when racing with a close.
   554  		// For example, if the channel was open and not empty, was closed, and then drained,
   555  		// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
   556  		// we use atomic loads for both checks, and rely on emptying and closing to happen in
   557  		// separate critical sections under the same lock.  This assumption fails when closing
   558  		// an unbuffered channel with a blocked send, but that is an error condition anyway.
   559  		if atomic.Load(&c.closed) == 0 {
   560  			// Because a channel cannot be reopened, the later observation of the channel
   561  			// being not closed implies that it was also not closed at the moment of the
   562  			// first observation. We behave as if we observed the channel at that moment
   563  			// and report that the receive cannot proceed.
   564  			return
   565  		}
   566  		// The channel is irreversibly closed. Re-check whether the channel has any pending data
   567  		// to receive, which could have arrived between the empty and closed checks above.
   568  		// Sequential consistency is also required here, when racing with such a send.
   569  		if empty(c) {
   570  			// The channel is irreversibly closed and empty.
   571  			if raceenabled {
   572  				raceacquire(c.raceaddr())
   573  			}
   574  			if ep != nil {
   575  				typedmemclr(c.elemtype, ep)
   576  			}
   577  			return true, false
   578  		}
   579  	}
   580  
   581  	var t0 int64
   582  	if blockprofilerate > 0 {
   583  		t0 = cputicks()
   584  	}
   585  
   586  	lock(&c.lock)
   587  
   588  	if c.closed != 0 {
   589  		if c.qcount == 0 {
   590  			if raceenabled {
   591  				raceacquire(c.raceaddr())
   592  			}
   593  			unlock(&c.lock)
   594  			if ep != nil {
   595  				typedmemclr(c.elemtype, ep)
   596  			}
   597  			return true, false
   598  		}
   599  		// The channel has been closed, but the channel's buffer have data.
   600  	} else {
   601  		// Just found waiting sender with not closed.
   602  		if sg := c.sendq.dequeue(); sg != nil {
   603  			// Found a waiting sender. If buffer is size 0, receive value
   604  			// directly from sender. Otherwise, receive from head of queue
   605  			// and add sender's value to the tail of the queue (both map to
   606  			// the same buffer slot because the queue is full).
   607  			recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
   608  			return true, true
   609  		}
   610  	}
   611  
   612  	if c.qcount > 0 {
   613  		// Receive directly from queue
   614  		qp := chanbuf(c, c.recvx)
   615  		if raceenabled {
   616  			racenotify(c, c.recvx, nil)
   617  		}
   618  		if ep != nil {
   619  			typedmemmove(c.elemtype, ep, qp)
   620  		}
   621  		typedmemclr(c.elemtype, qp)
   622  		c.recvx++
   623  		if c.recvx == c.dataqsiz {
   624  			c.recvx = 0
   625  		}
   626  		c.qcount--
   627  		unlock(&c.lock)
   628  		return true, true
   629  	}
   630  
   631  	if !block {
   632  		unlock(&c.lock)
   633  		return false, false
   634  	}
   635  
   636  	// no sender available: block on this channel.
   637  	gp := getg()
   638  	mysg := acquireSudog()
   639  	mysg.releasetime = 0
   640  	if t0 != 0 {
   641  		mysg.releasetime = -1
   642  	}
   643  	// No stack splits between assigning elem and enqueuing mysg
   644  	// on gp.waiting where copystack can find it.
   645  	mysg.elem = ep
   646  	mysg.waitlink = nil
   647  	gp.waiting = mysg
   648  
   649  	mysg.g = gp
   650  	mysg.isSelect = false
   651  	mysg.c = c
   652  	gp.param = nil
   653  	c.recvq.enqueue(mysg)
   654  	if c.timer != nil {
   655  		blockTimerChan(c)
   656  	}
   657  
   658  	// Signal to anyone trying to shrink our stack that we're about
   659  	// to park on a channel. The window between when this G's status
   660  	// changes and when we set gp.activeStackChans is not safe for
   661  	// stack shrinking.
   662  	gp.parkingOnChan.Store(true)
   663  	reason := waitReasonChanReceive
   664  	if c.bubble != nil {
   665  		reason = waitReasonSynctestChanReceive
   666  	}
   667  	gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanRecv, 2)
   668  
   669  	// someone woke us up
   670  	if mysg != gp.waiting {
   671  		throw("G waiting list is corrupted")
   672  	}
   673  	if c.timer != nil {
   674  		unblockTimerChan(c)
   675  	}
   676  	gp.waiting = nil
   677  	gp.activeStackChans = false
   678  	if mysg.releasetime > 0 {
   679  		blockevent(mysg.releasetime-t0, 2)
   680  	}
   681  	success := mysg.success
   682  	gp.param = nil
   683  	mysg.c = nil
   684  	releaseSudog(mysg)
   685  	return true, success
   686  }
   687  
   688  // recv processes a receive operation on a full channel c.
   689  // There are 2 parts:
   690  //  1. The value sent by the sender sg is put into the channel
   691  //     and the sender is woken up to go on its merry way.
   692  //  2. The value received by the receiver (the current G) is
   693  //     written to ep.
   694  //
   695  // For synchronous channels, both values are the same.
   696  // For asynchronous channels, the receiver gets its data from
   697  // the channel buffer and the sender's data is put in the
   698  // channel buffer.
   699  // Channel c must be full and locked. recv unlocks c with unlockf.
   700  // sg must already be dequeued from c.
   701  // A non-nil ep must point to the heap or the caller's stack.
   702  func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   703  	if c.bubble != nil && getg().bubble != c.bubble {
   704  		unlockf()
   705  		panic(plainError("receive on synctest channel from outside bubble"))
   706  	}
   707  	if c.dataqsiz == 0 {
   708  		if raceenabled {
   709  			racesync(c, sg)
   710  		}
   711  		if ep != nil {
   712  			// copy data from sender
   713  			recvDirect(c.elemtype, sg, ep)
   714  		}
   715  	} else {
   716  		// Queue is full. Take the item at the
   717  		// head of the queue. Make the sender enqueue
   718  		// its item at the tail of the queue. Since the
   719  		// queue is full, those are both the same slot.
   720  		qp := chanbuf(c, c.recvx)
   721  		if raceenabled {
   722  			racenotify(c, c.recvx, nil)
   723  			racenotify(c, c.recvx, sg)
   724  		}
   725  		// copy data from queue to receiver
   726  		if ep != nil {
   727  			typedmemmove(c.elemtype, ep, qp)
   728  		}
   729  		// copy data from sender to queue
   730  		typedmemmove(c.elemtype, qp, sg.elem)
   731  		c.recvx++
   732  		if c.recvx == c.dataqsiz {
   733  			c.recvx = 0
   734  		}
   735  		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
   736  	}
   737  	sg.elem = nil
   738  	gp := sg.g
   739  	unlockf()
   740  	gp.param = unsafe.Pointer(sg)
   741  	sg.success = true
   742  	if sg.releasetime != 0 {
   743  		sg.releasetime = cputicks()
   744  	}
   745  	goready(gp, skip+1)
   746  }
   747  
   748  func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool {
   749  	// There are unlocked sudogs that point into gp's stack. Stack
   750  	// copying must lock the channels of those sudogs.
   751  	// Set activeStackChans here instead of before we try parking
   752  	// because we could self-deadlock in stack growth on the
   753  	// channel lock.
   754  	gp.activeStackChans = true
   755  	// Mark that it's safe for stack shrinking to occur now,
   756  	// because any thread acquiring this G's stack for shrinking
   757  	// is guaranteed to observe activeStackChans after this store.
   758  	gp.parkingOnChan.Store(false)
   759  	// Make sure we unlock after setting activeStackChans and
   760  	// unsetting parkingOnChan. The moment we unlock chanLock
   761  	// we risk gp getting readied by a channel operation and
   762  	// so gp could continue running before everything before
   763  	// the unlock is visible (even to gp itself).
   764  	unlock((*mutex)(chanLock))
   765  	return true
   766  }
   767  
   768  // compiler implements
   769  //
   770  //	select {
   771  //	case c <- v:
   772  //		... foo
   773  //	default:
   774  //		... bar
   775  //	}
   776  //
   777  // as
   778  //
   779  //	if selectnbsend(c, v) {
   780  //		... foo
   781  //	} else {
   782  //		... bar
   783  //	}
   784  func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
   785  	return chansend(c, elem, false, sys.GetCallerPC())
   786  }
   787  
   788  // compiler implements
   789  //
   790  //	select {
   791  //	case v, ok = <-c:
   792  //		... foo
   793  //	default:
   794  //		... bar
   795  //	}
   796  //
   797  // as
   798  //
   799  //	if selected, ok = selectnbrecv(&v, c); selected {
   800  //		... foo
   801  //	} else {
   802  //		... bar
   803  //	}
   804  func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
   805  	return chanrecv(c, elem, false)
   806  }
   807  
   808  //go:linkname reflect_chansend reflect.chansend0
   809  func reflect_chansend(c *hchan, elem unsafe.Pointer, nb bool) (selected bool) {
   810  	return chansend(c, elem, !nb, sys.GetCallerPC())
   811  }
   812  
   813  //go:linkname reflect_chanrecv reflect.chanrecv
   814  func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {
   815  	return chanrecv(c, elem, !nb)
   816  }
   817  
   818  func chanlen(c *hchan) int {
   819  	if c == nil {
   820  		return 0
   821  	}
   822  	async := debug.asynctimerchan.Load() != 0
   823  	if c.timer != nil && async {
   824  		c.timer.maybeRunChan(c)
   825  	}
   826  	if c.timer != nil && !async {
   827  		// timer channels have a buffered implementation
   828  		// but present to users as unbuffered, so that we can
   829  		// undo sends without users noticing.
   830  		return 0
   831  	}
   832  	return int(c.qcount)
   833  }
   834  
   835  func chancap(c *hchan) int {
   836  	if c == nil {
   837  		return 0
   838  	}
   839  	if c.timer != nil {
   840  		async := debug.asynctimerchan.Load() != 0
   841  		if async {
   842  			return int(c.dataqsiz)
   843  		}
   844  		// timer channels have a buffered implementation
   845  		// but present to users as unbuffered, so that we can
   846  		// undo sends without users noticing.
   847  		return 0
   848  	}
   849  	return int(c.dataqsiz)
   850  }
   851  
   852  //go:linkname reflect_chanlen reflect.chanlen
   853  func reflect_chanlen(c *hchan) int {
   854  	return chanlen(c)
   855  }
   856  
   857  //go:linkname reflectlite_chanlen internal/reflectlite.chanlen
   858  func reflectlite_chanlen(c *hchan) int {
   859  	return chanlen(c)
   860  }
   861  
   862  //go:linkname reflect_chancap reflect.chancap
   863  func reflect_chancap(c *hchan) int {
   864  	return chancap(c)
   865  }
   866  
   867  //go:linkname reflect_chanclose reflect.chanclose
   868  func reflect_chanclose(c *hchan) {
   869  	closechan(c)
   870  }
   871  
   872  func (q *waitq) enqueue(sgp *sudog) {
   873  	sgp.next = nil
   874  	x := q.last
   875  	if x == nil {
   876  		sgp.prev = nil
   877  		q.first = sgp
   878  		q.last = sgp
   879  		return
   880  	}
   881  	sgp.prev = x
   882  	x.next = sgp
   883  	q.last = sgp
   884  }
   885  
   886  func (q *waitq) dequeue() *sudog {
   887  	for {
   888  		sgp := q.first
   889  		if sgp == nil {
   890  			return nil
   891  		}
   892  		y := sgp.next
   893  		if y == nil {
   894  			q.first = nil
   895  			q.last = nil
   896  		} else {
   897  			y.prev = nil
   898  			q.first = y
   899  			sgp.next = nil // mark as removed (see dequeueSudoG)
   900  		}
   901  
   902  		// if a goroutine was put on this queue because of a
   903  		// select, there is a small window between the goroutine
   904  		// being woken up by a different case and it grabbing the
   905  		// channel locks. Once it has the lock
   906  		// it removes itself from the queue, so we won't see it after that.
   907  		// We use a flag in the G struct to tell us when someone
   908  		// else has won the race to signal this goroutine but the goroutine
   909  		// hasn't removed itself from the queue yet.
   910  		if sgp.isSelect {
   911  			if !sgp.g.selectDone.CompareAndSwap(0, 1) {
   912  				// We lost the race to wake this goroutine.
   913  				continue
   914  			}
   915  		}
   916  
   917  		return sgp
   918  	}
   919  }
   920  
   921  func (c *hchan) raceaddr() unsafe.Pointer {
   922  	// Treat read-like and write-like operations on the channel to
   923  	// happen at this address. Avoid using the address of qcount
   924  	// or dataqsiz, because the len() and cap() builtins read
   925  	// those addresses, and we don't want them racing with
   926  	// operations like close().
   927  	return unsafe.Pointer(&c.buf)
   928  }
   929  
   930  func racesync(c *hchan, sg *sudog) {
   931  	racerelease(chanbuf(c, 0))
   932  	raceacquireg(sg.g, chanbuf(c, 0))
   933  	racereleaseg(sg.g, chanbuf(c, 0))
   934  	raceacquire(chanbuf(c, 0))
   935  }
   936  
   937  // Notify the race detector of a send or receive involving buffer entry idx
   938  // and a channel c or its communicating partner sg.
   939  // This function handles the special case of c.elemsize==0.
   940  func racenotify(c *hchan, idx uint, sg *sudog) {
   941  	// We could have passed the unsafe.Pointer corresponding to entry idx
   942  	// instead of idx itself.  However, in a future version of this function,
   943  	// we can use idx to better handle the case of elemsize==0.
   944  	// A future improvement to the detector is to call TSan with c and idx:
   945  	// this way, Go will continue to not allocating buffer entries for channels
   946  	// of elemsize==0, yet the race detector can be made to handle multiple
   947  	// sync objects underneath the hood (one sync object per idx)
   948  	qp := chanbuf(c, idx)
   949  	// When elemsize==0, we don't allocate a full buffer for the channel.
   950  	// Instead of individual buffer entries, the race detector uses the
   951  	// c.buf as the only buffer entry.  This simplification prevents us from
   952  	// following the memory model's happens-before rules (rules that are
   953  	// implemented in racereleaseacquire).  Instead, we accumulate happens-before
   954  	// information in the synchronization object associated with c.buf.
   955  	if c.elemsize == 0 {
   956  		if sg == nil {
   957  			raceacquire(qp)
   958  			racerelease(qp)
   959  		} else {
   960  			raceacquireg(sg.g, qp)
   961  			racereleaseg(sg.g, qp)
   962  		}
   963  	} else {
   964  		if sg == nil {
   965  			racereleaseacquire(qp)
   966  		} else {
   967  			racereleaseacquireg(sg.g, qp)
   968  		}
   969  	}
   970  }
   971  

View as plain text