Source file src/cmd/go/internal/cache/prog.go

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package cache
     6  
     7  import (
     8  	"bufio"
     9  	"cmd/go/internal/base"
    10  	"cmd/internal/quoted"
    11  	"context"
    12  	"crypto/sha256"
    13  	"encoding/base64"
    14  	"encoding/json"
    15  	"errors"
    16  	"fmt"
    17  	"internal/goexperiment"
    18  	"io"
    19  	"log"
    20  	"os"
    21  	"os/exec"
    22  	"sync"
    23  	"sync/atomic"
    24  	"time"
    25  )
    26  
    27  // ProgCache implements Cache via JSON messages over stdin/stdout to a child
    28  // helper process which can then implement whatever caching policy/mechanism it
    29  // wants.
    30  //
    31  // See https://github.com/golang/go/issues/59719
    32  type ProgCache struct {
    33  	cmd    *exec.Cmd
    34  	stdout io.ReadCloser  // from the child process
    35  	stdin  io.WriteCloser // to the child process
    36  	bw     *bufio.Writer  // to stdin
    37  	jenc   *json.Encoder  // to bw
    38  
    39  	// can are the commands that the child process declared that it supports.
    40  	// This is effectively the versioning mechanism.
    41  	can map[ProgCmd]bool
    42  
    43  	// fuzzDirCache is another Cache implementation to use for the FuzzDir
    44  	// method. In practice this is the default GOCACHE disk-based
    45  	// implementation.
    46  	//
    47  	// TODO(bradfitz): maybe this isn't ideal. But we'd need to extend the Cache
    48  	// interface and the fuzzing callers to be less disk-y to do more here.
    49  	fuzzDirCache Cache
    50  
    51  	closing      atomic.Bool
    52  	ctx          context.Context    // valid until Close via ctxClose
    53  	ctxCancel    context.CancelFunc // called on Close
    54  	readLoopDone chan struct{}      // closed when readLoop returns
    55  
    56  	mu         sync.Mutex // guards following fields
    57  	nextID     int64
    58  	inFlight   map[int64]chan<- *ProgResponse
    59  	outputFile map[OutputID]string // object => abs path on disk
    60  
    61  	// writeMu serializes writing to the child process.
    62  	// It must never be held at the same time as mu.
    63  	writeMu sync.Mutex
    64  }
    65  
    66  // ProgCmd is a command that can be issued to a child process.
    67  //
    68  // If the interface needs to grow, we can add new commands or new versioned
    69  // commands like "get2".
    70  type ProgCmd string
    71  
    72  const (
    73  	cmdGet   = ProgCmd("get")
    74  	cmdPut   = ProgCmd("put")
    75  	cmdClose = ProgCmd("close")
    76  )
    77  
    78  // ProgRequest is the JSON-encoded message that's sent from cmd/go to
    79  // the GOCACHEPROG child process over stdin. Each JSON object is on its
    80  // own line. A ProgRequest of Type "put" with BodySize > 0 will be followed
    81  // by a line containing a base64-encoded JSON string literal of the body.
    82  type ProgRequest struct {
    83  	// ID is a unique number per process across all requests.
    84  	// It must be echoed in the ProgResponse from the child.
    85  	ID int64
    86  
    87  	// Command is the type of request.
    88  	// The cmd/go tool will only send commands that were declared
    89  	// as supported by the child.
    90  	Command ProgCmd
    91  
    92  	// ActionID is non-nil for get and puts.
    93  	ActionID []byte `json:",omitempty"` // or nil if not used
    94  
    95  	// OutputID is set for Type "put".
    96  	//
    97  	// Prior to Go 1.24, when GOCACHEPROG was still an experiment, this was
    98  	// accidentally named ObjectID. It was renamed to OutputID in Go 1.24.
    99  	OutputID []byte `json:",omitempty"` // or nil if not used
   100  
   101  	// Body is the body for "put" requests. It's sent after the JSON object
   102  	// as a base64-encoded JSON string when BodySize is non-zero.
   103  	// It's sent as a separate JSON value instead of being a struct field
   104  	// send in this JSON object so large values can be streamed in both directions.
   105  	// The base64 string body of a ProgRequest will always be written
   106  	// immediately after the JSON object and a newline.
   107  	Body io.Reader `json:"-"`
   108  
   109  	// BodySize is the number of bytes of Body. If zero, the body isn't written.
   110  	BodySize int64 `json:",omitempty"`
   111  
   112  	// ObjectID is the accidental spelling of OutputID that was used prior to Go
   113  	// 1.24.
   114  	//
   115  	// Deprecated: use OutputID. This field is only populated temporarily for
   116  	// backwards compatibility with Go 1.23 and earlier when
   117  	// GOEXPERIMENT=gocacheprog is set. It will be removed in Go 1.25.
   118  	ObjectID []byte `json:",omitempty"`
   119  }
   120  
   121  // ProgResponse is the JSON response from the child process to cmd/go.
   122  //
   123  // With the exception of the first protocol message that the child writes to its
   124  // stdout with ID==0 and KnownCommands populated, these are only sent in
   125  // response to a ProgRequest from cmd/go.
   126  //
   127  // ProgResponses can be sent in any order. The ID must match the request they're
   128  // replying to.
   129  type ProgResponse struct {
   130  	ID  int64  // that corresponds to ProgRequest; they can be answered out of order
   131  	Err string `json:",omitempty"` // if non-empty, the error
   132  
   133  	// KnownCommands is included in the first message that cache helper program
   134  	// writes to stdout on startup (with ID==0). It includes the
   135  	// ProgRequest.Command types that are supported by the program.
   136  	//
   137  	// This lets us extend the protocol gracefully over time (adding "get2",
   138  	// etc), or fail gracefully when needed. It also lets us verify the program
   139  	// wants to be a cache helper.
   140  	KnownCommands []ProgCmd `json:",omitempty"`
   141  
   142  	// For Get requests.
   143  
   144  	Miss     bool       `json:",omitempty"` // cache miss
   145  	OutputID []byte     `json:",omitempty"`
   146  	Size     int64      `json:",omitempty"` // in bytes
   147  	Time     *time.Time `json:",omitempty"` // an Entry.Time; when the object was added to the docs
   148  
   149  	// DiskPath is the absolute path on disk of the ObjectID corresponding
   150  	// a "get" request's ActionID (on cache hit) or a "put" request's
   151  	// provided ObjectID.
   152  	DiskPath string `json:",omitempty"`
   153  }
   154  
   155  // startCacheProg starts the prog binary (with optional space-separated flags)
   156  // and returns a Cache implementation that talks to it.
   157  //
   158  // It blocks a few seconds to wait for the child process to successfully start
   159  // and advertise its capabilities.
   160  func startCacheProg(progAndArgs string, fuzzDirCache Cache) Cache {
   161  	if fuzzDirCache == nil {
   162  		panic("missing fuzzDirCache")
   163  	}
   164  	args, err := quoted.Split(progAndArgs)
   165  	if err != nil {
   166  		base.Fatalf("GOCACHEPROG args: %v", err)
   167  	}
   168  	var prog string
   169  	if len(args) > 0 {
   170  		prog = args[0]
   171  		args = args[1:]
   172  	}
   173  
   174  	ctx, ctxCancel := context.WithCancel(context.Background())
   175  
   176  	cmd := exec.CommandContext(ctx, prog, args...)
   177  	out, err := cmd.StdoutPipe()
   178  	if err != nil {
   179  		base.Fatalf("StdoutPipe to GOCACHEPROG: %v", err)
   180  	}
   181  	in, err := cmd.StdinPipe()
   182  	if err != nil {
   183  		base.Fatalf("StdinPipe to GOCACHEPROG: %v", err)
   184  	}
   185  	cmd.Stderr = os.Stderr
   186  	cmd.Cancel = in.Close
   187  
   188  	if err := cmd.Start(); err != nil {
   189  		base.Fatalf("error starting GOCACHEPROG program %q: %v", prog, err)
   190  	}
   191  
   192  	pc := &ProgCache{
   193  		ctx:          ctx,
   194  		ctxCancel:    ctxCancel,
   195  		fuzzDirCache: fuzzDirCache,
   196  		cmd:          cmd,
   197  		stdout:       out,
   198  		stdin:        in,
   199  		bw:           bufio.NewWriter(in),
   200  		inFlight:     make(map[int64]chan<- *ProgResponse),
   201  		outputFile:   make(map[OutputID]string),
   202  		readLoopDone: make(chan struct{}),
   203  	}
   204  
   205  	// Register our interest in the initial protocol message from the child to
   206  	// us, saying what it can do.
   207  	capResc := make(chan *ProgResponse, 1)
   208  	pc.inFlight[0] = capResc
   209  
   210  	pc.jenc = json.NewEncoder(pc.bw)
   211  	go pc.readLoop(pc.readLoopDone)
   212  
   213  	// Give the child process a few seconds to report its capabilities. This
   214  	// should be instant and not require any slow work by the program.
   215  	timer := time.NewTicker(5 * time.Second)
   216  	defer timer.Stop()
   217  	for {
   218  		select {
   219  		case <-timer.C:
   220  			log.Printf("# still waiting for GOCACHEPROG %v ...", prog)
   221  		case capRes := <-capResc:
   222  			can := map[ProgCmd]bool{}
   223  			for _, cmd := range capRes.KnownCommands {
   224  				can[cmd] = true
   225  			}
   226  			if len(can) == 0 {
   227  				base.Fatalf("GOCACHEPROG %v declared no supported commands", prog)
   228  			}
   229  			pc.can = can
   230  			return pc
   231  		}
   232  	}
   233  }
   234  
   235  func (c *ProgCache) readLoop(readLoopDone chan<- struct{}) {
   236  	defer close(readLoopDone)
   237  	jd := json.NewDecoder(c.stdout)
   238  	for {
   239  		res := new(ProgResponse)
   240  		if err := jd.Decode(res); err != nil {
   241  			if c.closing.Load() {
   242  				return // quietly
   243  			}
   244  			if err == io.EOF {
   245  				c.mu.Lock()
   246  				inFlight := len(c.inFlight)
   247  				c.mu.Unlock()
   248  				base.Fatalf("GOCACHEPROG exited pre-Close with %v pending requests", inFlight)
   249  			}
   250  			base.Fatalf("error reading JSON from GOCACHEPROG: %v", err)
   251  		}
   252  		c.mu.Lock()
   253  		ch, ok := c.inFlight[res.ID]
   254  		delete(c.inFlight, res.ID)
   255  		c.mu.Unlock()
   256  		if ok {
   257  			ch <- res
   258  		} else {
   259  			base.Fatalf("GOCACHEPROG sent response for unknown request ID %v", res.ID)
   260  		}
   261  	}
   262  }
   263  
   264  func (c *ProgCache) send(ctx context.Context, req *ProgRequest) (*ProgResponse, error) {
   265  	resc := make(chan *ProgResponse, 1)
   266  	if err := c.writeToChild(req, resc); err != nil {
   267  		return nil, err
   268  	}
   269  	select {
   270  	case res := <-resc:
   271  		if res.Err != "" {
   272  			return nil, errors.New(res.Err)
   273  		}
   274  		return res, nil
   275  	case <-ctx.Done():
   276  		return nil, ctx.Err()
   277  	}
   278  }
   279  
   280  func (c *ProgCache) writeToChild(req *ProgRequest, resc chan<- *ProgResponse) (err error) {
   281  	c.mu.Lock()
   282  	c.nextID++
   283  	req.ID = c.nextID
   284  	c.inFlight[req.ID] = resc
   285  	c.mu.Unlock()
   286  
   287  	defer func() {
   288  		if err != nil {
   289  			c.mu.Lock()
   290  			delete(c.inFlight, req.ID)
   291  			c.mu.Unlock()
   292  		}
   293  	}()
   294  
   295  	c.writeMu.Lock()
   296  	defer c.writeMu.Unlock()
   297  
   298  	if err := c.jenc.Encode(req); err != nil {
   299  		return err
   300  	}
   301  	if err := c.bw.WriteByte('\n'); err != nil {
   302  		return err
   303  	}
   304  	if req.Body != nil && req.BodySize > 0 {
   305  		if err := c.bw.WriteByte('"'); err != nil {
   306  			return err
   307  		}
   308  		e := base64.NewEncoder(base64.StdEncoding, c.bw)
   309  		wrote, err := io.Copy(e, req.Body)
   310  		if err != nil {
   311  			return err
   312  		}
   313  		if err := e.Close(); err != nil {
   314  			return nil
   315  		}
   316  		if wrote != req.BodySize {
   317  			return fmt.Errorf("short write writing body to GOCACHEPROG for action %x, output %x: wrote %v; expected %v",
   318  				req.ActionID, req.OutputID, wrote, req.BodySize)
   319  		}
   320  		if _, err := c.bw.WriteString("\"\n"); err != nil {
   321  			return err
   322  		}
   323  	}
   324  	if err := c.bw.Flush(); err != nil {
   325  		return err
   326  	}
   327  	return nil
   328  }
   329  
   330  func (c *ProgCache) Get(a ActionID) (Entry, error) {
   331  	if !c.can[cmdGet] {
   332  		// They can't do a "get". Maybe they're a write-only cache.
   333  		//
   334  		// TODO(bradfitz,bcmills): figure out the proper error type here. Maybe
   335  		// errors.ErrUnsupported? Is entryNotFoundError even appropriate? There
   336  		// might be places where we rely on the fact that a recent Put can be
   337  		// read through a corresponding Get. Audit callers and check, and document
   338  		// error types on the Cache interface.
   339  		return Entry{}, &entryNotFoundError{}
   340  	}
   341  	res, err := c.send(c.ctx, &ProgRequest{
   342  		Command:  cmdGet,
   343  		ActionID: a[:],
   344  	})
   345  	if err != nil {
   346  		return Entry{}, err // TODO(bradfitz): or entryNotFoundError? Audit callers.
   347  	}
   348  	if res.Miss {
   349  		return Entry{}, &entryNotFoundError{}
   350  	}
   351  	e := Entry{
   352  		Size: res.Size,
   353  	}
   354  	if res.Time != nil {
   355  		e.Time = *res.Time
   356  	} else {
   357  		e.Time = time.Now()
   358  	}
   359  	if res.DiskPath == "" {
   360  		return Entry{}, &entryNotFoundError{errors.New("GOCACHEPROG didn't populate DiskPath on get hit")}
   361  	}
   362  	if copy(e.OutputID[:], res.OutputID) != len(res.OutputID) {
   363  		return Entry{}, &entryNotFoundError{errors.New("incomplete ProgResponse OutputID")}
   364  	}
   365  	c.noteOutputFile(e.OutputID, res.DiskPath)
   366  	return e, nil
   367  }
   368  
   369  func (c *ProgCache) noteOutputFile(o OutputID, diskPath string) {
   370  	c.mu.Lock()
   371  	defer c.mu.Unlock()
   372  	c.outputFile[o] = diskPath
   373  }
   374  
   375  func (c *ProgCache) OutputFile(o OutputID) string {
   376  	c.mu.Lock()
   377  	defer c.mu.Unlock()
   378  	return c.outputFile[o]
   379  }
   380  
   381  func (c *ProgCache) Put(a ActionID, file io.ReadSeeker) (_ OutputID, size int64, _ error) {
   382  	// Compute output ID.
   383  	h := sha256.New()
   384  	if _, err := file.Seek(0, 0); err != nil {
   385  		return OutputID{}, 0, err
   386  	}
   387  	size, err := io.Copy(h, file)
   388  	if err != nil {
   389  		return OutputID{}, 0, err
   390  	}
   391  	var out OutputID
   392  	h.Sum(out[:0])
   393  
   394  	if _, err := file.Seek(0, 0); err != nil {
   395  		return OutputID{}, 0, err
   396  	}
   397  
   398  	if !c.can[cmdPut] {
   399  		// Child is a read-only cache. Do nothing.
   400  		return out, size, nil
   401  	}
   402  
   403  	// For compatibility with Go 1.23/1.24 GOEXPERIMENT=gocacheprog users, also
   404  	// populate the deprecated ObjectID field. This will be removed in Go 1.25.
   405  	var deprecatedValue []byte
   406  	if goexperiment.CacheProg {
   407  		deprecatedValue = out[:]
   408  	}
   409  
   410  	res, err := c.send(c.ctx, &ProgRequest{
   411  		Command:  cmdPut,
   412  		ActionID: a[:],
   413  		OutputID: out[:],
   414  		ObjectID: deprecatedValue, // TODO(bradfitz): remove in Go 1.25
   415  		Body:     file,
   416  		BodySize: size,
   417  	})
   418  	if err != nil {
   419  		return OutputID{}, 0, err
   420  	}
   421  	if res.DiskPath == "" {
   422  		return OutputID{}, 0, errors.New("GOCACHEPROG didn't return DiskPath in put response")
   423  	}
   424  	c.noteOutputFile(out, res.DiskPath)
   425  	return out, size, err
   426  }
   427  
   428  func (c *ProgCache) Close() error {
   429  	c.closing.Store(true)
   430  	var err error
   431  
   432  	// First write a "close" message to the child so it can exit nicely
   433  	// and clean up if it wants. Only after that exchange do we cancel
   434  	// the context that kills the process.
   435  	if c.can[cmdClose] {
   436  		_, err = c.send(c.ctx, &ProgRequest{Command: cmdClose})
   437  	}
   438  	c.ctxCancel()
   439  	<-c.readLoopDone
   440  	return err
   441  }
   442  
   443  func (c *ProgCache) FuzzDir() string {
   444  	// TODO(bradfitz): figure out what to do here. For now just use the
   445  	// disk-based default.
   446  	return c.fuzzDirCache.FuzzDir()
   447  }
   448  

View as plain text