Source file
src/runtime/netpoll.go
1
2
3
4
5
6
7 package runtime
8
9 import (
10 "internal/runtime/atomic"
11 "internal/runtime/sys"
12 "unsafe"
13 )
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 const (
45 pollNoError = 0
46 pollErrClosing = 1
47 pollErrTimeout = 2
48 pollErrNotPollable = 3
49 )
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 const (
65 pdNil uintptr = 0
66 pdReady uintptr = 1
67 pdWait uintptr = 2
68 )
69
70 const pollBlockSize = 4 * 1024
71
72
73
74
75 type pollDesc struct {
76 _ sys.NotInHeap
77 link *pollDesc
78 fd uintptr
79 fdseq atomic.Uintptr
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 atomicInfo atomic.Uint32
97
98
99
100 rg atomic.Uintptr
101 wg atomic.Uintptr
102
103 lock mutex
104 closing bool
105 rrun bool
106 wrun bool
107 user uint32
108 rseq uintptr
109 rt timer
110 rd int64
111 wseq uintptr
112 wt timer
113 wd int64
114 self *pollDesc
115 }
116
117
118
119
120
121 type pollInfo uint32
122
123 const (
124 pollClosing = 1 << iota
125 pollEventErr
126 pollExpiredReadDeadline
127 pollExpiredWriteDeadline
128 pollFDSeq
129 )
130
131 const (
132 pollFDSeqBits = 20
133 pollFDSeqMask = 1<<pollFDSeqBits - 1
134 )
135
136 func (i pollInfo) closing() bool { return i&pollClosing != 0 }
137 func (i pollInfo) eventErr() bool { return i&pollEventErr != 0 }
138 func (i pollInfo) expiredReadDeadline() bool { return i&pollExpiredReadDeadline != 0 }
139 func (i pollInfo) expiredWriteDeadline() bool { return i&pollExpiredWriteDeadline != 0 }
140
141
142 func (pd *pollDesc) info() pollInfo {
143 return pollInfo(pd.atomicInfo.Load())
144 }
145
146
147
148
149
150
151
152
153 func (pd *pollDesc) publishInfo() {
154 var info uint32
155 if pd.closing {
156 info |= pollClosing
157 }
158 if pd.rd < 0 {
159 info |= pollExpiredReadDeadline
160 }
161 if pd.wd < 0 {
162 info |= pollExpiredWriteDeadline
163 }
164 info |= uint32(pd.fdseq.Load()&pollFDSeqMask) << pollFDSeq
165
166
167 x := pd.atomicInfo.Load()
168 for !pd.atomicInfo.CompareAndSwap(x, (x&pollEventErr)|info) {
169 x = pd.atomicInfo.Load()
170 }
171 }
172
173
174
175
176 func (pd *pollDesc) setEventErr(b bool, seq uintptr) {
177 mSeq := uint32(seq & pollFDSeqMask)
178 x := pd.atomicInfo.Load()
179 xSeq := (x >> pollFDSeq) & pollFDSeqMask
180 if seq != 0 && xSeq != mSeq {
181 return
182 }
183 for (x&pollEventErr != 0) != b && !pd.atomicInfo.CompareAndSwap(x, x^pollEventErr) {
184 x = pd.atomicInfo.Load()
185 xSeq := (x >> pollFDSeq) & pollFDSeqMask
186 if seq != 0 && xSeq != mSeq {
187 return
188 }
189 }
190 }
191
192 type pollCache struct {
193 lock mutex
194 first *pollDesc
195
196
197
198
199
200 }
201
202 var (
203 netpollInitLock mutex
204 netpollInited atomic.Uint32
205
206 pollcache pollCache
207 netpollWaiters atomic.Uint32
208 )
209
210
211
212
213
214 func poll_runtime_pollServerInit() {
215 netpollGenericInit()
216 }
217
218 func netpollGenericInit() {
219 if netpollInited.Load() == 0 {
220 lockInit(&netpollInitLock, lockRankNetpollInit)
221 lockInit(&pollcache.lock, lockRankPollCache)
222 lock(&netpollInitLock)
223 if netpollInited.Load() == 0 {
224 netpollinit()
225 netpollInited.Store(1)
226 }
227 unlock(&netpollInitLock)
228 }
229 }
230
231 func netpollinited() bool {
232 return netpollInited.Load() != 0
233 }
234
235
236
237
238
239 func poll_runtime_isPollServerDescriptor(fd uintptr) bool {
240 return netpollIsPollDescriptor(fd)
241 }
242
243
244 func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
245 pd := pollcache.alloc()
246 lock(&pd.lock)
247 wg := pd.wg.Load()
248 if wg != pdNil && wg != pdReady {
249 throw("runtime: blocked write on free polldesc")
250 }
251 rg := pd.rg.Load()
252 if rg != pdNil && rg != pdReady {
253 throw("runtime: blocked read on free polldesc")
254 }
255 pd.fd = fd
256 if pd.fdseq.Load() == 0 {
257
258 pd.fdseq.Store(1)
259 }
260 pd.closing = false
261 pd.setEventErr(false, 0)
262 pd.rseq++
263 pd.rg.Store(pdNil)
264 pd.rd = 0
265 pd.wseq++
266 pd.wg.Store(pdNil)
267 pd.wd = 0
268 pd.self = pd
269 pd.publishInfo()
270 unlock(&pd.lock)
271
272 errno := netpollopen(fd, pd)
273 if errno != 0 {
274 pollcache.free(pd)
275 return nil, int(errno)
276 }
277 return pd, 0
278 }
279
280
281 func poll_runtime_pollClose(pd *pollDesc) {
282 if !pd.closing {
283 throw("runtime: close polldesc w/o unblock")
284 }
285 wg := pd.wg.Load()
286 if wg != pdNil && wg != pdReady {
287 throw("runtime: blocked write on closing polldesc")
288 }
289 rg := pd.rg.Load()
290 if rg != pdNil && rg != pdReady {
291 throw("runtime: blocked read on closing polldesc")
292 }
293 netpollclose(pd.fd)
294 pollcache.free(pd)
295 }
296
297 func (c *pollCache) free(pd *pollDesc) {
298
299
300 lock(&pd.lock)
301
302
303
304 fdseq := pd.fdseq.Load()
305 fdseq = (fdseq + 1) & (1<<taggedPointerBits - 1)
306 pd.fdseq.Store(fdseq)
307
308 pd.publishInfo()
309
310 unlock(&pd.lock)
311
312 lock(&c.lock)
313 pd.link = c.first
314 c.first = pd
315 unlock(&c.lock)
316 }
317
318
319
320
321
322
323 func poll_runtime_pollReset(pd *pollDesc, mode int) int {
324 errcode := netpollcheckerr(pd, int32(mode))
325 if errcode != pollNoError {
326 return errcode
327 }
328 if mode == 'r' {
329 pd.rg.Store(pdNil)
330 } else if mode == 'w' {
331 pd.wg.Store(pdNil)
332 }
333 return pollNoError
334 }
335
336
337
338
339
340
341
342 func poll_runtime_pollWait(pd *pollDesc, mode int) int {
343 errcode := netpollcheckerr(pd, int32(mode))
344 if errcode != pollNoError {
345 return errcode
346 }
347
348 if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" || GOOS == "wasip1" {
349 netpollarm(pd, mode)
350 }
351 for !netpollblock(pd, int32(mode), false) {
352 errcode = netpollcheckerr(pd, int32(mode))
353 if errcode != pollNoError {
354 return errcode
355 }
356
357
358
359 }
360 return pollNoError
361 }
362
363
364 func poll_runtime_pollWaitCanceled(pd *pollDesc, mode int) {
365
366
367 for !netpollblock(pd, int32(mode), true) {
368 }
369 }
370
371
372 func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
373 lock(&pd.lock)
374 if pd.closing {
375 unlock(&pd.lock)
376 return
377 }
378 rd0, wd0 := pd.rd, pd.wd
379 combo0 := rd0 > 0 && rd0 == wd0
380 if d > 0 {
381 d += nanotime()
382 if d <= 0 {
383
384
385 d = 1<<63 - 1
386 }
387 }
388 if mode == 'r' || mode == 'r'+'w' {
389 pd.rd = d
390 }
391 if mode == 'w' || mode == 'r'+'w' {
392 pd.wd = d
393 }
394 pd.publishInfo()
395 combo := pd.rd > 0 && pd.rd == pd.wd
396 rtf := netpollReadDeadline
397 if combo {
398 rtf = netpollDeadline
399 }
400 if !pd.rrun {
401 if pd.rd > 0 {
402
403
404
405 pd.rt.modify(pd.rd, 0, rtf, pd.makeArg(), pd.rseq)
406 pd.rrun = true
407 }
408 } else if pd.rd != rd0 || combo != combo0 {
409 pd.rseq++
410 if pd.rd > 0 {
411 pd.rt.modify(pd.rd, 0, rtf, pd.makeArg(), pd.rseq)
412 } else {
413 pd.rt.stop()
414 pd.rrun = false
415 }
416 }
417 if !pd.wrun {
418 if pd.wd > 0 && !combo {
419 pd.wt.modify(pd.wd, 0, netpollWriteDeadline, pd.makeArg(), pd.wseq)
420 pd.wrun = true
421 }
422 } else if pd.wd != wd0 || combo != combo0 {
423 pd.wseq++
424 if pd.wd > 0 && !combo {
425 pd.wt.modify(pd.wd, 0, netpollWriteDeadline, pd.makeArg(), pd.wseq)
426 } else {
427 pd.wt.stop()
428 pd.wrun = false
429 }
430 }
431
432
433 delta := int32(0)
434 var rg, wg *g
435 if pd.rd < 0 {
436 rg = netpollunblock(pd, 'r', false, &delta)
437 }
438 if pd.wd < 0 {
439 wg = netpollunblock(pd, 'w', false, &delta)
440 }
441 unlock(&pd.lock)
442 if rg != nil {
443 netpollgoready(rg, 3)
444 }
445 if wg != nil {
446 netpollgoready(wg, 3)
447 }
448 netpollAdjustWaiters(delta)
449 }
450
451
452 func poll_runtime_pollUnblock(pd *pollDesc) {
453 lock(&pd.lock)
454 if pd.closing {
455 throw("runtime: unblock on closing polldesc")
456 }
457 pd.closing = true
458 pd.rseq++
459 pd.wseq++
460 var rg, wg *g
461 pd.publishInfo()
462 delta := int32(0)
463 rg = netpollunblock(pd, 'r', false, &delta)
464 wg = netpollunblock(pd, 'w', false, &delta)
465 if pd.rrun {
466 pd.rt.stop()
467 pd.rrun = false
468 }
469 if pd.wrun {
470 pd.wt.stop()
471 pd.wrun = false
472 }
473 unlock(&pd.lock)
474 if rg != nil {
475 netpollgoready(rg, 3)
476 }
477 if wg != nil {
478 netpollgoready(wg, 3)
479 }
480 netpollAdjustWaiters(delta)
481 }
482
483
484
485
486
487
488
489
490
491
492
493
494 func netpollready(toRun *gList, pd *pollDesc, mode int32) int32 {
495 delta := int32(0)
496 var rg, wg *g
497 if mode == 'r' || mode == 'r'+'w' {
498 rg = netpollunblock(pd, 'r', true, &delta)
499 }
500 if mode == 'w' || mode == 'r'+'w' {
501 wg = netpollunblock(pd, 'w', true, &delta)
502 }
503 if rg != nil {
504 toRun.push(rg)
505 }
506 if wg != nil {
507 toRun.push(wg)
508 }
509 return delta
510 }
511
512 func netpollcheckerr(pd *pollDesc, mode int32) int {
513 info := pd.info()
514 if info.closing() {
515 return pollErrClosing
516 }
517 if (mode == 'r' && info.expiredReadDeadline()) || (mode == 'w' && info.expiredWriteDeadline()) {
518 return pollErrTimeout
519 }
520
521
522
523 if mode == 'r' && info.eventErr() {
524 return pollErrNotPollable
525 }
526 return pollNoError
527 }
528
529 func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
530 r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
531 if r {
532
533
534
535 netpollAdjustWaiters(1)
536 }
537 return r
538 }
539
540 func netpollgoready(gp *g, traceskip int) {
541 goready(gp, traceskip+1)
542 }
543
544
545
546
547
548 func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
549 gpp := &pd.rg
550 if mode == 'w' {
551 gpp = &pd.wg
552 }
553
554
555 for {
556
557 if gpp.CompareAndSwap(pdReady, pdNil) {
558 return true
559 }
560 if gpp.CompareAndSwap(pdNil, pdWait) {
561 break
562 }
563
564
565
566 if v := gpp.Load(); v != pdReady && v != pdNil {
567 throw("runtime: double wait")
568 }
569 }
570
571
572
573
574 if waitio || netpollcheckerr(pd, mode) == pollNoError {
575 gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceBlockNet, 5)
576 }
577
578 old := gpp.Swap(pdNil)
579 if old > pdWait {
580 throw("runtime: corrupted polldesc")
581 }
582 return old == pdReady
583 }
584
585
586
587
588
589
590
591 func netpollunblock(pd *pollDesc, mode int32, ioready bool, delta *int32) *g {
592 gpp := &pd.rg
593 if mode == 'w' {
594 gpp = &pd.wg
595 }
596
597 for {
598 old := gpp.Load()
599 if old == pdReady {
600 return nil
601 }
602 if old == pdNil && !ioready {
603
604
605 return nil
606 }
607 new := pdNil
608 if ioready {
609 new = pdReady
610 }
611 if gpp.CompareAndSwap(old, new) {
612 if old == pdWait {
613 old = pdNil
614 } else if old != pdNil {
615 *delta -= 1
616 }
617 return (*g)(unsafe.Pointer(old))
618 }
619 }
620 }
621
622 func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
623 lock(&pd.lock)
624
625
626 currentSeq := pd.rseq
627 if !read {
628 currentSeq = pd.wseq
629 }
630 if seq != currentSeq {
631
632 unlock(&pd.lock)
633 return
634 }
635 delta := int32(0)
636 var rg *g
637 if read {
638 if pd.rd <= 0 || !pd.rrun {
639 throw("runtime: inconsistent read deadline")
640 }
641 pd.rd = -1
642 pd.publishInfo()
643 rg = netpollunblock(pd, 'r', false, &delta)
644 }
645 var wg *g
646 if write {
647 if pd.wd <= 0 || !pd.wrun && !read {
648 throw("runtime: inconsistent write deadline")
649 }
650 pd.wd = -1
651 pd.publishInfo()
652 wg = netpollunblock(pd, 'w', false, &delta)
653 }
654 unlock(&pd.lock)
655 if rg != nil {
656 netpollgoready(rg, 0)
657 }
658 if wg != nil {
659 netpollgoready(wg, 0)
660 }
661 netpollAdjustWaiters(delta)
662 }
663
664 func netpollDeadline(arg any, seq uintptr, delta int64) {
665 netpolldeadlineimpl(arg.(*pollDesc), seq, true, true)
666 }
667
668 func netpollReadDeadline(arg any, seq uintptr, delta int64) {
669 netpolldeadlineimpl(arg.(*pollDesc), seq, true, false)
670 }
671
672 func netpollWriteDeadline(arg any, seq uintptr, delta int64) {
673 netpolldeadlineimpl(arg.(*pollDesc), seq, false, true)
674 }
675
676
677 func netpollAnyWaiters() bool {
678 return netpollWaiters.Load() > 0
679 }
680
681
682 func netpollAdjustWaiters(delta int32) {
683 if delta != 0 {
684 netpollWaiters.Add(delta)
685 }
686 }
687
688 func (c *pollCache) alloc() *pollDesc {
689 lock(&c.lock)
690 if c.first == nil {
691 const pdSize = unsafe.Sizeof(pollDesc{})
692 n := pollBlockSize / pdSize
693 if n == 0 {
694 n = 1
695 }
696
697
698 mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
699 for i := uintptr(0); i < n; i++ {
700 pd := (*pollDesc)(add(mem, i*pdSize))
701 lockInit(&pd.lock, lockRankPollDesc)
702 pd.rt.init(nil, nil)
703 pd.wt.init(nil, nil)
704 pd.link = c.first
705 c.first = pd
706 }
707 }
708 pd := c.first
709 c.first = pd.link
710 unlock(&c.lock)
711 return pd
712 }
713
714
715
716
717
718
719 func (pd *pollDesc) makeArg() (i any) {
720 x := (*eface)(unsafe.Pointer(&i))
721 x._type = pdType
722 x.data = unsafe.Pointer(&pd.self)
723 return
724 }
725
726 var (
727 pdEface any = (*pollDesc)(nil)
728 pdType *_type = efaceOf(&pdEface)._type
729 )
730
View as plain text