Source file src/net/sendfile_test.go

     1  // Copyright 2016 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package net
     6  
     7  import (
     8  	"bytes"
     9  	"context"
    10  	"crypto/sha256"
    11  	"encoding/hex"
    12  	"errors"
    13  	"fmt"
    14  	"internal/poll"
    15  	"io"
    16  	"math/rand"
    17  	"os"
    18  	"runtime"
    19  	"strconv"
    20  	"sync"
    21  	"testing"
    22  	"time"
    23  )
    24  
    25  const (
    26  	newton       = "../testdata/Isaac.Newton-Opticks.txt"
    27  	newtonLen    = 567198
    28  	newtonSHA256 = "d4a9ac22462b35e7821a4f2706c211093da678620a8f9997989ee7cf8d507bbd"
    29  )
    30  
    31  // expectSendfile runs f, and verifies that internal/poll.SendFile successfully handles
    32  // a write to wantConn during f's execution.
    33  //
    34  // On platforms where supportsSendfile is false, expectSendfile runs f but does not
    35  // expect a call to SendFile.
    36  func expectSendfile(t *testing.T, wantConn Conn, f func()) {
    37  	t.Helper()
    38  	if !supportsSendfile {
    39  		f()
    40  		return
    41  	}
    42  	orig := poll.TestHookDidSendFile
    43  	defer func() {
    44  		poll.TestHookDidSendFile = orig
    45  	}()
    46  	var (
    47  		called     bool
    48  		gotHandled bool
    49  		gotFD      *poll.FD
    50  		gotErr     error
    51  	)
    52  	poll.TestHookDidSendFile = func(dstFD *poll.FD, src int, written int64, err error, handled bool) {
    53  		if called {
    54  			t.Error("internal/poll.SendFile called multiple times, want one call")
    55  		}
    56  		called = true
    57  		gotHandled = handled
    58  		gotFD = dstFD
    59  		gotErr = err
    60  	}
    61  	f()
    62  	if !called {
    63  		t.Error("internal/poll.SendFile was not called, want it to be")
    64  		return
    65  	}
    66  	if !gotHandled {
    67  		t.Error("internal/poll.SendFile did not handle the write, want it to, error:", gotErr)
    68  		return
    69  	}
    70  	if &wantConn.(*TCPConn).fd.pfd != gotFD {
    71  		t.Error("internal.poll.SendFile called with unexpected FD")
    72  	}
    73  }
    74  
    75  func TestSendfile(t *testing.T) { testSendfile(t, newton, newtonSHA256, newtonLen, 0) }
    76  func TestSendfileWithExactLimit(t *testing.T) {
    77  	testSendfile(t, newton, newtonSHA256, newtonLen, newtonLen)
    78  }
    79  func TestSendfileWithLimitLargerThanFile(t *testing.T) {
    80  	testSendfile(t, newton, newtonSHA256, newtonLen, newtonLen*2)
    81  }
    82  func TestSendfileWithLargeFile(t *testing.T) {
    83  	// Some platforms are not capable of handling large files with sendfile
    84  	// due to limited system resource, so we only run this test on amd64 and
    85  	// arm64 for the moment.
    86  	if runtime.GOARCH != "amd64" && runtime.GOARCH != "arm64" {
    87  		t.Skip("skipping on non-amd64 and non-arm64 platforms")
    88  	}
    89  	// Also skip it during short testing.
    90  	if testing.Short() {
    91  		t.Skip("Skip it during short testing")
    92  	}
    93  
    94  	// We're using 1<<31 - 1 as the chunk size for sendfile currently,
    95  	// make an edge case file that is 1 byte bigger than that.
    96  	f := createTempFile(t, 1<<31)
    97  	// For big file like this, only verify the transmission of the file,
    98  	// skip the content check.
    99  	testSendfile(t, f.Name(), "", 1<<31, 0)
   100  }
   101  func testSendfile(t *testing.T, filePath, fileHash string, size, limit int64) {
   102  	ln := newLocalListener(t, "tcp")
   103  	defer ln.Close()
   104  
   105  	errc := make(chan error, 1)
   106  	go func(ln Listener) {
   107  		// Wait for a connection.
   108  		conn, err := ln.Accept()
   109  		if err != nil {
   110  			errc <- err
   111  			close(errc)
   112  			return
   113  		}
   114  
   115  		go func() {
   116  			defer close(errc)
   117  			defer conn.Close()
   118  
   119  			f, err := os.Open(filePath)
   120  			if err != nil {
   121  				errc <- err
   122  				return
   123  			}
   124  			defer f.Close()
   125  
   126  			// Return file data using io.Copy, which should use
   127  			// sendFile if available.
   128  			var sbytes int64
   129  			switch runtime.GOOS {
   130  			case "windows":
   131  				// Windows is not using sendfile for some reason:
   132  				// https://go.dev/issue/67042
   133  				sbytes, err = io.Copy(conn, f)
   134  			default:
   135  				expectSendfile(t, conn, func() {
   136  					if limit > 0 {
   137  						sbytes, err = io.CopyN(conn, f, limit)
   138  						if err == io.EOF && limit > size {
   139  							err = nil
   140  						}
   141  					} else {
   142  						sbytes, err = io.Copy(conn, f)
   143  					}
   144  				})
   145  			}
   146  			if err != nil {
   147  				errc <- err
   148  				return
   149  			}
   150  
   151  			if sbytes != size {
   152  				errc <- fmt.Errorf("sent %d bytes; expected %d", sbytes, size)
   153  				return
   154  			}
   155  		}()
   156  	}(ln)
   157  
   158  	// Connect to listener to retrieve file and verify digest matches
   159  	// expected.
   160  	c, err := Dial("tcp", ln.Addr().String())
   161  	if err != nil {
   162  		t.Fatal(err)
   163  	}
   164  	defer c.Close()
   165  
   166  	h := sha256.New()
   167  	rbytes, err := io.Copy(h, c)
   168  	if err != nil {
   169  		t.Error(err)
   170  	}
   171  
   172  	if rbytes != size {
   173  		t.Errorf("received %d bytes; expected %d", rbytes, size)
   174  	}
   175  
   176  	if len(fileHash) > 0 && hex.EncodeToString(h.Sum(nil)) != newtonSHA256 {
   177  		t.Error("retrieved data hash did not match")
   178  	}
   179  
   180  	for err := range errc {
   181  		t.Error(err)
   182  	}
   183  }
   184  
   185  func TestSendfileParts(t *testing.T) {
   186  	ln := newLocalListener(t, "tcp")
   187  	defer ln.Close()
   188  
   189  	errc := make(chan error, 1)
   190  	go func(ln Listener) {
   191  		// Wait for a connection.
   192  		conn, err := ln.Accept()
   193  		if err != nil {
   194  			errc <- err
   195  			close(errc)
   196  			return
   197  		}
   198  
   199  		go func() {
   200  			defer close(errc)
   201  			defer conn.Close()
   202  
   203  			f, err := os.Open(newton)
   204  			if err != nil {
   205  				errc <- err
   206  				return
   207  			}
   208  			defer f.Close()
   209  
   210  			for i := 0; i < 3; i++ {
   211  				// Return file data using io.CopyN, which should use
   212  				// sendFile if available.
   213  				expectSendfile(t, conn, func() {
   214  					_, err = io.CopyN(conn, f, 3)
   215  				})
   216  				if err != nil {
   217  					errc <- err
   218  					return
   219  				}
   220  			}
   221  		}()
   222  	}(ln)
   223  
   224  	c, err := Dial("tcp", ln.Addr().String())
   225  	if err != nil {
   226  		t.Fatal(err)
   227  	}
   228  	defer c.Close()
   229  
   230  	buf := new(bytes.Buffer)
   231  	buf.ReadFrom(c)
   232  
   233  	if want, have := "Produced ", buf.String(); have != want {
   234  		t.Errorf("unexpected server reply %q, want %q", have, want)
   235  	}
   236  
   237  	for err := range errc {
   238  		t.Error(err)
   239  	}
   240  }
   241  
   242  func TestSendfileSeeked(t *testing.T) {
   243  	ln := newLocalListener(t, "tcp")
   244  	defer ln.Close()
   245  
   246  	const seekTo = 65 << 10
   247  	const sendSize = 10 << 10
   248  
   249  	errc := make(chan error, 1)
   250  	go func(ln Listener) {
   251  		// Wait for a connection.
   252  		conn, err := ln.Accept()
   253  		if err != nil {
   254  			errc <- err
   255  			close(errc)
   256  			return
   257  		}
   258  
   259  		go func() {
   260  			defer close(errc)
   261  			defer conn.Close()
   262  
   263  			f, err := os.Open(newton)
   264  			if err != nil {
   265  				errc <- err
   266  				return
   267  			}
   268  			defer f.Close()
   269  			if _, err := f.Seek(seekTo, io.SeekStart); err != nil {
   270  				errc <- err
   271  				return
   272  			}
   273  
   274  			expectSendfile(t, conn, func() {
   275  				_, err = io.CopyN(conn, f, sendSize)
   276  			})
   277  			if err != nil {
   278  				errc <- err
   279  				return
   280  			}
   281  		}()
   282  	}(ln)
   283  
   284  	c, err := Dial("tcp", ln.Addr().String())
   285  	if err != nil {
   286  		t.Fatal(err)
   287  	}
   288  	defer c.Close()
   289  
   290  	buf := new(bytes.Buffer)
   291  	buf.ReadFrom(c)
   292  
   293  	if buf.Len() != sendSize {
   294  		t.Errorf("Got %d bytes; want %d", buf.Len(), sendSize)
   295  	}
   296  
   297  	for err := range errc {
   298  		t.Error(err)
   299  	}
   300  }
   301  
   302  // Test that sendfile doesn't put a pipe into blocking mode.
   303  func TestSendfilePipe(t *testing.T) {
   304  	switch runtime.GOOS {
   305  	case "plan9", "windows", "js", "wasip1":
   306  		// These systems don't support deadlines on pipes.
   307  		t.Skipf("skipping on %s", runtime.GOOS)
   308  	}
   309  
   310  	t.Parallel()
   311  
   312  	ln := newLocalListener(t, "tcp")
   313  	defer ln.Close()
   314  
   315  	r, w, err := os.Pipe()
   316  	if err != nil {
   317  		t.Fatal(err)
   318  	}
   319  	defer w.Close()
   320  	defer r.Close()
   321  
   322  	copied := make(chan bool)
   323  
   324  	var wg sync.WaitGroup
   325  	wg.Add(1)
   326  	go func() {
   327  		// Accept a connection and copy 1 byte from the read end of
   328  		// the pipe to the connection. This will call into sendfile.
   329  		defer wg.Done()
   330  		conn, err := ln.Accept()
   331  		if err != nil {
   332  			t.Error(err)
   333  			return
   334  		}
   335  		defer conn.Close()
   336  		// The comment above states that this should call into sendfile,
   337  		// but empirically it doesn't seem to do so at this time.
   338  		// If it does, or does on some platforms, this CopyN should be wrapped
   339  		// in expectSendfile.
   340  		_, err = io.CopyN(conn, r, 1)
   341  		if err != nil {
   342  			t.Error(err)
   343  			return
   344  		}
   345  		// Signal the main goroutine that we've copied the byte.
   346  		close(copied)
   347  	}()
   348  
   349  	wg.Add(1)
   350  	go func() {
   351  		// Write 1 byte to the write end of the pipe.
   352  		defer wg.Done()
   353  		_, err := w.Write([]byte{'a'})
   354  		if err != nil {
   355  			t.Error(err)
   356  		}
   357  	}()
   358  
   359  	wg.Add(1)
   360  	go func() {
   361  		// Connect to the server started two goroutines up and
   362  		// discard any data that it writes.
   363  		defer wg.Done()
   364  		conn, err := Dial("tcp", ln.Addr().String())
   365  		if err != nil {
   366  			t.Error(err)
   367  			return
   368  		}
   369  		defer conn.Close()
   370  		io.Copy(io.Discard, conn)
   371  	}()
   372  
   373  	// Wait for the byte to be copied, meaning that sendfile has
   374  	// been called on the pipe.
   375  	<-copied
   376  
   377  	// Set a very short deadline on the read end of the pipe.
   378  	if err := r.SetDeadline(time.Now().Add(time.Microsecond)); err != nil {
   379  		t.Fatal(err)
   380  	}
   381  
   382  	wg.Add(1)
   383  	go func() {
   384  		// Wait for much longer than the deadline and write a byte
   385  		// to the pipe.
   386  		defer wg.Done()
   387  		time.Sleep(50 * time.Millisecond)
   388  		w.Write([]byte{'b'})
   389  	}()
   390  
   391  	// If this read does not time out, the pipe was incorrectly
   392  	// put into blocking mode.
   393  	_, err = r.Read(make([]byte, 1))
   394  	if err == nil {
   395  		t.Error("Read did not time out")
   396  	} else if !os.IsTimeout(err) {
   397  		t.Errorf("got error %v, expected a time out", err)
   398  	}
   399  
   400  	wg.Wait()
   401  }
   402  
   403  // Issue 43822: tests that returns EOF when conn write timeout.
   404  func TestSendfileOnWriteTimeoutExceeded(t *testing.T) {
   405  	ln := newLocalListener(t, "tcp")
   406  	defer ln.Close()
   407  
   408  	errc := make(chan error, 1)
   409  	go func(ln Listener) (retErr error) {
   410  		defer func() {
   411  			errc <- retErr
   412  			close(errc)
   413  		}()
   414  
   415  		conn, err := ln.Accept()
   416  		if err != nil {
   417  			return err
   418  		}
   419  		defer conn.Close()
   420  
   421  		// Set the write deadline in the past(1h ago). It makes
   422  		// sure that it is always write timeout.
   423  		if err := conn.SetWriteDeadline(time.Now().Add(-1 * time.Hour)); err != nil {
   424  			return err
   425  		}
   426  
   427  		f, err := os.Open(newton)
   428  		if err != nil {
   429  			return err
   430  		}
   431  		defer f.Close()
   432  
   433  		// We expect this to use sendfile, but as of the time this comment was written
   434  		// poll.SendFile on an FD past its timeout can return an error indicating that
   435  		// it didn't handle the operation, resulting in a non-sendfile retry.
   436  		// So don't use expectSendfile here.
   437  		_, err = io.Copy(conn, f)
   438  		if errors.Is(err, os.ErrDeadlineExceeded) {
   439  			return nil
   440  		}
   441  
   442  		if err == nil {
   443  			err = fmt.Errorf("expected ErrDeadlineExceeded, but got nil")
   444  		}
   445  		return err
   446  	}(ln)
   447  
   448  	conn, err := Dial("tcp", ln.Addr().String())
   449  	if err != nil {
   450  		t.Fatal(err)
   451  	}
   452  	defer conn.Close()
   453  
   454  	n, err := io.Copy(io.Discard, conn)
   455  	if err != nil {
   456  		t.Fatalf("expected nil error, but got %v", err)
   457  	}
   458  	if n != 0 {
   459  		t.Fatalf("expected receive zero, but got %d byte(s)", n)
   460  	}
   461  
   462  	if err := <-errc; err != nil {
   463  		t.Fatal(err)
   464  	}
   465  }
   466  
   467  func BenchmarkSendfileZeroBytes(b *testing.B) {
   468  	var (
   469  		wg          sync.WaitGroup
   470  		ctx, cancel = context.WithCancel(context.Background())
   471  	)
   472  
   473  	defer wg.Wait()
   474  
   475  	ln := newLocalListener(b, "tcp")
   476  	defer ln.Close()
   477  
   478  	tempFile, err := os.CreateTemp(b.TempDir(), "test.txt")
   479  	if err != nil {
   480  		b.Fatalf("failed to create temp file: %v", err)
   481  	}
   482  	defer tempFile.Close()
   483  
   484  	fileName := tempFile.Name()
   485  
   486  	dataSize := b.N
   487  	wg.Add(1)
   488  	go func(f *os.File) {
   489  		defer wg.Done()
   490  
   491  		for i := 0; i < dataSize; i++ {
   492  			if _, err := f.Write([]byte{1}); err != nil {
   493  				b.Errorf("failed to write: %v", err)
   494  				return
   495  			}
   496  			if i%1000 == 0 {
   497  				f.Sync()
   498  			}
   499  		}
   500  	}(tempFile)
   501  
   502  	b.ResetTimer()
   503  	b.ReportAllocs()
   504  
   505  	wg.Add(1)
   506  	go func(ln Listener, fileName string) {
   507  		defer wg.Done()
   508  
   509  		conn, err := ln.Accept()
   510  		if err != nil {
   511  			b.Errorf("failed to accept: %v", err)
   512  			return
   513  		}
   514  		defer conn.Close()
   515  
   516  		f, err := os.OpenFile(fileName, os.O_RDONLY, 0660)
   517  		if err != nil {
   518  			b.Errorf("failed to open file: %v", err)
   519  			return
   520  		}
   521  		defer f.Close()
   522  
   523  		for {
   524  			if ctx.Err() != nil {
   525  				return
   526  			}
   527  
   528  			if _, err := io.Copy(conn, f); err != nil {
   529  				b.Errorf("failed to copy: %v", err)
   530  				return
   531  			}
   532  		}
   533  	}(ln, fileName)
   534  
   535  	conn, err := Dial("tcp", ln.Addr().String())
   536  	if err != nil {
   537  		b.Fatalf("failed to dial: %v", err)
   538  	}
   539  	defer conn.Close()
   540  
   541  	n, err := io.CopyN(io.Discard, conn, int64(dataSize))
   542  	if err != nil {
   543  		b.Fatalf("failed to copy: %v", err)
   544  	}
   545  	if n != int64(dataSize) {
   546  		b.Fatalf("expected %d copied bytes, but got %d", dataSize, n)
   547  	}
   548  
   549  	cancel()
   550  }
   551  
   552  func BenchmarkSendFile(b *testing.B) {
   553  	if runtime.GOOS == "windows" {
   554  		// TODO(panjf2000): Windows has not yet implemented FileConn,
   555  		//		remove this when it's implemented in https://go.dev/issues/9503.
   556  		b.Skipf("skipping on %s", runtime.GOOS)
   557  	}
   558  
   559  	b.Run("file-to-tcp", func(b *testing.B) { benchmarkSendFile(b, "tcp") })
   560  	b.Run("file-to-unix", func(b *testing.B) { benchmarkSendFile(b, "unix") })
   561  }
   562  
   563  func benchmarkSendFile(b *testing.B, proto string) {
   564  	for i := 0; i <= 10; i++ {
   565  		size := 1 << (i + 10)
   566  		bench := sendFileBench{
   567  			proto:     proto,
   568  			chunkSize: size,
   569  		}
   570  		b.Run(strconv.Itoa(size), bench.benchSendFile)
   571  	}
   572  }
   573  
   574  type sendFileBench struct {
   575  	proto     string
   576  	chunkSize int
   577  }
   578  
   579  func (bench sendFileBench) benchSendFile(b *testing.B) {
   580  	fileSize := b.N * bench.chunkSize
   581  	f := createTempFile(b, int64(fileSize))
   582  
   583  	client, server := spawnTestSocketPair(b, bench.proto)
   584  	defer server.Close()
   585  
   586  	cleanUp, err := startTestSocketPeer(b, client, "r", bench.chunkSize, fileSize)
   587  	if err != nil {
   588  		client.Close()
   589  		b.Fatal(err)
   590  	}
   591  	defer cleanUp(b)
   592  
   593  	b.ReportAllocs()
   594  	b.SetBytes(int64(bench.chunkSize))
   595  	b.ResetTimer()
   596  
   597  	// Data go from file to socket via sendfile(2).
   598  	sent, err := io.Copy(server, f)
   599  	if err != nil {
   600  		b.Fatalf("failed to copy data with sendfile, error: %v", err)
   601  	}
   602  	if sent != int64(fileSize) {
   603  		b.Fatalf("bytes sent mismatch, got: %d, want: %d", sent, fileSize)
   604  	}
   605  }
   606  
   607  func createTempFile(tb testing.TB, size int64) *os.File {
   608  	f, err := os.CreateTemp(tb.TempDir(), "sendfile-bench")
   609  	if err != nil {
   610  		tb.Fatalf("failed to create temporary file: %v", err)
   611  	}
   612  	tb.Cleanup(func() {
   613  		f.Close()
   614  	})
   615  
   616  	if _, err := io.CopyN(f, newRandReader(tb), size); err != nil {
   617  		tb.Fatalf("failed to fill the file with random data: %v", err)
   618  	}
   619  	if _, err := f.Seek(0, io.SeekStart); err != nil {
   620  		tb.Fatalf("failed to rewind the file: %v", err)
   621  	}
   622  
   623  	return f
   624  }
   625  
   626  func newRandReader(tb testing.TB) io.Reader {
   627  	seed := time.Now().UnixNano()
   628  	tb.Logf("Deterministic RNG seed based on timestamp: 0x%x", seed)
   629  	return rand.New(rand.NewSource(seed))
   630  }
   631  

View as plain text