Source file src/runtime/coro.go
1 // Copyright 2023 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 package runtime 6 7 import "unsafe" 8 9 // A coro represents extra concurrency without extra parallelism, 10 // as would be needed for a coroutine implementation. 11 // The coro does not represent a specific coroutine, only the ability 12 // to do coroutine-style control transfers. 13 // It can be thought of as like a special channel that always has 14 // a goroutine blocked on it. If another goroutine calls coroswitch(c), 15 // the caller becomes the goroutine blocked in c, and the goroutine 16 // formerly blocked in c starts running. 17 // These switches continue until a call to coroexit(c), 18 // which ends the use of the coro by releasing the blocked 19 // goroutine in c and exiting the current goroutine. 20 // 21 // Coros are heap allocated and garbage collected, so that user code 22 // can hold a pointer to a coro without causing potential dangling 23 // pointer errors. 24 type coro struct { 25 gp guintptr 26 f func(*coro) 27 28 // State for validating thread-lock interactions. 29 mp *m 30 lockedExt uint32 // mp's external LockOSThread counter at coro creation time. 31 lockedInt uint32 // mp's internal lockOSThread counter at coro creation time. 32 } 33 34 //go:linkname newcoro 35 36 // newcoro creates a new coro containing a 37 // goroutine blocked waiting to run f 38 // and returns that coro. 39 func newcoro(f func(*coro)) *coro { 40 c := new(coro) 41 c.f = f 42 pc := getcallerpc() 43 gp := getg() 44 systemstack(func() { 45 mp := gp.m 46 start := corostart 47 startfv := *(**funcval)(unsafe.Pointer(&start)) 48 gp = newproc1(startfv, gp, pc, true, waitReasonCoroutine) 49 50 // Scribble down locked thread state if needed and/or donate 51 // thread-lock state to the new goroutine. 52 if mp.lockedExt+mp.lockedInt != 0 { 53 c.mp = mp 54 c.lockedExt = mp.lockedExt 55 c.lockedInt = mp.lockedInt 56 } 57 }) 58 gp.coroarg = c 59 c.gp.set(gp) 60 return c 61 } 62 63 // corostart is the entry func for a new coroutine. 64 // It runs the coroutine user function f passed to corostart 65 // and then calls coroexit to remove the extra concurrency. 66 func corostart() { 67 gp := getg() 68 c := gp.coroarg 69 gp.coroarg = nil 70 71 defer coroexit(c) 72 c.f(c) 73 } 74 75 // coroexit is like coroswitch but closes the coro 76 // and exits the current goroutine 77 func coroexit(c *coro) { 78 gp := getg() 79 gp.coroarg = c 80 gp.coroexit = true 81 mcall(coroswitch_m) 82 } 83 84 //go:linkname coroswitch 85 86 // coroswitch switches to the goroutine blocked on c 87 // and then blocks the current goroutine on c. 88 func coroswitch(c *coro) { 89 gp := getg() 90 gp.coroarg = c 91 mcall(coroswitch_m) 92 } 93 94 // coroswitch_m is the implementation of coroswitch 95 // that runs on the m stack. 96 // 97 // Note: Coroutine switches are expected to happen at 98 // an order of magnitude (or more) higher frequency 99 // than regular goroutine switches, so this path is heavily 100 // optimized to remove unnecessary work. 101 // The fast path here is three CAS: the one at the top on gp.atomicstatus, 102 // the one in the middle to choose the next g, 103 // and the one at the bottom on gnext.atomicstatus. 104 // It is important not to add more atomic operations or other 105 // expensive operations to the fast path. 106 func coroswitch_m(gp *g) { 107 c := gp.coroarg 108 gp.coroarg = nil 109 exit := gp.coroexit 110 gp.coroexit = false 111 mp := gp.m 112 113 // Track and validate thread-lock interactions. 114 // 115 // The rules with thread-lock interactions are simple. When a coro goroutine is switched to, 116 // the same thread must be used, and the locked state must match with the thread-lock state of 117 // the goroutine which called newcoro. Thread-lock state consists of the thread and the number 118 // of internal (cgo callback, etc.) and external (LockOSThread) thread locks. 119 locked := gp.lockedm != 0 120 if c.mp != nil || locked { 121 if mp != c.mp || mp.lockedInt != c.lockedInt || mp.lockedExt != c.lockedExt { 122 print("coro: got thread ", unsafe.Pointer(mp), ", want ", unsafe.Pointer(c.mp), "\n") 123 print("coro: got lock internal ", mp.lockedInt, ", want ", c.lockedInt, "\n") 124 print("coro: got lock external ", mp.lockedExt, ", want ", c.lockedExt, "\n") 125 throw("coro: OS thread locking must match locking at coroutine creation") 126 } 127 } 128 129 // Acquire tracer for writing for the duration of this call. 130 // 131 // There's a lot of state manipulation performed with shortcuts 132 // but we need to make sure the tracer can only observe the 133 // start and end states to maintain a coherent model and avoid 134 // emitting an event for every single transition. 135 trace := traceAcquire() 136 137 if locked { 138 // Detach the goroutine from the thread; we'll attach to the goroutine we're 139 // switching to before returning. 140 gp.lockedm.set(nil) 141 } 142 143 if exit { 144 // The M might have a non-zero OS thread lock count when we get here, gdestroy 145 // will avoid destroying the M if the G isn't explicitly locked to it via lockedm, 146 // which we cleared above. It's fine to gdestroy here also, even when locked to 147 // the thread, because we'll be switching back to another goroutine anyway, which 148 // will take back its thread-lock state before returning. 149 gdestroy(gp) 150 gp = nil 151 } else { 152 // If we can CAS ourselves directly from running to waiting, so do, 153 // keeping the control transfer as lightweight as possible. 154 gp.waitreason = waitReasonCoroutine 155 if !gp.atomicstatus.CompareAndSwap(_Grunning, _Gwaiting) { 156 // The CAS failed: use casgstatus, which will take care of 157 // coordinating with the garbage collector about the state change. 158 casgstatus(gp, _Grunning, _Gwaiting) 159 } 160 161 // Clear gp.m. 162 setMNoWB(&gp.m, nil) 163 } 164 165 // The goroutine stored in c is the one to run next. 166 // Swap it with ourselves. 167 var gnext *g 168 for { 169 // Note: this is a racy load, but it will eventually 170 // get the right value, and if it gets the wrong value, 171 // the c.gp.cas will fail, so no harm done other than 172 // a wasted loop iteration. 173 // The cas will also sync c.gp's 174 // memory enough that the next iteration of the racy load 175 // should see the correct value. 176 // We are avoiding the atomic load to keep this path 177 // as lightweight as absolutely possible. 178 // (The atomic load is free on x86 but not free elsewhere.) 179 next := c.gp 180 if next.ptr() == nil { 181 throw("coroswitch on exited coro") 182 } 183 var self guintptr 184 self.set(gp) 185 if c.gp.cas(next, self) { 186 gnext = next.ptr() 187 break 188 } 189 } 190 191 // Check if we're switching to ourselves. This case is able to break our 192 // thread-lock invariants and an unbuffered channel implementation of 193 // coroswitch would deadlock. It's clear that this case should just not 194 // work. 195 if gnext == gp { 196 throw("coroswitch of a goroutine to itself") 197 } 198 199 // Emit the trace event after getting gnext but before changing curg. 200 // GoSwitch expects that the current G is running and that we haven't 201 // switched yet for correct status emission. 202 if trace.ok() { 203 trace.GoSwitch(gnext, exit) 204 } 205 206 // Start running next, without heavy scheduling machinery. 207 // Set mp.curg and gnext.m and then update scheduling state 208 // directly if possible. 209 setGNoWB(&mp.curg, gnext) 210 setMNoWB(&gnext.m, mp) 211 212 // Synchronize with any out-standing goroutine profile. We're about to start 213 // executing, and an invariant of the profiler is that we tryRecordGoroutineProfile 214 // whenever a goroutine is about to start running. 215 // 216 // N.B. We must do this before transitioning to _Grunning but after installing gnext 217 // in curg, so that we have a valid curg for allocation (tryRecordGoroutineProfile 218 // may allocate). 219 if goroutineProfile.active { 220 tryRecordGoroutineProfile(gnext, nil, osyield) 221 } 222 223 if !gnext.atomicstatus.CompareAndSwap(_Gwaiting, _Grunning) { 224 // The CAS failed: use casgstatus, which will take care of 225 // coordinating with the garbage collector about the state change. 226 casgstatus(gnext, _Gwaiting, _Grunnable) 227 casgstatus(gnext, _Grunnable, _Grunning) 228 } 229 230 // Donate locked state. 231 if locked { 232 mp.lockedg.set(gnext) 233 gnext.lockedm.set(mp) 234 } 235 236 // Release the trace locker. We've completed all the necessary transitions.. 237 if trace.ok() { 238 traceRelease(trace) 239 } 240 241 // Switch to gnext. Does not return. 242 gogo(&gnext.sched) 243 } 244