1
2
3
4
5 package poll
6
7 import "sync/atomic"
8
9
10
11
12 type fdMutex struct {
13 state uint64
14 rsema uint32
15 wsema uint32
16 }
17
18
19
20
21
22
23
24
25 const (
26 mutexClosed = 1 << 0
27 mutexRLock = 1 << 1
28 mutexWLock = 1 << 2
29 mutexRef = 1 << 3
30 mutexRefMask = (1<<20 - 1) << 3
31 mutexRWait = 1 << 23
32 mutexRMask = (1<<20 - 1) << 23
33 mutexWWait = 1 << 43
34 mutexWMask = (1<<20 - 1) << 43
35 )
36
37 const overflowMsg = "too many concurrent operations on a single file or socket (max 1048575)"
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 func (mu *fdMutex) incref() bool {
54 for {
55 old := atomic.LoadUint64(&mu.state)
56 if old&mutexClosed != 0 {
57 return false
58 }
59 new := old + mutexRef
60 if new&mutexRefMask == 0 {
61 panic(overflowMsg)
62 }
63 if atomic.CompareAndSwapUint64(&mu.state, old, new) {
64 return true
65 }
66 }
67 }
68
69
70
71 func (mu *fdMutex) increfAndClose() bool {
72 for {
73 old := atomic.LoadUint64(&mu.state)
74 if old&mutexClosed != 0 {
75 return false
76 }
77
78 new := (old | mutexClosed) + mutexRef
79 if new&mutexRefMask == 0 {
80 panic(overflowMsg)
81 }
82
83 new &^= mutexRMask | mutexWMask
84 if atomic.CompareAndSwapUint64(&mu.state, old, new) {
85
86
87 for old&mutexRMask != 0 {
88 old -= mutexRWait
89 runtime_Semrelease(&mu.rsema)
90 }
91 for old&mutexWMask != 0 {
92 old -= mutexWWait
93 runtime_Semrelease(&mu.wsema)
94 }
95 return true
96 }
97 }
98 }
99
100
101
102 func (mu *fdMutex) decref() bool {
103 for {
104 old := atomic.LoadUint64(&mu.state)
105 if old&mutexRefMask == 0 {
106 panic("inconsistent poll.fdMutex")
107 }
108 new := old - mutexRef
109 if atomic.CompareAndSwapUint64(&mu.state, old, new) {
110 return new&(mutexClosed|mutexRefMask) == mutexClosed
111 }
112 }
113 }
114
115
116
117 func (mu *fdMutex) rwlock(read bool) bool {
118 var mutexBit, mutexWait, mutexMask uint64
119 var mutexSema *uint32
120 if read {
121 mutexBit = mutexRLock
122 mutexWait = mutexRWait
123 mutexMask = mutexRMask
124 mutexSema = &mu.rsema
125 } else {
126 mutexBit = mutexWLock
127 mutexWait = mutexWWait
128 mutexMask = mutexWMask
129 mutexSema = &mu.wsema
130 }
131 for {
132 old := atomic.LoadUint64(&mu.state)
133 if old&mutexClosed != 0 {
134 return false
135 }
136 var new uint64
137 if old&mutexBit == 0 {
138
139 new = (old | mutexBit) + mutexRef
140 if new&mutexRefMask == 0 {
141 panic(overflowMsg)
142 }
143 } else {
144
145 new = old + mutexWait
146 if new&mutexMask == 0 {
147 panic(overflowMsg)
148 }
149 }
150 if atomic.CompareAndSwapUint64(&mu.state, old, new) {
151 if old&mutexBit == 0 {
152 return true
153 }
154 runtime_Semacquire(mutexSema)
155
156 }
157 }
158 }
159
160
161
162 func (mu *fdMutex) rwunlock(read bool) bool {
163 var mutexBit, mutexWait, mutexMask uint64
164 var mutexSema *uint32
165 if read {
166 mutexBit = mutexRLock
167 mutexWait = mutexRWait
168 mutexMask = mutexRMask
169 mutexSema = &mu.rsema
170 } else {
171 mutexBit = mutexWLock
172 mutexWait = mutexWWait
173 mutexMask = mutexWMask
174 mutexSema = &mu.wsema
175 }
176 for {
177 old := atomic.LoadUint64(&mu.state)
178 if old&mutexBit == 0 || old&mutexRefMask == 0 {
179 panic("inconsistent poll.fdMutex")
180 }
181
182 new := (old &^ mutexBit) - mutexRef
183 if old&mutexMask != 0 {
184 new -= mutexWait
185 }
186 if atomic.CompareAndSwapUint64(&mu.state, old, new) {
187 if old&mutexMask != 0 {
188 runtime_Semrelease(mutexSema)
189 }
190 return new&(mutexClosed|mutexRefMask) == mutexClosed
191 }
192 }
193 }
194
195
196 func runtime_Semacquire(sema *uint32)
197 func runtime_Semrelease(sema *uint32)
198
199
200
201 func (fd *FD) incref() error {
202 if !fd.fdmu.incref() {
203 return errClosing(fd.isFile)
204 }
205 return nil
206 }
207
208
209
210
211 func (fd *FD) decref() error {
212 if fd.fdmu.decref() {
213 return fd.destroy()
214 }
215 return nil
216 }
217
218
219
220 func (fd *FD) readLock() error {
221 if !fd.fdmu.rwlock(true) {
222 return errClosing(fd.isFile)
223 }
224 return nil
225 }
226
227
228
229
230 func (fd *FD) readUnlock() {
231 if fd.fdmu.rwunlock(true) {
232 fd.destroy()
233 }
234 }
235
236
237
238 func (fd *FD) writeLock() error {
239 if !fd.fdmu.rwlock(false) {
240 return errClosing(fd.isFile)
241 }
242 return nil
243 }
244
245
246
247
248 func (fd *FD) writeUnlock() {
249 if fd.fdmu.rwunlock(false) {
250 fd.destroy()
251 }
252 }
253
View as plain text