1
2
3
4
5 package quic
6
7 import (
8 "context"
9 "sync"
10 "sync/atomic"
11 "time"
12 )
13
14 type streamsState struct {
15 queue queue[*Stream]
16
17
18
19
20
21
22
23
24 streams map[streamID]maybeStream
25
26
27 localLimit [streamTypeCount]localStreamLimits
28 remoteLimit [streamTypeCount]remoteStreamLimits
29
30
31 peerInitialMaxStreamDataRemote [streamTypeCount]int64
32 peerInitialMaxStreamDataBidiLocal int64
33
34
35 inflow connInflow
36 outflow connOutflow
37
38
39
40 needSend atomic.Bool
41 sendMu sync.Mutex
42 queueMeta streamRing
43 queueData streamRing
44 }
45
46
47 type maybeStream struct {
48 s *Stream
49 }
50
51 func (c *Conn) streamsInit() {
52 c.streams.streams = make(map[streamID]maybeStream)
53 c.streams.queue = newQueue[*Stream]()
54 c.streams.localLimit[bidiStream].init()
55 c.streams.localLimit[uniStream].init()
56 c.streams.remoteLimit[bidiStream].init(c.config.maxBidiRemoteStreams())
57 c.streams.remoteLimit[uniStream].init(c.config.maxUniRemoteStreams())
58 c.inflowInit()
59 }
60
61 func (c *Conn) streamsCleanup() {
62 c.streams.queue.close(errConnClosed)
63 c.streams.localLimit[bidiStream].connHasClosed()
64 c.streams.localLimit[uniStream].connHasClosed()
65 for _, s := range c.streams.streams {
66 if s.s != nil {
67 s.s.connHasClosed()
68 }
69 }
70 }
71
72
73 func (c *Conn) AcceptStream(ctx context.Context) (*Stream, error) {
74 return c.streams.queue.get(ctx)
75 }
76
77
78
79
80
81 func (c *Conn) NewStream(ctx context.Context) (*Stream, error) {
82 return c.newLocalStream(ctx, bidiStream)
83 }
84
85
86
87
88
89 func (c *Conn) NewSendOnlyStream(ctx context.Context) (*Stream, error) {
90 return c.newLocalStream(ctx, uniStream)
91 }
92
93 func (c *Conn) newLocalStream(ctx context.Context, styp streamType) (*Stream, error) {
94 num, err := c.streams.localLimit[styp].open(ctx, c)
95 if err != nil {
96 return nil, err
97 }
98
99 s := newStream(c, newStreamID(c.side, styp, num))
100 s.outmaxbuf = c.config.maxStreamWriteBufferSize()
101 s.outwin = c.streams.peerInitialMaxStreamDataRemote[styp]
102 if styp == bidiStream {
103 s.inmaxbuf = c.config.maxStreamReadBufferSize()
104 s.inwin = c.config.maxStreamReadBufferSize()
105 }
106 s.inUnlock()
107 s.outUnlock()
108
109
110 if err := c.runOnLoop(ctx, func(now time.Time, c *Conn) {
111 c.streams.streams[s.id] = maybeStream{s}
112 }); err != nil {
113 return nil, err
114 }
115 return s, nil
116 }
117
118
119
120
121
122
123 type streamFrameType uint8
124
125 const (
126 sendStream = streamFrameType(iota)
127 recvStream
128 )
129
130
131
132 func (c *Conn) streamForID(id streamID) *Stream {
133 return c.streams.streams[id].s
134 }
135
136
137
138
139
140
141
142
143
144
145 func (c *Conn) streamForFrame(now time.Time, id streamID, ftype streamFrameType) *Stream {
146 if id.streamType() == uniStream {
147 if (id.initiator() == c.side) != (ftype == sendStream) {
148
149
150 c.abort(now, localTransportError{
151 code: errStreamState,
152 reason: "invalid frame for unidirectional stream",
153 })
154 return nil
155 }
156 }
157
158 ms, isOpen := c.streams.streams[id]
159 if ms.s != nil {
160 return ms.s
161 }
162
163 num := id.num()
164 styp := id.streamType()
165 if id.initiator() == c.side {
166
167 if c.streams.localLimit[styp].wasOpened(num) {
168 return nil
169 }
170
171
172 c.abort(now, localTransportError{
173 code: errStreamState,
174 reason: "received frame for unknown stream",
175 })
176 return nil
177 } else {
178
179
180
181 if !isOpen && num < c.streams.remoteLimit[styp].opened {
182
183 return nil
184 }
185 }
186
187 prevOpened := c.streams.remoteLimit[styp].opened
188 if err := c.streams.remoteLimit[styp].open(id); err != nil {
189 c.abort(now, err)
190 return nil
191 }
192
193
194
195
196 for n := newStreamID(id.initiator(), id.streamType(), prevOpened); n < id; n += 4 {
197 c.streams.streams[n] = maybeStream{}
198 }
199
200 s := newStream(c, id)
201 s.inmaxbuf = c.config.maxStreamReadBufferSize()
202 s.inwin = c.config.maxStreamReadBufferSize()
203 if id.streamType() == bidiStream {
204 s.outmaxbuf = c.config.maxStreamWriteBufferSize()
205 s.outwin = c.streams.peerInitialMaxStreamDataBidiLocal
206 }
207 s.inUnlock()
208 s.outUnlock()
209
210 c.streams.streams[id] = maybeStream{s}
211 c.streams.queue.put(s)
212 return s
213 }
214
215
216 func (c *Conn) maybeQueueStreamForSend(s *Stream, state streamState) {
217 if state.wantQueue() == state.inQueue() {
218 return
219 }
220 c.streams.sendMu.Lock()
221 defer c.streams.sendMu.Unlock()
222 state = s.state.load()
223 c.queueStreamForSendLocked(s, state)
224
225 c.streams.needSend.Store(true)
226 c.wake()
227 }
228
229
230
231
232
233 func (c *Conn) queueStreamForSendLocked(s *Stream, state streamState) {
234 for {
235 wantQueue := state.wantQueue()
236 inQueue := state.inQueue()
237 if inQueue == wantQueue {
238 return
239 }
240
241 switch inQueue {
242 case metaQueue:
243 c.streams.queueMeta.remove(s)
244 case dataQueue:
245 c.streams.queueData.remove(s)
246 }
247
248 switch wantQueue {
249 case metaQueue:
250 c.streams.queueMeta.append(s)
251 state = s.state.set(streamQueueMeta, streamQueueMeta|streamQueueData)
252 case dataQueue:
253 c.streams.queueData.append(s)
254 state = s.state.set(streamQueueData, streamQueueMeta|streamQueueData)
255 case noQueue:
256 state = s.state.set(0, streamQueueMeta|streamQueueData)
257 }
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273 }
274 }
275
276
277
278
279
280 func (c *Conn) appendStreamFrames(w *packetWriter, pnum packetNumber, pto bool) bool {
281
282 if !c.appendMaxDataFrame(w, pnum, pto) {
283 return false
284 }
285
286 if pto {
287 return c.appendStreamFramesPTO(w, pnum)
288 }
289 if !c.streams.needSend.Load() {
290
291
292
293 return c.appendMaxStreams(w, pnum, pto)
294 }
295 c.streams.sendMu.Lock()
296 defer c.streams.sendMu.Unlock()
297
298 for c.streams.queueMeta.head != nil {
299 s := c.streams.queueMeta.head
300 state := s.state.load()
301 if state&(streamQueueMeta|streamConnRemoved) != streamQueueMeta {
302 panic("BUG: queueMeta stream is not streamQueueMeta")
303 }
304 if state&streamInSendMeta != 0 {
305 s.ingate.lock()
306 ok := s.appendInFramesLocked(w, pnum, pto)
307 state = s.inUnlockNoQueue()
308 if !ok {
309 return false
310 }
311 if state&streamInSendMeta != 0 {
312 panic("BUG: streamInSendMeta set after successfully appending frames")
313 }
314 }
315 if state&streamOutSendMeta != 0 {
316 s.outgate.lock()
317
318
319 ok := s.appendOutFramesLocked(w, pnum, pto)
320 state = s.outUnlockNoQueue()
321
322
323
324 if !ok && state&streamOutSendMeta != 0 {
325 return false
326 }
327 if state&streamOutSendMeta != 0 {
328 panic("BUG: streamOutSendMeta set after successfully appending frames")
329 }
330 }
331
332 c.streams.queueMeta.remove(s)
333 if state&(streamInDone|streamOutDone) == streamInDone|streamOutDone {
334
335 state = s.state.set(streamConnRemoved, streamQueueMeta|streamConnRemoved)
336 delete(c.streams.streams, s.id)
337
338
339
340 if s.id.initiator() != c.side {
341 c.streams.remoteLimit[s.id.streamType()].close()
342 }
343 } else {
344 state = s.state.set(0, streamQueueMeta|streamConnRemoved)
345 }
346
347
348
349
350 c.queueStreamForSendLocked(s, state)
351 }
352
353
354 if !c.appendMaxStreams(w, pnum, pto) {
355 return false
356 }
357
358
359 for c.streams.queueData.head != nil {
360 avail := c.streams.outflow.avail()
361 if avail == 0 {
362 break
363 }
364 s := c.streams.queueData.head
365 s.outgate.lock()
366 ok := s.appendOutFramesLocked(w, pnum, pto)
367 state := s.outUnlockNoQueue()
368 if !ok {
369
370
371
372
373
374
375
376 if avail > 512 {
377 c.streams.queueData.head = s.next
378 }
379 return false
380 }
381 if state&streamQueueData == 0 {
382 panic("BUG: queueData stream is not streamQueueData")
383 }
384 if state&streamOutSendData != 0 {
385
386
387
388
389
390 if c.streams.outflow.avail() != 0 {
391 panic("BUG: streamOutSendData set and flow control available after send")
392 }
393 c.streams.queueData.head = s.next
394 return true
395 }
396 c.streams.queueData.remove(s)
397 state = s.state.set(0, streamQueueData)
398 c.queueStreamForSendLocked(s, state)
399 }
400 if c.streams.queueMeta.head == nil && c.streams.queueData.head == nil {
401 c.streams.needSend.Store(false)
402 }
403 return true
404 }
405
406
407
408
409
410
411 func (c *Conn) appendStreamFramesPTO(w *packetWriter, pnum packetNumber) bool {
412 const pto = true
413 if !c.appendMaxStreams(w, pnum, pto) {
414 return false
415 }
416 c.streams.sendMu.Lock()
417 defer c.streams.sendMu.Unlock()
418 for _, ms := range c.streams.streams {
419 s := ms.s
420 if s == nil {
421 continue
422 }
423 const pto = true
424 s.ingate.lock()
425 inOK := s.appendInFramesLocked(w, pnum, pto)
426 s.inUnlockNoQueue()
427 if !inOK {
428 return false
429 }
430
431 s.outgate.lock()
432 outOK := s.appendOutFramesLocked(w, pnum, pto)
433 s.outUnlockNoQueue()
434 if !outOK {
435 return false
436 }
437 }
438 return true
439 }
440
441 func (c *Conn) appendMaxStreams(w *packetWriter, pnum packetNumber, pto bool) bool {
442 if !c.streams.remoteLimit[uniStream].appendFrame(w, uniStream, pnum, pto) {
443 return false
444 }
445 if !c.streams.remoteLimit[bidiStream].appendFrame(w, bidiStream, pnum, pto) {
446 return false
447 }
448 return true
449 }
450
451
452 type streamRing struct {
453 head *Stream
454 }
455
456
457
458 func (r *streamRing) remove(s *Stream) {
459 if s.next == s {
460 r.head = nil
461 } else {
462 s.prev.next = s.next
463 s.next.prev = s.prev
464 if r.head == s {
465 r.head = s.next
466 }
467 }
468 }
469
470
471
472 func (r *streamRing) append(s *Stream) {
473 if r.head == nil {
474 r.head = s
475 s.next = s
476 s.prev = s
477 } else {
478 s.prev = r.head.prev
479 s.next = r.head
480 s.prev.next = s
481 s.next.prev = s
482 }
483 }
484
View as plain text