1
2
3
4
5 package quic
6
7 import (
8 "context"
9 "errors"
10 "fmt"
11 "io"
12 "math"
13 "sync"
14
15 "golang.org/x/net/internal/quic/quicwire"
16 )
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 type Stream struct {
34 id streamID
35 conn *Conn
36
37
38
39 inctx context.Context
40 outctx context.Context
41
42
43
44
45
46 ingate gate
47 in pipe
48 inwin int64
49 insendmax sentVal
50 inmaxbuf int64
51 insize int64
52 inset rangeset[int64]
53 inclosed sentVal
54 inresetcode int64
55
56
57
58
59
60
61 outgate gate
62 out pipe
63 outflushed int64
64 outwin int64
65 outmaxsent int64
66 outmaxbuf int64
67 outunsent rangeset[int64]
68 outacked rangeset[int64]
69 outopened sentVal
70 outclosed sentVal
71 outblocked sentVal
72 outreset sentVal
73 outresetcode uint64
74 outdone chan struct{}
75
76
77 inbufmu sync.Mutex
78 inbuf []byte
79 inbufoff int
80
81 outbufmu sync.Mutex
82 outbuf []byte
83 outbufoff int
84
85
86
87
88
89
90
91
92
93
94 state atomicBits[streamState]
95
96 prev, next *Stream
97 }
98
99 type streamState uint32
100
101 const (
102
103
104
105 streamInSendMeta = streamState(1 << iota)
106
107
108
109
110
111
112
113 streamOutSendMeta
114 streamOutSendData
115
116
117
118
119 streamInDone
120 streamOutDone
121
122
123 streamConnRemoved
124
125
126
127 streamQueueMeta
128 streamQueueData
129 )
130
131 type streamQueue int
132
133 const (
134 noQueue = streamQueue(iota)
135 metaQueue
136 dataQueue
137 )
138
139
140
141
142 const streamResetByConnClose = math.MaxInt64
143
144
145 func (s streamState) wantQueue() streamQueue {
146 switch {
147 case s&(streamInSendMeta|streamOutSendMeta) != 0:
148 return metaQueue
149 case s&(streamInDone|streamOutDone|streamConnRemoved) == streamInDone|streamOutDone:
150 return metaQueue
151 case s&streamOutSendData != 0:
152
153
154
155 return dataQueue
156 }
157 return noQueue
158 }
159
160
161 func (s streamState) inQueue() streamQueue {
162 switch {
163 case s&streamQueueMeta != 0:
164 return metaQueue
165 case s&streamQueueData != 0:
166 return dataQueue
167 }
168 return noQueue
169 }
170
171
172
173
174
175
176
177 func newStream(c *Conn, id streamID) *Stream {
178 s := &Stream{
179 conn: c,
180 id: id,
181 insize: -1,
182 inresetcode: -1,
183 ingate: newLockedGate(),
184 outgate: newLockedGate(),
185 inctx: context.Background(),
186 outctx: context.Background(),
187 }
188 if !s.IsReadOnly() {
189 s.outdone = make(chan struct{})
190 }
191 return s
192 }
193
194
195
196
197
198
199 func (s *Stream) ID() int64 {
200 return int64(s.id)
201 }
202
203
204
205
206 func (s *Stream) SetReadContext(ctx context.Context) {
207 s.inctx = ctx
208 }
209
210
211
212
213
214
215 func (s *Stream) SetWriteContext(ctx context.Context) {
216 s.outctx = ctx
217 }
218
219
220
221 func (s *Stream) IsReadOnly() bool {
222 return s.id.streamType() == uniStream && s.id.initiator() != s.conn.side
223 }
224
225
226
227 func (s *Stream) IsWriteOnly() bool {
228 return s.id.streamType() == uniStream && s.id.initiator() == s.conn.side
229 }
230
231
232
233
234
235
236
237
238
239
240
241 func (s *Stream) Read(b []byte) (n int, err error) {
242 if s.IsWriteOnly() {
243 return 0, errors.New("read from write-only stream")
244 }
245
246 fastPath := false
247 s.inbufmu.Lock()
248 if len(s.inbuf) > s.inbufoff {
249
250
251 n = copy(b, s.inbuf[s.inbufoff:])
252 s.inbufoff += n
253 fastPath = true
254 }
255 s.inbufmu.Unlock()
256 if fastPath {
257 return n, nil
258 }
259
260 if err := s.ingate.waitAndLock(s.inctx); err != nil {
261 return 0, err
262 }
263
264 if s.inbufoff > 0 {
265
266 s.in.discardBefore(s.in.start + int64(s.inbufoff))
267 s.inbufmu.Lock()
268 s.inbufoff = 0
269 s.inbuf = nil
270 s.inbufmu.Unlock()
271 }
272
273
274
275
276 var bytesRead int64
277 defer func() {
278 s.inUnlock()
279 s.conn.handleStreamBytesReadOffLoop(bytesRead)
280 }()
281 if s.inresetcode != -1 {
282 if s.inresetcode == streamResetByConnClose {
283 if err := s.conn.finalError(); err != nil {
284 return 0, err
285 }
286 }
287 return 0, fmt.Errorf("stream reset by peer: %w", StreamErrorCode(s.inresetcode))
288 }
289 if s.inclosed.isSet() {
290 return 0, errors.New("read from closed stream")
291 }
292 if s.insize == s.in.start {
293 return 0, io.EOF
294 }
295
296 if len(s.inset) < 1 || s.inset[0].start != 0 || s.inset[0].end <= s.in.start {
297 panic("BUG: inconsistent input stream state")
298 }
299 if size := int(s.inset[0].end - s.in.start); size < len(b) {
300 b = b[:size]
301 }
302 bytesRead = int64(len(b))
303 start := s.in.start
304 end := start + int64(len(b))
305 s.in.copy(start, b)
306 s.in.discardBefore(end)
307 if end == s.insize {
308
309
310 return len(b), io.EOF
311 }
312
313 if len(s.inset) > 0 && s.inset[0].start <= s.in.start && s.inset[0].end > s.in.start {
314
315
316 s.inbufmu.Lock()
317 s.inbuf = s.in.peek(s.inset[0].end - s.in.start)
318 s.inbufmu.Unlock()
319 bytesRead += int64(len(s.inbuf))
320 }
321 if s.insize == -1 || s.insize > s.inwin {
322 newWindow := s.in.start + int64(len(s.inbuf)) + s.inmaxbuf
323 addedWindow := newWindow - s.inwin
324 if shouldUpdateFlowControl(s.inmaxbuf, addedWindow) {
325
326 s.insendmax.setUnsent()
327 }
328 }
329
330 return len(b), nil
331 }
332
333
334
335
336 func (s *Stream) ReadByte() (byte, error) {
337 fastPath := false
338 s.inbufmu.Lock()
339 var readByte byte
340 if len(s.inbuf) > s.inbufoff {
341 readByte = s.inbuf[s.inbufoff]
342 s.inbufoff++
343 fastPath = true
344 }
345 s.inbufmu.Unlock()
346 if fastPath {
347 return readByte, nil
348 }
349
350 var b [1]byte
351 n, err := s.Read(b[:])
352 if n > 0 {
353 return b[0], nil
354 }
355 return 0, err
356 }
357
358
359
360
361
362 func shouldUpdateFlowControl(maxWindow, addedWindow int64) bool {
363 return addedWindow >= maxWindow/8
364 }
365
366
367
368
369
370
371 func (s *Stream) Write(b []byte) (n int, err error) {
372 if s.IsReadOnly() {
373 return 0, errors.New("write to read-only stream")
374 }
375
376 fastPath := false
377 s.outbufmu.Lock()
378 if len(b) > 0 && len(s.outbuf)-s.outbufoff >= len(b) {
379
380 copy(s.outbuf[s.outbufoff:], b)
381 s.outbufoff += len(b)
382 fastPath = true
383 }
384 s.outbufmu.Unlock()
385 if fastPath {
386 return len(b), nil
387 }
388
389 canWrite := s.outgate.lock()
390 s.flushFastOutputBuffer()
391 for {
392
393
394
395 if len(b) > 0 && !canWrite {
396
397 s.outUnlock()
398 if err := s.outgate.waitAndLock(s.outctx); err != nil {
399 return n, err
400 }
401
402
403
404 }
405 if err := s.writeErrorLocked(); err != nil {
406 s.outUnlock()
407 return n, err
408 }
409 if len(b) == 0 {
410 break
411 }
412
413
414 lim := s.out.start + s.outmaxbuf
415
416
417 nn := min(int64(len(b)), lim-s.out.end)
418
419 s.out.writeAt(b[:nn], s.out.end)
420 b = b[nn:]
421 n += int(nn)
422
423
424
425
426
427
428
429
430
431 const autoFlushSize = smallestMaxDatagramSize - 1 - connIDLen - 1 - aeadOverhead
432 shouldFlush := s.out.end >= s.outwin ||
433 s.out.end >= lim ||
434 (s.out.end-s.outflushed) >= autoFlushSize
435 if shouldFlush {
436 s.flushLocked()
437 }
438 if s.out.end > s.outwin {
439
440
441 s.outblocked.set()
442 }
443
444 canWrite = false
445 }
446 if lim := s.out.start + s.outmaxbuf - s.out.end - 1; lim > 0 {
447
448
449
450
451
452
453
454
455
456
457
458 s.outbufmu.Lock()
459 s.outbuf = s.out.availableBuffer()
460 if int64(len(s.outbuf)) > lim {
461 s.outbuf = s.outbuf[:lim]
462 }
463 s.outbufmu.Unlock()
464 }
465 s.outUnlock()
466 return n, nil
467 }
468
469
470 func (s *Stream) WriteByte(c byte) error {
471 fastPath := false
472 s.outbufmu.Lock()
473 if s.outbufoff < len(s.outbuf) {
474 s.outbuf[s.outbufoff] = c
475 s.outbufoff++
476 fastPath = true
477 }
478 s.outbufmu.Unlock()
479 if fastPath {
480 return nil
481 }
482
483 b := [1]byte{c}
484 _, err := s.Write(b[:])
485 return err
486 }
487
488 func (s *Stream) flushFastOutputBuffer() {
489 s.outbufmu.Lock()
490 defer s.outbufmu.Unlock()
491 if s.outbuf == nil {
492 return
493 }
494
495
496
497 s.out.end += int64(s.outbufoff)
498 s.outbuf = nil
499 s.outbufoff = 0
500 }
501
502
503
504
505 func (s *Stream) Flush() error {
506 if s.IsReadOnly() {
507 return errors.New("flush of read-only stream")
508 }
509 s.outgate.lock()
510 defer s.outUnlock()
511 if err := s.writeErrorLocked(); err != nil {
512 return err
513 }
514 s.flushLocked()
515 return nil
516 }
517
518
519
520 func (s *Stream) writeErrorLocked() error {
521 if s.outreset.isSet() {
522 if s.outresetcode == streamResetByConnClose {
523 if err := s.conn.finalError(); err != nil {
524 return err
525 }
526 }
527 return errors.New("write to reset stream")
528 }
529 if s.outclosed.isSet() {
530 return errors.New("write to closed stream")
531 }
532 return nil
533 }
534
535 func (s *Stream) flushLocked() {
536 s.flushFastOutputBuffer()
537 s.outopened.set()
538 if s.outflushed < s.outwin {
539 s.outunsent.add(s.outflushed, min(s.outwin, s.out.end))
540 }
541 s.outflushed = s.out.end
542 }
543
544
545
546
547
548
549
550
551
552 func (s *Stream) Close() error {
553 s.CloseRead()
554 if s.IsReadOnly() {
555 return nil
556 }
557 s.CloseWrite()
558
559 if err := s.conn.waitOnDone(s.outctx, s.outdone); err != nil {
560 return err
561 }
562 s.outgate.lock()
563 defer s.outUnlock()
564 if s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end) {
565 return nil
566 }
567 return errors.New("stream reset")
568 }
569
570
571
572
573
574
575
576 func (s *Stream) CloseRead() {
577 if s.IsWriteOnly() {
578 return
579 }
580 s.ingate.lock()
581 if s.inset.isrange(0, s.insize) || s.inresetcode != -1 {
582
583
584
585 s.inclosed.setReceived()
586 } else {
587 s.inclosed.set()
588 }
589 discarded := s.in.end - s.in.start
590 s.in.discardBefore(s.in.end)
591 s.inUnlock()
592 s.conn.handleStreamBytesReadOffLoop(discarded)
593 }
594
595
596
597
598
599
600
601 func (s *Stream) CloseWrite() {
602 if s.IsReadOnly() {
603 return
604 }
605 s.outgate.lock()
606 defer s.outUnlock()
607 s.outclosed.set()
608 s.flushLocked()
609 }
610
611
612
613
614
615
616
617
618
619
620
621
622 func (s *Stream) Reset(code uint64) {
623 const userClosed = true
624 s.resetInternal(code, userClosed)
625 }
626
627
628
629
630
631 func (s *Stream) resetInternal(code uint64, userClosed bool) {
632 s.outgate.lock()
633 defer s.outUnlock()
634 if s.IsReadOnly() {
635 return
636 }
637 if userClosed {
638
639 s.outclosed.set()
640 }
641 if s.outreset.isSet() {
642 return
643 }
644 if code > quicwire.MaxVarint {
645 code = quicwire.MaxVarint
646 }
647
648
649
650 s.outreset.set()
651 s.outresetcode = code
652 s.outbufmu.Lock()
653 s.outbuf = nil
654 s.outbufoff = 0
655 s.outbufmu.Unlock()
656 s.out.discardBefore(s.out.end)
657 s.outunsent = rangeset[int64]{}
658 s.outblocked.clear()
659 }
660
661
662 func (s *Stream) connHasClosed() {
663
664
665
666 localClose := s.conn.lifetime.state == connStateClosing
667
668 s.ingate.lock()
669 if !s.inset.isrange(0, s.insize) && s.inresetcode == -1 {
670 if localClose {
671 s.inclosed.set()
672 } else {
673 s.inresetcode = streamResetByConnClose
674 }
675 }
676 s.inUnlock()
677
678 s.outgate.lock()
679 if localClose {
680 s.outclosed.set()
681 s.outreset.set()
682 } else {
683 s.outresetcode = streamResetByConnClose
684 s.outreset.setReceived()
685 }
686 s.outUnlock()
687 }
688
689
690
691
692
693 func (s *Stream) inUnlock() {
694 state := s.inUnlockNoQueue()
695 s.conn.maybeQueueStreamForSend(s, state)
696 }
697
698
699
700 func (s *Stream) inUnlockNoQueue() streamState {
701 nextByte := s.in.start + int64(len(s.inbuf))
702 canRead := s.inset.contains(nextByte) ||
703 s.insize == s.in.start+int64(len(s.inbuf)) ||
704 s.inresetcode != -1 ||
705 s.inclosed.isSet()
706 defer s.ingate.unlock(canRead)
707 var state streamState
708 switch {
709 case s.IsWriteOnly():
710 state = streamInDone
711 case s.inresetcode != -1:
712 fallthrough
713 case s.in.start == s.insize:
714
715
716 if s.inclosed.isSet() {
717 state = streamInDone
718 }
719 case s.insendmax.shouldSend():
720 state = streamInSendMeta
721 case s.inclosed.shouldSend():
722 state = streamInSendMeta
723 }
724 const mask = streamInDone | streamInSendMeta
725 return s.state.set(state, mask)
726 }
727
728
729
730
731
732 func (s *Stream) outUnlock() {
733 state := s.outUnlockNoQueue()
734 s.conn.maybeQueueStreamForSend(s, state)
735 }
736
737
738
739 func (s *Stream) outUnlockNoQueue() streamState {
740 isDone := s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end) ||
741 s.outreset.isSet()
742 if isDone {
743 select {
744 case <-s.outdone:
745 default:
746 if !s.IsReadOnly() {
747 close(s.outdone)
748 }
749 }
750 }
751 lim := s.out.start + s.outmaxbuf
752 canWrite := lim > s.out.end ||
753 s.outclosed.isSet() ||
754 s.outreset.isSet()
755 defer s.outgate.unlock(canWrite)
756 var state streamState
757 switch {
758 case s.IsReadOnly():
759 state = streamOutDone
760 case s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end):
761 fallthrough
762 case s.outreset.isReceived():
763
764
765 if s.outclosed.isSet() {
766 state = streamOutDone
767 }
768 case s.outreset.shouldSend():
769 state = streamOutSendMeta
770 case s.outreset.isSet():
771 case s.outblocked.shouldSend():
772 state = streamOutSendMeta
773 case len(s.outunsent) > 0:
774 if s.outunsent.min() < s.outmaxsent {
775 state = streamOutSendMeta
776 } else {
777 state = streamOutSendData
778 }
779 case s.outclosed.shouldSend() && s.out.end == s.outmaxsent:
780 state = streamOutSendMeta
781 case s.outopened.shouldSend():
782 state = streamOutSendMeta
783 }
784 const mask = streamOutDone | streamOutSendMeta | streamOutSendData
785 return s.state.set(state, mask)
786 }
787
788
789 func (s *Stream) handleData(off int64, b []byte, fin bool) error {
790 s.ingate.lock()
791 defer s.inUnlock()
792 end := off + int64(len(b))
793 if err := s.checkStreamBounds(end, fin); err != nil {
794 return err
795 }
796 if s.inclosed.isSet() || s.inresetcode != -1 {
797
798
799 return nil
800 }
801 if s.insize == -1 && end > s.in.end {
802 added := end - s.in.end
803 if err := s.conn.handleStreamBytesReceived(added); err != nil {
804 return err
805 }
806 }
807 if len(s.inset) > 0 && s.inset[0].contains(off) {
808
809
810
811
812
813
814
815
816
817 newOff := min(end, s.inset[0].end)
818 b = b[newOff-off:]
819 off = newOff
820 }
821 s.in.writeAt(b, off)
822 s.inset.add(off, end)
823 if fin {
824 s.insize = end
825
826 s.insendmax.clear()
827 }
828 return nil
829 }
830
831
832 func (s *Stream) handleReset(code uint64, finalSize int64) error {
833 s.ingate.lock()
834 defer s.inUnlock()
835 const fin = true
836 if err := s.checkStreamBounds(finalSize, fin); err != nil {
837 return err
838 }
839 if s.inresetcode != -1 {
840
841 return nil
842 }
843 if s.insize == -1 {
844 added := finalSize - s.in.end
845 if err := s.conn.handleStreamBytesReceived(added); err != nil {
846 return err
847 }
848 }
849 s.conn.handleStreamBytesReadOnLoop(finalSize - s.in.start)
850 s.in.discardBefore(s.in.end)
851 s.inresetcode = int64(code)
852 s.insize = finalSize
853 return nil
854 }
855
856
857 func (s *Stream) checkStreamBounds(end int64, fin bool) error {
858 if end > s.inwin {
859
860 return localTransportError{
861 code: errFlowControl,
862 reason: "stream flow control window exceeded",
863 }
864 }
865 if s.insize != -1 && end > s.insize {
866
867 return localTransportError{
868 code: errFinalSize,
869 reason: "data received past end of stream",
870 }
871 }
872 if fin && s.insize != -1 && end != s.insize {
873
874 return localTransportError{
875 code: errFinalSize,
876 reason: "final size of stream changed",
877 }
878 }
879 if fin && end < s.in.end {
880
881 return localTransportError{
882 code: errFinalSize,
883 reason: "end of stream occurs before prior data",
884 }
885 }
886 return nil
887 }
888
889
890 func (s *Stream) handleStopSending(code uint64) error {
891
892
893 const userReset = false
894 s.resetInternal(code, userReset)
895 return nil
896 }
897
898
899 func (s *Stream) handleMaxStreamData(maxStreamData int64) error {
900 s.outgate.lock()
901 defer s.outUnlock()
902 if maxStreamData <= s.outwin {
903 return nil
904 }
905 if s.outflushed > s.outwin {
906 s.outunsent.add(s.outwin, min(maxStreamData, s.outflushed))
907 }
908 s.outwin = maxStreamData
909 if s.out.end > s.outwin {
910
911 s.outblocked.setUnsent()
912 } else {
913 s.outblocked.clear()
914 }
915 return nil
916 }
917
918
919 func (s *Stream) ackOrLoss(pnum packetNumber, ftype byte, fate packetFate) {
920
921
922
923
924
925
926 switch ftype {
927 case frameTypeResetStream:
928 s.outgate.lock()
929 s.outreset.ackOrLoss(pnum, fate)
930 s.outUnlock()
931 case frameTypeStopSending:
932 s.ingate.lock()
933 s.inclosed.ackOrLoss(pnum, fate)
934 s.inUnlock()
935 case frameTypeMaxStreamData:
936 s.ingate.lock()
937 s.insendmax.ackLatestOrLoss(pnum, fate)
938 s.inUnlock()
939 case frameTypeStreamDataBlocked:
940 s.outgate.lock()
941 s.outblocked.ackLatestOrLoss(pnum, fate)
942 s.outUnlock()
943 default:
944 panic("unhandled frame type")
945 }
946 }
947
948
949 func (s *Stream) ackOrLossData(pnum packetNumber, start, end int64, fin bool, fate packetFate) {
950 s.outgate.lock()
951 defer s.outUnlock()
952 s.outopened.ackOrLoss(pnum, fate)
953 if fin {
954 s.outclosed.ackOrLoss(pnum, fate)
955 }
956 if s.outreset.isSet() {
957
958 return
959 }
960 switch fate {
961 case packetAcked:
962 s.outacked.add(start, end)
963 s.outunsent.sub(start, end)
964
965 if s.outacked.contains(s.out.start) {
966 s.out.discardBefore(s.outacked[0].end)
967 }
968 case packetLost:
969
970
971
972 s.outunsent.add(start, end)
973 for _, a := range s.outacked {
974 s.outunsent.sub(a.start, a.end)
975 }
976 }
977 }
978
979
980
981
982
983
984 func (s *Stream) appendInFramesLocked(w *packetWriter, pnum packetNumber, pto bool) bool {
985 if s.inclosed.shouldSendPTO(pto) {
986
987
988 code := uint64(0)
989 if !w.appendStopSendingFrame(s.id, code) {
990 return false
991 }
992 s.inclosed.setSent(pnum)
993 }
994
995 if s.insendmax.shouldSendPTO(pto) {
996
997 maxStreamData := s.in.start + s.inmaxbuf
998 if !w.appendMaxStreamDataFrame(s.id, maxStreamData) {
999 return false
1000 }
1001 s.inwin = maxStreamData
1002 s.insendmax.setSent(pnum)
1003 }
1004 return true
1005 }
1006
1007
1008
1009
1010
1011
1012 func (s *Stream) appendOutFramesLocked(w *packetWriter, pnum packetNumber, pto bool) bool {
1013 if s.outreset.isSet() {
1014
1015 if s.outreset.shouldSendPTO(pto) {
1016 if !w.appendResetStreamFrame(s.id, s.outresetcode, s.outmaxsent) {
1017 return false
1018 }
1019 s.outreset.setSent(pnum)
1020 s.frameOpensStream(pnum)
1021 }
1022 return true
1023 }
1024 if s.outblocked.shouldSendPTO(pto) {
1025
1026 if !w.appendStreamDataBlockedFrame(s.id, s.outwin) {
1027 return false
1028 }
1029 s.outblocked.setSent(pnum)
1030 s.frameOpensStream(pnum)
1031 }
1032 for {
1033
1034 off, size := dataToSend(min(s.out.start, s.outwin), min(s.outflushed, s.outwin), s.outunsent, s.outacked, pto)
1035 if end := off + size; end > s.outmaxsent {
1036
1037 end = min(end, s.outmaxsent+s.conn.streams.outflow.avail())
1038 end = max(end, off)
1039 size = end - off
1040 }
1041 fin := s.outclosed.isSet() && off+size == s.out.end
1042 shouldSend := size > 0 ||
1043 s.outopened.shouldSendPTO(pto) ||
1044 (fin && s.outclosed.shouldSendPTO(pto))
1045 if !shouldSend {
1046 return true
1047 }
1048 b, added := w.appendStreamFrame(s.id, off, int(size), fin)
1049 if !added {
1050 return false
1051 }
1052 s.out.copy(off, b)
1053 end := off + int64(len(b))
1054 if end > s.outmaxsent {
1055 s.conn.streams.outflow.consume(end - s.outmaxsent)
1056 s.outmaxsent = end
1057 }
1058 s.outunsent.sub(off, end)
1059 s.frameOpensStream(pnum)
1060 if fin {
1061 s.outclosed.setSent(pnum)
1062 }
1063 if pto {
1064 return true
1065 }
1066 if int64(len(b)) < size {
1067 return false
1068 }
1069 }
1070 }
1071
1072
1073
1074
1075
1076 func (s *Stream) frameOpensStream(pnum packetNumber) {
1077 if !s.outopened.isReceived() {
1078 s.outopened.setSent(pnum)
1079 }
1080 }
1081
1082
1083 func dataToSend(start, end int64, outunsent, outacked rangeset[int64], pto bool) (sendStart, size int64) {
1084 switch {
1085 case pto:
1086
1087
1088
1089
1090
1091
1092
1093 for _, r := range outacked {
1094 if r.start > start {
1095 return start, r.start - start
1096 }
1097 }
1098 return start, end - start
1099 case outunsent.numRanges() > 0:
1100 return outunsent.min(), outunsent[0].size()
1101 default:
1102 return end, 0
1103 }
1104 }
1105
View as plain text