Source file
src/runtime/netpoll_kqueue.go
1
2
3
4
5
6
7 package runtime
8
9
10
11 import (
12 "internal/goarch"
13 "internal/runtime/atomic"
14 "unsafe"
15 )
16
17 var (
18 kq int32 = -1
19 netpollWakeSig atomic.Uint32
20 )
21
22 func netpollinit() {
23 kq = kqueue()
24 if kq < 0 {
25 println("runtime: kqueue failed with", -kq)
26 throw("runtime: netpollinit failed")
27 }
28 closeonexec(kq)
29 addWakeupEvent(kq)
30 }
31
32 func netpollopen(fd uintptr, pd *pollDesc) int32 {
33
34
35
36 var ev [2]keventt
37 *(*uintptr)(unsafe.Pointer(&ev[0].ident)) = fd
38 ev[0].filter = _EVFILT_READ
39 ev[0].flags = _EV_ADD | _EV_CLEAR
40 ev[0].fflags = 0
41 ev[0].data = 0
42
43 if goarch.PtrSize == 4 {
44
45
46
47
48 ev[0].udata = (*byte)(unsafe.Pointer(pd))
49 } else {
50 tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())
51 ev[0].udata = (*byte)(unsafe.Pointer(uintptr(tp)))
52 }
53 ev[1] = ev[0]
54 ev[1].filter = _EVFILT_WRITE
55 n := kevent(kq, &ev[0], 2, nil, 0, nil)
56 if n < 0 {
57 return -n
58 }
59 return 0
60 }
61
62 func netpollclose(fd uintptr) int32 {
63
64
65 return 0
66 }
67
68 func netpollarm(pd *pollDesc, mode int) {
69 throw("runtime: unused")
70 }
71
72
73 func netpollBreak() {
74
75 if !netpollWakeSig.CompareAndSwap(0, 1) {
76 return
77 }
78
79 wakeNetpoll(kq)
80 }
81
82
83
84
85
86
87
88
89
90 func netpoll(delay int64) (gList, int32) {
91 if kq == -1 {
92 return gList{}, 0
93 }
94 var tp *timespec
95 var ts timespec
96 if delay < 0 {
97 tp = nil
98 } else if delay == 0 {
99 tp = &ts
100 } else {
101 ts.setNsec(delay)
102 if ts.tv_sec > 1e6 {
103
104 ts.tv_sec = 1e6
105 }
106 tp = &ts
107 }
108 var events [64]keventt
109 retry:
110 n := kevent(kq, nil, 0, &events[0], int32(len(events)), tp)
111 if n < 0 {
112
113
114
115 if n != -_EINTR && n != -_ETIMEDOUT {
116 println("runtime: kevent on fd", kq, "failed with", -n)
117 throw("runtime: netpoll failed")
118 }
119
120
121 if delay > 0 {
122 return gList{}, 0
123 }
124 goto retry
125 }
126 var toRun gList
127 delta := int32(0)
128 for i := 0; i < int(n); i++ {
129 ev := &events[i]
130
131 if isWakeup(ev) {
132 if delay != 0 {
133
134
135 drainWakeupEvent(kq)
136 netpollWakeSig.Store(0)
137 }
138 continue
139 }
140
141 var mode int32
142 switch ev.filter {
143 case _EVFILT_READ:
144 mode += 'r'
145
146
147
148
149
150
151
152
153
154
155 if ev.flags&_EV_EOF != 0 {
156 mode += 'w'
157 }
158 case _EVFILT_WRITE:
159 mode += 'w'
160 }
161 if mode != 0 {
162 var pd *pollDesc
163 var tag uintptr
164 if goarch.PtrSize == 4 {
165
166
167 pd = (*pollDesc)(unsafe.Pointer(ev.udata))
168 tag = 0
169 } else {
170 tp := taggedPointer(uintptr(unsafe.Pointer(ev.udata)))
171 pd = (*pollDesc)(tp.pointer())
172 tag = tp.tag()
173 if pd.fdseq.Load() != tag {
174 continue
175 }
176 }
177 pd.setEventErr(ev.flags == _EV_ERROR, tag)
178 delta += netpollready(&toRun, pd, mode)
179 }
180 }
181 return toRun, delta
182 }
183
View as plain text