Source file src/runtime/chan.go

     1  // Copyright 2014 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package runtime
     6  
     7  // This file contains the implementation of Go channels.
     8  
     9  // Invariants:
    10  //  At least one of c.sendq and c.recvq is empty,
    11  //  except for the case of an unbuffered channel with a single goroutine
    12  //  blocked on it for both sending and receiving using a select statement,
    13  //  in which case the length of c.sendq and c.recvq is limited only by the
    14  //  size of the select statement.
    15  //
    16  // For buffered channels, also:
    17  //  c.qcount > 0 implies that c.recvq is empty.
    18  //  c.qcount < c.dataqsiz implies that c.sendq is empty.
    19  
    20  import (
    21  	"internal/abi"
    22  	"internal/runtime/atomic"
    23  	"internal/runtime/math"
    24  	"internal/runtime/sys"
    25  	"unsafe"
    26  )
    27  
    28  const (
    29  	maxAlign  = 8
    30  	hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
    31  	debugChan = false
    32  )
    33  
    34  type hchan struct {
    35  	qcount   uint           // total data in the queue
    36  	dataqsiz uint           // size of the circular queue
    37  	buf      unsafe.Pointer // points to an array of dataqsiz elements
    38  	elemsize uint16
    39  	closed   uint32
    40  	timer    *timer // timer feeding this chan
    41  	elemtype *_type // element type
    42  	sendx    uint   // send index
    43  	recvx    uint   // receive index
    44  	recvq    waitq  // list of recv waiters
    45  	sendq    waitq  // list of send waiters
    46  
    47  	// lock protects all fields in hchan, as well as several
    48  	// fields in sudogs blocked on this channel.
    49  	//
    50  	// Do not change another G's status while holding this lock
    51  	// (in particular, do not ready a G), as this can deadlock
    52  	// with stack shrinking.
    53  	lock mutex
    54  }
    55  
    56  type waitq struct {
    57  	first *sudog
    58  	last  *sudog
    59  }
    60  
    61  //go:linkname reflect_makechan reflect.makechan
    62  func reflect_makechan(t *chantype, size int) *hchan {
    63  	return makechan(t, size)
    64  }
    65  
    66  func makechan64(t *chantype, size int64) *hchan {
    67  	if int64(int(size)) != size {
    68  		panic(plainError("makechan: size out of range"))
    69  	}
    70  
    71  	return makechan(t, int(size))
    72  }
    73  
    74  func makechan(t *chantype, size int) *hchan {
    75  	elem := t.Elem
    76  
    77  	// compiler checks this but be safe.
    78  	if elem.Size_ >= 1<<16 {
    79  		throw("makechan: invalid channel element type")
    80  	}
    81  	if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
    82  		throw("makechan: bad alignment")
    83  	}
    84  
    85  	mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
    86  	if overflow || mem > maxAlloc-hchanSize || size < 0 {
    87  		panic(plainError("makechan: size out of range"))
    88  	}
    89  
    90  	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    91  	// buf points into the same allocation, elemtype is persistent.
    92  	// SudoG's are referenced from their owning thread so they can't be collected.
    93  	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    94  	var c *hchan
    95  	switch {
    96  	case mem == 0:
    97  		// Queue or element size is zero.
    98  		c = (*hchan)(mallocgc(hchanSize, nil, true))
    99  		// Race detector uses this location for synchronization.
   100  		c.buf = c.raceaddr()
   101  	case !elem.Pointers():
   102  		// Elements do not contain pointers.
   103  		// Allocate hchan and buf in one call.
   104  		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
   105  		c.buf = add(unsafe.Pointer(c), hchanSize)
   106  	default:
   107  		// Elements contain pointers.
   108  		c = new(hchan)
   109  		c.buf = mallocgc(mem, elem, true)
   110  	}
   111  
   112  	c.elemsize = uint16(elem.Size_)
   113  	c.elemtype = elem
   114  	c.dataqsiz = uint(size)
   115  	lockInit(&c.lock, lockRankHchan)
   116  
   117  	if debugChan {
   118  		print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
   119  	}
   120  	return c
   121  }
   122  
   123  // chanbuf(c, i) is pointer to the i'th slot in the buffer.
   124  //
   125  // chanbuf should be an internal detail,
   126  // but widely used packages access it using linkname.
   127  // Notable members of the hall of shame include:
   128  //   - github.com/fjl/memsize
   129  //
   130  // Do not remove or change the type signature.
   131  // See go.dev/issue/67401.
   132  //
   133  //go:linkname chanbuf
   134  func chanbuf(c *hchan, i uint) unsafe.Pointer {
   135  	return add(c.buf, uintptr(i)*uintptr(c.elemsize))
   136  }
   137  
   138  // full reports whether a send on c would block (that is, the channel is full).
   139  // It uses a single word-sized read of mutable state, so although
   140  // the answer is instantaneously true, the correct answer may have changed
   141  // by the time the calling function receives the return value.
   142  func full(c *hchan) bool {
   143  	// c.dataqsiz is immutable (never written after the channel is created)
   144  	// so it is safe to read at any time during channel operation.
   145  	if c.dataqsiz == 0 {
   146  		// Assumes that a pointer read is relaxed-atomic.
   147  		return c.recvq.first == nil
   148  	}
   149  	// Assumes that a uint read is relaxed-atomic.
   150  	return c.qcount == c.dataqsiz
   151  }
   152  
   153  // entry point for c <- x from compiled code.
   154  //
   155  //go:nosplit
   156  func chansend1(c *hchan, elem unsafe.Pointer) {
   157  	chansend(c, elem, true, sys.GetCallerPC())
   158  }
   159  
   160  /*
   161   * generic single channel send/recv
   162   * If block is not nil,
   163   * then the protocol will not
   164   * sleep but return if it could
   165   * not complete.
   166   *
   167   * sleep can wake up with g.param == nil
   168   * when a channel involved in the sleep has
   169   * been closed.  it is easiest to loop and re-run
   170   * the operation; we'll see that it's now closed.
   171   */
   172  func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   173  	if c == nil {
   174  		if !block {
   175  			return false
   176  		}
   177  		gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
   178  		throw("unreachable")
   179  	}
   180  
   181  	if debugChan {
   182  		print("chansend: chan=", c, "\n")
   183  	}
   184  
   185  	if raceenabled {
   186  		racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
   187  	}
   188  
   189  	// Fast path: check for failed non-blocking operation without acquiring the lock.
   190  	//
   191  	// After observing that the channel is not closed, we observe that the channel is
   192  	// not ready for sending. Each of these observations is a single word-sized read
   193  	// (first c.closed and second full()).
   194  	// Because a closed channel cannot transition from 'ready for sending' to
   195  	// 'not ready for sending', even if the channel is closed between the two observations,
   196  	// they imply a moment between the two when the channel was both not yet closed
   197  	// and not ready for sending. We behave as if we observed the channel at that moment,
   198  	// and report that the send cannot proceed.
   199  	//
   200  	// It is okay if the reads are reordered here: if we observe that the channel is not
   201  	// ready for sending and then observe that it is not closed, that implies that the
   202  	// channel wasn't closed during the first observation. However, nothing here
   203  	// guarantees forward progress. We rely on the side effects of lock release in
   204  	// chanrecv() and closechan() to update this thread's view of c.closed and full().
   205  	if !block && c.closed == 0 && full(c) {
   206  		return false
   207  	}
   208  
   209  	var t0 int64
   210  	if blockprofilerate > 0 {
   211  		t0 = cputicks()
   212  	}
   213  
   214  	lock(&c.lock)
   215  
   216  	if c.closed != 0 {
   217  		unlock(&c.lock)
   218  		panic(plainError("send on closed channel"))
   219  	}
   220  
   221  	if sg := c.recvq.dequeue(); sg != nil {
   222  		// Found a waiting receiver. We pass the value we want to send
   223  		// directly to the receiver, bypassing the channel buffer (if any).
   224  		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
   225  		return true
   226  	}
   227  
   228  	if c.qcount < c.dataqsiz {
   229  		// Space is available in the channel buffer. Enqueue the element to send.
   230  		qp := chanbuf(c, c.sendx)
   231  		if raceenabled {
   232  			racenotify(c, c.sendx, nil)
   233  		}
   234  		typedmemmove(c.elemtype, qp, ep)
   235  		c.sendx++
   236  		if c.sendx == c.dataqsiz {
   237  			c.sendx = 0
   238  		}
   239  		c.qcount++
   240  		unlock(&c.lock)
   241  		return true
   242  	}
   243  
   244  	if !block {
   245  		unlock(&c.lock)
   246  		return false
   247  	}
   248  
   249  	// Block on the channel. Some receiver will complete our operation for us.
   250  	gp := getg()
   251  	mysg := acquireSudog()
   252  	mysg.releasetime = 0
   253  	if t0 != 0 {
   254  		mysg.releasetime = -1
   255  	}
   256  	// No stack splits between assigning elem and enqueuing mysg
   257  	// on gp.waiting where copystack can find it.
   258  	mysg.elem = ep
   259  	mysg.waitlink = nil
   260  	mysg.g = gp
   261  	mysg.isSelect = false
   262  	mysg.c = c
   263  	gp.waiting = mysg
   264  	gp.param = nil
   265  	c.sendq.enqueue(mysg)
   266  	// Signal to anyone trying to shrink our stack that we're about
   267  	// to park on a channel. The window between when this G's status
   268  	// changes and when we set gp.activeStackChans is not safe for
   269  	// stack shrinking.
   270  	gp.parkingOnChan.Store(true)
   271  	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
   272  	// Ensure the value being sent is kept alive until the
   273  	// receiver copies it out. The sudog has a pointer to the
   274  	// stack object, but sudogs aren't considered as roots of the
   275  	// stack tracer.
   276  	KeepAlive(ep)
   277  
   278  	// someone woke us up.
   279  	if mysg != gp.waiting {
   280  		throw("G waiting list is corrupted")
   281  	}
   282  	gp.waiting = nil
   283  	gp.activeStackChans = false
   284  	closed := !mysg.success
   285  	gp.param = nil
   286  	if mysg.releasetime > 0 {
   287  		blockevent(mysg.releasetime-t0, 2)
   288  	}
   289  	mysg.c = nil
   290  	releaseSudog(mysg)
   291  	if closed {
   292  		if c.closed == 0 {
   293  			throw("chansend: spurious wakeup")
   294  		}
   295  		panic(plainError("send on closed channel"))
   296  	}
   297  	return true
   298  }
   299  
   300  // send processes a send operation on an empty channel c.
   301  // The value ep sent by the sender is copied to the receiver sg.
   302  // The receiver is then woken up to go on its merry way.
   303  // Channel c must be empty and locked.  send unlocks c with unlockf.
   304  // sg must already be dequeued from c.
   305  // ep must be non-nil and point to the heap or the caller's stack.
   306  func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   307  	if raceenabled {
   308  		if c.dataqsiz == 0 {
   309  			racesync(c, sg)
   310  		} else {
   311  			// Pretend we go through the buffer, even though
   312  			// we copy directly. Note that we need to increment
   313  			// the head/tail locations only when raceenabled.
   314  			racenotify(c, c.recvx, nil)
   315  			racenotify(c, c.recvx, sg)
   316  			c.recvx++
   317  			if c.recvx == c.dataqsiz {
   318  				c.recvx = 0
   319  			}
   320  			c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
   321  		}
   322  	}
   323  	if sg.elem != nil {
   324  		sendDirect(c.elemtype, sg, ep)
   325  		sg.elem = nil
   326  	}
   327  	gp := sg.g
   328  	unlockf()
   329  	gp.param = unsafe.Pointer(sg)
   330  	sg.success = true
   331  	if sg.releasetime != 0 {
   332  		sg.releasetime = cputicks()
   333  	}
   334  	goready(gp, skip+1)
   335  }
   336  
   337  // timerchandrain removes all elements in channel c's buffer.
   338  // It reports whether any elements were removed.
   339  // Because it is only intended for timers, it does not
   340  // handle waiting senders at all (all timer channels
   341  // use non-blocking sends to fill the buffer).
   342  func timerchandrain(c *hchan) bool {
   343  	// Note: Cannot use empty(c) because we are called
   344  	// while holding c.timer.sendLock, and empty(c) will
   345  	// call c.timer.maybeRunChan, which will deadlock.
   346  	// We are emptying the channel, so we only care about
   347  	// the count, not about potentially filling it up.
   348  	if atomic.Loaduint(&c.qcount) == 0 {
   349  		return false
   350  	}
   351  	lock(&c.lock)
   352  	any := false
   353  	for c.qcount > 0 {
   354  		any = true
   355  		typedmemclr(c.elemtype, chanbuf(c, c.recvx))
   356  		c.recvx++
   357  		if c.recvx == c.dataqsiz {
   358  			c.recvx = 0
   359  		}
   360  		c.qcount--
   361  	}
   362  	unlock(&c.lock)
   363  	return any
   364  }
   365  
   366  // Sends and receives on unbuffered or empty-buffered channels are the
   367  // only operations where one running goroutine writes to the stack of
   368  // another running goroutine. The GC assumes that stack writes only
   369  // happen when the goroutine is running and are only done by that
   370  // goroutine. Using a write barrier is sufficient to make up for
   371  // violating that assumption, but the write barrier has to work.
   372  // typedmemmove will call bulkBarrierPreWrite, but the target bytes
   373  // are not in the heap, so that will not help. We arrange to call
   374  // memmove and typeBitsBulkBarrier instead.
   375  
   376  func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
   377  	// src is on our stack, dst is a slot on another stack.
   378  
   379  	// Once we read sg.elem out of sg, it will no longer
   380  	// be updated if the destination's stack gets copied (shrunk).
   381  	// So make sure that no preemption points can happen between read & use.
   382  	dst := sg.elem
   383  	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
   384  	// No need for cgo write barrier checks because dst is always
   385  	// Go memory.
   386  	memmove(dst, src, t.Size_)
   387  }
   388  
   389  func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
   390  	// dst is on our stack or the heap, src is on another stack.
   391  	// The channel is locked, so src will not move during this
   392  	// operation.
   393  	src := sg.elem
   394  	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
   395  	memmove(dst, src, t.Size_)
   396  }
   397  
   398  func closechan(c *hchan) {
   399  	if c == nil {
   400  		panic(plainError("close of nil channel"))
   401  	}
   402  
   403  	lock(&c.lock)
   404  	if c.closed != 0 {
   405  		unlock(&c.lock)
   406  		panic(plainError("close of closed channel"))
   407  	}
   408  
   409  	if raceenabled {
   410  		callerpc := sys.GetCallerPC()
   411  		racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
   412  		racerelease(c.raceaddr())
   413  	}
   414  
   415  	c.closed = 1
   416  
   417  	var glist gList
   418  
   419  	// release all readers
   420  	for {
   421  		sg := c.recvq.dequeue()
   422  		if sg == nil {
   423  			break
   424  		}
   425  		if sg.elem != nil {
   426  			typedmemclr(c.elemtype, sg.elem)
   427  			sg.elem = nil
   428  		}
   429  		if sg.releasetime != 0 {
   430  			sg.releasetime = cputicks()
   431  		}
   432  		gp := sg.g
   433  		gp.param = unsafe.Pointer(sg)
   434  		sg.success = false
   435  		if raceenabled {
   436  			raceacquireg(gp, c.raceaddr())
   437  		}
   438  		glist.push(gp)
   439  	}
   440  
   441  	// release all writers (they will panic)
   442  	for {
   443  		sg := c.sendq.dequeue()
   444  		if sg == nil {
   445  			break
   446  		}
   447  		sg.elem = nil
   448  		if sg.releasetime != 0 {
   449  			sg.releasetime = cputicks()
   450  		}
   451  		gp := sg.g
   452  		gp.param = unsafe.Pointer(sg)
   453  		sg.success = false
   454  		if raceenabled {
   455  			raceacquireg(gp, c.raceaddr())
   456  		}
   457  		glist.push(gp)
   458  	}
   459  	unlock(&c.lock)
   460  
   461  	// Ready all Gs now that we've dropped the channel lock.
   462  	for !glist.empty() {
   463  		gp := glist.pop()
   464  		gp.schedlink = 0
   465  		goready(gp, 3)
   466  	}
   467  }
   468  
   469  // empty reports whether a read from c would block (that is, the channel is
   470  // empty).  It is atomically correct and sequentially consistent at the moment
   471  // it returns, but since the channel is unlocked, the channel may become
   472  // non-empty immediately afterward.
   473  func empty(c *hchan) bool {
   474  	// c.dataqsiz is immutable.
   475  	if c.dataqsiz == 0 {
   476  		return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
   477  	}
   478  	// c.timer is also immutable (it is set after make(chan) but before any channel operations).
   479  	// All timer channels have dataqsiz > 0.
   480  	if c.timer != nil {
   481  		c.timer.maybeRunChan()
   482  	}
   483  	return atomic.Loaduint(&c.qcount) == 0
   484  }
   485  
   486  // entry points for <- c from compiled code.
   487  //
   488  //go:nosplit
   489  func chanrecv1(c *hchan, elem unsafe.Pointer) {
   490  	chanrecv(c, elem, true)
   491  }
   492  
   493  //go:nosplit
   494  func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
   495  	_, received = chanrecv(c, elem, true)
   496  	return
   497  }
   498  
   499  // chanrecv receives on channel c and writes the received data to ep.
   500  // ep may be nil, in which case received data is ignored.
   501  // If block == false and no elements are available, returns (false, false).
   502  // Otherwise, if c is closed, zeros *ep and returns (true, false).
   503  // Otherwise, fills in *ep with an element and returns (true, true).
   504  // A non-nil ep must point to the heap or the caller's stack.
   505  func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   506  	// raceenabled: don't need to check ep, as it is always on the stack
   507  	// or is new memory allocated by reflect.
   508  
   509  	if debugChan {
   510  		print("chanrecv: chan=", c, "\n")
   511  	}
   512  
   513  	if c == nil {
   514  		if !block {
   515  			return
   516  		}
   517  		gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
   518  		throw("unreachable")
   519  	}
   520  
   521  	if c.timer != nil {
   522  		c.timer.maybeRunChan()
   523  	}
   524  
   525  	// Fast path: check for failed non-blocking operation without acquiring the lock.
   526  	if !block && empty(c) {
   527  		// After observing that the channel is not ready for receiving, we observe whether the
   528  		// channel is closed.
   529  		//
   530  		// Reordering of these checks could lead to incorrect behavior when racing with a close.
   531  		// For example, if the channel was open and not empty, was closed, and then drained,
   532  		// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
   533  		// we use atomic loads for both checks, and rely on emptying and closing to happen in
   534  		// separate critical sections under the same lock.  This assumption fails when closing
   535  		// an unbuffered channel with a blocked send, but that is an error condition anyway.
   536  		if atomic.Load(&c.closed) == 0 {
   537  			// Because a channel cannot be reopened, the later observation of the channel
   538  			// being not closed implies that it was also not closed at the moment of the
   539  			// first observation. We behave as if we observed the channel at that moment
   540  			// and report that the receive cannot proceed.
   541  			return
   542  		}
   543  		// The channel is irreversibly closed. Re-check whether the channel has any pending data
   544  		// to receive, which could have arrived between the empty and closed checks above.
   545  		// Sequential consistency is also required here, when racing with such a send.
   546  		if empty(c) {
   547  			// The channel is irreversibly closed and empty.
   548  			if raceenabled {
   549  				raceacquire(c.raceaddr())
   550  			}
   551  			if ep != nil {
   552  				typedmemclr(c.elemtype, ep)
   553  			}
   554  			return true, false
   555  		}
   556  	}
   557  
   558  	var t0 int64
   559  	if blockprofilerate > 0 {
   560  		t0 = cputicks()
   561  	}
   562  
   563  	lock(&c.lock)
   564  
   565  	if c.closed != 0 {
   566  		if c.qcount == 0 {
   567  			if raceenabled {
   568  				raceacquire(c.raceaddr())
   569  			}
   570  			unlock(&c.lock)
   571  			if ep != nil {
   572  				typedmemclr(c.elemtype, ep)
   573  			}
   574  			return true, false
   575  		}
   576  		// The channel has been closed, but the channel's buffer have data.
   577  	} else {
   578  		// Just found waiting sender with not closed.
   579  		if sg := c.sendq.dequeue(); sg != nil {
   580  			// Found a waiting sender. If buffer is size 0, receive value
   581  			// directly from sender. Otherwise, receive from head of queue
   582  			// and add sender's value to the tail of the queue (both map to
   583  			// the same buffer slot because the queue is full).
   584  			recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
   585  			return true, true
   586  		}
   587  	}
   588  
   589  	if c.qcount > 0 {
   590  		// Receive directly from queue
   591  		qp := chanbuf(c, c.recvx)
   592  		if raceenabled {
   593  			racenotify(c, c.recvx, nil)
   594  		}
   595  		if ep != nil {
   596  			typedmemmove(c.elemtype, ep, qp)
   597  		}
   598  		typedmemclr(c.elemtype, qp)
   599  		c.recvx++
   600  		if c.recvx == c.dataqsiz {
   601  			c.recvx = 0
   602  		}
   603  		c.qcount--
   604  		unlock(&c.lock)
   605  		return true, true
   606  	}
   607  
   608  	if !block {
   609  		unlock(&c.lock)
   610  		return false, false
   611  	}
   612  
   613  	// no sender available: block on this channel.
   614  	gp := getg()
   615  	mysg := acquireSudog()
   616  	mysg.releasetime = 0
   617  	if t0 != 0 {
   618  		mysg.releasetime = -1
   619  	}
   620  	// No stack splits between assigning elem and enqueuing mysg
   621  	// on gp.waiting where copystack can find it.
   622  	mysg.elem = ep
   623  	mysg.waitlink = nil
   624  	gp.waiting = mysg
   625  
   626  	mysg.g = gp
   627  	mysg.isSelect = false
   628  	mysg.c = c
   629  	gp.param = nil
   630  	c.recvq.enqueue(mysg)
   631  	if c.timer != nil {
   632  		blockTimerChan(c)
   633  	}
   634  
   635  	// Signal to anyone trying to shrink our stack that we're about
   636  	// to park on a channel. The window between when this G's status
   637  	// changes and when we set gp.activeStackChans is not safe for
   638  	// stack shrinking.
   639  	gp.parkingOnChan.Store(true)
   640  	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
   641  
   642  	// someone woke us up
   643  	if mysg != gp.waiting {
   644  		throw("G waiting list is corrupted")
   645  	}
   646  	if c.timer != nil {
   647  		unblockTimerChan(c)
   648  	}
   649  	gp.waiting = nil
   650  	gp.activeStackChans = false
   651  	if mysg.releasetime > 0 {
   652  		blockevent(mysg.releasetime-t0, 2)
   653  	}
   654  	success := mysg.success
   655  	gp.param = nil
   656  	mysg.c = nil
   657  	releaseSudog(mysg)
   658  	return true, success
   659  }
   660  
   661  // recv processes a receive operation on a full channel c.
   662  // There are 2 parts:
   663  //  1. The value sent by the sender sg is put into the channel
   664  //     and the sender is woken up to go on its merry way.
   665  //  2. The value received by the receiver (the current G) is
   666  //     written to ep.
   667  //
   668  // For synchronous channels, both values are the same.
   669  // For asynchronous channels, the receiver gets its data from
   670  // the channel buffer and the sender's data is put in the
   671  // channel buffer.
   672  // Channel c must be full and locked. recv unlocks c with unlockf.
   673  // sg must already be dequeued from c.
   674  // A non-nil ep must point to the heap or the caller's stack.
   675  func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   676  	if c.dataqsiz == 0 {
   677  		if raceenabled {
   678  			racesync(c, sg)
   679  		}
   680  		if ep != nil {
   681  			// copy data from sender
   682  			recvDirect(c.elemtype, sg, ep)
   683  		}
   684  	} else {
   685  		// Queue is full. Take the item at the
   686  		// head of the queue. Make the sender enqueue
   687  		// its item at the tail of the queue. Since the
   688  		// queue is full, those are both the same slot.
   689  		qp := chanbuf(c, c.recvx)
   690  		if raceenabled {
   691  			racenotify(c, c.recvx, nil)
   692  			racenotify(c, c.recvx, sg)
   693  		}
   694  		// copy data from queue to receiver
   695  		if ep != nil {
   696  			typedmemmove(c.elemtype, ep, qp)
   697  		}
   698  		// copy data from sender to queue
   699  		typedmemmove(c.elemtype, qp, sg.elem)
   700  		c.recvx++
   701  		if c.recvx == c.dataqsiz {
   702  			c.recvx = 0
   703  		}
   704  		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
   705  	}
   706  	sg.elem = nil
   707  	gp := sg.g
   708  	unlockf()
   709  	gp.param = unsafe.Pointer(sg)
   710  	sg.success = true
   711  	if sg.releasetime != 0 {
   712  		sg.releasetime = cputicks()
   713  	}
   714  	goready(gp, skip+1)
   715  }
   716  
   717  func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool {
   718  	// There are unlocked sudogs that point into gp's stack. Stack
   719  	// copying must lock the channels of those sudogs.
   720  	// Set activeStackChans here instead of before we try parking
   721  	// because we could self-deadlock in stack growth on the
   722  	// channel lock.
   723  	gp.activeStackChans = true
   724  	// Mark that it's safe for stack shrinking to occur now,
   725  	// because any thread acquiring this G's stack for shrinking
   726  	// is guaranteed to observe activeStackChans after this store.
   727  	gp.parkingOnChan.Store(false)
   728  	// Make sure we unlock after setting activeStackChans and
   729  	// unsetting parkingOnChan. The moment we unlock chanLock
   730  	// we risk gp getting readied by a channel operation and
   731  	// so gp could continue running before everything before
   732  	// the unlock is visible (even to gp itself).
   733  	unlock((*mutex)(chanLock))
   734  	return true
   735  }
   736  
   737  // compiler implements
   738  //
   739  //	select {
   740  //	case c <- v:
   741  //		... foo
   742  //	default:
   743  //		... bar
   744  //	}
   745  //
   746  // as
   747  //
   748  //	if selectnbsend(c, v) {
   749  //		... foo
   750  //	} else {
   751  //		... bar
   752  //	}
   753  func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
   754  	return chansend(c, elem, false, sys.GetCallerPC())
   755  }
   756  
   757  // compiler implements
   758  //
   759  //	select {
   760  //	case v, ok = <-c:
   761  //		... foo
   762  //	default:
   763  //		... bar
   764  //	}
   765  //
   766  // as
   767  //
   768  //	if selected, ok = selectnbrecv(&v, c); selected {
   769  //		... foo
   770  //	} else {
   771  //		... bar
   772  //	}
   773  func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
   774  	return chanrecv(c, elem, false)
   775  }
   776  
   777  //go:linkname reflect_chansend reflect.chansend0
   778  func reflect_chansend(c *hchan, elem unsafe.Pointer, nb bool) (selected bool) {
   779  	return chansend(c, elem, !nb, sys.GetCallerPC())
   780  }
   781  
   782  //go:linkname reflect_chanrecv reflect.chanrecv
   783  func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {
   784  	return chanrecv(c, elem, !nb)
   785  }
   786  
   787  func chanlen(c *hchan) int {
   788  	if c == nil {
   789  		return 0
   790  	}
   791  	async := debug.asynctimerchan.Load() != 0
   792  	if c.timer != nil && async {
   793  		c.timer.maybeRunChan()
   794  	}
   795  	if c.timer != nil && !async {
   796  		// timer channels have a buffered implementation
   797  		// but present to users as unbuffered, so that we can
   798  		// undo sends without users noticing.
   799  		return 0
   800  	}
   801  	return int(c.qcount)
   802  }
   803  
   804  func chancap(c *hchan) int {
   805  	if c == nil {
   806  		return 0
   807  	}
   808  	if c.timer != nil {
   809  		async := debug.asynctimerchan.Load() != 0
   810  		if async {
   811  			return int(c.dataqsiz)
   812  		}
   813  		// timer channels have a buffered implementation
   814  		// but present to users as unbuffered, so that we can
   815  		// undo sends without users noticing.
   816  		return 0
   817  	}
   818  	return int(c.dataqsiz)
   819  }
   820  
   821  //go:linkname reflect_chanlen reflect.chanlen
   822  func reflect_chanlen(c *hchan) int {
   823  	return chanlen(c)
   824  }
   825  
   826  //go:linkname reflectlite_chanlen internal/reflectlite.chanlen
   827  func reflectlite_chanlen(c *hchan) int {
   828  	return chanlen(c)
   829  }
   830  
   831  //go:linkname reflect_chancap reflect.chancap
   832  func reflect_chancap(c *hchan) int {
   833  	return chancap(c)
   834  }
   835  
   836  //go:linkname reflect_chanclose reflect.chanclose
   837  func reflect_chanclose(c *hchan) {
   838  	closechan(c)
   839  }
   840  
   841  func (q *waitq) enqueue(sgp *sudog) {
   842  	sgp.next = nil
   843  	x := q.last
   844  	if x == nil {
   845  		sgp.prev = nil
   846  		q.first = sgp
   847  		q.last = sgp
   848  		return
   849  	}
   850  	sgp.prev = x
   851  	x.next = sgp
   852  	q.last = sgp
   853  }
   854  
   855  func (q *waitq) dequeue() *sudog {
   856  	for {
   857  		sgp := q.first
   858  		if sgp == nil {
   859  			return nil
   860  		}
   861  		y := sgp.next
   862  		if y == nil {
   863  			q.first = nil
   864  			q.last = nil
   865  		} else {
   866  			y.prev = nil
   867  			q.first = y
   868  			sgp.next = nil // mark as removed (see dequeueSudoG)
   869  		}
   870  
   871  		// if a goroutine was put on this queue because of a
   872  		// select, there is a small window between the goroutine
   873  		// being woken up by a different case and it grabbing the
   874  		// channel locks. Once it has the lock
   875  		// it removes itself from the queue, so we won't see it after that.
   876  		// We use a flag in the G struct to tell us when someone
   877  		// else has won the race to signal this goroutine but the goroutine
   878  		// hasn't removed itself from the queue yet.
   879  		if sgp.isSelect && !sgp.g.selectDone.CompareAndSwap(0, 1) {
   880  			continue
   881  		}
   882  
   883  		return sgp
   884  	}
   885  }
   886  
   887  func (c *hchan) raceaddr() unsafe.Pointer {
   888  	// Treat read-like and write-like operations on the channel to
   889  	// happen at this address. Avoid using the address of qcount
   890  	// or dataqsiz, because the len() and cap() builtins read
   891  	// those addresses, and we don't want them racing with
   892  	// operations like close().
   893  	return unsafe.Pointer(&c.buf)
   894  }
   895  
   896  func racesync(c *hchan, sg *sudog) {
   897  	racerelease(chanbuf(c, 0))
   898  	raceacquireg(sg.g, chanbuf(c, 0))
   899  	racereleaseg(sg.g, chanbuf(c, 0))
   900  	raceacquire(chanbuf(c, 0))
   901  }
   902  
   903  // Notify the race detector of a send or receive involving buffer entry idx
   904  // and a channel c or its communicating partner sg.
   905  // This function handles the special case of c.elemsize==0.
   906  func racenotify(c *hchan, idx uint, sg *sudog) {
   907  	// We could have passed the unsafe.Pointer corresponding to entry idx
   908  	// instead of idx itself.  However, in a future version of this function,
   909  	// we can use idx to better handle the case of elemsize==0.
   910  	// A future improvement to the detector is to call TSan with c and idx:
   911  	// this way, Go will continue to not allocating buffer entries for channels
   912  	// of elemsize==0, yet the race detector can be made to handle multiple
   913  	// sync objects underneath the hood (one sync object per idx)
   914  	qp := chanbuf(c, idx)
   915  	// When elemsize==0, we don't allocate a full buffer for the channel.
   916  	// Instead of individual buffer entries, the race detector uses the
   917  	// c.buf as the only buffer entry.  This simplification prevents us from
   918  	// following the memory model's happens-before rules (rules that are
   919  	// implemented in racereleaseacquire).  Instead, we accumulate happens-before
   920  	// information in the synchronization object associated with c.buf.
   921  	if c.elemsize == 0 {
   922  		if sg == nil {
   923  			raceacquire(qp)
   924  			racerelease(qp)
   925  		} else {
   926  			raceacquireg(sg.g, qp)
   927  			racereleaseg(sg.g, qp)
   928  		}
   929  	} else {
   930  		if sg == nil {
   931  			racereleaseacquire(qp)
   932  		} else {
   933  			racereleaseacquireg(sg.g, qp)
   934  		}
   935  	}
   936  }
   937  

View as plain text