Source file src/cmd/go/internal/cache/prog.go

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package cache
     6  
     7  import (
     8  	"bufio"
     9  	"cmd/go/internal/base"
    10  	"cmd/internal/quoted"
    11  	"context"
    12  	"crypto/sha256"
    13  	"encoding/base64"
    14  	"encoding/json"
    15  	"errors"
    16  	"fmt"
    17  	"io"
    18  	"log"
    19  	"os"
    20  	"os/exec"
    21  	"sync"
    22  	"sync/atomic"
    23  	"time"
    24  )
    25  
    26  // ProgCache implements Cache via JSON messages over stdin/stdout to a child
    27  // helper process which can then implement whatever caching policy/mechanism it
    28  // wants.
    29  //
    30  // See https://github.com/golang/go/issues/59719
    31  type ProgCache struct {
    32  	cmd    *exec.Cmd
    33  	stdout io.ReadCloser  // from the child process
    34  	stdin  io.WriteCloser // to the child process
    35  	bw     *bufio.Writer  // to stdin
    36  	jenc   *json.Encoder  // to bw
    37  
    38  	// can are the commands that the child process declared that it supports.
    39  	// This is effectively the versioning mechanism.
    40  	can map[ProgCmd]bool
    41  
    42  	// fuzzDirCache is another Cache implementation to use for the FuzzDir
    43  	// method. In practice this is the default GOCACHE disk-based
    44  	// implementation.
    45  	//
    46  	// TODO(bradfitz): maybe this isn't ideal. But we'd need to extend the Cache
    47  	// interface and the fuzzing callers to be less disk-y to do more here.
    48  	fuzzDirCache Cache
    49  
    50  	closing      atomic.Bool
    51  	ctx          context.Context    // valid until Close via ctxClose
    52  	ctxCancel    context.CancelFunc // called on Close
    53  	readLoopDone chan struct{}      // closed when readLoop returns
    54  
    55  	mu         sync.Mutex // guards following fields
    56  	nextID     int64
    57  	inFlight   map[int64]chan<- *ProgResponse
    58  	outputFile map[OutputID]string // object => abs path on disk
    59  
    60  	// writeMu serializes writing to the child process.
    61  	// It must never be held at the same time as mu.
    62  	writeMu sync.Mutex
    63  }
    64  
    65  // ProgCmd is a command that can be issued to a child process.
    66  //
    67  // If the interface needs to grow, we can add new commands or new versioned
    68  // commands like "get2".
    69  type ProgCmd string
    70  
    71  const (
    72  	cmdGet   = ProgCmd("get")
    73  	cmdPut   = ProgCmd("put")
    74  	cmdClose = ProgCmd("close")
    75  )
    76  
    77  // ProgRequest is the JSON-encoded message that's sent from cmd/go to
    78  // the GOCACHEPROG child process over stdin. Each JSON object is on its
    79  // own line. A ProgRequest of Type "put" with BodySize > 0 will be followed
    80  // by a line containing a base64-encoded JSON string literal of the body.
    81  type ProgRequest struct {
    82  	// ID is a unique number per process across all requests.
    83  	// It must be echoed in the ProgResponse from the child.
    84  	ID int64
    85  
    86  	// Command is the type of request.
    87  	// The cmd/go tool will only send commands that were declared
    88  	// as supported by the child.
    89  	Command ProgCmd
    90  
    91  	// ActionID is non-nil for get and puts.
    92  	ActionID []byte `json:",omitempty"` // or nil if not used
    93  
    94  	// ObjectID is set for Type "put" and "output-file".
    95  	ObjectID []byte `json:",omitempty"` // or nil if not used
    96  
    97  	// Body is the body for "put" requests. It's sent after the JSON object
    98  	// as a base64-encoded JSON string when BodySize is non-zero.
    99  	// It's sent as a separate JSON value instead of being a struct field
   100  	// send in this JSON object so large values can be streamed in both directions.
   101  	// The base64 string body of a ProgRequest will always be written
   102  	// immediately after the JSON object and a newline.
   103  	Body io.Reader `json:"-"`
   104  
   105  	// BodySize is the number of bytes of Body. If zero, the body isn't written.
   106  	BodySize int64 `json:",omitempty"`
   107  }
   108  
   109  // ProgResponse is the JSON response from the child process to cmd/go.
   110  //
   111  // With the exception of the first protocol message that the child writes to its
   112  // stdout with ID==0 and KnownCommands populated, these are only sent in
   113  // response to a ProgRequest from cmd/go.
   114  //
   115  // ProgResponses can be sent in any order. The ID must match the request they're
   116  // replying to.
   117  type ProgResponse struct {
   118  	ID  int64  // that corresponds to ProgRequest; they can be answered out of order
   119  	Err string `json:",omitempty"` // if non-empty, the error
   120  
   121  	// KnownCommands is included in the first message that cache helper program
   122  	// writes to stdout on startup (with ID==0). It includes the
   123  	// ProgRequest.Command types that are supported by the program.
   124  	//
   125  	// This lets us extend the protocol gracefully over time (adding "get2",
   126  	// etc), or fail gracefully when needed. It also lets us verify the program
   127  	// wants to be a cache helper.
   128  	KnownCommands []ProgCmd `json:",omitempty"`
   129  
   130  	// For Get requests.
   131  
   132  	Miss     bool       `json:",omitempty"` // cache miss
   133  	OutputID []byte     `json:",omitempty"`
   134  	Size     int64      `json:",omitempty"` // in bytes
   135  	Time     *time.Time `json:",omitempty"` // an Entry.Time; when the object was added to the docs
   136  
   137  	// DiskPath is the absolute path on disk of the ObjectID corresponding
   138  	// a "get" request's ActionID (on cache hit) or a "put" request's
   139  	// provided ObjectID.
   140  	DiskPath string `json:",omitempty"`
   141  }
   142  
   143  // startCacheProg starts the prog binary (with optional space-separated flags)
   144  // and returns a Cache implementation that talks to it.
   145  //
   146  // It blocks a few seconds to wait for the child process to successfully start
   147  // and advertise its capabilities.
   148  func startCacheProg(progAndArgs string, fuzzDirCache Cache) Cache {
   149  	if fuzzDirCache == nil {
   150  		panic("missing fuzzDirCache")
   151  	}
   152  	args, err := quoted.Split(progAndArgs)
   153  	if err != nil {
   154  		base.Fatalf("GOCACHEPROG args: %v", err)
   155  	}
   156  	var prog string
   157  	if len(args) > 0 {
   158  		prog = args[0]
   159  		args = args[1:]
   160  	}
   161  
   162  	ctx, ctxCancel := context.WithCancel(context.Background())
   163  
   164  	cmd := exec.CommandContext(ctx, prog, args...)
   165  	out, err := cmd.StdoutPipe()
   166  	if err != nil {
   167  		base.Fatalf("StdoutPipe to GOCACHEPROG: %v", err)
   168  	}
   169  	in, err := cmd.StdinPipe()
   170  	if err != nil {
   171  		base.Fatalf("StdinPipe to GOCACHEPROG: %v", err)
   172  	}
   173  	cmd.Stderr = os.Stderr
   174  	cmd.Cancel = in.Close
   175  
   176  	if err := cmd.Start(); err != nil {
   177  		base.Fatalf("error starting GOCACHEPROG program %q: %v", prog, err)
   178  	}
   179  
   180  	pc := &ProgCache{
   181  		ctx:          ctx,
   182  		ctxCancel:    ctxCancel,
   183  		fuzzDirCache: fuzzDirCache,
   184  		cmd:          cmd,
   185  		stdout:       out,
   186  		stdin:        in,
   187  		bw:           bufio.NewWriter(in),
   188  		inFlight:     make(map[int64]chan<- *ProgResponse),
   189  		outputFile:   make(map[OutputID]string),
   190  		readLoopDone: make(chan struct{}),
   191  	}
   192  
   193  	// Register our interest in the initial protocol message from the child to
   194  	// us, saying what it can do.
   195  	capResc := make(chan *ProgResponse, 1)
   196  	pc.inFlight[0] = capResc
   197  
   198  	pc.jenc = json.NewEncoder(pc.bw)
   199  	go pc.readLoop(pc.readLoopDone)
   200  
   201  	// Give the child process a few seconds to report its capabilities. This
   202  	// should be instant and not require any slow work by the program.
   203  	timer := time.NewTicker(5 * time.Second)
   204  	defer timer.Stop()
   205  	for {
   206  		select {
   207  		case <-timer.C:
   208  			log.Printf("# still waiting for GOCACHEPROG %v ...", prog)
   209  		case capRes := <-capResc:
   210  			can := map[ProgCmd]bool{}
   211  			for _, cmd := range capRes.KnownCommands {
   212  				can[cmd] = true
   213  			}
   214  			if len(can) == 0 {
   215  				base.Fatalf("GOCACHEPROG %v declared no supported commands", prog)
   216  			}
   217  			pc.can = can
   218  			return pc
   219  		}
   220  	}
   221  }
   222  
   223  func (c *ProgCache) readLoop(readLoopDone chan<- struct{}) {
   224  	defer close(readLoopDone)
   225  	jd := json.NewDecoder(c.stdout)
   226  	for {
   227  		res := new(ProgResponse)
   228  		if err := jd.Decode(res); err != nil {
   229  			if c.closing.Load() {
   230  				return // quietly
   231  			}
   232  			if err == io.EOF {
   233  				c.mu.Lock()
   234  				inFlight := len(c.inFlight)
   235  				c.mu.Unlock()
   236  				base.Fatalf("GOCACHEPROG exited pre-Close with %v pending requests", inFlight)
   237  			}
   238  			base.Fatalf("error reading JSON from GOCACHEPROG: %v", err)
   239  		}
   240  		c.mu.Lock()
   241  		ch, ok := c.inFlight[res.ID]
   242  		delete(c.inFlight, res.ID)
   243  		c.mu.Unlock()
   244  		if ok {
   245  			ch <- res
   246  		} else {
   247  			base.Fatalf("GOCACHEPROG sent response for unknown request ID %v", res.ID)
   248  		}
   249  	}
   250  }
   251  
   252  func (c *ProgCache) send(ctx context.Context, req *ProgRequest) (*ProgResponse, error) {
   253  	resc := make(chan *ProgResponse, 1)
   254  	if err := c.writeToChild(req, resc); err != nil {
   255  		return nil, err
   256  	}
   257  	select {
   258  	case res := <-resc:
   259  		if res.Err != "" {
   260  			return nil, errors.New(res.Err)
   261  		}
   262  		return res, nil
   263  	case <-ctx.Done():
   264  		return nil, ctx.Err()
   265  	}
   266  }
   267  
   268  func (c *ProgCache) writeToChild(req *ProgRequest, resc chan<- *ProgResponse) (err error) {
   269  	c.mu.Lock()
   270  	c.nextID++
   271  	req.ID = c.nextID
   272  	c.inFlight[req.ID] = resc
   273  	c.mu.Unlock()
   274  
   275  	defer func() {
   276  		if err != nil {
   277  			c.mu.Lock()
   278  			delete(c.inFlight, req.ID)
   279  			c.mu.Unlock()
   280  		}
   281  	}()
   282  
   283  	c.writeMu.Lock()
   284  	defer c.writeMu.Unlock()
   285  
   286  	if err := c.jenc.Encode(req); err != nil {
   287  		return err
   288  	}
   289  	if err := c.bw.WriteByte('\n'); err != nil {
   290  		return err
   291  	}
   292  	if req.Body != nil && req.BodySize > 0 {
   293  		if err := c.bw.WriteByte('"'); err != nil {
   294  			return err
   295  		}
   296  		e := base64.NewEncoder(base64.StdEncoding, c.bw)
   297  		wrote, err := io.Copy(e, req.Body)
   298  		if err != nil {
   299  			return err
   300  		}
   301  		if err := e.Close(); err != nil {
   302  			return nil
   303  		}
   304  		if wrote != req.BodySize {
   305  			return fmt.Errorf("short write writing body to GOCACHEPROG for action %x, object %x: wrote %v; expected %v",
   306  				req.ActionID, req.ObjectID, wrote, req.BodySize)
   307  		}
   308  		if _, err := c.bw.WriteString("\"\n"); err != nil {
   309  			return err
   310  		}
   311  	}
   312  	if err := c.bw.Flush(); err != nil {
   313  		return err
   314  	}
   315  	return nil
   316  }
   317  
   318  func (c *ProgCache) Get(a ActionID) (Entry, error) {
   319  	if !c.can[cmdGet] {
   320  		// They can't do a "get". Maybe they're a write-only cache.
   321  		//
   322  		// TODO(bradfitz,bcmills): figure out the proper error type here. Maybe
   323  		// errors.ErrUnsupported? Is entryNotFoundError even appropriate? There
   324  		// might be places where we rely on the fact that a recent Put can be
   325  		// read through a corresponding Get. Audit callers and check, and document
   326  		// error types on the Cache interface.
   327  		return Entry{}, &entryNotFoundError{}
   328  	}
   329  	res, err := c.send(c.ctx, &ProgRequest{
   330  		Command:  cmdGet,
   331  		ActionID: a[:],
   332  	})
   333  	if err != nil {
   334  		return Entry{}, err // TODO(bradfitz): or entryNotFoundError? Audit callers.
   335  	}
   336  	if res.Miss {
   337  		return Entry{}, &entryNotFoundError{}
   338  	}
   339  	e := Entry{
   340  		Size: res.Size,
   341  	}
   342  	if res.Time != nil {
   343  		e.Time = *res.Time
   344  	} else {
   345  		e.Time = time.Now()
   346  	}
   347  	if res.DiskPath == "" {
   348  		return Entry{}, &entryNotFoundError{errors.New("GOCACHEPROG didn't populate DiskPath on get hit")}
   349  	}
   350  	if copy(e.OutputID[:], res.OutputID) != len(res.OutputID) {
   351  		return Entry{}, &entryNotFoundError{errors.New("incomplete ProgResponse OutputID")}
   352  	}
   353  	c.noteOutputFile(e.OutputID, res.DiskPath)
   354  	return e, nil
   355  }
   356  
   357  func (c *ProgCache) noteOutputFile(o OutputID, diskPath string) {
   358  	c.mu.Lock()
   359  	defer c.mu.Unlock()
   360  	c.outputFile[o] = diskPath
   361  }
   362  
   363  func (c *ProgCache) OutputFile(o OutputID) string {
   364  	c.mu.Lock()
   365  	defer c.mu.Unlock()
   366  	return c.outputFile[o]
   367  }
   368  
   369  func (c *ProgCache) Put(a ActionID, file io.ReadSeeker) (_ OutputID, size int64, _ error) {
   370  	// Compute output ID.
   371  	h := sha256.New()
   372  	if _, err := file.Seek(0, 0); err != nil {
   373  		return OutputID{}, 0, err
   374  	}
   375  	size, err := io.Copy(h, file)
   376  	if err != nil {
   377  		return OutputID{}, 0, err
   378  	}
   379  	var out OutputID
   380  	h.Sum(out[:0])
   381  
   382  	if _, err := file.Seek(0, 0); err != nil {
   383  		return OutputID{}, 0, err
   384  	}
   385  
   386  	if !c.can[cmdPut] {
   387  		// Child is a read-only cache. Do nothing.
   388  		return out, size, nil
   389  	}
   390  
   391  	res, err := c.send(c.ctx, &ProgRequest{
   392  		Command:  cmdPut,
   393  		ActionID: a[:],
   394  		ObjectID: out[:],
   395  		Body:     file,
   396  		BodySize: size,
   397  	})
   398  	if err != nil {
   399  		return OutputID{}, 0, err
   400  	}
   401  	if res.DiskPath == "" {
   402  		return OutputID{}, 0, errors.New("GOCACHEPROG didn't return DiskPath in put response")
   403  	}
   404  	c.noteOutputFile(out, res.DiskPath)
   405  	return out, size, err
   406  }
   407  
   408  func (c *ProgCache) Close() error {
   409  	c.closing.Store(true)
   410  	var err error
   411  
   412  	// First write a "close" message to the child so it can exit nicely
   413  	// and clean up if it wants. Only after that exchange do we cancel
   414  	// the context that kills the process.
   415  	if c.can[cmdClose] {
   416  		_, err = c.send(c.ctx, &ProgRequest{Command: cmdClose})
   417  	}
   418  	c.ctxCancel()
   419  	<-c.readLoopDone
   420  	return err
   421  }
   422  
   423  func (c *ProgCache) FuzzDir() string {
   424  	// TODO(bradfitz): figure out what to do here. For now just use the
   425  	// disk-based default.
   426  	return c.fuzzDirCache.FuzzDir()
   427  }
   428  

View as plain text