1
2
3
4
5 package quic
6
7 import (
8 "context"
9 "log/slog"
10 "math"
11 "time"
12 )
13
14 type lossState struct {
15 side connSide
16
17
18
19 handshakeConfirmed bool
20
21
22
23 maxAckDelay time.Duration
24
25
26
27
28 timer time.Time
29
30
31 ptoTimerArmed bool
32
33
34 ptoExpired bool
35
36
37
38 ptoBackoffCount int
39
40
41
42
43
44
45
46
47
48
49
50
51 antiAmplificationLimit int
52
53
54 consecutiveNonAckElicitingPackets int
55
56 rtt rttState
57 pacer pacerState
58 cc *ccReno
59
60
61 spaces [numberSpaceCount]struct {
62 sentPacketList
63 maxAcked packetNumber
64 lastAckEliciting packetNumber
65 }
66
67
68 ackFrameRTT time.Duration
69 ackFrameContainsAckEliciting bool
70 }
71
72 const antiAmplificationUnlimited = math.MaxInt
73
74 func (c *lossState) init(side connSide, maxDatagramSize int, now time.Time) {
75 c.side = side
76 if side == clientSide {
77
78 c.antiAmplificationLimit = antiAmplificationUnlimited
79 }
80 c.rtt.init()
81 c.cc = newReno(maxDatagramSize)
82 c.pacer.init(now, c.cc.congestionWindow, timerGranularity)
83
84
85
86 c.maxAckDelay = 25 * time.Millisecond
87
88 for space := range c.spaces {
89 c.spaces[space].maxAcked = -1
90 c.spaces[space].lastAckEliciting = -1
91 }
92 }
93
94
95 func (c *lossState) setMaxAckDelay(d time.Duration) {
96 if d >= (1<<14)*time.Millisecond {
97
98
99 return
100 }
101 c.maxAckDelay = d
102 }
103
104
105 func (c *lossState) confirmHandshake() {
106 c.handshakeConfirmed = true
107 }
108
109
110
111 func (c *lossState) validateClientAddress() {
112 c.antiAmplificationLimit = antiAmplificationUnlimited
113 }
114
115
116
117
118
119
120
121 const minPacketSize = 128
122
123 type ccLimit int
124
125 const (
126 ccOK = ccLimit(iota)
127 ccBlocked
128 ccLimited
129 ccPaced
130 )
131
132
133
134 func (c *lossState) sendLimit(now time.Time) (limit ccLimit, next time.Time) {
135 if c.antiAmplificationLimit < minPacketSize {
136
137 return ccBlocked, time.Time{}
138 }
139 if c.ptoExpired {
140
141 return ccOK, time.Time{}
142 }
143 if !c.cc.canSend() {
144
145 return ccLimited, time.Time{}
146 }
147 if c.cc.bytesInFlight == 0 {
148
149 return ccOK, time.Time{}
150 }
151 canSend, next := c.pacer.canSend(now)
152 if !canSend {
153
154 return ccPaced, next
155 }
156 return ccOK, time.Time{}
157 }
158
159
160 func (c *lossState) maxSendSize() int {
161 return min(c.antiAmplificationLimit, c.cc.maxDatagramSize)
162 }
163
164
165
166 func (c *lossState) advance(now time.Time, lossf func(numberSpace, *sentPacket, packetFate)) {
167 c.pacer.advance(now, c.cc.congestionWindow, c.rtt.smoothedRTT)
168 if c.ptoTimerArmed && !c.timer.IsZero() && !c.timer.After(now) {
169 c.ptoExpired = true
170 c.timer = time.Time{}
171 c.ptoBackoffCount++
172 }
173 c.detectLoss(now, lossf)
174 }
175
176
177 func (c *lossState) nextNumber(space numberSpace) packetNumber {
178 return c.spaces[space].nextNum
179 }
180
181
182 func (c *lossState) skipNumber(now time.Time, space numberSpace) {
183 sent := newSentPacket()
184 sent.num = c.spaces[space].nextNum
185 sent.time = now
186 sent.state = sentPacketUnsent
187 c.spaces[space].add(sent)
188 }
189
190
191 func (c *lossState) packetSent(now time.Time, log *slog.Logger, space numberSpace, sent *sentPacket) {
192 sent.time = now
193 c.spaces[space].add(sent)
194 size := sent.size
195 if c.antiAmplificationLimit != antiAmplificationUnlimited {
196 c.antiAmplificationLimit = max(0, c.antiAmplificationLimit-size)
197 }
198 if sent.inFlight {
199 c.cc.packetSent(now, log, space, sent)
200 c.pacer.packetSent(now, size, c.cc.congestionWindow, c.rtt.smoothedRTT)
201 if sent.ackEliciting {
202 c.spaces[space].lastAckEliciting = sent.num
203 c.ptoExpired = false
204 }
205 c.scheduleTimer(now)
206 if logEnabled(log, QLogLevelPacket) {
207 logBytesInFlight(log, c.cc.bytesInFlight)
208 }
209 }
210 if sent.ackEliciting {
211 c.consecutiveNonAckElicitingPackets = 0
212 } else {
213 c.consecutiveNonAckElicitingPackets++
214 }
215 }
216
217
218 func (c *lossState) datagramReceived(now time.Time, size int) {
219 if c.antiAmplificationLimit != antiAmplificationUnlimited {
220 c.antiAmplificationLimit += 3 * size
221
222
223
224 c.scheduleTimer(now)
225 if c.ptoTimerArmed && !c.timer.IsZero() && !c.timer.After(now) {
226 c.ptoExpired = true
227 c.timer = time.Time{}
228 }
229 }
230 }
231
232
233
234
235 func (c *lossState) receiveAckStart() {
236 c.ackFrameContainsAckEliciting = false
237 c.ackFrameRTT = -1
238 }
239
240
241
242 func (c *lossState) receiveAckRange(now time.Time, space numberSpace, rangeIndex int, start, end packetNumber, ackf func(numberSpace, *sentPacket, packetFate)) error {
243
244
245 if s := c.spaces[space].start(); start < s {
246 start = s
247 }
248 if e := c.spaces[space].end(); end > e {
249 return localTransportError{
250 code: errProtocolViolation,
251 reason: "acknowledgement for unsent packet",
252 }
253 }
254 if start >= end {
255 return nil
256 }
257 if rangeIndex == 0 {
258
259
260 sent := c.spaces[space].num(end - 1)
261 if sent.state == sentPacketSent {
262 c.ackFrameRTT = max(0, now.Sub(sent.time))
263 }
264 }
265 for pnum := start; pnum < end; pnum++ {
266 sent := c.spaces[space].num(pnum)
267 if sent.state == sentPacketUnsent {
268 return localTransportError{
269 code: errProtocolViolation,
270 reason: "acknowledgement for unsent packet",
271 }
272 }
273 if sent.state != sentPacketSent {
274 continue
275 }
276
277 if pnum > c.spaces[space].maxAcked {
278 c.spaces[space].maxAcked = pnum
279 }
280 sent.state = sentPacketAcked
281 c.cc.packetAcked(now, sent)
282 ackf(space, sent, packetAcked)
283 if sent.ackEliciting {
284 c.ackFrameContainsAckEliciting = true
285 }
286 }
287 return nil
288 }
289
290
291
292 func (c *lossState) receiveAckEnd(now time.Time, log *slog.Logger, space numberSpace, ackDelay time.Duration, lossf func(numberSpace, *sentPacket, packetFate)) {
293 c.spaces[space].sentPacketList.clean()
294
295
296
297 if c.ackFrameRTT >= 0 && c.ackFrameContainsAckEliciting {
298 c.rtt.updateSample(now, c.handshakeConfirmed, space, c.ackFrameRTT, ackDelay, c.maxAckDelay)
299 }
300
301
302
303 if !(c.side == clientSide && space == initialSpace) {
304 c.ptoBackoffCount = 0
305 }
306
307
308
309 c.timer = time.Time{}
310 c.detectLoss(now, lossf)
311 c.cc.packetBatchEnd(now, log, space, &c.rtt, c.maxAckDelay)
312
313 if logEnabled(log, QLogLevelPacket) {
314 var ssthresh slog.Attr
315 if c.cc.slowStartThreshold != math.MaxInt {
316 ssthresh = slog.Int("ssthresh", c.cc.slowStartThreshold)
317 }
318 log.LogAttrs(context.Background(), QLogLevelPacket,
319 "recovery:metrics_updated",
320 slog.Duration("min_rtt", c.rtt.minRTT),
321 slog.Duration("smoothed_rtt", c.rtt.smoothedRTT),
322 slog.Duration("latest_rtt", c.rtt.latestRTT),
323 slog.Duration("rtt_variance", c.rtt.rttvar),
324 slog.Int("congestion_window", c.cc.congestionWindow),
325 slog.Int("bytes_in_flight", c.cc.bytesInFlight),
326 ssthresh,
327 )
328 }
329 }
330
331
332
333
334 func (c *lossState) discardPackets(space numberSpace, log *slog.Logger, lossf func(numberSpace, *sentPacket, packetFate)) {
335 for i := 0; i < c.spaces[space].size; i++ {
336 sent := c.spaces[space].nth(i)
337 if sent.state != sentPacketSent {
338
339
340 continue
341 }
342 sent.state = sentPacketLost
343 c.cc.packetDiscarded(sent)
344 lossf(numberSpace(space), sent, packetLost)
345 }
346 c.spaces[space].clean()
347 if logEnabled(log, QLogLevelPacket) {
348 logBytesInFlight(log, c.cc.bytesInFlight)
349 }
350 }
351
352
353 func (c *lossState) discardKeys(now time.Time, log *slog.Logger, space numberSpace) {
354
355 for i := 0; i < c.spaces[space].size; i++ {
356 sent := c.spaces[space].nth(i)
357 if sent.state != sentPacketSent {
358 continue
359 }
360 c.cc.packetDiscarded(sent)
361 }
362 c.spaces[space].discard()
363 c.spaces[space].maxAcked = -1
364 c.spaces[space].lastAckEliciting = -1
365 c.scheduleTimer(now)
366 if logEnabled(log, QLogLevelPacket) {
367 logBytesInFlight(log, c.cc.bytesInFlight)
368 }
369 }
370
371 func (c *lossState) lossDuration() time.Duration {
372
373 return max((9*max(c.rtt.smoothedRTT, c.rtt.latestRTT))/8, timerGranularity)
374 }
375
376 func (c *lossState) detectLoss(now time.Time, lossf func(numberSpace, *sentPacket, packetFate)) {
377
378 const lossThreshold = 3
379
380 lossTime := now.Add(-c.lossDuration())
381 for space := numberSpace(0); space < numberSpaceCount; space++ {
382 for i := 0; i < c.spaces[space].size; i++ {
383 sent := c.spaces[space].nth(i)
384 if sent.state != sentPacketSent {
385 continue
386 }
387
388
389
390
391
392 switch {
393 case c.spaces[space].maxAcked-sent.num >= lossThreshold:
394
395
396 fallthrough
397 case sent.num <= c.spaces[space].maxAcked && !sent.time.After(lossTime):
398
399
400 sent.state = sentPacketLost
401 lossf(space, sent, packetLost)
402 if sent.inFlight {
403 c.cc.packetLost(now, space, sent, &c.rtt)
404 }
405 }
406 if sent.state != sentPacketLost {
407 break
408 }
409 }
410 c.spaces[space].clean()
411 }
412 c.scheduleTimer(now)
413 }
414
415
416
417
418
419
420
421
422
423
424 func (c *lossState) scheduleTimer(now time.Time) {
425 c.ptoTimerArmed = false
426
427
428
429
430
431 var oldestPotentiallyLost time.Time
432 for space := numberSpace(0); space < numberSpaceCount; space++ {
433 if c.spaces[space].size > 0 && c.spaces[space].start() <= c.spaces[space].maxAcked {
434 firstTime := c.spaces[space].nth(0).time
435 if oldestPotentiallyLost.IsZero() || firstTime.Before(oldestPotentiallyLost) {
436 oldestPotentiallyLost = firstTime
437 }
438 }
439 }
440 if !oldestPotentiallyLost.IsZero() {
441 c.timer = oldestPotentiallyLost.Add(c.lossDuration())
442 return
443 }
444
445
446 if c.ptoExpired {
447
448 c.timer = time.Time{}
449 return
450 }
451 if c.antiAmplificationLimit >= 0 && c.antiAmplificationLimit < minPacketSize {
452
453
454 c.timer = time.Time{}
455 return
456 }
457
458
459
460 var last time.Time
461 if !c.handshakeConfirmed {
462 for space := initialSpace; space <= handshakeSpace; space++ {
463 sent := c.spaces[space].num(c.spaces[space].lastAckEliciting)
464 if sent == nil {
465 continue
466 }
467 if last.IsZero() || last.After(sent.time) {
468 last = sent.time
469 }
470 }
471 } else {
472 sent := c.spaces[appDataSpace].num(c.spaces[appDataSpace].lastAckEliciting)
473 if sent != nil {
474 last = sent.time
475 }
476 }
477 if last.IsZero() &&
478 c.side == clientSide &&
479 c.spaces[handshakeSpace].maxAcked < 0 &&
480 !c.handshakeConfirmed {
481
482
483
484 if !c.timer.IsZero() {
485
486
487 c.ptoTimerArmed = true
488 return
489 }
490 last = now
491 } else if last.IsZero() {
492 c.timer = time.Time{}
493 return
494 }
495 c.timer = last.Add(c.ptoPeriod())
496 c.ptoTimerArmed = true
497 }
498
499 func (c *lossState) ptoPeriod() time.Duration {
500
501 return c.ptoBasePeriod() << c.ptoBackoffCount
502 }
503
504 func (c *lossState) ptoBasePeriod() time.Duration {
505
506 pto := c.rtt.smoothedRTT + max(4*c.rtt.rttvar, timerGranularity)
507 if c.handshakeConfirmed {
508
509
510
511 pto += c.maxAckDelay
512 }
513 return pto
514 }
515
516 func logBytesInFlight(log *slog.Logger, bytesInFlight int) {
517 log.LogAttrs(context.Background(), QLogLevelPacket,
518 "recovery:metrics_updated",
519 slog.Int("bytes_in_flight", bytesInFlight),
520 )
521 }
522
View as plain text