Source file src/runtime/time.go
1 // Copyright 2009 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 // Time-related runtime and pieces of package time. 6 7 package runtime 8 9 import ( 10 "internal/abi" 11 "internal/runtime/atomic" 12 "internal/runtime/sys" 13 "unsafe" 14 ) 15 16 // A timer is a potentially repeating trigger for calling t.f(t.arg, t.seq). 17 // Timers are allocated by client code, often as part of other data structures. 18 // Each P has a heap of pointers to timers that it manages. 19 // 20 // A timer is expected to be used by only one client goroutine at a time, 21 // but there will be concurrent access by the P managing that timer. 22 // Timer accesses are protected by the lock t.mu, with a snapshot of 23 // t's state bits published in t.astate to enable certain fast paths to make 24 // decisions about a timer without acquiring the lock. 25 type timer struct { 26 // mu protects reads and writes to all fields, with exceptions noted below. 27 mu mutex 28 29 astate atomic.Uint8 // atomic copy of state bits at last unlock 30 state uint8 // state bits 31 isChan bool // timer has a channel; immutable; can be read without lock 32 33 blocked uint32 // number of goroutines blocked on timer's channel 34 35 // Timer wakes up at when, and then at when+period, ... (period > 0 only) 36 // each time calling f(arg, seq, delay) in the timer goroutine, so f must be 37 // a well-behaved function and not block. 38 // 39 // The arg and seq are client-specified opaque arguments passed back to f. 40 // When used from netpoll, arg and seq have meanings defined by netpoll 41 // and are completely opaque to this code; in that context, seq is a sequence 42 // number to recognize and squelch stale function invocations. 43 // When used from package time, arg is a channel (for After, NewTicker) 44 // or the function to call (for AfterFunc) and seq is unused (0). 45 // 46 // Package time does not know about seq, but if this is a channel timer (t.isChan == true), 47 // this file uses t.seq as a sequence number to recognize and squelch 48 // sends that correspond to an earlier (stale) timer configuration, 49 // similar to its use in netpoll. In this usage (that is, when t.isChan == true), 50 // writes to seq are protected by both t.mu and t.sendLock, 51 // so reads are allowed when holding either of the two mutexes. 52 // 53 // The delay argument is nanotime() - t.when, meaning the delay in ns between 54 // when the timer should have gone off and now. Normally that amount is 55 // small enough not to matter, but for channel timers that are fed lazily, 56 // the delay can be arbitrarily long; package time subtracts it out to make 57 // it look like the send happened earlier than it actually did. 58 // (No one looked at the channel since then, or the send would have 59 // not happened so late, so no one can tell the difference.) 60 when int64 61 period int64 62 f func(arg any, seq uintptr, delay int64) 63 arg any 64 seq uintptr 65 66 // If non-nil, the timers containing t. 67 ts *timers 68 69 // sendLock protects sends on the timer's channel. 70 // Not used for async (pre-Go 1.23) behavior when debug.asynctimerchan.Load() != 0. 71 sendLock mutex 72 73 // isSending is used to handle races between running a 74 // channel timer and stopping or resetting the timer. 75 // It is used only for channel timers (t.isChan == true). 76 // It is not used for tickers. 77 // The value is incremented when about to send a value on the channel, 78 // and decremented after sending the value. 79 // The stop/reset code uses this to detect whether it 80 // stopped the channel send. 81 // 82 // isSending is incremented only when t.mu is held. 83 // isSending is decremented only when t.sendLock is held. 84 // isSending is read only when both t.mu and t.sendLock are held. 85 isSending atomic.Int32 86 } 87 88 // init initializes a newly allocated timer t. 89 // Any code that allocates a timer must call t.init before using it. 90 // The arg and f can be set during init, or they can be nil in init 91 // and set by a future call to t.modify. 92 func (t *timer) init(f func(arg any, seq uintptr, delay int64), arg any) { 93 lockInit(&t.mu, lockRankTimer) 94 t.f = f 95 t.arg = arg 96 } 97 98 // A timers is a per-P set of timers. 99 type timers struct { 100 // mu protects timers; timers are per-P, but the scheduler can 101 // access the timers of another P, so we have to lock. 102 mu mutex 103 104 // heap is the set of timers, ordered by heap[i].when. 105 // Must hold lock to access. 106 heap []timerWhen 107 108 // len is an atomic copy of len(heap). 109 len atomic.Uint32 110 111 // zombies is the number of timers in the heap 112 // that are marked for removal. 113 zombies atomic.Int32 114 115 // raceCtx is the race context used while executing timer functions. 116 raceCtx uintptr 117 118 // minWhenHeap is the minimum heap[i].when value (= heap[0].when). 119 // The wakeTime method uses minWhenHeap and minWhenModified 120 // to determine the next wake time. 121 // If minWhenHeap = 0, it means there are no timers in the heap. 122 minWhenHeap atomic.Int64 123 124 // minWhenModified is a lower bound on the minimum 125 // heap[i].when over timers with the timerModified bit set. 126 // If minWhenModified = 0, it means there are no timerModified timers in the heap. 127 minWhenModified atomic.Int64 128 } 129 130 type timerWhen struct { 131 timer *timer 132 when int64 133 } 134 135 func (ts *timers) lock() { 136 lock(&ts.mu) 137 } 138 139 func (ts *timers) unlock() { 140 // Update atomic copy of len(ts.heap). 141 // We only update at unlock so that the len is always 142 // the most recent unlocked length, not an ephemeral length. 143 // This matters if we lock ts, delete the only timer from the heap, 144 // add it back, and unlock. We want ts.len.Load to return 1 the 145 // entire time, never 0. This is important for pidleput deciding 146 // whether ts is empty. 147 ts.len.Store(uint32(len(ts.heap))) 148 149 unlock(&ts.mu) 150 } 151 152 // Timer state field. 153 const ( 154 // timerHeaped is set when the timer is stored in some P's heap. 155 timerHeaped uint8 = 1 << iota 156 157 // timerModified is set when t.when has been modified 158 // but the heap's heap[i].when entry still needs to be updated. 159 // That change waits until the heap in which 160 // the timer appears can be locked and rearranged. 161 // timerModified is only set when timerHeaped is also set. 162 timerModified 163 164 // timerZombie is set when the timer has been stopped 165 // but is still present in some P's heap. 166 // Only set when timerHeaped is also set. 167 // It is possible for timerModified and timerZombie to both 168 // be set, meaning that the timer was modified and then stopped. 169 // A timer sending to a channel may be placed in timerZombie 170 // to take it out of the heap even though the timer is not stopped, 171 // as long as nothing is reading from the channel. 172 timerZombie 173 ) 174 175 // timerDebug enables printing a textual debug trace of all timer operations to stderr. 176 const timerDebug = false 177 178 func (t *timer) trace(op string) { 179 if timerDebug { 180 t.trace1(op) 181 } 182 } 183 184 func (t *timer) trace1(op string) { 185 if !timerDebug { 186 return 187 } 188 bits := [4]string{"h", "m", "z", "c"} 189 for i := range 3 { 190 if t.state&(1<<i) == 0 { 191 bits[i] = "-" 192 } 193 } 194 if !t.isChan { 195 bits[3] = "-" 196 } 197 print("T ", t, " ", bits[0], bits[1], bits[2], bits[3], " b=", t.blocked, " ", op, "\n") 198 } 199 200 func (ts *timers) trace(op string) { 201 if timerDebug { 202 println("TS", ts, op) 203 } 204 } 205 206 // lock locks the timer, allowing reading or writing any of the timer fields. 207 func (t *timer) lock() { 208 lock(&t.mu) 209 t.trace("lock") 210 } 211 212 // unlock updates t.astate and unlocks the timer. 213 func (t *timer) unlock() { 214 t.trace("unlock") 215 // Let heap fast paths know whether heap[i].when is accurate. 216 // Also let maybeRunChan know whether channel is in heap. 217 t.astate.Store(t.state) 218 unlock(&t.mu) 219 } 220 221 // hchan returns the channel in t.arg. 222 // t must be a timer with a channel. 223 func (t *timer) hchan() *hchan { 224 if !t.isChan { 225 badTimer() 226 } 227 // Note: t.arg is a chan time.Time, 228 // and runtime cannot refer to that type, 229 // so we cannot use a type assertion. 230 return (*hchan)(efaceOf(&t.arg).data) 231 } 232 233 // updateHeap updates t as directed by t.state, updating t.state 234 // and returning a bool indicating whether the state (and ts.heap[0].when) changed. 235 // The caller must hold t's lock, or the world can be stopped instead. 236 // The timer set t.ts must be non-nil and locked, t must be t.ts.heap[0], and updateHeap 237 // takes care of moving t within the timers heap to preserve the heap invariants. 238 // If ts == nil, then t must not be in a heap (or is in a heap that is 239 // temporarily not maintaining its invariant, such as during timers.adjust). 240 func (t *timer) updateHeap() (updated bool) { 241 assertWorldStoppedOrLockHeld(&t.mu) 242 t.trace("updateHeap") 243 ts := t.ts 244 if ts == nil || t != ts.heap[0].timer { 245 badTimer() 246 } 247 assertLockHeld(&ts.mu) 248 if t.state&timerZombie != 0 { 249 // Take timer out of heap. 250 t.state &^= timerHeaped | timerZombie | timerModified 251 ts.zombies.Add(-1) 252 ts.deleteMin() 253 return true 254 } 255 256 if t.state&timerModified != 0 { 257 // Update ts.heap[0].when and move within heap. 258 t.state &^= timerModified 259 ts.heap[0].when = t.when 260 ts.siftDown(0) 261 ts.updateMinWhenHeap() 262 return true 263 } 264 265 return false 266 } 267 268 // maxWhen is the maximum value for timer's when field. 269 const maxWhen = 1<<63 - 1 270 271 // verifyTimers can be set to true to add debugging checks that the 272 // timer heaps are valid. 273 const verifyTimers = false 274 275 // Package time APIs. 276 // Godoc uses the comments in package time, not these. 277 278 // time.now is implemented in assembly. 279 280 // timeSleep puts the current goroutine to sleep for at least ns nanoseconds. 281 // 282 //go:linkname timeSleep time.Sleep 283 func timeSleep(ns int64) { 284 if ns <= 0 { 285 return 286 } 287 288 gp := getg() 289 t := gp.timer 290 if t == nil { 291 t = new(timer) 292 t.init(goroutineReady, gp) 293 gp.timer = t 294 } 295 when := nanotime() + ns 296 if when < 0 { // check for overflow. 297 when = maxWhen 298 } 299 gp.sleepWhen = when 300 gopark(resetForSleep, nil, waitReasonSleep, traceBlockSleep, 1) 301 } 302 303 // resetForSleep is called after the goroutine is parked for timeSleep. 304 // We can't call timer.reset in timeSleep itself because if this is a short 305 // sleep and there are many goroutines then the P can wind up running the 306 // timer function, goroutineReady, before the goroutine has been parked. 307 func resetForSleep(gp *g, _ unsafe.Pointer) bool { 308 gp.timer.reset(gp.sleepWhen, 0) 309 return true 310 } 311 312 // A timeTimer is a runtime-allocated time.Timer or time.Ticker 313 // with the additional runtime state following it. 314 // The runtime state is inaccessible to package time. 315 type timeTimer struct { 316 c unsafe.Pointer // <-chan time.Time 317 init bool 318 timer 319 } 320 321 // newTimer allocates and returns a new time.Timer or time.Ticker (same layout) 322 // with the given parameters. 323 // 324 //go:linkname newTimer time.newTimer 325 func newTimer(when, period int64, f func(arg any, seq uintptr, delay int64), arg any, c *hchan) *timeTimer { 326 t := new(timeTimer) 327 t.timer.init(nil, nil) 328 t.trace("new") 329 if raceenabled { 330 racerelease(unsafe.Pointer(&t.timer)) 331 } 332 if c != nil { 333 lockInit(&t.sendLock, lockRankTimerSend) 334 t.isChan = true 335 c.timer = &t.timer 336 if c.dataqsiz == 0 { 337 throw("invalid timer channel: no capacity") 338 } 339 } 340 t.modify(when, period, f, arg, 0) 341 t.init = true 342 return t 343 } 344 345 // stopTimer stops a timer. 346 // It reports whether t was stopped before being run. 347 // 348 //go:linkname stopTimer time.stopTimer 349 func stopTimer(t *timeTimer) bool { 350 return t.stop() 351 } 352 353 // resetTimer resets an inactive timer, adding it to the timer heap. 354 // 355 // Reports whether the timer was modified before it was run. 356 // 357 //go:linkname resetTimer time.resetTimer 358 func resetTimer(t *timeTimer, when, period int64) bool { 359 if raceenabled { 360 racerelease(unsafe.Pointer(&t.timer)) 361 } 362 return t.reset(when, period) 363 } 364 365 // Go runtime. 366 367 // Ready the goroutine arg. 368 func goroutineReady(arg any, _ uintptr, _ int64) { 369 goready(arg.(*g), 0) 370 } 371 372 // addHeap adds t to the timers heap. 373 // The caller must hold ts.lock or the world must be stopped. 374 // The caller must also have checked that t belongs in the heap. 375 // Callers that are not sure can call t.maybeAdd instead, 376 // but note that maybeAdd has different locking requirements. 377 func (ts *timers) addHeap(t *timer) { 378 assertWorldStoppedOrLockHeld(&ts.mu) 379 // Timers rely on the network poller, so make sure the poller 380 // has started. 381 if netpollInited.Load() == 0 { 382 netpollGenericInit() 383 } 384 385 if t.ts != nil { 386 throw("ts set in timer") 387 } 388 t.ts = ts 389 ts.heap = append(ts.heap, timerWhen{t, t.when}) 390 ts.siftUp(len(ts.heap) - 1) 391 if t == ts.heap[0].timer { 392 ts.updateMinWhenHeap() 393 } 394 } 395 396 // maybeRunAsync checks whether t needs to be triggered and runs it if so. 397 // The caller is responsible for locking the timer and for checking that we 398 // are running timers in async mode. If the timer needs to be run, 399 // maybeRunAsync will unlock and re-lock it. 400 // The timer is always locked on return. 401 func (t *timer) maybeRunAsync() { 402 assertLockHeld(&t.mu) 403 if t.state&timerHeaped == 0 && t.isChan && t.when > 0 { 404 // If timer should have triggered already (but nothing looked at it yet), 405 // trigger now, so that a receive after the stop sees the "old" value 406 // that should be there. 407 // (It is possible to have t.blocked > 0 if there is a racing receive 408 // in blockTimerChan, but timerHeaped not being set means 409 // it hasn't run t.maybeAdd yet; in that case, running the 410 // timer ourselves now is fine.) 411 if now := nanotime(); t.when <= now { 412 systemstack(func() { 413 t.unlockAndRun(now) // resets t.when 414 }) 415 t.lock() 416 } 417 } 418 } 419 420 // stop stops the timer t. It may be on some other P, so we can't 421 // actually remove it from the timers heap. We can only mark it as stopped. 422 // It will be removed in due course by the P whose heap it is on. 423 // Reports whether the timer was stopped before it was run. 424 func (t *timer) stop() bool { 425 async := debug.asynctimerchan.Load() != 0 426 if !async && t.isChan { 427 lock(&t.sendLock) 428 } 429 430 t.lock() 431 t.trace("stop") 432 if async { 433 t.maybeRunAsync() 434 } 435 if t.state&timerHeaped != 0 { 436 t.state |= timerModified 437 if t.state&timerZombie == 0 { 438 t.state |= timerZombie 439 t.ts.zombies.Add(1) 440 } 441 } 442 pending := t.when > 0 443 t.when = 0 444 445 if !async && t.isChan { 446 // Stop any future sends with stale values. 447 // See timer.unlockAndRun. 448 t.seq++ 449 450 // If there is currently a send in progress, 451 // incrementing seq is going to prevent that 452 // send from actually happening. That means 453 // that we should return true: the timer was 454 // stopped, even though t.when may be zero. 455 if t.period == 0 && t.isSending.Load() > 0 { 456 pending = true 457 } 458 } 459 t.unlock() 460 if !async && t.isChan { 461 unlock(&t.sendLock) 462 if timerchandrain(t.hchan()) { 463 pending = true 464 } 465 } 466 467 return pending 468 } 469 470 // deleteMin removes timer 0 from ts. 471 // ts must be locked. 472 func (ts *timers) deleteMin() { 473 assertLockHeld(&ts.mu) 474 t := ts.heap[0].timer 475 if t.ts != ts { 476 throw("wrong timers") 477 } 478 t.ts = nil 479 last := len(ts.heap) - 1 480 if last > 0 { 481 ts.heap[0] = ts.heap[last] 482 } 483 ts.heap[last] = timerWhen{} 484 ts.heap = ts.heap[:last] 485 if last > 0 { 486 ts.siftDown(0) 487 } 488 ts.updateMinWhenHeap() 489 if last == 0 { 490 // If there are no timers, then clearly there are no timerModified timers. 491 ts.minWhenModified.Store(0) 492 } 493 } 494 495 // modify modifies an existing timer. 496 // This is called by the netpoll code or time.Ticker.Reset or time.Timer.Reset. 497 // Reports whether the timer was modified before it was run. 498 // If f == nil, then t.f, t.arg, and t.seq are not modified. 499 func (t *timer) modify(when, period int64, f func(arg any, seq uintptr, delay int64), arg any, seq uintptr) bool { 500 if when <= 0 { 501 throw("timer when must be positive") 502 } 503 if period < 0 { 504 throw("timer period must be non-negative") 505 } 506 async := debug.asynctimerchan.Load() != 0 507 508 if !async && t.isChan { 509 lock(&t.sendLock) 510 } 511 512 t.lock() 513 if async { 514 t.maybeRunAsync() 515 } 516 t.trace("modify") 517 oldPeriod := t.period 518 t.period = period 519 if f != nil { 520 t.f = f 521 t.arg = arg 522 t.seq = seq 523 } 524 525 wake := false 526 pending := t.when > 0 527 t.when = when 528 if t.state&timerHeaped != 0 { 529 t.state |= timerModified 530 if t.state&timerZombie != 0 { 531 // In the heap but marked for removal (by a Stop). 532 // Unmark it, since it has been Reset and will be running again. 533 t.ts.zombies.Add(-1) 534 t.state &^= timerZombie 535 } 536 // The corresponding heap[i].when is updated later. 537 // See comment in type timer above and in timers.adjust below. 538 if min := t.ts.minWhenModified.Load(); min == 0 || when < min { 539 wake = true 540 // Force timerModified bit out to t.astate before updating t.minWhenModified, 541 // to synchronize with t.ts.adjust. See comment in adjust. 542 t.astate.Store(t.state) 543 t.ts.updateMinWhenModified(when) 544 } 545 } 546 547 add := t.needsAdd() 548 549 if !async && t.isChan { 550 // Stop any future sends with stale values. 551 // See timer.unlockAndRun. 552 t.seq++ 553 554 // If there is currently a send in progress, 555 // incrementing seq is going to prevent that 556 // send from actually happening. That means 557 // that we should return true: the timer was 558 // stopped, even though t.when may be zero. 559 if oldPeriod == 0 && t.isSending.Load() > 0 { 560 pending = true 561 } 562 } 563 t.unlock() 564 if !async && t.isChan { 565 if timerchandrain(t.hchan()) { 566 pending = true 567 } 568 unlock(&t.sendLock) 569 } 570 571 if add { 572 t.maybeAdd() 573 } 574 if wake { 575 wakeNetPoller(when) 576 } 577 578 return pending 579 } 580 581 // needsAdd reports whether t needs to be added to a timers heap. 582 // t must be locked. 583 func (t *timer) needsAdd() bool { 584 assertLockHeld(&t.mu) 585 need := t.state&timerHeaped == 0 && t.when > 0 && (!t.isChan || t.blocked > 0) 586 if need { 587 t.trace("needsAdd+") 588 } else { 589 t.trace("needsAdd-") 590 } 591 return need 592 } 593 594 // maybeAdd adds t to the local timers heap if it needs to be in a heap. 595 // The caller must not hold t's lock nor any timers heap lock. 596 // The caller probably just unlocked t, but that lock must be dropped 597 // in order to acquire a ts.lock, to avoid lock inversions. 598 // (timers.adjust holds ts.lock while acquiring each t's lock, 599 // so we cannot hold any t's lock while acquiring ts.lock). 600 // 601 // Strictly speaking it *might* be okay to hold t.lock and 602 // acquire ts.lock at the same time, because we know that 603 // t is not in any ts.heap, so nothing holding a ts.lock would 604 // be acquiring the t.lock at the same time, meaning there 605 // isn't a possible deadlock. But it is easier and safer not to be 606 // too clever and respect the static ordering. 607 // (If we don't, we have to change the static lock checking of t and ts.) 608 // 609 // Concurrent calls to time.Timer.Reset or blockTimerChan 610 // may result in concurrent calls to t.maybeAdd, 611 // so we cannot assume that t is not in a heap on entry to t.maybeAdd. 612 func (t *timer) maybeAdd() { 613 // Note: Not holding any locks on entry to t.maybeAdd, 614 // so the current g can be rescheduled to a different M and P 615 // at any time, including between the ts := assignment and the 616 // call to ts.lock. If a reschedule happened then, we would be 617 // adding t to some other P's timers, perhaps even a P that the scheduler 618 // has marked as idle with no timers, in which case the timer could 619 // go unnoticed until long after t.when. 620 // Calling acquirem instead of using getg().m makes sure that 621 // we end up locking and inserting into the current P's timers. 622 mp := acquirem() 623 ts := &mp.p.ptr().timers 624 ts.lock() 625 ts.cleanHead() 626 t.lock() 627 t.trace("maybeAdd") 628 when := int64(0) 629 wake := false 630 if t.needsAdd() { 631 t.state |= timerHeaped 632 when = t.when 633 wakeTime := ts.wakeTime() 634 wake = wakeTime == 0 || when < wakeTime 635 ts.addHeap(t) 636 } 637 t.unlock() 638 ts.unlock() 639 releasem(mp) 640 if wake { 641 wakeNetPoller(when) 642 } 643 } 644 645 // reset resets the time when a timer should fire. 646 // If used for an inactive timer, the timer will become active. 647 // Reports whether the timer was active and was stopped. 648 func (t *timer) reset(when, period int64) bool { 649 return t.modify(when, period, nil, nil, 0) 650 } 651 652 // cleanHead cleans up the head of the timer queue. This speeds up 653 // programs that create and delete timers; leaving them in the heap 654 // slows down heap operations. 655 // The caller must have locked ts. 656 func (ts *timers) cleanHead() { 657 ts.trace("cleanHead") 658 assertLockHeld(&ts.mu) 659 gp := getg() 660 for { 661 if len(ts.heap) == 0 { 662 return 663 } 664 665 // This loop can theoretically run for a while, and because 666 // it is holding timersLock it cannot be preempted. 667 // If someone is trying to preempt us, just return. 668 // We can clean the timers later. 669 if gp.preemptStop { 670 return 671 } 672 673 // Delete zombies from tail of heap. It requires no heap adjustments at all, 674 // and doing so increases the chances that when we swap out a zombie 675 // in heap[0] for the tail of the heap, we'll get a non-zombie timer, 676 // shortening this loop. 677 n := len(ts.heap) 678 if t := ts.heap[n-1].timer; t.astate.Load()&timerZombie != 0 { 679 t.lock() 680 if t.state&timerZombie != 0 { 681 t.state &^= timerHeaped | timerZombie | timerModified 682 t.ts = nil 683 ts.zombies.Add(-1) 684 ts.heap[n-1] = timerWhen{} 685 ts.heap = ts.heap[:n-1] 686 } 687 t.unlock() 688 continue 689 } 690 691 t := ts.heap[0].timer 692 if t.ts != ts { 693 throw("bad ts") 694 } 695 696 if t.astate.Load()&(timerModified|timerZombie) == 0 { 697 // Fast path: head of timers does not need adjustment. 698 return 699 } 700 701 t.lock() 702 updated := t.updateHeap() 703 t.unlock() 704 if !updated { 705 // Head of timers does not need adjustment. 706 return 707 } 708 } 709 } 710 711 // take moves any timers from src into ts 712 // and then clears the timer state from src, 713 // because src is being destroyed. 714 // The caller must not have locked either timers. 715 // For now this is only called when the world is stopped. 716 func (ts *timers) take(src *timers) { 717 ts.trace("take") 718 assertWorldStopped() 719 if len(src.heap) > 0 { 720 // The world is stopped, so we ignore the locking of ts and src here. 721 // That would introduce a sched < timers lock ordering, 722 // which we'd rather avoid in the static ranking. 723 for _, tw := range src.heap { 724 t := tw.timer 725 t.ts = nil 726 if t.state&timerZombie != 0 { 727 t.state &^= timerHeaped | timerZombie | timerModified 728 } else { 729 t.state &^= timerModified 730 ts.addHeap(t) 731 } 732 } 733 src.heap = nil 734 src.zombies.Store(0) 735 src.minWhenHeap.Store(0) 736 src.minWhenModified.Store(0) 737 src.len.Store(0) 738 ts.len.Store(uint32(len(ts.heap))) 739 } 740 } 741 742 // adjust looks through the timers in ts.heap for 743 // any timers that have been modified to run earlier, and puts them in 744 // the correct place in the heap. While looking for those timers, 745 // it also moves timers that have been modified to run later, 746 // and removes deleted timers. The caller must have locked ts. 747 func (ts *timers) adjust(now int64, force bool) { 748 ts.trace("adjust") 749 assertLockHeld(&ts.mu) 750 // If we haven't yet reached the time of the earliest modified 751 // timer, don't do anything. This speeds up programs that adjust 752 // a lot of timers back and forth if the timers rarely expire. 753 // We'll postpone looking through all the adjusted timers until 754 // one would actually expire. 755 if !force { 756 first := ts.minWhenModified.Load() 757 if first == 0 || first > now { 758 if verifyTimers { 759 ts.verify() 760 } 761 return 762 } 763 } 764 765 // minWhenModified is a lower bound on the earliest t.when 766 // among the timerModified timers. We want to make it more precise: 767 // we are going to scan the heap and clean out all the timerModified bits, 768 // at which point minWhenModified can be set to 0 (indicating none at all). 769 // 770 // Other P's can be calling ts.wakeTime concurrently, and we'd like to 771 // keep ts.wakeTime returning an accurate value throughout this entire process. 772 // 773 // Setting minWhenModified = 0 *before* the scan could make wakeTime 774 // return an incorrect value: if minWhenModified < minWhenHeap, then clearing 775 // it to 0 will make wakeTime return minWhenHeap (too late) until the scan finishes. 776 // To avoid that, we want to set minWhenModified to 0 *after* the scan. 777 // 778 // Setting minWhenModified = 0 *after* the scan could result in missing 779 // concurrent timer modifications in other goroutines; those will lock 780 // the specific timer, set the timerModified bit, and set t.when. 781 // To avoid that, we want to set minWhenModified to 0 *before* the scan. 782 // 783 // The way out of this dilemma is to preserve wakeTime a different way. 784 // wakeTime is min(minWhenHeap, minWhenModified), and minWhenHeap 785 // is protected by ts.lock, which we hold, so we can modify it however we like 786 // in service of keeping wakeTime accurate. 787 // 788 // So we can: 789 // 790 // 1. Set minWhenHeap = min(minWhenHeap, minWhenModified) 791 // 2. Set minWhenModified = 0 792 // (Other goroutines may modify timers and update minWhenModified now.) 793 // 3. Scan timers 794 // 4. Set minWhenHeap = heap[0].when 795 // 796 // That order preserves a correct value of wakeTime throughout the entire 797 // operation: 798 // Step 1 “locks in” an accurate wakeTime even with minWhenModified cleared. 799 // Step 2 makes sure concurrent t.when updates are not lost during the scan. 800 // Step 3 processes all modified timer values, justifying minWhenModified = 0. 801 // Step 4 corrects minWhenHeap to a precise value. 802 // 803 // The wakeTime method implementation reads minWhenModified *before* minWhenHeap, 804 // so that if the minWhenModified is observed to be 0, that means the minWhenHeap that 805 // follows will include the information that was zeroed out of it. 806 // 807 // Originally Step 3 locked every timer, which made sure any timer update that was 808 // already in progress during Steps 1+2 completed and was observed by Step 3. 809 // All that locking was too expensive, so now we do an atomic load of t.astate to 810 // decide whether we need to do a full lock. To make sure that we still observe any 811 // timer update already in progress during Steps 1+2, t.modify sets timerModified 812 // in t.astate *before* calling t.updateMinWhenModified. That ensures that the 813 // overwrite in Step 2 cannot lose an update: if it does overwrite an update, Step 3 814 // will see the timerModified and do a full lock. 815 ts.minWhenHeap.Store(ts.wakeTime()) 816 ts.minWhenModified.Store(0) 817 818 changed := false 819 for i := 0; i < len(ts.heap); i++ { 820 tw := &ts.heap[i] 821 t := tw.timer 822 if t.ts != ts { 823 throw("bad ts") 824 } 825 826 if t.astate.Load()&(timerModified|timerZombie) == 0 { 827 // Does not need adjustment. 828 continue 829 } 830 831 t.lock() 832 switch { 833 case t.state&timerHeaped == 0: 834 badTimer() 835 836 case t.state&timerZombie != 0: 837 ts.zombies.Add(-1) 838 t.state &^= timerHeaped | timerZombie | timerModified 839 n := len(ts.heap) 840 ts.heap[i] = ts.heap[n-1] 841 ts.heap[n-1] = timerWhen{} 842 ts.heap = ts.heap[:n-1] 843 t.ts = nil 844 i-- 845 changed = true 846 847 case t.state&timerModified != 0: 848 tw.when = t.when 849 t.state &^= timerModified 850 changed = true 851 } 852 t.unlock() 853 } 854 855 if changed { 856 ts.initHeap() 857 } 858 ts.updateMinWhenHeap() 859 860 if verifyTimers { 861 ts.verify() 862 } 863 } 864 865 // wakeTime looks at ts's timers and returns the time when we 866 // should wake up the netpoller. It returns 0 if there are no timers. 867 // This function is invoked when dropping a P, so it must run without 868 // any write barriers. 869 // 870 //go:nowritebarrierrec 871 func (ts *timers) wakeTime() int64 { 872 // Note that the order of these two loads matters: 873 // adjust updates minWhen to make it safe to clear minNextWhen. 874 // We read minWhen after reading minNextWhen so that 875 // if we see a cleared minNextWhen, we are guaranteed to see 876 // the updated minWhen. 877 nextWhen := ts.minWhenModified.Load() 878 when := ts.minWhenHeap.Load() 879 if when == 0 || (nextWhen != 0 && nextWhen < when) { 880 when = nextWhen 881 } 882 return when 883 } 884 885 // check runs any timers in ts that are ready. 886 // If now is not 0 it is the current time. 887 // It returns the passed time or the current time if now was passed as 0. 888 // and the time when the next timer should run or 0 if there is no next timer, 889 // and reports whether it ran any timers. 890 // If the time when the next timer should run is not 0, 891 // it is always larger than the returned time. 892 // We pass now in and out to avoid extra calls of nanotime. 893 // 894 //go:yeswritebarrierrec 895 func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool) { 896 ts.trace("check") 897 // If it's not yet time for the first timer, or the first adjusted 898 // timer, then there is nothing to do. 899 next := ts.wakeTime() 900 if next == 0 { 901 // No timers to run or adjust. 902 return now, 0, false 903 } 904 905 if now == 0 { 906 now = nanotime() 907 } 908 909 // If this is the local P, and there are a lot of deleted timers, 910 // clear them out. We only do this for the local P to reduce 911 // lock contention on timersLock. 912 zombies := ts.zombies.Load() 913 if zombies < 0 { 914 badTimer() 915 } 916 force := ts == &getg().m.p.ptr().timers && int(zombies) > int(ts.len.Load())/4 917 918 if now < next && !force { 919 // Next timer is not ready to run, and we don't need to clear deleted timers. 920 return now, next, false 921 } 922 923 ts.lock() 924 if len(ts.heap) > 0 { 925 ts.adjust(now, false) 926 for len(ts.heap) > 0 { 927 // Note that runtimer may temporarily unlock ts. 928 if tw := ts.run(now); tw != 0 { 929 if tw > 0 { 930 pollUntil = tw 931 } 932 break 933 } 934 ran = true 935 } 936 937 // Note: Delaying the forced adjustment until after the ts.run 938 // (as opposed to calling ts.adjust(now, force) above) 939 // is significantly faster under contention, such as in 940 // package time's BenchmarkTimerAdjust10000, 941 // though we do not fully understand why. 942 force = ts == &getg().m.p.ptr().timers && int(ts.zombies.Load()) > int(ts.len.Load())/4 943 if force { 944 ts.adjust(now, true) 945 } 946 } 947 ts.unlock() 948 949 return now, pollUntil, ran 950 } 951 952 // run examines the first timer in ts. If it is ready based on now, 953 // it runs the timer and removes or updates it. 954 // Returns 0 if it ran a timer, -1 if there are no more timers, or the time 955 // when the first timer should run. 956 // The caller must have locked ts. 957 // If a timer is run, this will temporarily unlock ts. 958 // 959 //go:systemstack 960 func (ts *timers) run(now int64) int64 { 961 ts.trace("run") 962 assertLockHeld(&ts.mu) 963 Redo: 964 if len(ts.heap) == 0 { 965 return -1 966 } 967 tw := ts.heap[0] 968 t := tw.timer 969 if t.ts != ts { 970 throw("bad ts") 971 } 972 973 if t.astate.Load()&(timerModified|timerZombie) == 0 && tw.when > now { 974 // Fast path: not ready to run. 975 return tw.when 976 } 977 978 t.lock() 979 if t.updateHeap() { 980 t.unlock() 981 goto Redo 982 } 983 984 if t.state&timerHeaped == 0 || t.state&timerModified != 0 { 985 badTimer() 986 } 987 988 if t.when > now { 989 // Not ready to run. 990 t.unlock() 991 return t.when 992 } 993 994 t.unlockAndRun(now) 995 assertLockHeld(&ts.mu) // t is unlocked now, but not ts 996 return 0 997 } 998 999 // unlockAndRun unlocks and runs the timer t (which must be locked). 1000 // If t is in a timer set (t.ts != nil), the caller must also have locked the timer set, 1001 // and this call will temporarily unlock the timer set while running the timer function. 1002 // unlockAndRun returns with t unlocked and t.ts (re-)locked. 1003 // 1004 //go:systemstack 1005 func (t *timer) unlockAndRun(now int64) { 1006 t.trace("unlockAndRun") 1007 assertLockHeld(&t.mu) 1008 if t.ts != nil { 1009 assertLockHeld(&t.ts.mu) 1010 } 1011 if raceenabled { 1012 // Note that we are running on a system stack, 1013 // so there is no chance of getg().m being reassigned 1014 // out from under us while this function executes. 1015 tsLocal := &getg().m.p.ptr().timers 1016 if tsLocal.raceCtx == 0 { 1017 tsLocal.raceCtx = racegostart(abi.FuncPCABIInternal((*timers).run) + sys.PCQuantum) 1018 } 1019 raceacquirectx(tsLocal.raceCtx, unsafe.Pointer(t)) 1020 } 1021 1022 if t.state&(timerModified|timerZombie) != 0 { 1023 badTimer() 1024 } 1025 1026 f := t.f 1027 arg := t.arg 1028 seq := t.seq 1029 var next int64 1030 delay := now - t.when 1031 if t.period > 0 { 1032 // Leave in heap but adjust next time to fire. 1033 next = t.when + t.period*(1+delay/t.period) 1034 if next < 0 { // check for overflow. 1035 next = maxWhen 1036 } 1037 } else { 1038 next = 0 1039 } 1040 ts := t.ts 1041 t.when = next 1042 if t.state&timerHeaped != 0 { 1043 t.state |= timerModified 1044 if next == 0 { 1045 t.state |= timerZombie 1046 t.ts.zombies.Add(1) 1047 } 1048 t.updateHeap() 1049 } 1050 1051 async := debug.asynctimerchan.Load() != 0 1052 if !async && t.isChan && t.period == 0 { 1053 // Tell Stop/Reset that we are sending a value. 1054 if t.isSending.Add(1) < 0 { 1055 throw("too many concurrent timer firings") 1056 } 1057 } 1058 1059 t.unlock() 1060 1061 if raceenabled { 1062 // Temporarily use the current P's racectx for g0. 1063 gp := getg() 1064 if gp.racectx != 0 { 1065 throw("unexpected racectx") 1066 } 1067 gp.racectx = gp.m.p.ptr().timers.raceCtx 1068 } 1069 1070 if ts != nil { 1071 ts.unlock() 1072 } 1073 1074 if !async && t.isChan { 1075 // For a timer channel, we want to make sure that no stale sends 1076 // happen after a t.stop or t.modify, but we cannot hold t.mu 1077 // during the actual send (which f does) due to lock ordering. 1078 // It can happen that we are holding t's lock above, we decide 1079 // it's time to send a time value (by calling f), grab the parameters, 1080 // unlock above, and then a t.stop or t.modify changes the timer 1081 // and returns. At that point, the send needs not to happen after all. 1082 // The way we arrange for it not to happen is that t.stop and t.modify 1083 // both increment t.seq while holding both t.mu and t.sendLock. 1084 // We copied the seq value above while holding t.mu. 1085 // Now we can acquire t.sendLock (which will be held across the send) 1086 // and double-check that t.seq is still the seq value we saw above. 1087 // If not, the timer has been updated and we should skip the send. 1088 // We skip the send by reassigning f to a no-op function. 1089 // 1090 // The isSending field tells t.stop or t.modify that we have 1091 // started to send the value. That lets them correctly return 1092 // true meaning that no value was sent. 1093 lock(&t.sendLock) 1094 1095 if t.period == 0 { 1096 // We are committed to possibly sending a value 1097 // based on seq, so no need to keep telling 1098 // stop/modify that we are sending. 1099 if t.isSending.Add(-1) < 0 { 1100 throw("mismatched isSending updates") 1101 } 1102 } 1103 1104 if t.seq != seq { 1105 f = func(any, uintptr, int64) {} 1106 } 1107 } 1108 1109 f(arg, seq, delay) 1110 1111 if !async && t.isChan { 1112 unlock(&t.sendLock) 1113 } 1114 1115 if ts != nil { 1116 ts.lock() 1117 } 1118 1119 if raceenabled { 1120 gp := getg() 1121 gp.racectx = 0 1122 } 1123 } 1124 1125 // verifyTimerHeap verifies that the timers is in a valid state. 1126 // This is only for debugging, and is only called if verifyTimers is true. 1127 // The caller must have locked ts. 1128 func (ts *timers) verify() { 1129 assertLockHeld(&ts.mu) 1130 for i, tw := range ts.heap { 1131 if i == 0 { 1132 // First timer has no parent. 1133 continue 1134 } 1135 1136 // The heap is timerHeapN-ary. See siftupTimer and siftdownTimer. 1137 p := int(uint(i-1) / timerHeapN) 1138 if tw.when < ts.heap[p].when { 1139 print("bad timer heap at ", i, ": ", p, ": ", ts.heap[p].when, ", ", i, ": ", tw.when, "\n") 1140 throw("bad timer heap") 1141 } 1142 } 1143 if n := int(ts.len.Load()); len(ts.heap) != n { 1144 println("timer heap len", len(ts.heap), "!= atomic len", n) 1145 throw("bad timer heap len") 1146 } 1147 } 1148 1149 // updateMinWhenHeap sets ts.minWhenHeap to ts.heap[0].when. 1150 // The caller must have locked ts or the world must be stopped. 1151 func (ts *timers) updateMinWhenHeap() { 1152 assertWorldStoppedOrLockHeld(&ts.mu) 1153 if len(ts.heap) == 0 { 1154 ts.minWhenHeap.Store(0) 1155 } else { 1156 ts.minWhenHeap.Store(ts.heap[0].when) 1157 } 1158 } 1159 1160 // updateMinWhenModified updates ts.minWhenModified to be <= when. 1161 // ts need not be (and usually is not) locked. 1162 func (ts *timers) updateMinWhenModified(when int64) { 1163 for { 1164 old := ts.minWhenModified.Load() 1165 if old != 0 && old < when { 1166 return 1167 } 1168 if ts.minWhenModified.CompareAndSwap(old, when) { 1169 return 1170 } 1171 } 1172 } 1173 1174 // timeSleepUntil returns the time when the next timer should fire. Returns 1175 // maxWhen if there are no timers. 1176 // This is only called by sysmon and checkdead. 1177 func timeSleepUntil() int64 { 1178 next := int64(maxWhen) 1179 1180 // Prevent allp slice changes. This is like retake. 1181 lock(&allpLock) 1182 for _, pp := range allp { 1183 if pp == nil { 1184 // This can happen if procresize has grown 1185 // allp but not yet created new Ps. 1186 continue 1187 } 1188 1189 if w := pp.timers.wakeTime(); w != 0 { 1190 next = min(next, w) 1191 } 1192 } 1193 unlock(&allpLock) 1194 1195 return next 1196 } 1197 1198 const timerHeapN = 4 1199 1200 // Heap maintenance algorithms. 1201 // These algorithms check for slice index errors manually. 1202 // Slice index error can happen if the program is using racy 1203 // access to timers. We don't want to panic here, because 1204 // it will cause the program to crash with a mysterious 1205 // "panic holding locks" message. Instead, we panic while not 1206 // holding a lock. 1207 1208 // siftUp puts the timer at position i in the right place 1209 // in the heap by moving it up toward the top of the heap. 1210 func (ts *timers) siftUp(i int) { 1211 heap := ts.heap 1212 if i >= len(heap) { 1213 badTimer() 1214 } 1215 tw := heap[i] 1216 when := tw.when 1217 if when <= 0 { 1218 badTimer() 1219 } 1220 for i > 0 { 1221 p := int(uint(i-1) / timerHeapN) // parent 1222 if when >= heap[p].when { 1223 break 1224 } 1225 heap[i] = heap[p] 1226 i = p 1227 } 1228 if heap[i].timer != tw.timer { 1229 heap[i] = tw 1230 } 1231 } 1232 1233 // siftDown puts the timer at position i in the right place 1234 // in the heap by moving it down toward the bottom of the heap. 1235 func (ts *timers) siftDown(i int) { 1236 heap := ts.heap 1237 n := len(heap) 1238 if i >= n { 1239 badTimer() 1240 } 1241 if i*timerHeapN+1 >= n { 1242 return 1243 } 1244 tw := heap[i] 1245 when := tw.when 1246 if when <= 0 { 1247 badTimer() 1248 } 1249 for { 1250 leftChild := i*timerHeapN + 1 1251 if leftChild >= n { 1252 break 1253 } 1254 w := when 1255 c := -1 1256 for j, tw := range heap[leftChild:min(leftChild+timerHeapN, n)] { 1257 if tw.when < w { 1258 w = tw.when 1259 c = leftChild + j 1260 } 1261 } 1262 if c < 0 { 1263 break 1264 } 1265 heap[i] = heap[c] 1266 i = c 1267 } 1268 if heap[i].timer != tw.timer { 1269 heap[i] = tw 1270 } 1271 } 1272 1273 // initHeap reestablishes the heap order in the slice ts.heap. 1274 // It takes O(n) time for n=len(ts.heap), not the O(n log n) of n repeated add operations. 1275 func (ts *timers) initHeap() { 1276 // Last possible element that needs sifting down is parent of last element; 1277 // last element is len(t)-1; parent of last element is (len(t)-1-1)/timerHeapN. 1278 if len(ts.heap) <= 1 { 1279 return 1280 } 1281 for i := int(uint(len(ts.heap)-1-1) / timerHeapN); i >= 0; i-- { 1282 ts.siftDown(i) 1283 } 1284 } 1285 1286 // badTimer is called if the timer data structures have been corrupted, 1287 // presumably due to racy use by the program. We panic here rather than 1288 // panicking due to invalid slice access while holding locks. 1289 // See issue #25686. 1290 func badTimer() { 1291 throw("timer data corruption") 1292 } 1293 1294 // Timer channels. 1295 1296 // maybeRunChan checks whether the timer needs to run 1297 // to send a value to its associated channel. If so, it does. 1298 // The timer must not be locked. 1299 func (t *timer) maybeRunChan() { 1300 if t.astate.Load()&timerHeaped != 0 { 1301 // If the timer is in the heap, the ordinary timer code 1302 // is in charge of sending when appropriate. 1303 return 1304 } 1305 1306 t.lock() 1307 now := nanotime() 1308 if t.state&timerHeaped != 0 || t.when == 0 || t.when > now { 1309 t.trace("maybeRunChan-") 1310 // Timer in the heap, or not running at all, or not triggered. 1311 t.unlock() 1312 return 1313 } 1314 t.trace("maybeRunChan+") 1315 systemstack(func() { 1316 t.unlockAndRun(now) 1317 }) 1318 } 1319 1320 // blockTimerChan is called when a channel op has decided to block on c. 1321 // The caller holds the channel lock for c and possibly other channels. 1322 // blockTimerChan makes sure that c is in a timer heap, 1323 // adding it if needed. 1324 func blockTimerChan(c *hchan) { 1325 t := c.timer 1326 t.lock() 1327 t.trace("blockTimerChan") 1328 if !t.isChan { 1329 badTimer() 1330 } 1331 1332 t.blocked++ 1333 1334 // If this is the first enqueue after a recent dequeue, 1335 // the timer may still be in the heap but marked as a zombie. 1336 // Unmark it in this case, if the timer is still pending. 1337 if t.state&timerHeaped != 0 && t.state&timerZombie != 0 && t.when > 0 { 1338 t.state &^= timerZombie 1339 t.ts.zombies.Add(-1) 1340 } 1341 1342 // t.maybeAdd must be called with t unlocked, 1343 // because it needs to lock t.ts before t. 1344 // Then it will do nothing if t.needsAdd(state) is false. 1345 // Check that now before the unlock, 1346 // avoiding the extra lock-lock-unlock-unlock 1347 // inside maybeAdd when t does not need to be added. 1348 add := t.needsAdd() 1349 t.unlock() 1350 if add { 1351 t.maybeAdd() 1352 } 1353 } 1354 1355 // unblockTimerChan is called when a channel op that was blocked on c 1356 // is no longer blocked. Every call to blockTimerChan must be paired with 1357 // a call to unblockTimerChan. 1358 // The caller holds the channel lock for c and possibly other channels. 1359 // unblockTimerChan removes c from the timer heap when nothing is 1360 // blocked on it anymore. 1361 func unblockTimerChan(c *hchan) { 1362 t := c.timer 1363 t.lock() 1364 t.trace("unblockTimerChan") 1365 if !t.isChan || t.blocked == 0 { 1366 badTimer() 1367 } 1368 t.blocked-- 1369 if t.blocked == 0 && t.state&timerHeaped != 0 && t.state&timerZombie == 0 { 1370 // Last goroutine that was blocked on this timer. 1371 // Mark for removal from heap but do not clear t.when, 1372 // so that we know what time it is still meant to trigger. 1373 t.state |= timerZombie 1374 t.ts.zombies.Add(1) 1375 } 1376 t.unlock() 1377 } 1378