Source file src/runtime/coro.go
1 // Copyright 2023 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 package runtime 6 7 import ( 8 "internal/runtime/sys" 9 "unsafe" 10 ) 11 12 // A coro represents extra concurrency without extra parallelism, 13 // as would be needed for a coroutine implementation. 14 // The coro does not represent a specific coroutine, only the ability 15 // to do coroutine-style control transfers. 16 // It can be thought of as like a special channel that always has 17 // a goroutine blocked on it. If another goroutine calls coroswitch(c), 18 // the caller becomes the goroutine blocked in c, and the goroutine 19 // formerly blocked in c starts running. 20 // These switches continue until a call to coroexit(c), 21 // which ends the use of the coro by releasing the blocked 22 // goroutine in c and exiting the current goroutine. 23 // 24 // Coros are heap allocated and garbage collected, so that user code 25 // can hold a pointer to a coro without causing potential dangling 26 // pointer errors. 27 type coro struct { 28 gp guintptr 29 f func(*coro) 30 31 // State for validating thread-lock interactions. 32 mp *m 33 lockedExt uint32 // mp's external LockOSThread counter at coro creation time. 34 lockedInt uint32 // mp's internal lockOSThread counter at coro creation time. 35 } 36 37 //go:linkname newcoro 38 39 // newcoro creates a new coro containing a 40 // goroutine blocked waiting to run f 41 // and returns that coro. 42 func newcoro(f func(*coro)) *coro { 43 c := new(coro) 44 c.f = f 45 pc := sys.GetCallerPC() 46 gp := getg() 47 systemstack(func() { 48 mp := gp.m 49 start := corostart 50 startfv := *(**funcval)(unsafe.Pointer(&start)) 51 gp = newproc1(startfv, gp, pc, true, waitReasonCoroutine) 52 53 // Scribble down locked thread state if needed and/or donate 54 // thread-lock state to the new goroutine. 55 if mp.lockedExt+mp.lockedInt != 0 { 56 c.mp = mp 57 c.lockedExt = mp.lockedExt 58 c.lockedInt = mp.lockedInt 59 } 60 }) 61 gp.coroarg = c 62 c.gp.set(gp) 63 return c 64 } 65 66 // corostart is the entry func for a new coroutine. 67 // It runs the coroutine user function f passed to corostart 68 // and then calls coroexit to remove the extra concurrency. 69 func corostart() { 70 gp := getg() 71 c := gp.coroarg 72 gp.coroarg = nil 73 74 defer coroexit(c) 75 c.f(c) 76 } 77 78 // coroexit is like coroswitch but closes the coro 79 // and exits the current goroutine 80 func coroexit(c *coro) { 81 gp := getg() 82 gp.coroarg = c 83 gp.coroexit = true 84 mcall(coroswitch_m) 85 } 86 87 //go:linkname coroswitch 88 89 // coroswitch switches to the goroutine blocked on c 90 // and then blocks the current goroutine on c. 91 func coroswitch(c *coro) { 92 gp := getg() 93 gp.coroarg = c 94 mcall(coroswitch_m) 95 } 96 97 // coroswitch_m is the implementation of coroswitch 98 // that runs on the m stack. 99 // 100 // Note: Coroutine switches are expected to happen at 101 // an order of magnitude (or more) higher frequency 102 // than regular goroutine switches, so this path is heavily 103 // optimized to remove unnecessary work. 104 // The fast path here is three CAS: the one at the top on gp.atomicstatus, 105 // the one in the middle to choose the next g, 106 // and the one at the bottom on gnext.atomicstatus. 107 // It is important not to add more atomic operations or other 108 // expensive operations to the fast path. 109 func coroswitch_m(gp *g) { 110 c := gp.coroarg 111 gp.coroarg = nil 112 exit := gp.coroexit 113 gp.coroexit = false 114 mp := gp.m 115 116 // Track and validate thread-lock interactions. 117 // 118 // The rules with thread-lock interactions are simple. When a coro goroutine is switched to, 119 // the same thread must be used, and the locked state must match with the thread-lock state of 120 // the goroutine which called newcoro. Thread-lock state consists of the thread and the number 121 // of internal (cgo callback, etc.) and external (LockOSThread) thread locks. 122 locked := gp.lockedm != 0 123 if c.mp != nil || locked { 124 if mp != c.mp || mp.lockedInt != c.lockedInt || mp.lockedExt != c.lockedExt { 125 print("coro: got thread ", unsafe.Pointer(mp), ", want ", unsafe.Pointer(c.mp), "\n") 126 print("coro: got lock internal ", mp.lockedInt, ", want ", c.lockedInt, "\n") 127 print("coro: got lock external ", mp.lockedExt, ", want ", c.lockedExt, "\n") 128 throw("coro: OS thread locking must match locking at coroutine creation") 129 } 130 } 131 132 // Acquire tracer for writing for the duration of this call. 133 // 134 // There's a lot of state manipulation performed with shortcuts 135 // but we need to make sure the tracer can only observe the 136 // start and end states to maintain a coherent model and avoid 137 // emitting an event for every single transition. 138 trace := traceAcquire() 139 140 if locked { 141 // Detach the goroutine from the thread; we'll attach to the goroutine we're 142 // switching to before returning. 143 gp.lockedm.set(nil) 144 } 145 146 if exit { 147 // The M might have a non-zero OS thread lock count when we get here, gdestroy 148 // will avoid destroying the M if the G isn't explicitly locked to it via lockedm, 149 // which we cleared above. It's fine to gdestroy here also, even when locked to 150 // the thread, because we'll be switching back to another goroutine anyway, which 151 // will take back its thread-lock state before returning. 152 gdestroy(gp) 153 gp = nil 154 } else { 155 // If we can CAS ourselves directly from running to waiting, so do, 156 // keeping the control transfer as lightweight as possible. 157 gp.waitreason = waitReasonCoroutine 158 if !gp.atomicstatus.CompareAndSwap(_Grunning, _Gwaiting) { 159 // The CAS failed: use casgstatus, which will take care of 160 // coordinating with the garbage collector about the state change. 161 casgstatus(gp, _Grunning, _Gwaiting) 162 } 163 164 // Clear gp.m. 165 setMNoWB(&gp.m, nil) 166 } 167 168 // The goroutine stored in c is the one to run next. 169 // Swap it with ourselves. 170 var gnext *g 171 for { 172 // Note: this is a racy load, but it will eventually 173 // get the right value, and if it gets the wrong value, 174 // the c.gp.cas will fail, so no harm done other than 175 // a wasted loop iteration. 176 // The cas will also sync c.gp's 177 // memory enough that the next iteration of the racy load 178 // should see the correct value. 179 // We are avoiding the atomic load to keep this path 180 // as lightweight as absolutely possible. 181 // (The atomic load is free on x86 but not free elsewhere.) 182 next := c.gp 183 if next.ptr() == nil { 184 throw("coroswitch on exited coro") 185 } 186 var self guintptr 187 self.set(gp) 188 if c.gp.cas(next, self) { 189 gnext = next.ptr() 190 break 191 } 192 } 193 194 // Check if we're switching to ourselves. This case is able to break our 195 // thread-lock invariants and an unbuffered channel implementation of 196 // coroswitch would deadlock. It's clear that this case should just not 197 // work. 198 if gnext == gp { 199 throw("coroswitch of a goroutine to itself") 200 } 201 202 // Emit the trace event after getting gnext but before changing curg. 203 // GoSwitch expects that the current G is running and that we haven't 204 // switched yet for correct status emission. 205 if trace.ok() { 206 trace.GoSwitch(gnext, exit) 207 } 208 209 // Start running next, without heavy scheduling machinery. 210 // Set mp.curg and gnext.m and then update scheduling state 211 // directly if possible. 212 setGNoWB(&mp.curg, gnext) 213 setMNoWB(&gnext.m, mp) 214 215 // Synchronize with any out-standing goroutine profile. We're about to start 216 // executing, and an invariant of the profiler is that we tryRecordGoroutineProfile 217 // whenever a goroutine is about to start running. 218 // 219 // N.B. We must do this before transitioning to _Grunning but after installing gnext 220 // in curg, so that we have a valid curg for allocation (tryRecordGoroutineProfile 221 // may allocate). 222 if goroutineProfile.active { 223 tryRecordGoroutineProfile(gnext, nil, osyield) 224 } 225 226 if !gnext.atomicstatus.CompareAndSwap(_Gwaiting, _Grunning) { 227 // The CAS failed: use casgstatus, which will take care of 228 // coordinating with the garbage collector about the state change. 229 casgstatus(gnext, _Gwaiting, _Grunnable) 230 casgstatus(gnext, _Grunnable, _Grunning) 231 } 232 233 // Donate locked state. 234 if locked { 235 mp.lockedg.set(gnext) 236 gnext.lockedm.set(mp) 237 } 238 239 // Release the trace locker. We've completed all the necessary transitions.. 240 if trace.ok() { 241 traceRelease(trace) 242 } 243 244 // Switch to gnext. Does not return. 245 gogo(&gnext.sched) 246 } 247