Source file src/internal/trace/generation.go

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package trace
     6  
     7  import (
     8  	"bufio"
     9  	"bytes"
    10  	"cmp"
    11  	"encoding/binary"
    12  	"fmt"
    13  	"io"
    14  	"slices"
    15  	"strings"
    16  
    17  	"internal/trace/event"
    18  	"internal/trace/event/go122"
    19  )
    20  
    21  // generation contains all the trace data for a single
    22  // trace generation. It is purely data: it does not
    23  // track any parse state nor does it contain a cursor
    24  // into the generation.
    25  type generation struct {
    26  	gen        uint64
    27  	batches    map[ThreadID][]batch
    28  	batchMs    []ThreadID
    29  	cpuSamples []cpuSample
    30  	*evTable
    31  }
    32  
    33  // spilledBatch represents a batch that was read out for the next generation,
    34  // while reading the previous one. It's passed on when parsing the next
    35  // generation.
    36  type spilledBatch struct {
    37  	gen uint64
    38  	*batch
    39  }
    40  
    41  // readGeneration buffers and decodes the structural elements of a trace generation
    42  // out of r. spill is the first batch of the new generation (already buffered and
    43  // parsed from reading the last generation). Returns the generation and the first
    44  // batch read of the next generation, if any.
    45  //
    46  // If gen is non-nil, it is valid and must be processed before handling the returned
    47  // error.
    48  func readGeneration(r *bufio.Reader, spill *spilledBatch) (*generation, *spilledBatch, error) {
    49  	g := &generation{
    50  		evTable: &evTable{
    51  			pcs: make(map[uint64]frame),
    52  		},
    53  		batches: make(map[ThreadID][]batch),
    54  	}
    55  	// Process the spilled batch.
    56  	if spill != nil {
    57  		g.gen = spill.gen
    58  		if err := processBatch(g, *spill.batch); err != nil {
    59  			return nil, nil, err
    60  		}
    61  		spill = nil
    62  	}
    63  	// Read batches one at a time until we either hit EOF or
    64  	// the next generation.
    65  	var spillErr error
    66  	for {
    67  		b, gen, err := readBatch(r)
    68  		if err == io.EOF {
    69  			break
    70  		}
    71  		if err != nil {
    72  			if g.gen != 0 {
    73  				// This is an error reading the first batch of the next generation.
    74  				// This is fine. Let's forge ahead assuming that what we've got so
    75  				// far is fine.
    76  				spillErr = err
    77  				break
    78  			}
    79  			return nil, nil, err
    80  		}
    81  		if gen == 0 {
    82  			// 0 is a sentinel used by the runtime, so we'll never see it.
    83  			return nil, nil, fmt.Errorf("invalid generation number %d", gen)
    84  		}
    85  		if g.gen == 0 {
    86  			// Initialize gen.
    87  			g.gen = gen
    88  		}
    89  		if gen == g.gen+1 { // TODO: advance this the same way the runtime does.
    90  			spill = &spilledBatch{gen: gen, batch: &b}
    91  			break
    92  		}
    93  		if gen != g.gen {
    94  			// N.B. Fail as fast as possible if we see this. At first it
    95  			// may seem prudent to be fault-tolerant and assume we have a
    96  			// complete generation, parsing and returning that first. However,
    97  			// if the batches are mixed across generations then it's likely
    98  			// we won't be able to parse this generation correctly at all.
    99  			// Rather than return a cryptic error in that case, indicate the
   100  			// problem as soon as we see it.
   101  			return nil, nil, fmt.Errorf("generations out of order")
   102  		}
   103  		if err := processBatch(g, b); err != nil {
   104  			return nil, nil, err
   105  		}
   106  	}
   107  
   108  	// Check some invariants.
   109  	if g.freq == 0 {
   110  		return nil, nil, fmt.Errorf("no frequency event found")
   111  	}
   112  	// N.B. Trust that the batch order is correct. We can't validate the batch order
   113  	// by timestamp because the timestamps could just be plain wrong. The source of
   114  	// truth is the order things appear in the trace and the partial order sequence
   115  	// numbers on certain events. If it turns out the batch order is actually incorrect
   116  	// we'll very likely fail to advance a partial order from the frontier.
   117  
   118  	// Compactify stacks and strings for better lookup performance later.
   119  	g.stacks.compactify()
   120  	g.strings.compactify()
   121  
   122  	// Validate stacks.
   123  	if err := validateStackStrings(&g.stacks, &g.strings, g.pcs); err != nil {
   124  		return nil, nil, err
   125  	}
   126  
   127  	// Fix up the CPU sample timestamps, now that we have freq.
   128  	for i := range g.cpuSamples {
   129  		s := &g.cpuSamples[i]
   130  		s.time = g.freq.mul(timestamp(s.time))
   131  	}
   132  	// Sort the CPU samples.
   133  	slices.SortFunc(g.cpuSamples, func(a, b cpuSample) int {
   134  		return cmp.Compare(a.time, b.time)
   135  	})
   136  	return g, spill, spillErr
   137  }
   138  
   139  // processBatch adds the batch to the generation.
   140  func processBatch(g *generation, b batch) error {
   141  	switch {
   142  	case b.isStringsBatch():
   143  		if err := addStrings(&g.strings, b); err != nil {
   144  			return err
   145  		}
   146  	case b.isStacksBatch():
   147  		if err := addStacks(&g.stacks, g.pcs, b); err != nil {
   148  			return err
   149  		}
   150  	case b.isCPUSamplesBatch():
   151  		samples, err := addCPUSamples(g.cpuSamples, b)
   152  		if err != nil {
   153  			return err
   154  		}
   155  		g.cpuSamples = samples
   156  	case b.isFreqBatch():
   157  		freq, err := parseFreq(b)
   158  		if err != nil {
   159  			return err
   160  		}
   161  		if g.freq != 0 {
   162  			return fmt.Errorf("found multiple frequency events")
   163  		}
   164  		g.freq = freq
   165  	case b.exp != event.NoExperiment:
   166  		if g.expData == nil {
   167  			g.expData = make(map[event.Experiment]*ExperimentalData)
   168  		}
   169  		if err := addExperimentalData(g.expData, b); err != nil {
   170  			return err
   171  		}
   172  	default:
   173  		if _, ok := g.batches[b.m]; !ok {
   174  			g.batchMs = append(g.batchMs, b.m)
   175  		}
   176  		g.batches[b.m] = append(g.batches[b.m], b)
   177  	}
   178  	return nil
   179  }
   180  
   181  // validateStackStrings makes sure all the string references in
   182  // the stack table are present in the string table.
   183  func validateStackStrings(
   184  	stacks *dataTable[stackID, stack],
   185  	strings *dataTable[stringID, string],
   186  	frames map[uint64]frame,
   187  ) error {
   188  	var err error
   189  	stacks.forEach(func(id stackID, stk stack) bool {
   190  		for _, pc := range stk.pcs {
   191  			frame, ok := frames[pc]
   192  			if !ok {
   193  				err = fmt.Errorf("found unknown pc %x for stack %d", pc, id)
   194  				return false
   195  			}
   196  			_, ok = strings.get(frame.funcID)
   197  			if !ok {
   198  				err = fmt.Errorf("found invalid func string ID %d for stack %d", frame.funcID, id)
   199  				return false
   200  			}
   201  			_, ok = strings.get(frame.fileID)
   202  			if !ok {
   203  				err = fmt.Errorf("found invalid file string ID %d for stack %d", frame.fileID, id)
   204  				return false
   205  			}
   206  		}
   207  		return true
   208  	})
   209  	return err
   210  }
   211  
   212  // addStrings takes a batch whose first byte is an EvStrings event
   213  // (indicating that the batch contains only strings) and adds each
   214  // string contained therein to the provided strings map.
   215  func addStrings(stringTable *dataTable[stringID, string], b batch) error {
   216  	if !b.isStringsBatch() {
   217  		return fmt.Errorf("internal error: addStrings called on non-string batch")
   218  	}
   219  	r := bytes.NewReader(b.data)
   220  	hdr, err := r.ReadByte() // Consume the EvStrings byte.
   221  	if err != nil || event.Type(hdr) != go122.EvStrings {
   222  		return fmt.Errorf("missing strings batch header")
   223  	}
   224  
   225  	var sb strings.Builder
   226  	for r.Len() != 0 {
   227  		// Read the header.
   228  		ev, err := r.ReadByte()
   229  		if err != nil {
   230  			return err
   231  		}
   232  		if event.Type(ev) != go122.EvString {
   233  			return fmt.Errorf("expected string event, got %d", ev)
   234  		}
   235  
   236  		// Read the string's ID.
   237  		id, err := binary.ReadUvarint(r)
   238  		if err != nil {
   239  			return err
   240  		}
   241  
   242  		// Read the string's length.
   243  		len, err := binary.ReadUvarint(r)
   244  		if err != nil {
   245  			return err
   246  		}
   247  		if len > go122.MaxStringSize {
   248  			return fmt.Errorf("invalid string size %d, maximum is %d", len, go122.MaxStringSize)
   249  		}
   250  
   251  		// Copy out the string.
   252  		n, err := io.CopyN(&sb, r, int64(len))
   253  		if n != int64(len) {
   254  			return fmt.Errorf("failed to read full string: read %d but wanted %d", n, len)
   255  		}
   256  		if err != nil {
   257  			return fmt.Errorf("copying string data: %w", err)
   258  		}
   259  
   260  		// Add the string to the map.
   261  		s := sb.String()
   262  		sb.Reset()
   263  		if err := stringTable.insert(stringID(id), s); err != nil {
   264  			return err
   265  		}
   266  	}
   267  	return nil
   268  }
   269  
   270  // addStacks takes a batch whose first byte is an EvStacks event
   271  // (indicating that the batch contains only stacks) and adds each
   272  // string contained therein to the provided stacks map.
   273  func addStacks(stackTable *dataTable[stackID, stack], pcs map[uint64]frame, b batch) error {
   274  	if !b.isStacksBatch() {
   275  		return fmt.Errorf("internal error: addStacks called on non-stacks batch")
   276  	}
   277  	r := bytes.NewReader(b.data)
   278  	hdr, err := r.ReadByte() // Consume the EvStacks byte.
   279  	if err != nil || event.Type(hdr) != go122.EvStacks {
   280  		return fmt.Errorf("missing stacks batch header")
   281  	}
   282  
   283  	for r.Len() != 0 {
   284  		// Read the header.
   285  		ev, err := r.ReadByte()
   286  		if err != nil {
   287  			return err
   288  		}
   289  		if event.Type(ev) != go122.EvStack {
   290  			return fmt.Errorf("expected stack event, got %d", ev)
   291  		}
   292  
   293  		// Read the stack's ID.
   294  		id, err := binary.ReadUvarint(r)
   295  		if err != nil {
   296  			return err
   297  		}
   298  
   299  		// Read how many frames are in each stack.
   300  		nFrames, err := binary.ReadUvarint(r)
   301  		if err != nil {
   302  			return err
   303  		}
   304  		if nFrames > go122.MaxFramesPerStack {
   305  			return fmt.Errorf("invalid stack size %d, maximum is %d", nFrames, go122.MaxFramesPerStack)
   306  		}
   307  
   308  		// Each frame consists of 4 fields: pc, funcID (string), fileID (string), line.
   309  		frames := make([]uint64, 0, nFrames)
   310  		for i := uint64(0); i < nFrames; i++ {
   311  			// Read the frame data.
   312  			pc, err := binary.ReadUvarint(r)
   313  			if err != nil {
   314  				return fmt.Errorf("reading frame %d's PC for stack %d: %w", i+1, id, err)
   315  			}
   316  			funcID, err := binary.ReadUvarint(r)
   317  			if err != nil {
   318  				return fmt.Errorf("reading frame %d's funcID for stack %d: %w", i+1, id, err)
   319  			}
   320  			fileID, err := binary.ReadUvarint(r)
   321  			if err != nil {
   322  				return fmt.Errorf("reading frame %d's fileID for stack %d: %w", i+1, id, err)
   323  			}
   324  			line, err := binary.ReadUvarint(r)
   325  			if err != nil {
   326  				return fmt.Errorf("reading frame %d's line for stack %d: %w", i+1, id, err)
   327  			}
   328  			frames = append(frames, pc)
   329  
   330  			if _, ok := pcs[pc]; !ok {
   331  				pcs[pc] = frame{
   332  					pc:     pc,
   333  					funcID: stringID(funcID),
   334  					fileID: stringID(fileID),
   335  					line:   line,
   336  				}
   337  			}
   338  		}
   339  
   340  		// Add the stack to the map.
   341  		if err := stackTable.insert(stackID(id), stack{pcs: frames}); err != nil {
   342  			return err
   343  		}
   344  	}
   345  	return nil
   346  }
   347  
   348  // addCPUSamples takes a batch whose first byte is an EvCPUSamples event
   349  // (indicating that the batch contains only CPU samples) and adds each
   350  // sample contained therein to the provided samples list.
   351  func addCPUSamples(samples []cpuSample, b batch) ([]cpuSample, error) {
   352  	if !b.isCPUSamplesBatch() {
   353  		return nil, fmt.Errorf("internal error: addCPUSamples called on non-CPU-sample batch")
   354  	}
   355  	r := bytes.NewReader(b.data)
   356  	hdr, err := r.ReadByte() // Consume the EvCPUSamples byte.
   357  	if err != nil || event.Type(hdr) != go122.EvCPUSamples {
   358  		return nil, fmt.Errorf("missing CPU samples batch header")
   359  	}
   360  
   361  	for r.Len() != 0 {
   362  		// Read the header.
   363  		ev, err := r.ReadByte()
   364  		if err != nil {
   365  			return nil, err
   366  		}
   367  		if event.Type(ev) != go122.EvCPUSample {
   368  			return nil, fmt.Errorf("expected CPU sample event, got %d", ev)
   369  		}
   370  
   371  		// Read the sample's timestamp.
   372  		ts, err := binary.ReadUvarint(r)
   373  		if err != nil {
   374  			return nil, err
   375  		}
   376  
   377  		// Read the sample's M.
   378  		m, err := binary.ReadUvarint(r)
   379  		if err != nil {
   380  			return nil, err
   381  		}
   382  		mid := ThreadID(m)
   383  
   384  		// Read the sample's P.
   385  		p, err := binary.ReadUvarint(r)
   386  		if err != nil {
   387  			return nil, err
   388  		}
   389  		pid := ProcID(p)
   390  
   391  		// Read the sample's G.
   392  		g, err := binary.ReadUvarint(r)
   393  		if err != nil {
   394  			return nil, err
   395  		}
   396  		goid := GoID(g)
   397  		if g == 0 {
   398  			goid = NoGoroutine
   399  		}
   400  
   401  		// Read the sample's stack.
   402  		s, err := binary.ReadUvarint(r)
   403  		if err != nil {
   404  			return nil, err
   405  		}
   406  
   407  		// Add the sample to the slice.
   408  		samples = append(samples, cpuSample{
   409  			schedCtx: schedCtx{
   410  				M: mid,
   411  				P: pid,
   412  				G: goid,
   413  			},
   414  			time:  Time(ts), // N.B. this is really a "timestamp," not a Time.
   415  			stack: stackID(s),
   416  		})
   417  	}
   418  	return samples, nil
   419  }
   420  
   421  // parseFreq parses out a lone EvFrequency from a batch.
   422  func parseFreq(b batch) (frequency, error) {
   423  	if !b.isFreqBatch() {
   424  		return 0, fmt.Errorf("internal error: parseFreq called on non-frequency batch")
   425  	}
   426  	r := bytes.NewReader(b.data)
   427  	r.ReadByte() // Consume the EvFrequency byte.
   428  
   429  	// Read the frequency. It'll come out as timestamp units per second.
   430  	f, err := binary.ReadUvarint(r)
   431  	if err != nil {
   432  		return 0, err
   433  	}
   434  	// Convert to nanoseconds per timestamp unit.
   435  	return frequency(1.0 / (float64(f) / 1e9)), nil
   436  }
   437  
   438  // addExperimentalData takes an experimental batch and adds it to the ExperimentalData
   439  // for the experiment its a part of.
   440  func addExperimentalData(expData map[event.Experiment]*ExperimentalData, b batch) error {
   441  	if b.exp == event.NoExperiment {
   442  		return fmt.Errorf("internal error: addExperimentalData called on non-experimental batch")
   443  	}
   444  	ed, ok := expData[b.exp]
   445  	if !ok {
   446  		ed = new(ExperimentalData)
   447  		expData[b.exp] = ed
   448  	}
   449  	ed.Batches = append(ed.Batches, ExperimentalBatch{
   450  		Thread: b.m,
   451  		Data:   b.data,
   452  	})
   453  	return nil
   454  }
   455  

View as plain text