1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 package http2
27
28 import (
29 "bufio"
30 "bytes"
31 "context"
32 "crypto/rand"
33 "crypto/tls"
34 "errors"
35 "fmt"
36 "io"
37 "log"
38 "math"
39 "net"
40 "net/http/internal"
41 "net/http/internal/httpcommon"
42 "net/textproto"
43 "net/url"
44 "os"
45 "reflect"
46 "runtime"
47 "slices"
48 "strconv"
49 "strings"
50 "sync"
51 "time"
52
53 "golang.org/x/net/http/httpguts"
54 "golang.org/x/net/http2/hpack"
55 )
56
57 const (
58 prefaceTimeout = 10 * time.Second
59 firstSettingsTimeout = 2 * time.Second
60 handlerChunkWriteSize = 4 << 10
61 defaultMaxStreams = 250
62
63
64
65
66 maxQueuedControlFrames = 10000
67 )
68
69 var (
70 errClientDisconnected = errors.New("client disconnected")
71 errClosedBody = errors.New("body closed by handler")
72 errHandlerComplete = errors.New("http2: request body closed due to handler exiting")
73 errStreamClosed = errors.New("http2: stream closed")
74 )
75
76 var responseWriterStatePool = sync.Pool{
77 New: func() any {
78 rws := &responseWriterState{}
79 rws.bw = bufio.NewWriterSize(chunkWriter{rws}, handlerChunkWriteSize)
80 return rws
81 },
82 }
83
84
85 var (
86 testHookOnConn func()
87 testHookOnPanicMu *sync.Mutex
88 testHookOnPanic func(sc *serverConn, panicVal any) (rePanic bool)
89 )
90
91
92 type Server struct {
93
94
95
96
97 MaxHandlers int
98
99
100
101
102
103
104
105 MaxConcurrentStreams uint32
106
107
108
109
110
111
112 MaxDecoderHeaderTableSize uint32
113
114
115
116
117
118 MaxEncoderHeaderTableSize uint32
119
120
121
122
123
124 MaxReadFrameSize uint32
125
126
127
128 PermitProhibitedCipherSuites bool
129
130
131
132
133
134 IdleTimeout time.Duration
135
136
137
138
139 ReadIdleTimeout time.Duration
140
141
142
143
144 PingTimeout time.Duration
145
146
147
148
149
150 WriteByteTimeout time.Duration
151
152
153
154
155
156
157 MaxUploadBufferPerConnection int32
158
159
160
161
162
163 MaxUploadBufferPerStream int32
164
165
166
167 NewWriteScheduler func() WriteScheduler
168
169
170
171
172
173 CountError func(errType string)
174
175
176
177
178 state *serverInternalState
179 }
180
181 type serverInternalState struct {
182 mu sync.Mutex
183 activeConns map[*serverConn]struct{}
184
185
186
187 errChanPool sync.Pool
188 }
189
190 func (s *serverInternalState) registerConn(sc *serverConn) {
191 if s == nil {
192 return
193 }
194 s.mu.Lock()
195 s.activeConns[sc] = struct{}{}
196 s.mu.Unlock()
197 }
198
199 func (s *serverInternalState) unregisterConn(sc *serverConn) {
200 if s == nil {
201 return
202 }
203 s.mu.Lock()
204 delete(s.activeConns, sc)
205 s.mu.Unlock()
206 }
207
208 func (s *serverInternalState) startGracefulShutdown() {
209 if s == nil {
210 return
211 }
212 s.mu.Lock()
213 for sc := range s.activeConns {
214 sc.startGracefulShutdown()
215 }
216 s.mu.Unlock()
217 }
218
219
220
221 var errChanPool = sync.Pool{
222 New: func() any { return make(chan error, 1) },
223 }
224
225 func (s *serverInternalState) getErrChan() chan error {
226 if s == nil {
227 return errChanPool.Get().(chan error)
228 }
229 return s.errChanPool.Get().(chan error)
230 }
231
232 func (s *serverInternalState) putErrChan(ch chan error) {
233 if s == nil {
234 errChanPool.Put(ch)
235 return
236 }
237 s.errChanPool.Put(ch)
238 }
239
240 func (s *Server) Configure(conf ServerConfig, tcfg *tls.Config) error {
241 s.state = &serverInternalState{
242 activeConns: make(map[*serverConn]struct{}),
243 errChanPool: sync.Pool{New: func() any { return make(chan error, 1) }},
244 }
245
246 if tcfg.CipherSuites != nil && tcfg.MinVersion < tls.VersionTLS13 {
247
248
249
250 haveRequired := false
251 for _, cs := range tcfg.CipherSuites {
252 switch cs {
253 case tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
254
255
256 tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256:
257 haveRequired = true
258 }
259 }
260 if !haveRequired {
261 return fmt.Errorf("http2: TLSConfig.CipherSuites is missing an HTTP/2-required AES_128_GCM_SHA256 cipher (need at least one of TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 or TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256)")
262 }
263 }
264
265
266
267
268
269
270
271
272 return nil
273 }
274
275 func (s *Server) GracefulShutdown() {
276 s.state.startGracefulShutdown()
277 }
278
279
280 type ServeConnOpts struct {
281
282
283 Context context.Context
284
285
286
287 BaseConfig ServerConfig
288
289
290
291
292 Handler Handler
293
294
295
296 Settings []byte
297
298
299
300 SawClientPreface bool
301 }
302
303 func (o *ServeConnOpts) context() context.Context {
304 if o != nil && o.Context != nil {
305 return o.Context
306 }
307 return context.Background()
308 }
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324 func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
325 if opts == nil {
326 opts = &ServeConnOpts{}
327 }
328
329 var newf func(*serverConn)
330 if inTests {
331
332 newf, _ = opts.Context.Value(NewConnContextKey).(func(*serverConn))
333 }
334
335 s.serveConn(c, opts, newf)
336 }
337
338 type contextKey string
339
340 var (
341 NewConnContextKey = new("NewConnContextKey")
342 ConnectionStateContextKey = new("ConnectionStateContextKey")
343 )
344
345 func (s *Server) serveConn(c net.Conn, opts *ServeConnOpts, newf func(*serverConn)) {
346 baseCtx, cancel := serverConnBaseContext(c, opts)
347 defer cancel()
348
349 conf := configFromServer(opts.BaseConfig, s)
350 sc := &serverConn{
351 srv: s,
352 hs: opts.BaseConfig,
353 conn: c,
354 baseCtx: baseCtx,
355 remoteAddrStr: c.RemoteAddr().String(),
356 bw: newBufferedWriter(c, conf.WriteByteTimeout),
357 handler: opts.Handler,
358 streams: make(map[uint32]*stream),
359 readFrameCh: make(chan readFrameResult),
360 wantWriteFrameCh: make(chan FrameWriteRequest, 8),
361 serveMsgCh: make(chan any, 8),
362 wroteFrameCh: make(chan frameWriteResult, 1),
363 bodyReadCh: make(chan bodyReadMsg),
364 doneServing: make(chan struct{}),
365 clientMaxStreams: math.MaxUint32,
366 advMaxStreams: uint32(conf.MaxConcurrentStreams),
367 initialStreamSendWindowSize: initialWindowSize,
368 initialStreamRecvWindowSize: int32(conf.MaxReceiveBufferPerStream),
369 maxFrameSize: initialMaxFrameSize,
370 pingTimeout: conf.PingTimeout,
371 countErrorFunc: conf.CountError,
372 serveG: newGoroutineLock(),
373 pushEnabled: true,
374 sawClientPreface: opts.SawClientPreface,
375 }
376 if newf != nil {
377 newf(sc)
378 }
379
380 s.state.registerConn(sc)
381 defer s.state.unregisterConn(sc)
382
383
384
385
386
387
388 if sc.hs.WriteTimeout() > 0 {
389 sc.conn.SetWriteDeadline(time.Time{})
390 }
391
392 switch {
393 case s.NewWriteScheduler != nil:
394 sc.writeSched = s.NewWriteScheduler()
395 case sc.hs.DisableClientPriority():
396 sc.writeSched = newRoundRobinWriteScheduler()
397 default:
398 sc.writeSched = newPriorityWriteSchedulerRFC9218()
399 }
400
401
402
403
404 sc.flow.add(initialWindowSize)
405 sc.inflow.init(initialWindowSize)
406 sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
407 sc.hpackEncoder.SetMaxDynamicTableSizeLimit(uint32(conf.MaxEncoderHeaderTableSize))
408
409 fr := NewFramer(sc.bw, c)
410 if conf.CountError != nil {
411 fr.countError = conf.CountError
412 }
413 fr.ReadMetaHeaders = hpack.NewDecoder(uint32(conf.MaxDecoderHeaderTableSize), nil)
414 fr.MaxHeaderListSize = sc.maxHeaderListSize()
415 fr.SetMaxReadFrameSize(uint32(conf.MaxReadFrameSize))
416 sc.framer = fr
417
418 if tc, ok := c.(connectionStater); ok {
419 sc.tlsState = new(tls.ConnectionState)
420 *sc.tlsState = tc.ConnectionState()
421
422
423 if inTests {
424 f, ok := opts.Context.Value(ConnectionStateContextKey).(func() tls.ConnectionState)
425 if ok {
426 *sc.tlsState = f()
427 }
428 }
429
430
431
432
433
434
435
436
437
438
439
440 if sc.tlsState.Version < tls.VersionTLS12 {
441 sc.rejectConn(ErrCodeInadequateSecurity, "TLS version too low")
442 return
443 }
444
445 if sc.tlsState.ServerName == "" {
446
447
448
449
450
451
452
453
454
455 }
456
457 if !conf.PermitProhibitedCipherSuites && isBadCipher(sc.tlsState.CipherSuite) {
458
459
460
461
462
463
464
465
466
467
468 sc.rejectConn(ErrCodeInadequateSecurity, fmt.Sprintf("Prohibited TLS 1.2 Cipher Suite: %x", sc.tlsState.CipherSuite))
469 return
470 }
471 }
472
473 if opts.Settings != nil {
474 fr := &SettingsFrame{
475 FrameHeader: FrameHeader{valid: true},
476 p: opts.Settings,
477 }
478 if err := fr.ForeachSetting(sc.processSetting); err != nil {
479 sc.rejectConn(ErrCodeProtocol, "invalid settings")
480 return
481 }
482 opts.Settings = nil
483 }
484
485 sc.serve(conf)
486 }
487
488 func serverConnBaseContext(c net.Conn, opts *ServeConnOpts) (ctx context.Context, cancel func()) {
489 return context.WithCancel(opts.context())
490 }
491
492 func (sc *serverConn) rejectConn(err ErrCode, debug string) {
493 sc.vlogf("http2: server rejecting conn: %v, %s", err, debug)
494
495 sc.framer.WriteGoAway(0, err, []byte(debug))
496 sc.bw.Flush()
497 sc.conn.Close()
498 }
499
500 type serverConn struct {
501
502 srv *Server
503 hs ServerConfig
504 conn net.Conn
505 bw *bufferedWriter
506 handler Handler
507 baseCtx context.Context
508 framer *Framer
509 doneServing chan struct{}
510 readFrameCh chan readFrameResult
511 wantWriteFrameCh chan FrameWriteRequest
512 wroteFrameCh chan frameWriteResult
513 bodyReadCh chan bodyReadMsg
514 serveMsgCh chan any
515 flow outflow
516 inflow inflow
517 tlsState *tls.ConnectionState
518 remoteAddrStr string
519 writeSched WriteScheduler
520 countErrorFunc func(errType string)
521
522
523 serveG goroutineLock
524 pushEnabled bool
525 sawClientPreface bool
526 sawFirstSettings bool
527 needToSendSettingsAck bool
528 unackedSettings int
529 queuedControlFrames int
530 clientMaxStreams uint32
531 advMaxStreams uint32
532 curClientStreams uint32
533 curPushedStreams uint32
534 curHandlers uint32
535 maxClientStreamID uint32
536 maxPushPromiseID uint32
537 streams map[uint32]*stream
538 unstartedHandlers []unstartedHandler
539 initialStreamSendWindowSize int32
540 initialStreamRecvWindowSize int32
541 maxFrameSize int32
542 peerMaxHeaderListSize uint32
543 canonHeader map[string]string
544 canonHeaderKeysSize int
545 writingFrame bool
546 writingFrameAsync bool
547 needsFrameFlush bool
548 inGoAway bool
549 inFrameScheduleLoop bool
550 needToSendGoAway bool
551 pingSent bool
552 sentPingData [8]byte
553 goAwayCode ErrCode
554 shutdownTimer *time.Timer
555 idleTimer *time.Timer
556 readIdleTimeout time.Duration
557 pingTimeout time.Duration
558 readIdleTimer *time.Timer
559
560
561 headerWriteBuf bytes.Buffer
562 hpackEncoder *hpack.Encoder
563
564
565 shutdownOnce sync.Once
566
567
568 hasIntermediary bool
569 priorityAware bool
570 }
571
572 func (sc *serverConn) writeSchedIgnoresRFC7540() bool {
573 switch sc.writeSched.(type) {
574 case *priorityWriteSchedulerRFC9218:
575 return true
576 case *roundRobinWriteScheduler:
577 return true
578 default:
579 return false
580 }
581 }
582
583 const DefaultMaxHeaderBytes = 1 << 20
584
585 func (sc *serverConn) maxHeaderListSize() uint32 {
586 n := sc.hs.MaxHeaderBytes()
587 if n <= 0 {
588 n = DefaultMaxHeaderBytes
589 }
590 return uint32(adjustHTTP1MaxHeaderSize(int64(n)))
591 }
592
593 func (sc *serverConn) curOpenStreams() uint32 {
594 sc.serveG.check()
595 return sc.curClientStreams + sc.curPushedStreams
596 }
597
598
599
600
601
602
603
604
605 type stream struct {
606
607 sc *serverConn
608 id uint32
609 body *pipe
610 cw closeWaiter
611 ctx context.Context
612 cancelCtx func()
613
614
615 bodyBytes int64
616 declBodyBytes int64
617 flow outflow
618 inflow inflow
619 state streamState
620 resetQueued bool
621 gotTrailerHeader bool
622 wroteHeaders bool
623 readDeadline *time.Timer
624 writeDeadline *time.Timer
625 closeErr error
626
627 trailer Header
628 reqTrailer Header
629 }
630
631 func (sc *serverConn) Framer() *Framer { return sc.framer }
632 func (sc *serverConn) CloseConn() error { return sc.conn.Close() }
633 func (sc *serverConn) Flush() error { return sc.bw.Flush() }
634 func (sc *serverConn) HeaderEncoder() (*hpack.Encoder, *bytes.Buffer) {
635 return sc.hpackEncoder, &sc.headerWriteBuf
636 }
637
638 func (sc *serverConn) state(streamID uint32) (streamState, *stream) {
639 sc.serveG.check()
640
641 if st, ok := sc.streams[streamID]; ok {
642 return st.state, st
643 }
644
645
646
647
648
649
650 if streamID%2 == 1 {
651 if streamID <= sc.maxClientStreamID {
652 return stateClosed, nil
653 }
654 } else {
655 if streamID <= sc.maxPushPromiseID {
656 return stateClosed, nil
657 }
658 }
659 return stateIdle, nil
660 }
661
662
663
664
665 func (sc *serverConn) setConnState(state ConnState) {
666 sc.hs.ConnState(sc.conn, state)
667 }
668
669 func (sc *serverConn) vlogf(format string, args ...any) {
670 if VerboseLogs {
671 sc.logf(format, args...)
672 }
673 }
674
675 func (sc *serverConn) logf(format string, args ...any) {
676 if lg := sc.hs.ErrorLog(); lg != nil {
677 lg.Printf(format, args...)
678 } else {
679 log.Printf(format, args...)
680 }
681 }
682
683
684
685
686
687 func errno(v error) uintptr {
688 if rv := reflect.ValueOf(v); rv.Kind() == reflect.Uintptr {
689 return uintptr(rv.Uint())
690 }
691 return 0
692 }
693
694
695
696 func isClosedConnError(err error) bool {
697 if err == nil {
698 return false
699 }
700
701 if errors.Is(err, net.ErrClosed) {
702 return true
703 }
704
705
706
707
708
709 if runtime.GOOS == "windows" {
710 if oe, ok := err.(*net.OpError); ok && oe.Op == "read" {
711 if se, ok := oe.Err.(*os.SyscallError); ok && se.Syscall == "wsarecv" {
712 const WSAECONNABORTED = 10053
713 const WSAECONNRESET = 10054
714 if n := errno(se.Err); n == WSAECONNRESET || n == WSAECONNABORTED {
715 return true
716 }
717 }
718 }
719 }
720 return false
721 }
722
723 func (sc *serverConn) condlogf(err error, format string, args ...any) {
724 if err == nil {
725 return
726 }
727 if err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err) || err == errPrefaceTimeout {
728
729 sc.vlogf(format, args...)
730 } else {
731 sc.logf(format, args...)
732 }
733 }
734
735
736
737
738
739
740 const maxCachedCanonicalHeadersKeysSize = 2048
741
742 func (sc *serverConn) canonicalHeader(v string) string {
743 sc.serveG.check()
744 cv, ok := httpcommon.CachedCanonicalHeader(v)
745 if ok {
746 return cv
747 }
748 cv, ok = sc.canonHeader[v]
749 if ok {
750 return cv
751 }
752 if sc.canonHeader == nil {
753 sc.canonHeader = make(map[string]string)
754 }
755 cv = textproto.CanonicalMIMEHeaderKey(v)
756 size := 100 + len(v)*2
757 if sc.canonHeaderKeysSize+size <= maxCachedCanonicalHeadersKeysSize {
758 sc.canonHeader[v] = cv
759 sc.canonHeaderKeysSize += size
760 }
761 return cv
762 }
763
764 type readFrameResult struct {
765 f Frame
766 err error
767
768
769
770
771 readMore func()
772 }
773
774
775
776
777
778 func (sc *serverConn) readFrames() {
779 gate := make(chan struct{})
780 gateDone := func() { gate <- struct{}{} }
781 for {
782 f, err := sc.framer.ReadFrame()
783 select {
784 case sc.readFrameCh <- readFrameResult{f, err, gateDone}:
785 case <-sc.doneServing:
786 return
787 }
788 select {
789 case <-gate:
790 case <-sc.doneServing:
791 return
792 }
793 if terminalReadFrameError(err) {
794 return
795 }
796 }
797 }
798
799
800 type frameWriteResult struct {
801 _ incomparable
802 wr FrameWriteRequest
803 err error
804 }
805
806
807
808
809
810 func (sc *serverConn) writeFrameAsync(wr FrameWriteRequest, wd *writeData) {
811 var err error
812 if wd == nil {
813 err = wr.write.writeFrame(sc)
814 } else {
815 err = sc.framer.endWrite()
816 }
817 sc.wroteFrameCh <- frameWriteResult{wr: wr, err: err}
818 }
819
820 func (sc *serverConn) closeAllStreamsOnConnClose() {
821 sc.serveG.check()
822 for _, st := range sc.streams {
823 sc.closeStream(st, errClientDisconnected)
824 }
825 }
826
827 func (sc *serverConn) stopShutdownTimer() {
828 sc.serveG.check()
829 if t := sc.shutdownTimer; t != nil {
830 t.Stop()
831 }
832 }
833
834 func (sc *serverConn) notePanic() {
835
836 if testHookOnPanicMu != nil {
837 testHookOnPanicMu.Lock()
838 defer testHookOnPanicMu.Unlock()
839 }
840 if testHookOnPanic != nil {
841 if e := recover(); e != nil {
842 if testHookOnPanic(sc, e) {
843 panic(e)
844 }
845 }
846 }
847 }
848
849 func (sc *serverConn) serve(conf Config) {
850 sc.serveG.check()
851 defer sc.notePanic()
852 defer sc.conn.Close()
853 defer sc.closeAllStreamsOnConnClose()
854 defer sc.stopShutdownTimer()
855 defer close(sc.doneServing)
856
857 if VerboseLogs {
858 sc.vlogf("http2: server connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
859 }
860
861 settings := writeSettings{
862 {SettingMaxFrameSize, uint32(conf.MaxReadFrameSize)},
863 {SettingMaxConcurrentStreams, sc.advMaxStreams},
864 {SettingMaxHeaderListSize, sc.maxHeaderListSize()},
865 {SettingHeaderTableSize, uint32(conf.MaxDecoderHeaderTableSize)},
866 {SettingInitialWindowSize, uint32(sc.initialStreamRecvWindowSize)},
867 }
868 if !disableExtendedConnectProtocol {
869 settings = append(settings, Setting{SettingEnableConnectProtocol, 1})
870 }
871 if sc.writeSchedIgnoresRFC7540() {
872 settings = append(settings, Setting{SettingNoRFC7540Priorities, 1})
873 }
874 sc.writeFrame(FrameWriteRequest{
875 write: settings,
876 })
877 sc.unackedSettings++
878
879
880
881 if diff := conf.MaxReceiveBufferPerConnection - initialWindowSize; diff > 0 {
882 sc.sendWindowUpdate(nil, int(diff))
883 }
884
885 if err := sc.readPreface(); err != nil {
886 sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
887 return
888 }
889
890
891
892
893 sc.setConnState(ConnStateActive)
894 sc.setConnState(ConnStateIdle)
895
896 if sc.srv.IdleTimeout > 0 {
897 sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
898 defer sc.idleTimer.Stop()
899 }
900
901 if conf.SendPingTimeout > 0 {
902 sc.readIdleTimeout = conf.SendPingTimeout
903 sc.readIdleTimer = time.AfterFunc(conf.SendPingTimeout, sc.onReadIdleTimer)
904 defer sc.readIdleTimer.Stop()
905 }
906
907 go sc.readFrames()
908
909 settingsTimer := time.AfterFunc(firstSettingsTimeout, sc.onSettingsTimer)
910 defer settingsTimer.Stop()
911
912 lastFrameTime := time.Now()
913 loopNum := 0
914 for {
915 loopNum++
916 select {
917 case wr := <-sc.wantWriteFrameCh:
918 if se, ok := wr.write.(StreamError); ok {
919 sc.resetStream(se)
920 break
921 }
922 sc.writeFrame(wr)
923 case res := <-sc.wroteFrameCh:
924 sc.wroteFrame(res)
925 case res := <-sc.readFrameCh:
926 lastFrameTime = time.Now()
927
928
929 if sc.writingFrameAsync {
930 select {
931 case wroteRes := <-sc.wroteFrameCh:
932 sc.wroteFrame(wroteRes)
933 default:
934 }
935 }
936 if !sc.processFrameFromReader(res) {
937 return
938 }
939 res.readMore()
940 if settingsTimer != nil {
941 settingsTimer.Stop()
942 settingsTimer = nil
943 }
944 case m := <-sc.bodyReadCh:
945 sc.noteBodyRead(m.st, m.n)
946 case msg := <-sc.serveMsgCh:
947 switch v := msg.(type) {
948 case func(int):
949 v(loopNum)
950 case *serverMessage:
951 switch v {
952 case settingsTimerMsg:
953 sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr())
954 return
955 case idleTimerMsg:
956 sc.vlogf("connection is idle")
957 sc.goAway(ErrCodeNo)
958 case readIdleTimerMsg:
959 sc.handlePingTimer(lastFrameTime)
960 case shutdownTimerMsg:
961 sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr())
962 return
963 case gracefulShutdownMsg:
964 sc.startGracefulShutdownInternal()
965 case handlerDoneMsg:
966 sc.handlerDone()
967 default:
968 panic("unknown timer")
969 }
970 case *startPushRequest:
971 sc.startPush(v)
972 case func(*serverConn):
973 v(sc)
974 default:
975 panic(fmt.Sprintf("unexpected type %T", v))
976 }
977 }
978
979
980
981
982 if sc.queuedControlFrames > maxQueuedControlFrames {
983 sc.vlogf("http2: too many control frames in send queue, closing connection")
984 return
985 }
986
987
988
989
990 sentGoAway := sc.inGoAway && !sc.needToSendGoAway && !sc.writingFrame
991 gracefulShutdownComplete := sc.goAwayCode == ErrCodeNo && sc.curOpenStreams() == 0
992 if sentGoAway && sc.shutdownTimer == nil && (sc.goAwayCode != ErrCodeNo || gracefulShutdownComplete) {
993 sc.shutDownIn(goAwayTimeout)
994 }
995 }
996 }
997
998 func (sc *serverConn) handlePingTimer(lastFrameReadTime time.Time) {
999 if sc.pingSent {
1000 sc.logf("timeout waiting for PING response")
1001 if f := sc.countErrorFunc; f != nil {
1002 f("conn_close_lost_ping")
1003 }
1004 sc.conn.Close()
1005 return
1006 }
1007
1008 pingAt := lastFrameReadTime.Add(sc.readIdleTimeout)
1009 now := time.Now()
1010 if pingAt.After(now) {
1011
1012
1013 sc.readIdleTimer.Reset(pingAt.Sub(now))
1014 return
1015 }
1016
1017 sc.pingSent = true
1018
1019
1020 _, _ = rand.Read(sc.sentPingData[:])
1021 sc.writeFrame(FrameWriteRequest{
1022 write: &writePing{data: sc.sentPingData},
1023 })
1024 sc.readIdleTimer.Reset(sc.pingTimeout)
1025 }
1026
1027 type serverMessage int
1028
1029
1030 var (
1031 settingsTimerMsg = new(serverMessage)
1032 idleTimerMsg = new(serverMessage)
1033 readIdleTimerMsg = new(serverMessage)
1034 shutdownTimerMsg = new(serverMessage)
1035 gracefulShutdownMsg = new(serverMessage)
1036 handlerDoneMsg = new(serverMessage)
1037 )
1038
1039 func (sc *serverConn) onSettingsTimer() { sc.sendServeMsg(settingsTimerMsg) }
1040 func (sc *serverConn) onIdleTimer() { sc.sendServeMsg(idleTimerMsg) }
1041 func (sc *serverConn) onReadIdleTimer() { sc.sendServeMsg(readIdleTimerMsg) }
1042 func (sc *serverConn) onShutdownTimer() { sc.sendServeMsg(shutdownTimerMsg) }
1043
1044 func (sc *serverConn) sendServeMsg(msg any) {
1045 sc.serveG.checkNotOn()
1046 select {
1047 case sc.serveMsgCh <- msg:
1048 case <-sc.doneServing:
1049 }
1050 }
1051
1052 var errPrefaceTimeout = errors.New("timeout waiting for client preface")
1053
1054
1055
1056
1057 func (sc *serverConn) readPreface() error {
1058 if sc.sawClientPreface {
1059 return nil
1060 }
1061 errc := make(chan error, 1)
1062 go func() {
1063
1064 buf := make([]byte, len(ClientPreface))
1065 if _, err := io.ReadFull(sc.conn, buf); err != nil {
1066 errc <- err
1067 } else if !bytes.Equal(buf, clientPreface) {
1068 errc <- fmt.Errorf("bogus greeting %q", buf)
1069 } else {
1070 errc <- nil
1071 }
1072 }()
1073 timer := time.NewTimer(prefaceTimeout)
1074 defer timer.Stop()
1075 select {
1076 case <-timer.C:
1077 return errPrefaceTimeout
1078 case err := <-errc:
1079 if err == nil {
1080 if VerboseLogs {
1081 sc.vlogf("http2: server: client %v said hello", sc.conn.RemoteAddr())
1082 }
1083 }
1084 return err
1085 }
1086 }
1087
1088 var writeDataPool = sync.Pool{
1089 New: func() any { return new(writeData) },
1090 }
1091
1092
1093
1094 func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStream bool) error {
1095 ch := sc.srv.state.getErrChan()
1096 writeArg := writeDataPool.Get().(*writeData)
1097 *writeArg = writeData{stream.id, data, endStream}
1098 err := sc.writeFrameFromHandler(FrameWriteRequest{
1099 write: writeArg,
1100 stream: stream,
1101 done: ch,
1102 })
1103 if err != nil {
1104 return err
1105 }
1106 var frameWriteDone bool
1107 select {
1108 case err = <-ch:
1109 frameWriteDone = true
1110 case <-sc.doneServing:
1111 return errClientDisconnected
1112 case <-stream.cw:
1113
1114
1115
1116
1117
1118
1119
1120 select {
1121 case err = <-ch:
1122 frameWriteDone = true
1123 default:
1124 return errStreamClosed
1125 }
1126 }
1127 sc.srv.state.putErrChan(ch)
1128 if frameWriteDone {
1129 writeDataPool.Put(writeArg)
1130 }
1131 return err
1132 }
1133
1134
1135
1136
1137
1138
1139
1140
1141 func (sc *serverConn) writeFrameFromHandler(wr FrameWriteRequest) error {
1142 sc.serveG.checkNotOn()
1143 select {
1144 case sc.wantWriteFrameCh <- wr:
1145 return nil
1146 case <-sc.doneServing:
1147
1148
1149 return errClientDisconnected
1150 }
1151 }
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161 func (sc *serverConn) writeFrame(wr FrameWriteRequest) {
1162 sc.serveG.check()
1163
1164
1165 var ignoreWrite bool
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185 if wr.StreamID() != 0 {
1186 _, isReset := wr.write.(StreamError)
1187 if state, _ := sc.state(wr.StreamID()); state == stateClosed && !isReset {
1188 ignoreWrite = true
1189 }
1190 }
1191
1192
1193
1194 switch wr.write.(type) {
1195 case *writeResHeaders:
1196 wr.stream.wroteHeaders = true
1197 case write100ContinueHeadersFrame:
1198 if wr.stream.wroteHeaders {
1199
1200
1201 if wr.done != nil {
1202 panic("wr.done != nil for write100ContinueHeadersFrame")
1203 }
1204 ignoreWrite = true
1205 }
1206 }
1207
1208 if !ignoreWrite {
1209 if wr.isControl() {
1210 sc.queuedControlFrames++
1211
1212
1213 if sc.queuedControlFrames < 0 {
1214 sc.conn.Close()
1215 }
1216 }
1217 sc.writeSched.Push(wr)
1218 }
1219 sc.scheduleFrameWrite()
1220 }
1221
1222
1223
1224
1225 func (sc *serverConn) startFrameWrite(wr FrameWriteRequest) {
1226 sc.serveG.check()
1227 if sc.writingFrame {
1228 panic("internal error: can only be writing one frame at a time")
1229 }
1230
1231 st := wr.stream
1232 if st != nil {
1233 switch st.state {
1234 case stateHalfClosedLocal:
1235 switch wr.write.(type) {
1236 case StreamError, handlerPanicRST, writeWindowUpdate:
1237
1238
1239 default:
1240 panic(fmt.Sprintf("internal error: attempt to send frame on a half-closed-local stream: %v", wr))
1241 }
1242 case stateClosed:
1243 panic(fmt.Sprintf("internal error: attempt to send frame on a closed stream: %v", wr))
1244 }
1245 }
1246 if wpp, ok := wr.write.(*writePushPromise); ok {
1247 var err error
1248 wpp.promisedID, err = wpp.allocatePromisedID()
1249 if err != nil {
1250 sc.writingFrameAsync = false
1251 wr.replyToWriter(err)
1252 return
1253 }
1254 }
1255
1256 sc.writingFrame = true
1257 sc.needsFrameFlush = true
1258 if wr.write.staysWithinBuffer(sc.bw.Available()) {
1259 sc.writingFrameAsync = false
1260 err := wr.write.writeFrame(sc)
1261 sc.wroteFrame(frameWriteResult{wr: wr, err: err})
1262 } else if wd, ok := wr.write.(*writeData); ok {
1263
1264
1265
1266 sc.framer.startWriteDataPadded(wd.streamID, wd.endStream, wd.p, nil)
1267 sc.writingFrameAsync = true
1268 go sc.writeFrameAsync(wr, wd)
1269 } else {
1270 sc.writingFrameAsync = true
1271 go sc.writeFrameAsync(wr, nil)
1272 }
1273 }
1274
1275
1276
1277
1278 var errHandlerPanicked = errors.New("http2: handler panicked")
1279
1280
1281
1282 func (sc *serverConn) wroteFrame(res frameWriteResult) {
1283 sc.serveG.check()
1284 if !sc.writingFrame {
1285 panic("internal error: expected to be already writing a frame")
1286 }
1287 sc.writingFrame = false
1288 sc.writingFrameAsync = false
1289
1290 if res.err != nil {
1291 sc.conn.Close()
1292 }
1293
1294 wr := res.wr
1295
1296 if writeEndsStream(wr.write) {
1297 st := wr.stream
1298 if st == nil {
1299 panic("internal error: expecting non-nil stream")
1300 }
1301 switch st.state {
1302 case stateOpen:
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313 st.state = stateHalfClosedLocal
1314
1315
1316
1317
1318 sc.resetStream(streamError(st.id, ErrCodeNo))
1319 case stateHalfClosedRemote:
1320 sc.closeStream(st, errHandlerComplete)
1321 }
1322 } else {
1323 switch v := wr.write.(type) {
1324 case StreamError:
1325
1326 if st, ok := sc.streams[v.StreamID]; ok {
1327 sc.closeStream(st, v)
1328 }
1329 case handlerPanicRST:
1330 sc.closeStream(wr.stream, errHandlerPanicked)
1331 }
1332 }
1333
1334
1335 wr.replyToWriter(res.err)
1336
1337 sc.scheduleFrameWrite()
1338 }
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350 func (sc *serverConn) scheduleFrameWrite() {
1351 sc.serveG.check()
1352 if sc.writingFrame || sc.inFrameScheduleLoop {
1353 return
1354 }
1355 sc.inFrameScheduleLoop = true
1356 for !sc.writingFrameAsync {
1357 if sc.needToSendGoAway {
1358 sc.needToSendGoAway = false
1359 sc.startFrameWrite(FrameWriteRequest{
1360 write: &writeGoAway{
1361 maxStreamID: sc.maxClientStreamID,
1362 code: sc.goAwayCode,
1363 },
1364 })
1365 continue
1366 }
1367 if sc.needToSendSettingsAck {
1368 sc.needToSendSettingsAck = false
1369 sc.startFrameWrite(FrameWriteRequest{write: writeSettingsAck{}})
1370 continue
1371 }
1372 if !sc.inGoAway || sc.goAwayCode == ErrCodeNo {
1373 if wr, ok := sc.writeSched.Pop(); ok {
1374 if wr.isControl() {
1375 sc.queuedControlFrames--
1376 }
1377 sc.startFrameWrite(wr)
1378 continue
1379 }
1380 }
1381 if sc.needsFrameFlush {
1382 sc.startFrameWrite(FrameWriteRequest{write: flushFrameWriter{}})
1383 sc.needsFrameFlush = false
1384 continue
1385 }
1386 break
1387 }
1388 sc.inFrameScheduleLoop = false
1389 }
1390
1391
1392
1393
1394
1395
1396
1397
1398 func (sc *serverConn) startGracefulShutdown() {
1399 sc.serveG.checkNotOn()
1400 sc.shutdownOnce.Do(func() { sc.sendServeMsg(gracefulShutdownMsg) })
1401 }
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419 var goAwayTimeout = 1 * time.Second
1420
1421 func (sc *serverConn) startGracefulShutdownInternal() {
1422 sc.goAway(ErrCodeNo)
1423 }
1424
1425 func (sc *serverConn) goAway(code ErrCode) {
1426 sc.serveG.check()
1427 if sc.inGoAway {
1428 if sc.goAwayCode == ErrCodeNo {
1429 sc.goAwayCode = code
1430 }
1431 return
1432 }
1433 sc.inGoAway = true
1434 sc.needToSendGoAway = true
1435 sc.goAwayCode = code
1436 sc.scheduleFrameWrite()
1437 }
1438
1439 func (sc *serverConn) shutDownIn(d time.Duration) {
1440 sc.serveG.check()
1441 sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer)
1442 }
1443
1444 func (sc *serverConn) resetStream(se StreamError) {
1445 sc.serveG.check()
1446 sc.writeFrame(FrameWriteRequest{write: se})
1447 if st, ok := sc.streams[se.StreamID]; ok {
1448 st.resetQueued = true
1449 }
1450 }
1451
1452
1453
1454
1455 func (sc *serverConn) processFrameFromReader(res readFrameResult) bool {
1456 sc.serveG.check()
1457 err := res.err
1458 if err != nil {
1459 if err == ErrFrameTooLarge {
1460 sc.goAway(ErrCodeFrameSize)
1461 return true
1462 }
1463 clientGone := err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err)
1464 if clientGone {
1465
1466
1467
1468
1469
1470
1471
1472
1473 return false
1474 }
1475 } else {
1476 f := res.f
1477 if VerboseLogs {
1478 sc.vlogf("http2: server read frame %v", summarizeFrame(f))
1479 }
1480 err = sc.processFrame(f)
1481 if err == nil {
1482 return true
1483 }
1484 }
1485
1486 switch ev := err.(type) {
1487 case StreamError:
1488 sc.resetStream(ev)
1489 return true
1490 case goAwayFlowError:
1491 sc.goAway(ErrCodeFlowControl)
1492 return true
1493 case ConnectionError:
1494 if res.f != nil {
1495 if id := res.f.Header().StreamID; id > sc.maxClientStreamID {
1496 sc.maxClientStreamID = id
1497 }
1498 }
1499 sc.logf("http2: server connection error from %v: %v", sc.conn.RemoteAddr(), ev)
1500 sc.goAway(ErrCode(ev))
1501 return true
1502 default:
1503 if res.err != nil {
1504 sc.vlogf("http2: server closing client connection; error reading frame from client %s: %v", sc.conn.RemoteAddr(), err)
1505 } else {
1506 sc.logf("http2: server closing client connection: %v", err)
1507 }
1508 return false
1509 }
1510 }
1511
1512 func (sc *serverConn) processFrame(f Frame) error {
1513 sc.serveG.check()
1514
1515
1516 if !sc.sawFirstSettings {
1517 if _, ok := f.(*SettingsFrame); !ok {
1518 return sc.countError("first_settings", ConnectionError(ErrCodeProtocol))
1519 }
1520 sc.sawFirstSettings = true
1521 }
1522
1523
1524
1525
1526
1527 if sc.inGoAway && (sc.goAwayCode != ErrCodeNo || f.Header().StreamID > sc.maxClientStreamID) {
1528
1529 if f, ok := f.(*DataFrame); ok {
1530 if !sc.inflow.take(f.Length) {
1531 return sc.countError("data_flow", streamError(f.Header().StreamID, ErrCodeFlowControl))
1532 }
1533 sc.sendWindowUpdate(nil, int(f.Length))
1534 }
1535 return nil
1536 }
1537
1538 switch f := f.(type) {
1539 case *SettingsFrame:
1540 return sc.processSettings(f)
1541 case *MetaHeadersFrame:
1542 return sc.processHeaders(f)
1543 case *WindowUpdateFrame:
1544 return sc.processWindowUpdate(f)
1545 case *PingFrame:
1546 return sc.processPing(f)
1547 case *DataFrame:
1548 return sc.processData(f)
1549 case *RSTStreamFrame:
1550 return sc.processResetStream(f)
1551 case *PriorityFrame:
1552 return sc.processPriority(f)
1553 case *GoAwayFrame:
1554 return sc.processGoAway(f)
1555 case *PushPromiseFrame:
1556
1557
1558 return sc.countError("push_promise", ConnectionError(ErrCodeProtocol))
1559 case *PriorityUpdateFrame:
1560 return sc.processPriorityUpdate(f)
1561 default:
1562 sc.vlogf("http2: server ignoring frame: %v", f.Header())
1563 return nil
1564 }
1565 }
1566
1567 func (sc *serverConn) processPing(f *PingFrame) error {
1568 sc.serveG.check()
1569 if f.IsAck() {
1570 if sc.pingSent && sc.sentPingData == f.Data {
1571
1572 sc.pingSent = false
1573 sc.readIdleTimer.Reset(sc.readIdleTimeout)
1574 }
1575
1576
1577 return nil
1578 }
1579 if f.StreamID != 0 {
1580
1581
1582
1583
1584
1585 return sc.countError("ping_on_stream", ConnectionError(ErrCodeProtocol))
1586 }
1587 sc.writeFrame(FrameWriteRequest{write: writePingAck{f}})
1588 return nil
1589 }
1590
1591 func (sc *serverConn) processWindowUpdate(f *WindowUpdateFrame) error {
1592 sc.serveG.check()
1593 switch {
1594 case f.StreamID != 0:
1595 state, st := sc.state(f.StreamID)
1596 if state == stateIdle {
1597
1598
1599
1600
1601 return sc.countError("stream_idle", ConnectionError(ErrCodeProtocol))
1602 }
1603 if st == nil {
1604
1605
1606
1607
1608
1609 return nil
1610 }
1611 if !st.flow.add(int32(f.Increment)) {
1612 return sc.countError("bad_flow", streamError(f.StreamID, ErrCodeFlowControl))
1613 }
1614 default:
1615 if !sc.flow.add(int32(f.Increment)) {
1616 return goAwayFlowError{}
1617 }
1618 }
1619 sc.scheduleFrameWrite()
1620 return nil
1621 }
1622
1623 func (sc *serverConn) processResetStream(f *RSTStreamFrame) error {
1624 sc.serveG.check()
1625
1626 state, st := sc.state(f.StreamID)
1627 if state == stateIdle {
1628
1629
1630
1631
1632
1633 return sc.countError("reset_idle_stream", ConnectionError(ErrCodeProtocol))
1634 }
1635 if st != nil {
1636 st.cancelCtx()
1637 sc.closeStream(st, streamError(f.StreamID, f.ErrCode))
1638 }
1639 return nil
1640 }
1641
1642 func (sc *serverConn) closeStream(st *stream, err error) {
1643 sc.serveG.check()
1644 if st.state == stateIdle || st.state == stateClosed {
1645 panic(fmt.Sprintf("invariant; can't close stream in state %v", st.state))
1646 }
1647 st.state = stateClosed
1648 if st.readDeadline != nil {
1649 st.readDeadline.Stop()
1650 }
1651 if st.writeDeadline != nil {
1652 st.writeDeadline.Stop()
1653 }
1654 if st.isPushed() {
1655 sc.curPushedStreams--
1656 } else {
1657 sc.curClientStreams--
1658 }
1659 delete(sc.streams, st.id)
1660 if len(sc.streams) == 0 {
1661 sc.setConnState(ConnStateIdle)
1662 if sc.srv.IdleTimeout > 0 && sc.idleTimer != nil {
1663 sc.idleTimer.Reset(sc.srv.IdleTimeout)
1664 }
1665 if h1ServerKeepAlivesDisabled(sc.hs) {
1666 sc.startGracefulShutdownInternal()
1667 }
1668 }
1669 if p := st.body; p != nil {
1670
1671
1672 sc.sendWindowUpdate(nil, p.Len())
1673
1674 p.CloseWithError(err)
1675 }
1676 if e, ok := err.(StreamError); ok {
1677 if e.Cause != nil {
1678 err = e.Cause
1679 } else {
1680 err = errStreamClosed
1681 }
1682 }
1683 st.closeErr = err
1684 st.cancelCtx()
1685 st.cw.Close()
1686 sc.writeSched.CloseStream(st.id)
1687 }
1688
1689 func (sc *serverConn) processSettings(f *SettingsFrame) error {
1690 sc.serveG.check()
1691 if f.IsAck() {
1692 sc.unackedSettings--
1693 if sc.unackedSettings < 0 {
1694
1695
1696
1697 return sc.countError("ack_mystery", ConnectionError(ErrCodeProtocol))
1698 }
1699 return nil
1700 }
1701 if f.NumSettings() > 100 || f.HasDuplicates() {
1702
1703
1704
1705 return sc.countError("settings_big_or_dups", ConnectionError(ErrCodeProtocol))
1706 }
1707 if err := f.ForeachSetting(sc.processSetting); err != nil {
1708 return err
1709 }
1710
1711
1712 sc.needToSendSettingsAck = true
1713 sc.scheduleFrameWrite()
1714 return nil
1715 }
1716
1717 func (sc *serverConn) processSetting(s Setting) error {
1718 sc.serveG.check()
1719 if err := s.Valid(); err != nil {
1720 return err
1721 }
1722 if VerboseLogs {
1723 sc.vlogf("http2: server processing setting %v", s)
1724 }
1725 switch s.ID {
1726 case SettingHeaderTableSize:
1727 sc.hpackEncoder.SetMaxDynamicTableSize(s.Val)
1728 case SettingEnablePush:
1729 sc.pushEnabled = s.Val != 0
1730 case SettingMaxConcurrentStreams:
1731 sc.clientMaxStreams = s.Val
1732 case SettingInitialWindowSize:
1733 return sc.processSettingInitialWindowSize(s.Val)
1734 case SettingMaxFrameSize:
1735 sc.maxFrameSize = int32(s.Val)
1736 case SettingMaxHeaderListSize:
1737 sc.peerMaxHeaderListSize = s.Val
1738 case SettingEnableConnectProtocol:
1739
1740
1741 case SettingNoRFC7540Priorities:
1742 if s.Val > 1 {
1743 return ConnectionError(ErrCodeProtocol)
1744 }
1745 default:
1746
1747
1748
1749 if VerboseLogs {
1750 sc.vlogf("http2: server ignoring unknown setting %v", s)
1751 }
1752 }
1753 return nil
1754 }
1755
1756 func (sc *serverConn) processSettingInitialWindowSize(val uint32) error {
1757 sc.serveG.check()
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767 old := sc.initialStreamSendWindowSize
1768 sc.initialStreamSendWindowSize = int32(val)
1769 growth := int32(val) - old
1770 for _, st := range sc.streams {
1771 if !st.flow.add(growth) {
1772
1773
1774
1775
1776
1777
1778 return sc.countError("setting_win_size", ConnectionError(ErrCodeFlowControl))
1779 }
1780 }
1781 return nil
1782 }
1783
1784 func (sc *serverConn) processData(f *DataFrame) error {
1785 sc.serveG.check()
1786 id := f.Header().StreamID
1787
1788 data := f.Data()
1789 state, st := sc.state(id)
1790 if id == 0 || state == stateIdle {
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801 return sc.countError("data_on_idle", ConnectionError(ErrCodeProtocol))
1802 }
1803
1804
1805
1806
1807 if st == nil || state != stateOpen || st.gotTrailerHeader || st.resetQueued {
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817 if !sc.inflow.take(f.Length) {
1818 return sc.countError("data_flow", streamError(id, ErrCodeFlowControl))
1819 }
1820 sc.sendWindowUpdate(nil, int(f.Length))
1821
1822 if st != nil && st.resetQueued {
1823
1824 return nil
1825 }
1826 return sc.countError("closed", streamError(id, ErrCodeStreamClosed))
1827 }
1828 if st.body == nil {
1829 panic("internal error: should have a body in this state")
1830 }
1831
1832
1833 if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes {
1834 if !sc.inflow.take(f.Length) {
1835 return sc.countError("data_flow", streamError(id, ErrCodeFlowControl))
1836 }
1837 sc.sendWindowUpdate(nil, int(f.Length))
1838
1839 st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
1840
1841
1842
1843 return sc.countError("send_too_much", streamError(id, ErrCodeProtocol))
1844 }
1845 if f.Length > 0 {
1846
1847 if !takeInflows(&sc.inflow, &st.inflow, f.Length) {
1848 return sc.countError("flow_on_data_length", streamError(id, ErrCodeFlowControl))
1849 }
1850
1851 if len(data) > 0 {
1852 st.bodyBytes += int64(len(data))
1853 wrote, err := st.body.Write(data)
1854 if err != nil {
1855
1856
1857
1858 sc.sendWindowUpdate(nil, int(f.Length)-wrote)
1859 return nil
1860 }
1861 if wrote != len(data) {
1862 panic("internal error: bad Writer")
1863 }
1864 }
1865
1866
1867
1868
1869
1870
1871 pad := int32(f.Length) - int32(len(data))
1872 sc.sendWindowUpdate32(nil, pad)
1873 sc.sendWindowUpdate32(st, pad)
1874 }
1875 if f.StreamEnded() {
1876 st.endStream()
1877 }
1878 return nil
1879 }
1880
1881 func (sc *serverConn) processGoAway(f *GoAwayFrame) error {
1882 sc.serveG.check()
1883 if f.ErrCode != ErrCodeNo {
1884 sc.logf("http2: received GOAWAY %+v, starting graceful shutdown", f)
1885 } else {
1886 sc.vlogf("http2: received GOAWAY %+v, starting graceful shutdown", f)
1887 }
1888 sc.startGracefulShutdownInternal()
1889
1890
1891 sc.pushEnabled = false
1892 return nil
1893 }
1894
1895
1896 func (st *stream) isPushed() bool {
1897 return st.id%2 == 0
1898 }
1899
1900
1901
1902 func (st *stream) endStream() {
1903 sc := st.sc
1904 sc.serveG.check()
1905
1906 if st.declBodyBytes != -1 && st.declBodyBytes != st.bodyBytes {
1907 st.body.CloseWithError(fmt.Errorf("request declared a Content-Length of %d but only wrote %d bytes",
1908 st.declBodyBytes, st.bodyBytes))
1909 } else {
1910 st.body.closeWithErrorAndCode(io.EOF, st.copyTrailersToHandlerRequest)
1911 st.body.CloseWithError(io.EOF)
1912 }
1913 st.state = stateHalfClosedRemote
1914 }
1915
1916
1917
1918 func (st *stream) copyTrailersToHandlerRequest() {
1919 for k, vv := range st.trailer {
1920 if _, ok := st.reqTrailer[k]; ok {
1921
1922 st.reqTrailer[k] = vv
1923 }
1924 }
1925 }
1926
1927
1928
1929 func (st *stream) onReadTimeout() {
1930 if st.body != nil {
1931
1932
1933 st.body.CloseWithError(fmt.Errorf("%w", os.ErrDeadlineExceeded))
1934 }
1935 }
1936
1937
1938
1939 func (st *stream) onWriteTimeout() {
1940 st.sc.writeFrameFromHandler(FrameWriteRequest{write: StreamError{
1941 StreamID: st.id,
1942 Code: ErrCodeInternal,
1943 Cause: os.ErrDeadlineExceeded,
1944 }})
1945 }
1946
1947 func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
1948 sc.serveG.check()
1949 id := f.StreamID
1950
1951
1952
1953
1954
1955 if id%2 != 1 {
1956 return sc.countError("headers_even", ConnectionError(ErrCodeProtocol))
1957 }
1958
1959
1960
1961
1962 if st := sc.streams[f.StreamID]; st != nil {
1963 if st.resetQueued {
1964
1965
1966 return nil
1967 }
1968
1969
1970
1971
1972 if st.state == stateHalfClosedRemote {
1973 return sc.countError("headers_half_closed", streamError(id, ErrCodeStreamClosed))
1974 }
1975 return st.processTrailerHeaders(f)
1976 }
1977
1978
1979
1980
1981
1982
1983 if id <= sc.maxClientStreamID {
1984 return sc.countError("stream_went_down", ConnectionError(ErrCodeProtocol))
1985 }
1986 sc.maxClientStreamID = id
1987
1988 if sc.idleTimer != nil {
1989 sc.idleTimer.Stop()
1990 }
1991
1992
1993
1994
1995
1996
1997
1998 if sc.curClientStreams+1 > sc.advMaxStreams {
1999 if sc.unackedSettings == 0 {
2000
2001 return sc.countError("over_max_streams", streamError(id, ErrCodeProtocol))
2002 }
2003
2004
2005
2006
2007
2008 return sc.countError("over_max_streams_race", streamError(id, ErrCodeRefusedStream))
2009 }
2010
2011 initialState := stateOpen
2012 if f.StreamEnded() {
2013 initialState = stateHalfClosedRemote
2014 }
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024 initialPriority := defaultRFC9218Priority(sc.priorityAware && !sc.hasIntermediary)
2025 if _, ok := sc.writeSched.(*priorityWriteSchedulerRFC9218); ok && !sc.hasIntermediary {
2026 headerPriority, priorityAware, hasIntermediary := f.rfc9218Priority(sc.priorityAware)
2027 initialPriority = headerPriority
2028 sc.hasIntermediary = hasIntermediary
2029 if priorityAware {
2030 sc.priorityAware = true
2031 }
2032 }
2033 st := sc.newStream(id, 0, initialState, initialPriority)
2034
2035 if f.HasPriority() {
2036 if err := sc.checkPriority(f.StreamID, f.Priority); err != nil {
2037 return err
2038 }
2039 if !sc.writeSchedIgnoresRFC7540() {
2040 sc.writeSched.AdjustStream(st.id, f.Priority)
2041 }
2042 }
2043
2044 rw, req, err := sc.newWriterAndRequest(st, f)
2045 if err != nil {
2046 return err
2047 }
2048 st.reqTrailer = req.Trailer
2049 if st.reqTrailer != nil {
2050 st.trailer = make(Header)
2051 }
2052 st.body = req.Body.(*requestBody).pipe
2053 st.declBodyBytes = req.ContentLength
2054
2055 handler := sc.handler.ServeHTTP
2056 if f.Truncated {
2057
2058 handler = handleHeaderListTooLong
2059 } else if err := checkValidHTTP2RequestHeaders(req.Header); err != nil {
2060 handler = serve400Handler{err}.ServeHTTP
2061 }
2062
2063
2064
2065
2066
2067
2068
2069
2070 if sc.hs.ReadTimeout() > 0 {
2071 sc.conn.SetReadDeadline(time.Time{})
2072 st.readDeadline = time.AfterFunc(sc.hs.ReadTimeout(), st.onReadTimeout)
2073 }
2074
2075 return sc.scheduleHandler(id, rw, req, handler)
2076 }
2077
2078 func (st *stream) processTrailerHeaders(f *MetaHeadersFrame) error {
2079 sc := st.sc
2080 sc.serveG.check()
2081 if st.gotTrailerHeader {
2082 return sc.countError("dup_trailers", ConnectionError(ErrCodeProtocol))
2083 }
2084 st.gotTrailerHeader = true
2085 if !f.StreamEnded() {
2086 return sc.countError("trailers_not_ended", streamError(st.id, ErrCodeProtocol))
2087 }
2088
2089 if len(f.PseudoFields()) > 0 {
2090 return sc.countError("trailers_pseudo", streamError(st.id, ErrCodeProtocol))
2091 }
2092 if st.trailer != nil {
2093 for _, hf := range f.RegularFields() {
2094 key := sc.canonicalHeader(hf.Name)
2095 if !httpguts.ValidTrailerHeader(key) {
2096
2097
2098
2099 return sc.countError("trailers_bogus", streamError(st.id, ErrCodeProtocol))
2100 }
2101 st.trailer[key] = append(st.trailer[key], hf.Value)
2102 }
2103 }
2104 st.endStream()
2105 return nil
2106 }
2107
2108 func (sc *serverConn) checkPriority(streamID uint32, p PriorityParam) error {
2109 if streamID == p.StreamDep {
2110
2111
2112
2113
2114 return sc.countError("priority", streamError(streamID, ErrCodeProtocol))
2115 }
2116 return nil
2117 }
2118
2119 func (sc *serverConn) processPriority(f *PriorityFrame) error {
2120 if err := sc.checkPriority(f.StreamID, f.PriorityParam); err != nil {
2121 return err
2122 }
2123
2124
2125
2126
2127
2128 if sc.writeSchedIgnoresRFC7540() {
2129 return nil
2130 }
2131 sc.writeSched.AdjustStream(f.StreamID, f.PriorityParam)
2132 return nil
2133 }
2134
2135 func (sc *serverConn) processPriorityUpdate(f *PriorityUpdateFrame) error {
2136 sc.priorityAware = true
2137 if _, ok := sc.writeSched.(*priorityWriteSchedulerRFC9218); !ok {
2138 return nil
2139 }
2140 p, ok := parseRFC9218Priority(f.Priority, sc.priorityAware)
2141 if !ok {
2142 return sc.countError("unparsable_priority_update", streamError(f.PrioritizedStreamID, ErrCodeProtocol))
2143 }
2144 sc.writeSched.AdjustStream(f.PrioritizedStreamID, p)
2145 return nil
2146 }
2147
2148 func (sc *serverConn) newStream(id, pusherID uint32, state streamState, priority PriorityParam) *stream {
2149 sc.serveG.check()
2150 if id == 0 {
2151 panic("internal error: cannot create stream with id 0")
2152 }
2153
2154 ctx, cancelCtx := context.WithCancel(sc.baseCtx)
2155 st := &stream{
2156 sc: sc,
2157 id: id,
2158 state: state,
2159 ctx: ctx,
2160 cancelCtx: cancelCtx,
2161 }
2162 st.cw.Init()
2163 st.flow.conn = &sc.flow
2164 st.flow.add(sc.initialStreamSendWindowSize)
2165 st.inflow.init(sc.initialStreamRecvWindowSize)
2166 if writeTimeout := sc.hs.WriteTimeout(); writeTimeout > 0 {
2167 st.writeDeadline = time.AfterFunc(writeTimeout, st.onWriteTimeout)
2168 }
2169
2170 sc.streams[id] = st
2171 sc.writeSched.OpenStream(st.id, OpenStreamOptions{PusherID: pusherID, priority: priority})
2172 if st.isPushed() {
2173 sc.curPushedStreams++
2174 } else {
2175 sc.curClientStreams++
2176 }
2177 if sc.curOpenStreams() == 1 {
2178 sc.setConnState(ConnStateActive)
2179 }
2180
2181 return st
2182 }
2183
2184 func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*responseWriter, *ServerRequest, error) {
2185 sc.serveG.check()
2186
2187 rp := httpcommon.ServerRequestParam{
2188 Method: f.PseudoValue("method"),
2189 Scheme: f.PseudoValue("scheme"),
2190 Authority: f.PseudoValue("authority"),
2191 Path: f.PseudoValue("path"),
2192 Protocol: f.PseudoValue("protocol"),
2193 }
2194
2195
2196 if disableExtendedConnectProtocol && rp.Protocol != "" {
2197 return nil, nil, sc.countError("bad_connect", streamError(f.StreamID, ErrCodeProtocol))
2198 }
2199
2200 isConnect := rp.Method == "CONNECT"
2201 if isConnect {
2202 if rp.Protocol == "" && (rp.Path != "" || rp.Scheme != "" || rp.Authority == "") {
2203 return nil, nil, sc.countError("bad_connect", streamError(f.StreamID, ErrCodeProtocol))
2204 }
2205 } else if rp.Method == "" || rp.Path == "" || (rp.Scheme != "https" && rp.Scheme != "http") {
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216 return nil, nil, sc.countError("bad_path_method", streamError(f.StreamID, ErrCodeProtocol))
2217 }
2218
2219 header := make(Header)
2220 rp.Header = header
2221 for _, hf := range f.RegularFields() {
2222 header.Add(sc.canonicalHeader(hf.Name), hf.Value)
2223 }
2224 if rp.Authority == "" {
2225 rp.Authority = header.Get("Host")
2226 }
2227 if rp.Protocol != "" {
2228 header.Set(":protocol", rp.Protocol)
2229 }
2230
2231 rw, req, err := sc.newWriterAndRequestNoBody(st, rp)
2232 if err != nil {
2233 return nil, nil, err
2234 }
2235 bodyOpen := !f.StreamEnded()
2236 if bodyOpen {
2237 if vv, ok := rp.Header["Content-Length"]; ok {
2238 if cl, err := strconv.ParseUint(vv[0], 10, 63); err == nil {
2239 req.ContentLength = int64(cl)
2240 } else {
2241 req.ContentLength = 0
2242 }
2243 } else {
2244 req.ContentLength = -1
2245 }
2246 req.Body.(*requestBody).pipe = &pipe{
2247 b: &dataBuffer{expected: req.ContentLength},
2248 }
2249 }
2250 return rw, req, nil
2251 }
2252
2253 func (sc *serverConn) newWriterAndRequestNoBody(st *stream, rp httpcommon.ServerRequestParam) (*responseWriter, *ServerRequest, error) {
2254 sc.serveG.check()
2255
2256 var tlsState *tls.ConnectionState
2257 if rp.Scheme == "https" {
2258 tlsState = sc.tlsState
2259 }
2260
2261 res := httpcommon.NewServerRequest(rp)
2262 if res.InvalidReason != "" {
2263 return nil, nil, sc.countError(res.InvalidReason, streamError(st.id, ErrCodeProtocol))
2264 }
2265
2266 body := &requestBody{
2267 conn: sc,
2268 stream: st,
2269 needsContinue: res.NeedsContinue,
2270 }
2271 rw := sc.newResponseWriter(st)
2272 rw.rws.req = ServerRequest{
2273 Context: st.ctx,
2274 Method: rp.Method,
2275 URL: res.URL,
2276 RemoteAddr: sc.remoteAddrStr,
2277 Header: rp.Header,
2278 RequestURI: res.RequestURI,
2279 Proto: "HTTP/2.0",
2280 ProtoMajor: 2,
2281 ProtoMinor: 0,
2282 TLS: tlsState,
2283 Host: rp.Authority,
2284 Body: body,
2285 Trailer: res.Trailer,
2286 }
2287 return rw, &rw.rws.req, nil
2288 }
2289
2290 func (sc *serverConn) newResponseWriter(st *stream) *responseWriter {
2291 rws := responseWriterStatePool.Get().(*responseWriterState)
2292 bwSave := rws.bw
2293 *rws = responseWriterState{}
2294 rws.conn = sc
2295 rws.bw = bwSave
2296 rws.bw.Reset(chunkWriter{rws})
2297 rws.stream = st
2298 return &responseWriter{rws: rws}
2299 }
2300
2301 type unstartedHandler struct {
2302 streamID uint32
2303 rw *responseWriter
2304 req *ServerRequest
2305 handler func(*ResponseWriter, *ServerRequest)
2306 }
2307
2308
2309
2310 func (sc *serverConn) scheduleHandler(streamID uint32, rw *responseWriter, req *ServerRequest, handler func(*ResponseWriter, *ServerRequest)) error {
2311 sc.serveG.check()
2312 maxHandlers := sc.advMaxStreams
2313 if sc.curHandlers < maxHandlers {
2314 sc.curHandlers++
2315 go sc.runHandler(rw, req, handler)
2316 return nil
2317 }
2318 if len(sc.unstartedHandlers) > int(4*sc.advMaxStreams) {
2319 return sc.countError("too_many_early_resets", ConnectionError(ErrCodeEnhanceYourCalm))
2320 }
2321 sc.unstartedHandlers = append(sc.unstartedHandlers, unstartedHandler{
2322 streamID: streamID,
2323 rw: rw,
2324 req: req,
2325 handler: handler,
2326 })
2327 return nil
2328 }
2329
2330 func (sc *serverConn) handlerDone() {
2331 sc.serveG.check()
2332 sc.curHandlers--
2333 i := 0
2334 maxHandlers := sc.advMaxStreams
2335 for ; i < len(sc.unstartedHandlers); i++ {
2336 u := sc.unstartedHandlers[i]
2337 if sc.streams[u.streamID] == nil {
2338
2339 continue
2340 }
2341 if sc.curHandlers >= maxHandlers {
2342 break
2343 }
2344 sc.curHandlers++
2345 go sc.runHandler(u.rw, u.req, u.handler)
2346 sc.unstartedHandlers[i] = unstartedHandler{}
2347 }
2348 sc.unstartedHandlers = sc.unstartedHandlers[i:]
2349 if len(sc.unstartedHandlers) == 0 {
2350 sc.unstartedHandlers = nil
2351 }
2352 }
2353
2354
2355 func (sc *serverConn) runHandler(rw *responseWriter, req *ServerRequest, handler func(*ResponseWriter, *ServerRequest)) {
2356 defer sc.sendServeMsg(handlerDoneMsg)
2357 didPanic := true
2358 defer func() {
2359 rw.rws.stream.cancelCtx()
2360 if req.MultipartForm != nil {
2361 req.MultipartForm.RemoveAll()
2362 }
2363 if didPanic {
2364 e := recover()
2365 sc.writeFrameFromHandler(FrameWriteRequest{
2366 write: handlerPanicRST{rw.rws.stream.id},
2367 stream: rw.rws.stream,
2368 })
2369
2370 if e != nil && e != ErrAbortHandler {
2371 const size = 64 << 10
2372 buf := make([]byte, size)
2373 buf = buf[:runtime.Stack(buf, false)]
2374 sc.logf("http2: panic serving %v: %v\n%s", sc.conn.RemoteAddr(), e, buf)
2375 }
2376 return
2377 }
2378 rw.handlerDone()
2379 }()
2380 handler(rw, req)
2381 didPanic = false
2382 }
2383
2384 func handleHeaderListTooLong(w *ResponseWriter, r *ServerRequest) {
2385
2386
2387
2388
2389 const statusRequestHeaderFieldsTooLarge = 431
2390 w.WriteHeader(statusRequestHeaderFieldsTooLarge)
2391 io.WriteString(w, "<h1>HTTP Error 431</h1><p>Request Header Field(s) Too Large</p>")
2392 }
2393
2394
2395
2396 func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) error {
2397 sc.serveG.checkNotOn()
2398 var errc chan error
2399 if headerData.h != nil {
2400
2401
2402
2403
2404 errc = sc.srv.state.getErrChan()
2405 }
2406 if err := sc.writeFrameFromHandler(FrameWriteRequest{
2407 write: headerData,
2408 stream: st,
2409 done: errc,
2410 }); err != nil {
2411 return err
2412 }
2413 if errc != nil {
2414 select {
2415 case err := <-errc:
2416 sc.srv.state.putErrChan(errc)
2417 return err
2418 case <-sc.doneServing:
2419 return errClientDisconnected
2420 case <-st.cw:
2421 return errStreamClosed
2422 }
2423 }
2424 return nil
2425 }
2426
2427
2428 func (sc *serverConn) write100ContinueHeaders(st *stream) {
2429 sc.writeFrameFromHandler(FrameWriteRequest{
2430 write: write100ContinueHeadersFrame{st.id},
2431 stream: st,
2432 })
2433 }
2434
2435
2436
2437 type bodyReadMsg struct {
2438 st *stream
2439 n int
2440 }
2441
2442
2443
2444
2445 func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int, err error) {
2446 sc.serveG.checkNotOn()
2447 if n > 0 {
2448 select {
2449 case sc.bodyReadCh <- bodyReadMsg{st, n}:
2450 case <-sc.doneServing:
2451 }
2452 }
2453 }
2454
2455 func (sc *serverConn) noteBodyRead(st *stream, n int) {
2456 sc.serveG.check()
2457 sc.sendWindowUpdate(nil, n)
2458 if st.state != stateHalfClosedRemote && st.state != stateClosed {
2459
2460
2461 sc.sendWindowUpdate(st, n)
2462 }
2463 }
2464
2465
2466 func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) {
2467 sc.sendWindowUpdate(st, int(n))
2468 }
2469
2470
2471 func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
2472 sc.serveG.check()
2473 var streamID uint32
2474 var send int32
2475 if st == nil {
2476 send = sc.inflow.add(n)
2477 } else {
2478 streamID = st.id
2479 send = st.inflow.add(n)
2480 }
2481 if send == 0 {
2482 return
2483 }
2484 sc.writeFrame(FrameWriteRequest{
2485 write: writeWindowUpdate{streamID: streamID, n: uint32(send)},
2486 stream: st,
2487 })
2488 }
2489
2490
2491
2492 type requestBody struct {
2493 _ incomparable
2494 stream *stream
2495 conn *serverConn
2496 closeOnce sync.Once
2497 sawEOF bool
2498 pipe *pipe
2499 needsContinue bool
2500 }
2501
2502 func (b *requestBody) Close() error {
2503 b.closeOnce.Do(func() {
2504 if b.pipe != nil {
2505 b.pipe.BreakWithError(errClosedBody)
2506 }
2507 })
2508 return nil
2509 }
2510
2511 func (b *requestBody) Read(p []byte) (n int, err error) {
2512 if b.needsContinue {
2513 b.needsContinue = false
2514 b.conn.write100ContinueHeaders(b.stream)
2515 }
2516 if b.pipe == nil || b.sawEOF {
2517 return 0, io.EOF
2518 }
2519 n, err = b.pipe.Read(p)
2520 if err == io.EOF {
2521 b.sawEOF = true
2522 }
2523 if b.conn == nil {
2524 return
2525 }
2526 b.conn.noteBodyReadFromHandler(b.stream, n, err)
2527 return
2528 }
2529
2530
2531
2532
2533
2534
2535
2536 type responseWriter struct {
2537 rws *responseWriterState
2538 }
2539
2540 type responseWriterState struct {
2541
2542 stream *stream
2543 req ServerRequest
2544 conn *serverConn
2545
2546
2547 bw *bufio.Writer
2548
2549
2550 handlerHeader Header
2551 snapHeader Header
2552 trailers []string
2553 status int
2554 wroteHeader bool
2555 sentHeader bool
2556 handlerDone bool
2557
2558 sentContentLen int64
2559 wroteBytes int64
2560
2561 closeNotifierMu sync.Mutex
2562 closeNotifierCh chan bool
2563 }
2564
2565 type chunkWriter struct{ rws *responseWriterState }
2566
2567 func (cw chunkWriter) Write(p []byte) (n int, err error) {
2568 n, err = cw.rws.writeChunk(p)
2569 if err == errStreamClosed {
2570
2571
2572 err = cw.rws.stream.closeErr
2573 }
2574 return n, err
2575 }
2576
2577 func (rws *responseWriterState) hasTrailers() bool { return len(rws.trailers) > 0 }
2578
2579 func (rws *responseWriterState) hasNonemptyTrailers() bool {
2580 for _, trailer := range rws.trailers {
2581 if _, ok := rws.handlerHeader[trailer]; ok {
2582 return true
2583 }
2584 }
2585 return false
2586 }
2587
2588
2589
2590
2591 func (rws *responseWriterState) declareTrailer(k string) {
2592 k = textproto.CanonicalMIMEHeaderKey(k)
2593 if !httpguts.ValidTrailerHeader(k) {
2594
2595 rws.conn.logf("ignoring invalid trailer %q", k)
2596 return
2597 }
2598 if !slices.Contains(rws.trailers, k) {
2599 rws.trailers = append(rws.trailers, k)
2600 }
2601 }
2602
2603 const TimeFormat = "Mon, 02 Jan 2006 15:04:05 GMT"
2604
2605
2606
2607
2608
2609
2610
2611 func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
2612 if !rws.wroteHeader {
2613 rws.writeHeader(200)
2614 }
2615
2616 if rws.handlerDone {
2617 rws.promoteUndeclaredTrailers()
2618 }
2619
2620 isHeadResp := rws.req.Method == "HEAD"
2621 if !rws.sentHeader {
2622 rws.sentHeader = true
2623 var ctype, clen string
2624 if clen = rws.snapHeader.Get("Content-Length"); clen != "" {
2625 rws.snapHeader.Del("Content-Length")
2626 if cl, err := strconv.ParseUint(clen, 10, 63); err == nil {
2627 rws.sentContentLen = int64(cl)
2628 } else {
2629 clen = ""
2630 }
2631 }
2632 _, hasContentLength := rws.snapHeader["Content-Length"]
2633 if !hasContentLength && clen == "" && rws.handlerDone && bodyAllowedForStatus(rws.status) && (len(p) > 0 || !isHeadResp) {
2634 clen = strconv.Itoa(len(p))
2635 }
2636 _, hasContentType := rws.snapHeader["Content-Type"]
2637
2638
2639 ce := rws.snapHeader.Get("Content-Encoding")
2640 hasCE := len(ce) > 0
2641 if !hasCE && !hasContentType && bodyAllowedForStatus(rws.status) && len(p) > 0 {
2642 ctype = internal.DetectContentType(p)
2643 }
2644 var date string
2645 if _, ok := rws.snapHeader["Date"]; !ok {
2646
2647 date = time.Now().UTC().Format(TimeFormat)
2648 }
2649
2650 for _, v := range rws.snapHeader["Trailer"] {
2651 foreachHeaderElement(v, rws.declareTrailer)
2652 }
2653
2654
2655
2656
2657
2658
2659 if _, ok := rws.snapHeader["Connection"]; ok {
2660 v := rws.snapHeader.Get("Connection")
2661 delete(rws.snapHeader, "Connection")
2662 if v == "close" {
2663 rws.conn.startGracefulShutdown()
2664 }
2665 }
2666
2667 endStream := (rws.handlerDone && !rws.hasTrailers() && len(p) == 0) || isHeadResp
2668 err = rws.conn.writeHeaders(rws.stream, &writeResHeaders{
2669 streamID: rws.stream.id,
2670 httpResCode: rws.status,
2671 h: rws.snapHeader,
2672 endStream: endStream,
2673 contentType: ctype,
2674 contentLength: clen,
2675 date: date,
2676 })
2677 if err != nil {
2678 return 0, err
2679 }
2680 if endStream {
2681 return 0, nil
2682 }
2683 }
2684 if isHeadResp {
2685 return len(p), nil
2686 }
2687 if len(p) == 0 && !rws.handlerDone {
2688 return 0, nil
2689 }
2690
2691
2692
2693 hasNonemptyTrailers := rws.hasNonemptyTrailers()
2694 endStream := rws.handlerDone && !hasNonemptyTrailers
2695 if len(p) > 0 || endStream {
2696
2697 if err := rws.conn.writeDataFromHandler(rws.stream, p, endStream); err != nil {
2698 return 0, err
2699 }
2700 }
2701
2702 if rws.handlerDone && hasNonemptyTrailers {
2703 err = rws.conn.writeHeaders(rws.stream, &writeResHeaders{
2704 streamID: rws.stream.id,
2705 h: rws.handlerHeader,
2706 trailers: rws.trailers,
2707 endStream: true,
2708 })
2709 return len(p), err
2710 }
2711 return len(p), nil
2712 }
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
2723
2724
2725
2726
2727 const TrailerPrefix = "Trailer:"
2728
2729
2730
2731
2732
2733
2734
2735
2736
2737
2738
2739
2740
2741
2742
2743
2744
2745
2746
2747
2748
2749
2750 func (rws *responseWriterState) promoteUndeclaredTrailers() {
2751 for k, vv := range rws.handlerHeader {
2752 if !strings.HasPrefix(k, TrailerPrefix) {
2753 continue
2754 }
2755 trailerKey := strings.TrimPrefix(k, TrailerPrefix)
2756 rws.declareTrailer(trailerKey)
2757 rws.handlerHeader[textproto.CanonicalMIMEHeaderKey(trailerKey)] = vv
2758 }
2759
2760 if len(rws.trailers) > 1 {
2761 slices.Sort(rws.trailers)
2762 }
2763 }
2764
2765 func (w *responseWriter) SetReadDeadline(deadline time.Time) error {
2766 st := w.rws.stream
2767 if !deadline.IsZero() && deadline.Before(time.Now()) {
2768
2769
2770 st.onReadTimeout()
2771 return nil
2772 }
2773 w.rws.conn.sendServeMsg(func(sc *serverConn) {
2774 if st.readDeadline != nil {
2775 if !st.readDeadline.Stop() {
2776
2777 return
2778 }
2779 }
2780 if deadline.IsZero() {
2781 st.readDeadline = nil
2782 } else if st.readDeadline == nil {
2783 st.readDeadline = time.AfterFunc(deadline.Sub(time.Now()), st.onReadTimeout)
2784 } else {
2785 st.readDeadline.Reset(deadline.Sub(time.Now()))
2786 }
2787 })
2788 return nil
2789 }
2790
2791 func (w *responseWriter) SetWriteDeadline(deadline time.Time) error {
2792 st := w.rws.stream
2793 if !deadline.IsZero() && deadline.Before(time.Now()) {
2794
2795
2796 st.onWriteTimeout()
2797 return nil
2798 }
2799 w.rws.conn.sendServeMsg(func(sc *serverConn) {
2800 if st.writeDeadline != nil {
2801 if !st.writeDeadline.Stop() {
2802
2803 return
2804 }
2805 }
2806 if deadline.IsZero() {
2807 st.writeDeadline = nil
2808 } else if st.writeDeadline == nil {
2809 st.writeDeadline = time.AfterFunc(deadline.Sub(time.Now()), st.onWriteTimeout)
2810 } else {
2811 st.writeDeadline.Reset(deadline.Sub(time.Now()))
2812 }
2813 })
2814 return nil
2815 }
2816
2817 func (w *responseWriter) EnableFullDuplex() error {
2818
2819 return nil
2820 }
2821
2822 func (w *responseWriter) Flush() {
2823 w.FlushError()
2824 }
2825
2826 func (w *responseWriter) FlushError() error {
2827 rws := w.rws
2828 if rws == nil {
2829 panic("Header called after Handler finished")
2830 }
2831 var err error
2832 if rws.bw.Buffered() > 0 {
2833 err = rws.bw.Flush()
2834 } else {
2835
2836
2837
2838
2839 _, err = chunkWriter{rws}.Write(nil)
2840 if err == nil {
2841 select {
2842 case <-rws.stream.cw:
2843 err = rws.stream.closeErr
2844 default:
2845 }
2846 }
2847 }
2848 return err
2849 }
2850
2851 func (w *responseWriter) CloseNotify() <-chan bool {
2852 rws := w.rws
2853 if rws == nil {
2854 panic("CloseNotify called after Handler finished")
2855 }
2856 rws.closeNotifierMu.Lock()
2857 ch := rws.closeNotifierCh
2858 if ch == nil {
2859 ch = make(chan bool, 1)
2860 rws.closeNotifierCh = ch
2861 cw := rws.stream.cw
2862 go func() {
2863 cw.Wait()
2864 ch <- true
2865 }()
2866 }
2867 rws.closeNotifierMu.Unlock()
2868 return ch
2869 }
2870
2871 func (w *responseWriter) Header() Header {
2872 rws := w.rws
2873 if rws == nil {
2874 panic("Header called after Handler finished")
2875 }
2876 if rws.handlerHeader == nil {
2877 rws.handlerHeader = make(Header)
2878 }
2879 return rws.handlerHeader
2880 }
2881
2882
2883 func checkWriteHeaderCode(code int) {
2884
2885
2886
2887
2888
2889
2890
2891
2892
2893
2894 if code < 100 || code > 999 {
2895 panic(fmt.Sprintf("invalid WriteHeader code %v", code))
2896 }
2897 }
2898
2899 func (w *responseWriter) WriteHeader(code int) {
2900 rws := w.rws
2901 if rws == nil {
2902 panic("WriteHeader called after Handler finished")
2903 }
2904 rws.writeHeader(code)
2905 }
2906
2907 func (rws *responseWriterState) writeHeader(code int) {
2908 if rws.wroteHeader {
2909 return
2910 }
2911
2912 checkWriteHeaderCode(code)
2913
2914
2915 if code >= 100 && code <= 199 {
2916
2917 h := rws.handlerHeader
2918
2919 _, cl := h["Content-Length"]
2920 _, te := h["Transfer-Encoding"]
2921 if cl || te {
2922 h = cloneHeader(h)
2923 h.Del("Content-Length")
2924 h.Del("Transfer-Encoding")
2925 }
2926
2927 rws.conn.writeHeaders(rws.stream, &writeResHeaders{
2928 streamID: rws.stream.id,
2929 httpResCode: code,
2930 h: h,
2931 endStream: rws.handlerDone && !rws.hasTrailers(),
2932 })
2933
2934 return
2935 }
2936
2937 rws.wroteHeader = true
2938 rws.status = code
2939 if len(rws.handlerHeader) > 0 {
2940 rws.snapHeader = cloneHeader(rws.handlerHeader)
2941 }
2942 }
2943
2944 func cloneHeader(h Header) Header {
2945 h2 := make(Header, len(h))
2946 for k, vv := range h {
2947 vv2 := make([]string, len(vv))
2948 copy(vv2, vv)
2949 h2[k] = vv2
2950 }
2951 return h2
2952 }
2953
2954
2955
2956
2957
2958
2959
2960
2961
2962 func (w *responseWriter) Write(p []byte) (n int, err error) {
2963 return w.write(len(p), p, "")
2964 }
2965
2966 func (w *responseWriter) WriteString(s string) (n int, err error) {
2967 return w.write(len(s), nil, s)
2968 }
2969
2970
2971 func (w *responseWriter) write(lenData int, dataB []byte, dataS string) (n int, err error) {
2972 rws := w.rws
2973 if rws == nil {
2974 panic("Write called after Handler finished")
2975 }
2976 if !rws.wroteHeader {
2977 w.WriteHeader(200)
2978 }
2979 if !bodyAllowedForStatus(rws.status) {
2980 return 0, ErrBodyNotAllowed
2981 }
2982 rws.wroteBytes += int64(len(dataB)) + int64(len(dataS))
2983 if rws.sentContentLen != 0 && rws.wroteBytes > rws.sentContentLen {
2984
2985 return 0, errors.New("http2: handler wrote more than declared Content-Length")
2986 }
2987
2988 if dataB != nil {
2989 return rws.bw.Write(dataB)
2990 } else {
2991 return rws.bw.WriteString(dataS)
2992 }
2993 }
2994
2995 func (w *responseWriter) handlerDone() {
2996 rws := w.rws
2997 rws.handlerDone = true
2998 w.Flush()
2999 w.rws = nil
3000 responseWriterStatePool.Put(rws)
3001 }
3002
3003
3004 var (
3005 ErrRecursivePush = errors.New("http2: recursive push not allowed")
3006 ErrPushLimitReached = errors.New("http2: push would exceed peer's SETTINGS_MAX_CONCURRENT_STREAMS")
3007 )
3008
3009 func (w *responseWriter) Push(target, method string, header Header) error {
3010 st := w.rws.stream
3011 sc := st.sc
3012 sc.serveG.checkNotOn()
3013
3014
3015
3016 if st.isPushed() {
3017 return ErrRecursivePush
3018 }
3019
3020
3021 if method == "" {
3022 method = "GET"
3023 }
3024 if header == nil {
3025 header = Header{}
3026 }
3027 wantScheme := "http"
3028 if w.rws.req.TLS != nil {
3029 wantScheme = "https"
3030 }
3031
3032
3033 u, err := url.Parse(target)
3034 if err != nil {
3035 return err
3036 }
3037 if u.Scheme == "" {
3038 if !strings.HasPrefix(target, "/") {
3039 return fmt.Errorf("target must be an absolute URL or an absolute path: %q", target)
3040 }
3041 u.Scheme = wantScheme
3042 u.Host = w.rws.req.Host
3043 } else {
3044 if u.Scheme != wantScheme {
3045 return fmt.Errorf("cannot push URL with scheme %q from request with scheme %q", u.Scheme, wantScheme)
3046 }
3047 if u.Host == "" {
3048 return errors.New("URL must have a host")
3049 }
3050 }
3051 for k := range header {
3052 if strings.HasPrefix(k, ":") {
3053 return fmt.Errorf("promised request headers cannot include pseudo header %q", k)
3054 }
3055
3056
3057
3058
3059 if asciiEqualFold(k, "content-length") ||
3060 asciiEqualFold(k, "content-encoding") ||
3061 asciiEqualFold(k, "trailer") ||
3062 asciiEqualFold(k, "te") ||
3063 asciiEqualFold(k, "expect") ||
3064 asciiEqualFold(k, "host") {
3065 return fmt.Errorf("promised request headers cannot include %q", k)
3066 }
3067 }
3068 if err := checkValidHTTP2RequestHeaders(header); err != nil {
3069 return err
3070 }
3071
3072
3073
3074
3075 if method != "GET" && method != "HEAD" {
3076 return fmt.Errorf("method %q must be GET or HEAD", method)
3077 }
3078
3079 msg := &startPushRequest{
3080 parent: st,
3081 method: method,
3082 url: u,
3083 header: cloneHeader(header),
3084 done: sc.srv.state.getErrChan(),
3085 }
3086
3087 select {
3088 case <-sc.doneServing:
3089 return errClientDisconnected
3090 case <-st.cw:
3091 return errStreamClosed
3092 case sc.serveMsgCh <- msg:
3093 }
3094
3095 select {
3096 case <-sc.doneServing:
3097 return errClientDisconnected
3098 case <-st.cw:
3099 return errStreamClosed
3100 case err := <-msg.done:
3101 sc.srv.state.putErrChan(msg.done)
3102 return err
3103 }
3104 }
3105
3106 type startPushRequest struct {
3107 parent *stream
3108 method string
3109 url *url.URL
3110 header Header
3111 done chan error
3112 }
3113
3114 func (sc *serverConn) startPush(msg *startPushRequest) {
3115 sc.serveG.check()
3116
3117
3118
3119
3120 if msg.parent.state != stateOpen && msg.parent.state != stateHalfClosedRemote {
3121
3122 msg.done <- errStreamClosed
3123 return
3124 }
3125
3126
3127 if !sc.pushEnabled {
3128 msg.done <- ErrNotSupported
3129 return
3130 }
3131
3132
3133
3134
3135 allocatePromisedID := func() (uint32, error) {
3136 sc.serveG.check()
3137
3138
3139
3140 if !sc.pushEnabled {
3141 return 0, ErrNotSupported
3142 }
3143
3144 if sc.curPushedStreams+1 > sc.clientMaxStreams {
3145 return 0, ErrPushLimitReached
3146 }
3147
3148
3149
3150
3151
3152 if sc.maxPushPromiseID+2 >= 1<<31 {
3153 sc.startGracefulShutdownInternal()
3154 return 0, ErrPushLimitReached
3155 }
3156 sc.maxPushPromiseID += 2
3157 promisedID := sc.maxPushPromiseID
3158
3159
3160
3161
3162
3163
3164 promised := sc.newStream(promisedID, msg.parent.id, stateHalfClosedRemote, defaultRFC9218Priority(sc.priorityAware && !sc.hasIntermediary))
3165 rw, req, err := sc.newWriterAndRequestNoBody(promised, httpcommon.ServerRequestParam{
3166 Method: msg.method,
3167 Scheme: msg.url.Scheme,
3168 Authority: msg.url.Host,
3169 Path: msg.url.RequestURI(),
3170 Header: cloneHeader(msg.header),
3171 })
3172 if err != nil {
3173
3174 panic(fmt.Sprintf("newWriterAndRequestNoBody(%+v): %v", msg.url, err))
3175 }
3176
3177 sc.curHandlers++
3178 go sc.runHandler(rw, req, sc.handler.ServeHTTP)
3179 return promisedID, nil
3180 }
3181
3182 sc.writeFrame(FrameWriteRequest{
3183 write: &writePushPromise{
3184 streamID: msg.parent.id,
3185 method: msg.method,
3186 url: msg.url,
3187 h: msg.header,
3188 allocatePromisedID: allocatePromisedID,
3189 },
3190 stream: msg.parent,
3191 done: msg.done,
3192 })
3193 }
3194
3195
3196
3197 func foreachHeaderElement(v string, fn func(string)) {
3198 v = textproto.TrimString(v)
3199 if v == "" {
3200 return
3201 }
3202 if !strings.Contains(v, ",") {
3203 fn(v)
3204 return
3205 }
3206 for f := range strings.SplitSeq(v, ",") {
3207 if f = textproto.TrimString(f); f != "" {
3208 fn(f)
3209 }
3210 }
3211 }
3212
3213
3214 var connHeaders = []string{
3215 "Connection",
3216 "Keep-Alive",
3217 "Proxy-Connection",
3218 "Transfer-Encoding",
3219 "Upgrade",
3220 }
3221
3222
3223
3224
3225 func checkValidHTTP2RequestHeaders(h Header) error {
3226 for _, k := range connHeaders {
3227 if _, ok := h[k]; ok {
3228 return fmt.Errorf("request header %q is not valid in HTTP/2", k)
3229 }
3230 }
3231 te := h["Te"]
3232 if len(te) > 0 && (len(te) > 1 || (te[0] != "trailers" && te[0] != "")) {
3233 return errors.New(`request header "TE" may only be "trailers" in HTTP/2`)
3234 }
3235 return nil
3236 }
3237
3238 type serve400Handler struct {
3239 err error
3240 }
3241
3242 func (handler serve400Handler) ServeHTTP(w *ResponseWriter, r *ServerRequest) {
3243 const statusBadRequest = 400
3244
3245
3246 h := w.Header()
3247 h.Del("Content-Length")
3248 h.Set("Content-Type", "text/plain; charset=utf-8")
3249 h.Set("X-Content-Type-Options", "nosniff")
3250 w.WriteHeader(statusBadRequest)
3251 fmt.Fprintln(w, handler.err.Error())
3252 }
3253
3254
3255
3256
3257 func h1ServerKeepAlivesDisabled(hs ServerConfig) bool {
3258 return !hs.DoKeepAlives()
3259 }
3260
3261 func (sc *serverConn) countError(name string, err error) error {
3262 if sc == nil || sc.srv == nil {
3263 return err
3264 }
3265 f := sc.countErrorFunc
3266 if f == nil {
3267 return err
3268 }
3269 var typ string
3270 var code ErrCode
3271 switch e := err.(type) {
3272 case ConnectionError:
3273 typ = "conn"
3274 code = ErrCode(e)
3275 case StreamError:
3276 typ = "stream"
3277 code = ErrCode(e.Code)
3278 default:
3279 return err
3280 }
3281 codeStr := errCodeName[code]
3282 if codeStr == "" {
3283 codeStr = strconv.Itoa(int(code))
3284 }
3285 f(fmt.Sprintf("%s_%s_%s", typ, codeStr, name))
3286 return err
3287 }
3288
View as plain text