Source file src/runtime/coro.go

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package runtime
     6  
     7  import (
     8  	"internal/runtime/sys"
     9  	"unsafe"
    10  )
    11  
    12  // A coro represents extra concurrency without extra parallelism,
    13  // as would be needed for a coroutine implementation.
    14  // The coro does not represent a specific coroutine, only the ability
    15  // to do coroutine-style control transfers.
    16  // It can be thought of as like a special channel that always has
    17  // a goroutine blocked on it. If another goroutine calls coroswitch(c),
    18  // the caller becomes the goroutine blocked in c, and the goroutine
    19  // formerly blocked in c starts running.
    20  // These switches continue until a call to coroexit(c),
    21  // which ends the use of the coro by releasing the blocked
    22  // goroutine in c and exiting the current goroutine.
    23  //
    24  // Coros are heap allocated and garbage collected, so that user code
    25  // can hold a pointer to a coro without causing potential dangling
    26  // pointer errors.
    27  type coro struct {
    28  	gp guintptr
    29  	f  func(*coro)
    30  
    31  	// State for validating thread-lock interactions.
    32  	mp        *m
    33  	lockedExt uint32 // mp's external LockOSThread counter at coro creation time.
    34  	lockedInt uint32 // mp's internal lockOSThread counter at coro creation time.
    35  }
    36  
    37  //go:linkname newcoro
    38  
    39  // newcoro creates a new coro containing a
    40  // goroutine blocked waiting to run f
    41  // and returns that coro.
    42  func newcoro(f func(*coro)) *coro {
    43  	c := new(coro)
    44  	c.f = f
    45  	pc := sys.GetCallerPC()
    46  	gp := getg()
    47  	systemstack(func() {
    48  		mp := gp.m
    49  		start := corostart
    50  		startfv := *(**funcval)(unsafe.Pointer(&start))
    51  		gp = newproc1(startfv, gp, pc, true, waitReasonCoroutine)
    52  
    53  		// Scribble down locked thread state if needed and/or donate
    54  		// thread-lock state to the new goroutine.
    55  		if mp.lockedExt+mp.lockedInt != 0 {
    56  			c.mp = mp
    57  			c.lockedExt = mp.lockedExt
    58  			c.lockedInt = mp.lockedInt
    59  		}
    60  	})
    61  	gp.coroarg = c
    62  	c.gp.set(gp)
    63  	return c
    64  }
    65  
    66  // corostart is the entry func for a new coroutine.
    67  // It runs the coroutine user function f passed to corostart
    68  // and then calls coroexit to remove the extra concurrency.
    69  func corostart() {
    70  	gp := getg()
    71  	c := gp.coroarg
    72  	gp.coroarg = nil
    73  
    74  	defer coroexit(c)
    75  	c.f(c)
    76  }
    77  
    78  // coroexit is like coroswitch but closes the coro
    79  // and exits the current goroutine
    80  func coroexit(c *coro) {
    81  	gp := getg()
    82  	gp.coroarg = c
    83  	gp.coroexit = true
    84  	mcall(coroswitch_m)
    85  }
    86  
    87  //go:linkname coroswitch
    88  
    89  // coroswitch switches to the goroutine blocked on c
    90  // and then blocks the current goroutine on c.
    91  func coroswitch(c *coro) {
    92  	gp := getg()
    93  	gp.coroarg = c
    94  	mcall(coroswitch_m)
    95  }
    96  
    97  // coroswitch_m is the implementation of coroswitch
    98  // that runs on the m stack.
    99  //
   100  // Note: Coroutine switches are expected to happen at
   101  // an order of magnitude (or more) higher frequency
   102  // than regular goroutine switches, so this path is heavily
   103  // optimized to remove unnecessary work.
   104  // The fast path here is three CAS: the one at the top on gp.atomicstatus,
   105  // the one in the middle to choose the next g,
   106  // and the one at the bottom on gnext.atomicstatus.
   107  // It is important not to add more atomic operations or other
   108  // expensive operations to the fast path.
   109  func coroswitch_m(gp *g) {
   110  	c := gp.coroarg
   111  	gp.coroarg = nil
   112  	exit := gp.coroexit
   113  	gp.coroexit = false
   114  	mp := gp.m
   115  
   116  	// Track and validate thread-lock interactions.
   117  	//
   118  	// The rules with thread-lock interactions are simple. When a coro goroutine is switched to,
   119  	// the same thread must be used, and the locked state must match with the thread-lock state of
   120  	// the goroutine which called newcoro. Thread-lock state consists of the thread and the number
   121  	// of internal (cgo callback, etc.) and external (LockOSThread) thread locks.
   122  	locked := gp.lockedm != 0
   123  	if c.mp != nil || locked {
   124  		if mp != c.mp || mp.lockedInt != c.lockedInt || mp.lockedExt != c.lockedExt {
   125  			print("coro: got thread ", unsafe.Pointer(mp), ", want ", unsafe.Pointer(c.mp), "\n")
   126  			print("coro: got lock internal ", mp.lockedInt, ", want ", c.lockedInt, "\n")
   127  			print("coro: got lock external ", mp.lockedExt, ", want ", c.lockedExt, "\n")
   128  			throw("coro: OS thread locking must match locking at coroutine creation")
   129  		}
   130  	}
   131  
   132  	// Acquire tracer for writing for the duration of this call.
   133  	//
   134  	// There's a lot of state manipulation performed with shortcuts
   135  	// but we need to make sure the tracer can only observe the
   136  	// start and end states to maintain a coherent model and avoid
   137  	// emitting an event for every single transition.
   138  	trace := traceAcquire()
   139  
   140  	if locked {
   141  		// Detach the goroutine from the thread; we'll attach to the goroutine we're
   142  		// switching to before returning.
   143  		gp.lockedm.set(nil)
   144  	}
   145  
   146  	if exit {
   147  		// The M might have a non-zero OS thread lock count when we get here, gdestroy
   148  		// will avoid destroying the M if the G isn't explicitly locked to it via lockedm,
   149  		// which we cleared above. It's fine to gdestroy here also, even when locked to
   150  		// the thread, because we'll be switching back to another goroutine anyway, which
   151  		// will take back its thread-lock state before returning.
   152  		gdestroy(gp)
   153  		gp = nil
   154  	} else {
   155  		// If we can CAS ourselves directly from running to waiting, so do,
   156  		// keeping the control transfer as lightweight as possible.
   157  		gp.waitreason = waitReasonCoroutine
   158  		if !gp.atomicstatus.CompareAndSwap(_Grunning, _Gwaiting) {
   159  			// The CAS failed: use casgstatus, which will take care of
   160  			// coordinating with the garbage collector about the state change.
   161  			casgstatus(gp, _Grunning, _Gwaiting)
   162  		}
   163  
   164  		// Clear gp.m.
   165  		setMNoWB(&gp.m, nil)
   166  	}
   167  
   168  	// The goroutine stored in c is the one to run next.
   169  	// Swap it with ourselves.
   170  	var gnext *g
   171  	for {
   172  		// Note: this is a racy load, but it will eventually
   173  		// get the right value, and if it gets the wrong value,
   174  		// the c.gp.cas will fail, so no harm done other than
   175  		// a wasted loop iteration.
   176  		// The cas will also sync c.gp's
   177  		// memory enough that the next iteration of the racy load
   178  		// should see the correct value.
   179  		// We are avoiding the atomic load to keep this path
   180  		// as lightweight as absolutely possible.
   181  		// (The atomic load is free on x86 but not free elsewhere.)
   182  		next := c.gp
   183  		if next.ptr() == nil {
   184  			throw("coroswitch on exited coro")
   185  		}
   186  		var self guintptr
   187  		self.set(gp)
   188  		if c.gp.cas(next, self) {
   189  			gnext = next.ptr()
   190  			break
   191  		}
   192  	}
   193  
   194  	// Check if we're switching to ourselves. This case is able to break our
   195  	// thread-lock invariants and an unbuffered channel implementation of
   196  	// coroswitch would deadlock. It's clear that this case should just not
   197  	// work.
   198  	if gnext == gp {
   199  		throw("coroswitch of a goroutine to itself")
   200  	}
   201  
   202  	// Emit the trace event after getting gnext but before changing curg.
   203  	// GoSwitch expects that the current G is running and that we haven't
   204  	// switched yet for correct status emission.
   205  	if trace.ok() {
   206  		trace.GoSwitch(gnext, exit)
   207  	}
   208  
   209  	// Start running next, without heavy scheduling machinery.
   210  	// Set mp.curg and gnext.m and then update scheduling state
   211  	// directly if possible.
   212  	setGNoWB(&mp.curg, gnext)
   213  	setMNoWB(&gnext.m, mp)
   214  
   215  	// Synchronize with any out-standing goroutine profile. We're about to start
   216  	// executing, and an invariant of the profiler is that we tryRecordGoroutineProfile
   217  	// whenever a goroutine is about to start running.
   218  	//
   219  	// N.B. We must do this before transitioning to _Grunning but after installing gnext
   220  	// in curg, so that we have a valid curg for allocation (tryRecordGoroutineProfile
   221  	// may allocate).
   222  	if goroutineProfile.active {
   223  		tryRecordGoroutineProfile(gnext, nil, osyield)
   224  	}
   225  
   226  	if !gnext.atomicstatus.CompareAndSwap(_Gwaiting, _Grunning) {
   227  		// The CAS failed: use casgstatus, which will take care of
   228  		// coordinating with the garbage collector about the state change.
   229  		casgstatus(gnext, _Gwaiting, _Grunnable)
   230  		casgstatus(gnext, _Grunnable, _Grunning)
   231  	}
   232  
   233  	// Donate locked state.
   234  	if locked {
   235  		mp.lockedg.set(gnext)
   236  		gnext.lockedm.set(mp)
   237  	}
   238  
   239  	// Release the trace locker. We've completed all the necessary transitions..
   240  	if trace.ok() {
   241  		traceRelease(trace)
   242  	}
   243  
   244  	// Switch to gnext. Does not return.
   245  	gogo(&gnext.sched)
   246  }
   247  

View as plain text