Source file src/net/sendfile_test.go

     1  // Copyright 2016 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package net
     6  
     7  import (
     8  	"bytes"
     9  	"context"
    10  	"crypto/sha256"
    11  	"encoding/hex"
    12  	"errors"
    13  	"fmt"
    14  	"internal/poll"
    15  	"io"
    16  	"os"
    17  	"runtime"
    18  	"strconv"
    19  	"sync"
    20  	"testing"
    21  	"time"
    22  )
    23  
    24  const (
    25  	newton       = "../testdata/Isaac.Newton-Opticks.txt"
    26  	newtonLen    = 567198
    27  	newtonSHA256 = "d4a9ac22462b35e7821a4f2706c211093da678620a8f9997989ee7cf8d507bbd"
    28  )
    29  
    30  // expectSendfile runs f, and verifies that internal/poll.SendFile successfully handles
    31  // a write to wantConn during f's execution.
    32  //
    33  // On platforms where supportsSendfile is false, expectSendfile runs f but does not
    34  // expect a call to SendFile.
    35  func expectSendfile(t *testing.T, wantConn Conn, f func()) {
    36  	t.Helper()
    37  	if !supportsSendfile {
    38  		f()
    39  		return
    40  	}
    41  	orig := poll.TestHookDidSendFile
    42  	defer func() {
    43  		poll.TestHookDidSendFile = orig
    44  	}()
    45  	var (
    46  		called     bool
    47  		gotHandled bool
    48  		gotFD      *poll.FD
    49  	)
    50  	poll.TestHookDidSendFile = func(dstFD *poll.FD, src int, written int64, err error, handled bool) {
    51  		if called {
    52  			t.Error("internal/poll.SendFile called multiple times, want one call")
    53  		}
    54  		called = true
    55  		gotHandled = handled
    56  		gotFD = dstFD
    57  	}
    58  	f()
    59  	if !called {
    60  		t.Error("internal/poll.SendFile was not called, want it to be")
    61  		return
    62  	}
    63  	if !gotHandled {
    64  		t.Error("internal/poll.SendFile did not handle the write, want it to")
    65  		return
    66  	}
    67  	if &wantConn.(*TCPConn).fd.pfd != gotFD {
    68  		t.Error("internal.poll.SendFile called with unexpected FD")
    69  	}
    70  }
    71  
    72  func TestSendfile(t *testing.T) {
    73  	ln := newLocalListener(t, "tcp")
    74  	defer ln.Close()
    75  
    76  	errc := make(chan error, 1)
    77  	go func(ln Listener) {
    78  		// Wait for a connection.
    79  		conn, err := ln.Accept()
    80  		if err != nil {
    81  			errc <- err
    82  			close(errc)
    83  			return
    84  		}
    85  
    86  		go func() {
    87  			defer close(errc)
    88  			defer conn.Close()
    89  
    90  			f, err := os.Open(newton)
    91  			if err != nil {
    92  				errc <- err
    93  				return
    94  			}
    95  			defer f.Close()
    96  
    97  			// Return file data using io.Copy, which should use
    98  			// sendFile if available.
    99  			var sbytes int64
   100  			switch runtime.GOOS {
   101  			case "windows":
   102  				// Windows is not using sendfile for some reason:
   103  				// https://go.dev/issue/67042
   104  				sbytes, err = io.Copy(conn, f)
   105  			default:
   106  				expectSendfile(t, conn, func() {
   107  					sbytes, err = io.Copy(conn, f)
   108  				})
   109  			}
   110  			if err != nil {
   111  				errc <- err
   112  				return
   113  			}
   114  
   115  			if sbytes != newtonLen {
   116  				errc <- fmt.Errorf("sent %d bytes; expected %d", sbytes, newtonLen)
   117  				return
   118  			}
   119  		}()
   120  	}(ln)
   121  
   122  	// Connect to listener to retrieve file and verify digest matches
   123  	// expected.
   124  	c, err := Dial("tcp", ln.Addr().String())
   125  	if err != nil {
   126  		t.Fatal(err)
   127  	}
   128  	defer c.Close()
   129  
   130  	h := sha256.New()
   131  	rbytes, err := io.Copy(h, c)
   132  	if err != nil {
   133  		t.Error(err)
   134  	}
   135  
   136  	if rbytes != newtonLen {
   137  		t.Errorf("received %d bytes; expected %d", rbytes, newtonLen)
   138  	}
   139  
   140  	if res := hex.EncodeToString(h.Sum(nil)); res != newtonSHA256 {
   141  		t.Error("retrieved data hash did not match")
   142  	}
   143  
   144  	for err := range errc {
   145  		t.Error(err)
   146  	}
   147  }
   148  
   149  func TestSendfileParts(t *testing.T) {
   150  	ln := newLocalListener(t, "tcp")
   151  	defer ln.Close()
   152  
   153  	errc := make(chan error, 1)
   154  	go func(ln Listener) {
   155  		// Wait for a connection.
   156  		conn, err := ln.Accept()
   157  		if err != nil {
   158  			errc <- err
   159  			close(errc)
   160  			return
   161  		}
   162  
   163  		go func() {
   164  			defer close(errc)
   165  			defer conn.Close()
   166  
   167  			f, err := os.Open(newton)
   168  			if err != nil {
   169  				errc <- err
   170  				return
   171  			}
   172  			defer f.Close()
   173  
   174  			for i := 0; i < 3; i++ {
   175  				// Return file data using io.CopyN, which should use
   176  				// sendFile if available.
   177  				expectSendfile(t, conn, func() {
   178  					_, err = io.CopyN(conn, f, 3)
   179  				})
   180  				if err != nil {
   181  					errc <- err
   182  					return
   183  				}
   184  			}
   185  		}()
   186  	}(ln)
   187  
   188  	c, err := Dial("tcp", ln.Addr().String())
   189  	if err != nil {
   190  		t.Fatal(err)
   191  	}
   192  	defer c.Close()
   193  
   194  	buf := new(bytes.Buffer)
   195  	buf.ReadFrom(c)
   196  
   197  	if want, have := "Produced ", buf.String(); have != want {
   198  		t.Errorf("unexpected server reply %q, want %q", have, want)
   199  	}
   200  
   201  	for err := range errc {
   202  		t.Error(err)
   203  	}
   204  }
   205  
   206  func TestSendfileSeeked(t *testing.T) {
   207  	ln := newLocalListener(t, "tcp")
   208  	defer ln.Close()
   209  
   210  	const seekTo = 65 << 10
   211  	const sendSize = 10 << 10
   212  
   213  	errc := make(chan error, 1)
   214  	go func(ln Listener) {
   215  		// Wait for a connection.
   216  		conn, err := ln.Accept()
   217  		if err != nil {
   218  			errc <- err
   219  			close(errc)
   220  			return
   221  		}
   222  
   223  		go func() {
   224  			defer close(errc)
   225  			defer conn.Close()
   226  
   227  			f, err := os.Open(newton)
   228  			if err != nil {
   229  				errc <- err
   230  				return
   231  			}
   232  			defer f.Close()
   233  			if _, err := f.Seek(seekTo, io.SeekStart); err != nil {
   234  				errc <- err
   235  				return
   236  			}
   237  
   238  			expectSendfile(t, conn, func() {
   239  				_, err = io.CopyN(conn, f, sendSize)
   240  			})
   241  			if err != nil {
   242  				errc <- err
   243  				return
   244  			}
   245  		}()
   246  	}(ln)
   247  
   248  	c, err := Dial("tcp", ln.Addr().String())
   249  	if err != nil {
   250  		t.Fatal(err)
   251  	}
   252  	defer c.Close()
   253  
   254  	buf := new(bytes.Buffer)
   255  	buf.ReadFrom(c)
   256  
   257  	if buf.Len() != sendSize {
   258  		t.Errorf("Got %d bytes; want %d", buf.Len(), sendSize)
   259  	}
   260  
   261  	for err := range errc {
   262  		t.Error(err)
   263  	}
   264  }
   265  
   266  // Test that sendfile doesn't put a pipe into blocking mode.
   267  func TestSendfilePipe(t *testing.T) {
   268  	switch runtime.GOOS {
   269  	case "plan9", "windows", "js", "wasip1":
   270  		// These systems don't support deadlines on pipes.
   271  		t.Skipf("skipping on %s", runtime.GOOS)
   272  	}
   273  
   274  	t.Parallel()
   275  
   276  	ln := newLocalListener(t, "tcp")
   277  	defer ln.Close()
   278  
   279  	r, w, err := os.Pipe()
   280  	if err != nil {
   281  		t.Fatal(err)
   282  	}
   283  	defer w.Close()
   284  	defer r.Close()
   285  
   286  	copied := make(chan bool)
   287  
   288  	var wg sync.WaitGroup
   289  	wg.Add(1)
   290  	go func() {
   291  		// Accept a connection and copy 1 byte from the read end of
   292  		// the pipe to the connection. This will call into sendfile.
   293  		defer wg.Done()
   294  		conn, err := ln.Accept()
   295  		if err != nil {
   296  			t.Error(err)
   297  			return
   298  		}
   299  		defer conn.Close()
   300  		// The comment above states that this should call into sendfile,
   301  		// but empirically it doesn't seem to do so at this time.
   302  		// If it does, or does on some platforms, this CopyN should be wrapped
   303  		// in expectSendfile.
   304  		_, err = io.CopyN(conn, r, 1)
   305  		if err != nil {
   306  			t.Error(err)
   307  			return
   308  		}
   309  		// Signal the main goroutine that we've copied the byte.
   310  		close(copied)
   311  	}()
   312  
   313  	wg.Add(1)
   314  	go func() {
   315  		// Write 1 byte to the write end of the pipe.
   316  		defer wg.Done()
   317  		_, err := w.Write([]byte{'a'})
   318  		if err != nil {
   319  			t.Error(err)
   320  		}
   321  	}()
   322  
   323  	wg.Add(1)
   324  	go func() {
   325  		// Connect to the server started two goroutines up and
   326  		// discard any data that it writes.
   327  		defer wg.Done()
   328  		conn, err := Dial("tcp", ln.Addr().String())
   329  		if err != nil {
   330  			t.Error(err)
   331  			return
   332  		}
   333  		defer conn.Close()
   334  		io.Copy(io.Discard, conn)
   335  	}()
   336  
   337  	// Wait for the byte to be copied, meaning that sendfile has
   338  	// been called on the pipe.
   339  	<-copied
   340  
   341  	// Set a very short deadline on the read end of the pipe.
   342  	if err := r.SetDeadline(time.Now().Add(time.Microsecond)); err != nil {
   343  		t.Fatal(err)
   344  	}
   345  
   346  	wg.Add(1)
   347  	go func() {
   348  		// Wait for much longer than the deadline and write a byte
   349  		// to the pipe.
   350  		defer wg.Done()
   351  		time.Sleep(50 * time.Millisecond)
   352  		w.Write([]byte{'b'})
   353  	}()
   354  
   355  	// If this read does not time out, the pipe was incorrectly
   356  	// put into blocking mode.
   357  	_, err = r.Read(make([]byte, 1))
   358  	if err == nil {
   359  		t.Error("Read did not time out")
   360  	} else if !os.IsTimeout(err) {
   361  		t.Errorf("got error %v, expected a time out", err)
   362  	}
   363  
   364  	wg.Wait()
   365  }
   366  
   367  // Issue 43822: tests that returns EOF when conn write timeout.
   368  func TestSendfileOnWriteTimeoutExceeded(t *testing.T) {
   369  	ln := newLocalListener(t, "tcp")
   370  	defer ln.Close()
   371  
   372  	errc := make(chan error, 1)
   373  	go func(ln Listener) (retErr error) {
   374  		defer func() {
   375  			errc <- retErr
   376  			close(errc)
   377  		}()
   378  
   379  		conn, err := ln.Accept()
   380  		if err != nil {
   381  			return err
   382  		}
   383  		defer conn.Close()
   384  
   385  		// Set the write deadline in the past(1h ago). It makes
   386  		// sure that it is always write timeout.
   387  		if err := conn.SetWriteDeadline(time.Now().Add(-1 * time.Hour)); err != nil {
   388  			return err
   389  		}
   390  
   391  		f, err := os.Open(newton)
   392  		if err != nil {
   393  			return err
   394  		}
   395  		defer f.Close()
   396  
   397  		// We expect this to use sendfile, but as of the time this comment was written
   398  		// poll.SendFile on an FD past its timeout can return an error indicating that
   399  		// it didn't handle the operation, resulting in a non-sendfile retry.
   400  		// So don't use expectSendfile here.
   401  		_, err = io.Copy(conn, f)
   402  		if errors.Is(err, os.ErrDeadlineExceeded) {
   403  			return nil
   404  		}
   405  
   406  		if err == nil {
   407  			err = fmt.Errorf("expected ErrDeadlineExceeded, but got nil")
   408  		}
   409  		return err
   410  	}(ln)
   411  
   412  	conn, err := Dial("tcp", ln.Addr().String())
   413  	if err != nil {
   414  		t.Fatal(err)
   415  	}
   416  	defer conn.Close()
   417  
   418  	n, err := io.Copy(io.Discard, conn)
   419  	if err != nil {
   420  		t.Fatalf("expected nil error, but got %v", err)
   421  	}
   422  	if n != 0 {
   423  		t.Fatalf("expected receive zero, but got %d byte(s)", n)
   424  	}
   425  
   426  	if err := <-errc; err != nil {
   427  		t.Fatal(err)
   428  	}
   429  }
   430  
   431  func BenchmarkSendfileZeroBytes(b *testing.B) {
   432  	var (
   433  		wg          sync.WaitGroup
   434  		ctx, cancel = context.WithCancel(context.Background())
   435  	)
   436  
   437  	defer wg.Wait()
   438  
   439  	ln := newLocalListener(b, "tcp")
   440  	defer ln.Close()
   441  
   442  	tempFile, err := os.CreateTemp(b.TempDir(), "test.txt")
   443  	if err != nil {
   444  		b.Fatalf("failed to create temp file: %v", err)
   445  	}
   446  	defer tempFile.Close()
   447  
   448  	fileName := tempFile.Name()
   449  
   450  	dataSize := b.N
   451  	wg.Add(1)
   452  	go func(f *os.File) {
   453  		defer wg.Done()
   454  
   455  		for i := 0; i < dataSize; i++ {
   456  			if _, err := f.Write([]byte{1}); err != nil {
   457  				b.Errorf("failed to write: %v", err)
   458  				return
   459  			}
   460  			if i%1000 == 0 {
   461  				f.Sync()
   462  			}
   463  		}
   464  	}(tempFile)
   465  
   466  	b.ResetTimer()
   467  	b.ReportAllocs()
   468  
   469  	wg.Add(1)
   470  	go func(ln Listener, fileName string) {
   471  		defer wg.Done()
   472  
   473  		conn, err := ln.Accept()
   474  		if err != nil {
   475  			b.Errorf("failed to accept: %v", err)
   476  			return
   477  		}
   478  		defer conn.Close()
   479  
   480  		f, err := os.OpenFile(fileName, os.O_RDONLY, 0660)
   481  		if err != nil {
   482  			b.Errorf("failed to open file: %v", err)
   483  			return
   484  		}
   485  		defer f.Close()
   486  
   487  		for {
   488  			if ctx.Err() != nil {
   489  				return
   490  			}
   491  
   492  			if _, err := io.Copy(conn, f); err != nil {
   493  				b.Errorf("failed to copy: %v", err)
   494  				return
   495  			}
   496  		}
   497  	}(ln, fileName)
   498  
   499  	conn, err := Dial("tcp", ln.Addr().String())
   500  	if err != nil {
   501  		b.Fatalf("failed to dial: %v", err)
   502  	}
   503  	defer conn.Close()
   504  
   505  	n, err := io.CopyN(io.Discard, conn, int64(dataSize))
   506  	if err != nil {
   507  		b.Fatalf("failed to copy: %v", err)
   508  	}
   509  	if n != int64(dataSize) {
   510  		b.Fatalf("expected %d copied bytes, but got %d", dataSize, n)
   511  	}
   512  
   513  	cancel()
   514  }
   515  
   516  func BenchmarkSendFile(b *testing.B) {
   517  	if runtime.GOOS == "windows" {
   518  		// TODO(panjf2000): Windows has not yet implemented FileConn,
   519  		//		remove this when it's implemented in https://go.dev/issues/9503.
   520  		b.Skipf("skipping on %s", runtime.GOOS)
   521  	}
   522  
   523  	b.Run("file-to-tcp", func(b *testing.B) { benchmarkSendFile(b, "tcp") })
   524  	b.Run("file-to-unix", func(b *testing.B) { benchmarkSendFile(b, "unix") })
   525  }
   526  
   527  func benchmarkSendFile(b *testing.B, proto string) {
   528  	for i := 0; i <= 10; i++ {
   529  		size := 1 << (i + 10)
   530  		bench := sendFileBench{
   531  			proto:     proto,
   532  			chunkSize: size,
   533  		}
   534  		b.Run(strconv.Itoa(size), bench.benchSendFile)
   535  	}
   536  }
   537  
   538  type sendFileBench struct {
   539  	proto     string
   540  	chunkSize int
   541  }
   542  
   543  func (bench sendFileBench) benchSendFile(b *testing.B) {
   544  	fileSize := b.N * bench.chunkSize
   545  	f := createTempFile(b, fileSize)
   546  
   547  	client, server := spawnTestSocketPair(b, bench.proto)
   548  	defer server.Close()
   549  
   550  	cleanUp, err := startTestSocketPeer(b, client, "r", bench.chunkSize, fileSize)
   551  	if err != nil {
   552  		client.Close()
   553  		b.Fatal(err)
   554  	}
   555  	defer cleanUp(b)
   556  
   557  	b.ReportAllocs()
   558  	b.SetBytes(int64(bench.chunkSize))
   559  	b.ResetTimer()
   560  
   561  	// Data go from file to socket via sendfile(2).
   562  	sent, err := io.Copy(server, f)
   563  	if err != nil {
   564  		b.Fatalf("failed to copy data with sendfile, error: %v", err)
   565  	}
   566  	if sent != int64(fileSize) {
   567  		b.Fatalf("bytes sent mismatch, got: %d, want: %d", sent, fileSize)
   568  	}
   569  }
   570  
   571  func createTempFile(b *testing.B, size int) *os.File {
   572  	f, err := os.CreateTemp(b.TempDir(), "sendfile-bench")
   573  	if err != nil {
   574  		b.Fatalf("failed to create temporary file: %v", err)
   575  	}
   576  	b.Cleanup(func() {
   577  		f.Close()
   578  	})
   579  
   580  	data := make([]byte, size)
   581  	if _, err := f.Write(data); err != nil {
   582  		b.Fatalf("failed to create and feed the file: %v", err)
   583  	}
   584  	if err := f.Sync(); err != nil {
   585  		b.Fatalf("failed to save the file: %v", err)
   586  	}
   587  	if _, err := f.Seek(0, io.SeekStart); err != nil {
   588  		b.Fatalf("failed to rewind the file: %v", err)
   589  	}
   590  
   591  	return f
   592  }
   593  

View as plain text