Source file src/runtime/chan.go

     1  // Copyright 2014 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package runtime
     6  
     7  // This file contains the implementation of Go channels.
     8  
     9  // Invariants:
    10  //  At least one of c.sendq and c.recvq is empty,
    11  //  except for the case of an unbuffered channel with a single goroutine
    12  //  blocked on it for both sending and receiving using a select statement,
    13  //  in which case the length of c.sendq and c.recvq is limited only by the
    14  //  size of the select statement.
    15  //
    16  // For buffered channels, also:
    17  //  c.qcount > 0 implies that c.recvq is empty.
    18  //  c.qcount < c.dataqsiz implies that c.sendq is empty.
    19  
    20  import (
    21  	"internal/abi"
    22  	"internal/runtime/atomic"
    23  	"runtime/internal/math"
    24  	"unsafe"
    25  )
    26  
    27  const (
    28  	maxAlign  = 8
    29  	hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
    30  	debugChan = false
    31  )
    32  
    33  type hchan struct {
    34  	qcount   uint           // total data in the queue
    35  	dataqsiz uint           // size of the circular queue
    36  	buf      unsafe.Pointer // points to an array of dataqsiz elements
    37  	elemsize uint16
    38  	closed   uint32
    39  	timer    *timer // timer feeding this chan
    40  	elemtype *_type // element type
    41  	sendx    uint   // send index
    42  	recvx    uint   // receive index
    43  	recvq    waitq  // list of recv waiters
    44  	sendq    waitq  // list of send waiters
    45  
    46  	// lock protects all fields in hchan, as well as several
    47  	// fields in sudogs blocked on this channel.
    48  	//
    49  	// Do not change another G's status while holding this lock
    50  	// (in particular, do not ready a G), as this can deadlock
    51  	// with stack shrinking.
    52  	lock mutex
    53  }
    54  
    55  type waitq struct {
    56  	first *sudog
    57  	last  *sudog
    58  }
    59  
    60  //go:linkname reflect_makechan reflect.makechan
    61  func reflect_makechan(t *chantype, size int) *hchan {
    62  	return makechan(t, size)
    63  }
    64  
    65  func makechan64(t *chantype, size int64) *hchan {
    66  	if int64(int(size)) != size {
    67  		panic(plainError("makechan: size out of range"))
    68  	}
    69  
    70  	return makechan(t, int(size))
    71  }
    72  
    73  func makechan(t *chantype, size int) *hchan {
    74  	elem := t.Elem
    75  
    76  	// compiler checks this but be safe.
    77  	if elem.Size_ >= 1<<16 {
    78  		throw("makechan: invalid channel element type")
    79  	}
    80  	if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
    81  		throw("makechan: bad alignment")
    82  	}
    83  
    84  	mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
    85  	if overflow || mem > maxAlloc-hchanSize || size < 0 {
    86  		panic(plainError("makechan: size out of range"))
    87  	}
    88  
    89  	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    90  	// buf points into the same allocation, elemtype is persistent.
    91  	// SudoG's are referenced from their owning thread so they can't be collected.
    92  	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    93  	var c *hchan
    94  	switch {
    95  	case mem == 0:
    96  		// Queue or element size is zero.
    97  		c = (*hchan)(mallocgc(hchanSize, nil, true))
    98  		// Race detector uses this location for synchronization.
    99  		c.buf = c.raceaddr()
   100  	case !elem.Pointers():
   101  		// Elements do not contain pointers.
   102  		// Allocate hchan and buf in one call.
   103  		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
   104  		c.buf = add(unsafe.Pointer(c), hchanSize)
   105  	default:
   106  		// Elements contain pointers.
   107  		c = new(hchan)
   108  		c.buf = mallocgc(mem, elem, true)
   109  	}
   110  
   111  	c.elemsize = uint16(elem.Size_)
   112  	c.elemtype = elem
   113  	c.dataqsiz = uint(size)
   114  	lockInit(&c.lock, lockRankHchan)
   115  
   116  	if debugChan {
   117  		print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
   118  	}
   119  	return c
   120  }
   121  
   122  // chanbuf(c, i) is pointer to the i'th slot in the buffer.
   123  //
   124  // chanbuf should be an internal detail,
   125  // but widely used packages access it using linkname.
   126  // Notable members of the hall of shame include:
   127  //   - github.com/fjl/memsize
   128  //
   129  // Do not remove or change the type signature.
   130  // See go.dev/issue/67401.
   131  //
   132  //go:linkname chanbuf
   133  func chanbuf(c *hchan, i uint) unsafe.Pointer {
   134  	return add(c.buf, uintptr(i)*uintptr(c.elemsize))
   135  }
   136  
   137  // full reports whether a send on c would block (that is, the channel is full).
   138  // It uses a single word-sized read of mutable state, so although
   139  // the answer is instantaneously true, the correct answer may have changed
   140  // by the time the calling function receives the return value.
   141  func full(c *hchan) bool {
   142  	// c.dataqsiz is immutable (never written after the channel is created)
   143  	// so it is safe to read at any time during channel operation.
   144  	if c.dataqsiz == 0 {
   145  		// Assumes that a pointer read is relaxed-atomic.
   146  		return c.recvq.first == nil
   147  	}
   148  	// Assumes that a uint read is relaxed-atomic.
   149  	return c.qcount == c.dataqsiz
   150  }
   151  
   152  // entry point for c <- x from compiled code.
   153  //
   154  //go:nosplit
   155  func chansend1(c *hchan, elem unsafe.Pointer) {
   156  	chansend(c, elem, true, getcallerpc())
   157  }
   158  
   159  /*
   160   * generic single channel send/recv
   161   * If block is not nil,
   162   * then the protocol will not
   163   * sleep but return if it could
   164   * not complete.
   165   *
   166   * sleep can wake up with g.param == nil
   167   * when a channel involved in the sleep has
   168   * been closed.  it is easiest to loop and re-run
   169   * the operation; we'll see that it's now closed.
   170   */
   171  func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   172  	if c == nil {
   173  		if !block {
   174  			return false
   175  		}
   176  		gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
   177  		throw("unreachable")
   178  	}
   179  
   180  	if debugChan {
   181  		print("chansend: chan=", c, "\n")
   182  	}
   183  
   184  	if raceenabled {
   185  		racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
   186  	}
   187  
   188  	// Fast path: check for failed non-blocking operation without acquiring the lock.
   189  	//
   190  	// After observing that the channel is not closed, we observe that the channel is
   191  	// not ready for sending. Each of these observations is a single word-sized read
   192  	// (first c.closed and second full()).
   193  	// Because a closed channel cannot transition from 'ready for sending' to
   194  	// 'not ready for sending', even if the channel is closed between the two observations,
   195  	// they imply a moment between the two when the channel was both not yet closed
   196  	// and not ready for sending. We behave as if we observed the channel at that moment,
   197  	// and report that the send cannot proceed.
   198  	//
   199  	// It is okay if the reads are reordered here: if we observe that the channel is not
   200  	// ready for sending and then observe that it is not closed, that implies that the
   201  	// channel wasn't closed during the first observation. However, nothing here
   202  	// guarantees forward progress. We rely on the side effects of lock release in
   203  	// chanrecv() and closechan() to update this thread's view of c.closed and full().
   204  	if !block && c.closed == 0 && full(c) {
   205  		return false
   206  	}
   207  
   208  	var t0 int64
   209  	if blockprofilerate > 0 {
   210  		t0 = cputicks()
   211  	}
   212  
   213  	lock(&c.lock)
   214  
   215  	if c.closed != 0 {
   216  		unlock(&c.lock)
   217  		panic(plainError("send on closed channel"))
   218  	}
   219  
   220  	if sg := c.recvq.dequeue(); sg != nil {
   221  		// Found a waiting receiver. We pass the value we want to send
   222  		// directly to the receiver, bypassing the channel buffer (if any).
   223  		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
   224  		return true
   225  	}
   226  
   227  	if c.qcount < c.dataqsiz {
   228  		// Space is available in the channel buffer. Enqueue the element to send.
   229  		qp := chanbuf(c, c.sendx)
   230  		if raceenabled {
   231  			racenotify(c, c.sendx, nil)
   232  		}
   233  		typedmemmove(c.elemtype, qp, ep)
   234  		c.sendx++
   235  		if c.sendx == c.dataqsiz {
   236  			c.sendx = 0
   237  		}
   238  		c.qcount++
   239  		unlock(&c.lock)
   240  		return true
   241  	}
   242  
   243  	if !block {
   244  		unlock(&c.lock)
   245  		return false
   246  	}
   247  
   248  	// Block on the channel. Some receiver will complete our operation for us.
   249  	gp := getg()
   250  	mysg := acquireSudog()
   251  	mysg.releasetime = 0
   252  	if t0 != 0 {
   253  		mysg.releasetime = -1
   254  	}
   255  	// No stack splits between assigning elem and enqueuing mysg
   256  	// on gp.waiting where copystack can find it.
   257  	mysg.elem = ep
   258  	mysg.waitlink = nil
   259  	mysg.g = gp
   260  	mysg.isSelect = false
   261  	mysg.c = c
   262  	gp.waiting = mysg
   263  	gp.param = nil
   264  	c.sendq.enqueue(mysg)
   265  	// Signal to anyone trying to shrink our stack that we're about
   266  	// to park on a channel. The window between when this G's status
   267  	// changes and when we set gp.activeStackChans is not safe for
   268  	// stack shrinking.
   269  	gp.parkingOnChan.Store(true)
   270  	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
   271  	// Ensure the value being sent is kept alive until the
   272  	// receiver copies it out. The sudog has a pointer to the
   273  	// stack object, but sudogs aren't considered as roots of the
   274  	// stack tracer.
   275  	KeepAlive(ep)
   276  
   277  	// someone woke us up.
   278  	if mysg != gp.waiting {
   279  		throw("G waiting list is corrupted")
   280  	}
   281  	gp.waiting = nil
   282  	gp.activeStackChans = false
   283  	closed := !mysg.success
   284  	gp.param = nil
   285  	if mysg.releasetime > 0 {
   286  		blockevent(mysg.releasetime-t0, 2)
   287  	}
   288  	mysg.c = nil
   289  	releaseSudog(mysg)
   290  	if closed {
   291  		if c.closed == 0 {
   292  			throw("chansend: spurious wakeup")
   293  		}
   294  		panic(plainError("send on closed channel"))
   295  	}
   296  	return true
   297  }
   298  
   299  // send processes a send operation on an empty channel c.
   300  // The value ep sent by the sender is copied to the receiver sg.
   301  // The receiver is then woken up to go on its merry way.
   302  // Channel c must be empty and locked.  send unlocks c with unlockf.
   303  // sg must already be dequeued from c.
   304  // ep must be non-nil and point to the heap or the caller's stack.
   305  func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   306  	if raceenabled {
   307  		if c.dataqsiz == 0 {
   308  			racesync(c, sg)
   309  		} else {
   310  			// Pretend we go through the buffer, even though
   311  			// we copy directly. Note that we need to increment
   312  			// the head/tail locations only when raceenabled.
   313  			racenotify(c, c.recvx, nil)
   314  			racenotify(c, c.recvx, sg)
   315  			c.recvx++
   316  			if c.recvx == c.dataqsiz {
   317  				c.recvx = 0
   318  			}
   319  			c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
   320  		}
   321  	}
   322  	if sg.elem != nil {
   323  		sendDirect(c.elemtype, sg, ep)
   324  		sg.elem = nil
   325  	}
   326  	gp := sg.g
   327  	unlockf()
   328  	gp.param = unsafe.Pointer(sg)
   329  	sg.success = true
   330  	if sg.releasetime != 0 {
   331  		sg.releasetime = cputicks()
   332  	}
   333  	goready(gp, skip+1)
   334  }
   335  
   336  // timerchandrain removes all elements in channel c's buffer.
   337  // It reports whether any elements were removed.
   338  // Because it is only intended for timers, it does not
   339  // handle waiting senders at all (all timer channels
   340  // use non-blocking sends to fill the buffer).
   341  func timerchandrain(c *hchan) bool {
   342  	// Note: Cannot use empty(c) because we are called
   343  	// while holding c.timer.sendLock, and empty(c) will
   344  	// call c.timer.maybeRunChan, which will deadlock.
   345  	// We are emptying the channel, so we only care about
   346  	// the count, not about potentially filling it up.
   347  	if atomic.Loaduint(&c.qcount) == 0 {
   348  		return false
   349  	}
   350  	lock(&c.lock)
   351  	any := false
   352  	for c.qcount > 0 {
   353  		any = true
   354  		typedmemclr(c.elemtype, chanbuf(c, c.recvx))
   355  		c.recvx++
   356  		if c.recvx == c.dataqsiz {
   357  			c.recvx = 0
   358  		}
   359  		c.qcount--
   360  	}
   361  	unlock(&c.lock)
   362  	return any
   363  }
   364  
   365  // Sends and receives on unbuffered or empty-buffered channels are the
   366  // only operations where one running goroutine writes to the stack of
   367  // another running goroutine. The GC assumes that stack writes only
   368  // happen when the goroutine is running and are only done by that
   369  // goroutine. Using a write barrier is sufficient to make up for
   370  // violating that assumption, but the write barrier has to work.
   371  // typedmemmove will call bulkBarrierPreWrite, but the target bytes
   372  // are not in the heap, so that will not help. We arrange to call
   373  // memmove and typeBitsBulkBarrier instead.
   374  
   375  func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
   376  	// src is on our stack, dst is a slot on another stack.
   377  
   378  	// Once we read sg.elem out of sg, it will no longer
   379  	// be updated if the destination's stack gets copied (shrunk).
   380  	// So make sure that no preemption points can happen between read & use.
   381  	dst := sg.elem
   382  	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
   383  	// No need for cgo write barrier checks because dst is always
   384  	// Go memory.
   385  	memmove(dst, src, t.Size_)
   386  }
   387  
   388  func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
   389  	// dst is on our stack or the heap, src is on another stack.
   390  	// The channel is locked, so src will not move during this
   391  	// operation.
   392  	src := sg.elem
   393  	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
   394  	memmove(dst, src, t.Size_)
   395  }
   396  
   397  func closechan(c *hchan) {
   398  	if c == nil {
   399  		panic(plainError("close of nil channel"))
   400  	}
   401  
   402  	lock(&c.lock)
   403  	if c.closed != 0 {
   404  		unlock(&c.lock)
   405  		panic(plainError("close of closed channel"))
   406  	}
   407  
   408  	if raceenabled {
   409  		callerpc := getcallerpc()
   410  		racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
   411  		racerelease(c.raceaddr())
   412  	}
   413  
   414  	c.closed = 1
   415  
   416  	var glist gList
   417  
   418  	// release all readers
   419  	for {
   420  		sg := c.recvq.dequeue()
   421  		if sg == nil {
   422  			break
   423  		}
   424  		if sg.elem != nil {
   425  			typedmemclr(c.elemtype, sg.elem)
   426  			sg.elem = nil
   427  		}
   428  		if sg.releasetime != 0 {
   429  			sg.releasetime = cputicks()
   430  		}
   431  		gp := sg.g
   432  		gp.param = unsafe.Pointer(sg)
   433  		sg.success = false
   434  		if raceenabled {
   435  			raceacquireg(gp, c.raceaddr())
   436  		}
   437  		glist.push(gp)
   438  	}
   439  
   440  	// release all writers (they will panic)
   441  	for {
   442  		sg := c.sendq.dequeue()
   443  		if sg == nil {
   444  			break
   445  		}
   446  		sg.elem = nil
   447  		if sg.releasetime != 0 {
   448  			sg.releasetime = cputicks()
   449  		}
   450  		gp := sg.g
   451  		gp.param = unsafe.Pointer(sg)
   452  		sg.success = false
   453  		if raceenabled {
   454  			raceacquireg(gp, c.raceaddr())
   455  		}
   456  		glist.push(gp)
   457  	}
   458  	unlock(&c.lock)
   459  
   460  	// Ready all Gs now that we've dropped the channel lock.
   461  	for !glist.empty() {
   462  		gp := glist.pop()
   463  		gp.schedlink = 0
   464  		goready(gp, 3)
   465  	}
   466  }
   467  
   468  // empty reports whether a read from c would block (that is, the channel is
   469  // empty).  It is atomically correct and sequentially consistent at the moment
   470  // it returns, but since the channel is unlocked, the channel may become
   471  // non-empty immediately afterward.
   472  func empty(c *hchan) bool {
   473  	// c.dataqsiz is immutable.
   474  	if c.dataqsiz == 0 {
   475  		return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
   476  	}
   477  	// c.timer is also immutable (it is set after make(chan) but before any channel operations).
   478  	// All timer channels have dataqsiz > 0.
   479  	if c.timer != nil {
   480  		c.timer.maybeRunChan()
   481  	}
   482  	return atomic.Loaduint(&c.qcount) == 0
   483  }
   484  
   485  // entry points for <- c from compiled code.
   486  //
   487  //go:nosplit
   488  func chanrecv1(c *hchan, elem unsafe.Pointer) {
   489  	chanrecv(c, elem, true)
   490  }
   491  
   492  //go:nosplit
   493  func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
   494  	_, received = chanrecv(c, elem, true)
   495  	return
   496  }
   497  
   498  // chanrecv receives on channel c and writes the received data to ep.
   499  // ep may be nil, in which case received data is ignored.
   500  // If block == false and no elements are available, returns (false, false).
   501  // Otherwise, if c is closed, zeros *ep and returns (true, false).
   502  // Otherwise, fills in *ep with an element and returns (true, true).
   503  // A non-nil ep must point to the heap or the caller's stack.
   504  func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   505  	// raceenabled: don't need to check ep, as it is always on the stack
   506  	// or is new memory allocated by reflect.
   507  
   508  	if debugChan {
   509  		print("chanrecv: chan=", c, "\n")
   510  	}
   511  
   512  	if c == nil {
   513  		if !block {
   514  			return
   515  		}
   516  		gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
   517  		throw("unreachable")
   518  	}
   519  
   520  	if c.timer != nil {
   521  		c.timer.maybeRunChan()
   522  	}
   523  
   524  	// Fast path: check for failed non-blocking operation without acquiring the lock.
   525  	if !block && empty(c) {
   526  		// After observing that the channel is not ready for receiving, we observe whether the
   527  		// channel is closed.
   528  		//
   529  		// Reordering of these checks could lead to incorrect behavior when racing with a close.
   530  		// For example, if the channel was open and not empty, was closed, and then drained,
   531  		// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
   532  		// we use atomic loads for both checks, and rely on emptying and closing to happen in
   533  		// separate critical sections under the same lock.  This assumption fails when closing
   534  		// an unbuffered channel with a blocked send, but that is an error condition anyway.
   535  		if atomic.Load(&c.closed) == 0 {
   536  			// Because a channel cannot be reopened, the later observation of the channel
   537  			// being not closed implies that it was also not closed at the moment of the
   538  			// first observation. We behave as if we observed the channel at that moment
   539  			// and report that the receive cannot proceed.
   540  			return
   541  		}
   542  		// The channel is irreversibly closed. Re-check whether the channel has any pending data
   543  		// to receive, which could have arrived between the empty and closed checks above.
   544  		// Sequential consistency is also required here, when racing with such a send.
   545  		if empty(c) {
   546  			// The channel is irreversibly closed and empty.
   547  			if raceenabled {
   548  				raceacquire(c.raceaddr())
   549  			}
   550  			if ep != nil {
   551  				typedmemclr(c.elemtype, ep)
   552  			}
   553  			return true, false
   554  		}
   555  	}
   556  
   557  	var t0 int64
   558  	if blockprofilerate > 0 {
   559  		t0 = cputicks()
   560  	}
   561  
   562  	lock(&c.lock)
   563  
   564  	if c.closed != 0 {
   565  		if c.qcount == 0 {
   566  			if raceenabled {
   567  				raceacquire(c.raceaddr())
   568  			}
   569  			unlock(&c.lock)
   570  			if ep != nil {
   571  				typedmemclr(c.elemtype, ep)
   572  			}
   573  			return true, false
   574  		}
   575  		// The channel has been closed, but the channel's buffer have data.
   576  	} else {
   577  		// Just found waiting sender with not closed.
   578  		if sg := c.sendq.dequeue(); sg != nil {
   579  			// Found a waiting sender. If buffer is size 0, receive value
   580  			// directly from sender. Otherwise, receive from head of queue
   581  			// and add sender's value to the tail of the queue (both map to
   582  			// the same buffer slot because the queue is full).
   583  			recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
   584  			return true, true
   585  		}
   586  	}
   587  
   588  	if c.qcount > 0 {
   589  		// Receive directly from queue
   590  		qp := chanbuf(c, c.recvx)
   591  		if raceenabled {
   592  			racenotify(c, c.recvx, nil)
   593  		}
   594  		if ep != nil {
   595  			typedmemmove(c.elemtype, ep, qp)
   596  		}
   597  		typedmemclr(c.elemtype, qp)
   598  		c.recvx++
   599  		if c.recvx == c.dataqsiz {
   600  			c.recvx = 0
   601  		}
   602  		c.qcount--
   603  		unlock(&c.lock)
   604  		return true, true
   605  	}
   606  
   607  	if !block {
   608  		unlock(&c.lock)
   609  		return false, false
   610  	}
   611  
   612  	// no sender available: block on this channel.
   613  	gp := getg()
   614  	mysg := acquireSudog()
   615  	mysg.releasetime = 0
   616  	if t0 != 0 {
   617  		mysg.releasetime = -1
   618  	}
   619  	// No stack splits between assigning elem and enqueuing mysg
   620  	// on gp.waiting where copystack can find it.
   621  	mysg.elem = ep
   622  	mysg.waitlink = nil
   623  	gp.waiting = mysg
   624  
   625  	mysg.g = gp
   626  	mysg.isSelect = false
   627  	mysg.c = c
   628  	gp.param = nil
   629  	c.recvq.enqueue(mysg)
   630  	if c.timer != nil {
   631  		blockTimerChan(c)
   632  	}
   633  
   634  	// Signal to anyone trying to shrink our stack that we're about
   635  	// to park on a channel. The window between when this G's status
   636  	// changes and when we set gp.activeStackChans is not safe for
   637  	// stack shrinking.
   638  	gp.parkingOnChan.Store(true)
   639  	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
   640  
   641  	// someone woke us up
   642  	if mysg != gp.waiting {
   643  		throw("G waiting list is corrupted")
   644  	}
   645  	if c.timer != nil {
   646  		unblockTimerChan(c)
   647  	}
   648  	gp.waiting = nil
   649  	gp.activeStackChans = false
   650  	if mysg.releasetime > 0 {
   651  		blockevent(mysg.releasetime-t0, 2)
   652  	}
   653  	success := mysg.success
   654  	gp.param = nil
   655  	mysg.c = nil
   656  	releaseSudog(mysg)
   657  	return true, success
   658  }
   659  
   660  // recv processes a receive operation on a full channel c.
   661  // There are 2 parts:
   662  //  1. The value sent by the sender sg is put into the channel
   663  //     and the sender is woken up to go on its merry way.
   664  //  2. The value received by the receiver (the current G) is
   665  //     written to ep.
   666  //
   667  // For synchronous channels, both values are the same.
   668  // For asynchronous channels, the receiver gets its data from
   669  // the channel buffer and the sender's data is put in the
   670  // channel buffer.
   671  // Channel c must be full and locked. recv unlocks c with unlockf.
   672  // sg must already be dequeued from c.
   673  // A non-nil ep must point to the heap or the caller's stack.
   674  func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   675  	if c.dataqsiz == 0 {
   676  		if raceenabled {
   677  			racesync(c, sg)
   678  		}
   679  		if ep != nil {
   680  			// copy data from sender
   681  			recvDirect(c.elemtype, sg, ep)
   682  		}
   683  	} else {
   684  		// Queue is full. Take the item at the
   685  		// head of the queue. Make the sender enqueue
   686  		// its item at the tail of the queue. Since the
   687  		// queue is full, those are both the same slot.
   688  		qp := chanbuf(c, c.recvx)
   689  		if raceenabled {
   690  			racenotify(c, c.recvx, nil)
   691  			racenotify(c, c.recvx, sg)
   692  		}
   693  		// copy data from queue to receiver
   694  		if ep != nil {
   695  			typedmemmove(c.elemtype, ep, qp)
   696  		}
   697  		// copy data from sender to queue
   698  		typedmemmove(c.elemtype, qp, sg.elem)
   699  		c.recvx++
   700  		if c.recvx == c.dataqsiz {
   701  			c.recvx = 0
   702  		}
   703  		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
   704  	}
   705  	sg.elem = nil
   706  	gp := sg.g
   707  	unlockf()
   708  	gp.param = unsafe.Pointer(sg)
   709  	sg.success = true
   710  	if sg.releasetime != 0 {
   711  		sg.releasetime = cputicks()
   712  	}
   713  	goready(gp, skip+1)
   714  }
   715  
   716  func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool {
   717  	// There are unlocked sudogs that point into gp's stack. Stack
   718  	// copying must lock the channels of those sudogs.
   719  	// Set activeStackChans here instead of before we try parking
   720  	// because we could self-deadlock in stack growth on the
   721  	// channel lock.
   722  	gp.activeStackChans = true
   723  	// Mark that it's safe for stack shrinking to occur now,
   724  	// because any thread acquiring this G's stack for shrinking
   725  	// is guaranteed to observe activeStackChans after this store.
   726  	gp.parkingOnChan.Store(false)
   727  	// Make sure we unlock after setting activeStackChans and
   728  	// unsetting parkingOnChan. The moment we unlock chanLock
   729  	// we risk gp getting readied by a channel operation and
   730  	// so gp could continue running before everything before
   731  	// the unlock is visible (even to gp itself).
   732  	unlock((*mutex)(chanLock))
   733  	return true
   734  }
   735  
   736  // compiler implements
   737  //
   738  //	select {
   739  //	case c <- v:
   740  //		... foo
   741  //	default:
   742  //		... bar
   743  //	}
   744  //
   745  // as
   746  //
   747  //	if selectnbsend(c, v) {
   748  //		... foo
   749  //	} else {
   750  //		... bar
   751  //	}
   752  func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
   753  	return chansend(c, elem, false, getcallerpc())
   754  }
   755  
   756  // compiler implements
   757  //
   758  //	select {
   759  //	case v, ok = <-c:
   760  //		... foo
   761  //	default:
   762  //		... bar
   763  //	}
   764  //
   765  // as
   766  //
   767  //	if selected, ok = selectnbrecv(&v, c); selected {
   768  //		... foo
   769  //	} else {
   770  //		... bar
   771  //	}
   772  func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
   773  	return chanrecv(c, elem, false)
   774  }
   775  
   776  //go:linkname reflect_chansend reflect.chansend0
   777  func reflect_chansend(c *hchan, elem unsafe.Pointer, nb bool) (selected bool) {
   778  	return chansend(c, elem, !nb, getcallerpc())
   779  }
   780  
   781  //go:linkname reflect_chanrecv reflect.chanrecv
   782  func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {
   783  	return chanrecv(c, elem, !nb)
   784  }
   785  
   786  func chanlen(c *hchan) int {
   787  	if c == nil {
   788  		return 0
   789  	}
   790  	async := debug.asynctimerchan.Load() != 0
   791  	if c.timer != nil && async {
   792  		c.timer.maybeRunChan()
   793  	}
   794  	if c.timer != nil && !async {
   795  		// timer channels have a buffered implementation
   796  		// but present to users as unbuffered, so that we can
   797  		// undo sends without users noticing.
   798  		return 0
   799  	}
   800  	return int(c.qcount)
   801  }
   802  
   803  func chancap(c *hchan) int {
   804  	if c == nil {
   805  		return 0
   806  	}
   807  	if c.timer != nil {
   808  		async := debug.asynctimerchan.Load() != 0
   809  		if async {
   810  			return int(c.dataqsiz)
   811  		}
   812  		// timer channels have a buffered implementation
   813  		// but present to users as unbuffered, so that we can
   814  		// undo sends without users noticing.
   815  		return 0
   816  	}
   817  	return int(c.dataqsiz)
   818  }
   819  
   820  //go:linkname reflect_chanlen reflect.chanlen
   821  func reflect_chanlen(c *hchan) int {
   822  	return chanlen(c)
   823  }
   824  
   825  //go:linkname reflectlite_chanlen internal/reflectlite.chanlen
   826  func reflectlite_chanlen(c *hchan) int {
   827  	return chanlen(c)
   828  }
   829  
   830  //go:linkname reflect_chancap reflect.chancap
   831  func reflect_chancap(c *hchan) int {
   832  	return chancap(c)
   833  }
   834  
   835  //go:linkname reflect_chanclose reflect.chanclose
   836  func reflect_chanclose(c *hchan) {
   837  	closechan(c)
   838  }
   839  
   840  func (q *waitq) enqueue(sgp *sudog) {
   841  	sgp.next = nil
   842  	x := q.last
   843  	if x == nil {
   844  		sgp.prev = nil
   845  		q.first = sgp
   846  		q.last = sgp
   847  		return
   848  	}
   849  	sgp.prev = x
   850  	x.next = sgp
   851  	q.last = sgp
   852  }
   853  
   854  func (q *waitq) dequeue() *sudog {
   855  	for {
   856  		sgp := q.first
   857  		if sgp == nil {
   858  			return nil
   859  		}
   860  		y := sgp.next
   861  		if y == nil {
   862  			q.first = nil
   863  			q.last = nil
   864  		} else {
   865  			y.prev = nil
   866  			q.first = y
   867  			sgp.next = nil // mark as removed (see dequeueSudoG)
   868  		}
   869  
   870  		// if a goroutine was put on this queue because of a
   871  		// select, there is a small window between the goroutine
   872  		// being woken up by a different case and it grabbing the
   873  		// channel locks. Once it has the lock
   874  		// it removes itself from the queue, so we won't see it after that.
   875  		// We use a flag in the G struct to tell us when someone
   876  		// else has won the race to signal this goroutine but the goroutine
   877  		// hasn't removed itself from the queue yet.
   878  		if sgp.isSelect && !sgp.g.selectDone.CompareAndSwap(0, 1) {
   879  			continue
   880  		}
   881  
   882  		return sgp
   883  	}
   884  }
   885  
   886  func (c *hchan) raceaddr() unsafe.Pointer {
   887  	// Treat read-like and write-like operations on the channel to
   888  	// happen at this address. Avoid using the address of qcount
   889  	// or dataqsiz, because the len() and cap() builtins read
   890  	// those addresses, and we don't want them racing with
   891  	// operations like close().
   892  	return unsafe.Pointer(&c.buf)
   893  }
   894  
   895  func racesync(c *hchan, sg *sudog) {
   896  	racerelease(chanbuf(c, 0))
   897  	raceacquireg(sg.g, chanbuf(c, 0))
   898  	racereleaseg(sg.g, chanbuf(c, 0))
   899  	raceacquire(chanbuf(c, 0))
   900  }
   901  
   902  // Notify the race detector of a send or receive involving buffer entry idx
   903  // and a channel c or its communicating partner sg.
   904  // This function handles the special case of c.elemsize==0.
   905  func racenotify(c *hchan, idx uint, sg *sudog) {
   906  	// We could have passed the unsafe.Pointer corresponding to entry idx
   907  	// instead of idx itself.  However, in a future version of this function,
   908  	// we can use idx to better handle the case of elemsize==0.
   909  	// A future improvement to the detector is to call TSan with c and idx:
   910  	// this way, Go will continue to not allocating buffer entries for channels
   911  	// of elemsize==0, yet the race detector can be made to handle multiple
   912  	// sync objects underneath the hood (one sync object per idx)
   913  	qp := chanbuf(c, idx)
   914  	// When elemsize==0, we don't allocate a full buffer for the channel.
   915  	// Instead of individual buffer entries, the race detector uses the
   916  	// c.buf as the only buffer entry.  This simplification prevents us from
   917  	// following the memory model's happens-before rules (rules that are
   918  	// implemented in racereleaseacquire).  Instead, we accumulate happens-before
   919  	// information in the synchronization object associated with c.buf.
   920  	if c.elemsize == 0 {
   921  		if sg == nil {
   922  			raceacquire(qp)
   923  			racerelease(qp)
   924  		} else {
   925  			raceacquireg(sg.g, qp)
   926  			racereleaseg(sg.g, qp)
   927  		}
   928  	} else {
   929  		if sg == nil {
   930  			racereleaseacquire(qp)
   931  		} else {
   932  			racereleaseacquireg(sg.g, qp)
   933  		}
   934  	}
   935  }
   936  

View as plain text