1
2
3
4
5 package trace
6
7 import (
8 "bufio"
9 "fmt"
10 "io"
11 "slices"
12 "strings"
13
14 "internal/trace/event/go122"
15 "internal/trace/internal/oldtrace"
16 "internal/trace/version"
17 )
18
19
20 type Reader struct {
21 r *bufio.Reader
22 lastTs Time
23 gen *generation
24 spill *spilledBatch
25 spillErr error
26 frontier []*batchCursor
27 cpuSamples []cpuSample
28 order ordering
29 emittedSync bool
30
31 go121Events *oldTraceConverter
32 }
33
34
35 func NewReader(r io.Reader) (*Reader, error) {
36 br := bufio.NewReader(r)
37 v, err := version.ReadHeader(br)
38 if err != nil {
39 return nil, err
40 }
41 switch v {
42 case version.Go111, version.Go119, version.Go121:
43 tr, err := oldtrace.Parse(br, v)
44 if err != nil {
45 return nil, err
46 }
47 return &Reader{
48 go121Events: convertOldFormat(tr),
49 }, nil
50 case version.Go122, version.Go123:
51 return &Reader{
52 r: br,
53 order: ordering{
54 mStates: make(map[ThreadID]*mState),
55 pStates: make(map[ProcID]*pState),
56 gStates: make(map[GoID]*gState),
57 activeTasks: make(map[TaskID]taskState),
58 },
59
60 emittedSync: true,
61 }, nil
62 default:
63 return nil, fmt.Errorf("unknown or unsupported version go 1.%d", v)
64 }
65 }
66
67
68
69
70
71 func (r *Reader) ReadEvent() (e Event, err error) {
72 if r.go121Events != nil {
73 ev, err := r.go121Events.next()
74 if err != nil {
75
76 return Event{}, err
77 }
78 return ev, nil
79 }
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98 defer func() {
99 if err != nil {
100 return
101 }
102 if err = e.validateTableIDs(); err != nil {
103 return
104 }
105 if e.base.time <= r.lastTs {
106 e.base.time = r.lastTs + 1
107 }
108 r.lastTs = e.base.time
109 }()
110
111
112 if ev, ok := r.order.Next(); ok {
113 return ev, nil
114 }
115
116
117 if len(r.frontier) == 0 && len(r.cpuSamples) == 0 {
118 if !r.emittedSync {
119 r.emittedSync = true
120 return syncEvent(r.gen.evTable, r.lastTs), nil
121 }
122 if r.spillErr != nil {
123 return Event{}, r.spillErr
124 }
125 if r.gen != nil && r.spill == nil {
126
127
128
129
130
131 return Event{}, io.EOF
132 }
133
134 var err error
135 r.gen, r.spill, err = readGeneration(r.r, r.spill)
136 if r.gen == nil {
137 return Event{}, err
138 }
139 r.spillErr = err
140
141
142 r.cpuSamples = r.gen.cpuSamples
143
144
145 for m, batches := range r.gen.batches {
146 bc := &batchCursor{m: m}
147 ok, err := bc.nextEvent(batches, r.gen.freq)
148 if err != nil {
149 return Event{}, err
150 }
151 if !ok {
152
153 continue
154 }
155 r.frontier = heapInsert(r.frontier, bc)
156 }
157
158
159 r.emittedSync = false
160 }
161 tryAdvance := func(i int) (bool, error) {
162 bc := r.frontier[i]
163
164 if ok, err := r.order.Advance(&bc.ev, r.gen.evTable, bc.m, r.gen.gen); !ok || err != nil {
165 return ok, err
166 }
167
168
169 ok, err := bc.nextEvent(r.gen.batches[bc.m], r.gen.freq)
170 if err != nil {
171 return false, err
172 }
173 if ok {
174
175 heapUpdate(r.frontier, i)
176 } else {
177
178 r.frontier = heapRemove(r.frontier, i)
179 }
180 return true, nil
181 }
182
183 if len(r.cpuSamples) != 0 {
184 if len(r.frontier) == 0 || r.cpuSamples[0].time < r.frontier[0].ev.time {
185 e := r.cpuSamples[0].asEvent(r.gen.evTable)
186 r.cpuSamples = r.cpuSamples[1:]
187 return e, nil
188 }
189 }
190
191
192 if len(r.frontier) == 0 {
193 return Event{}, fmt.Errorf("broken trace: frontier is empty:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
194 }
195 if ok, err := tryAdvance(0); err != nil {
196 return Event{}, err
197 } else if !ok {
198
199
200
201
202
203 slices.SortFunc(r.frontier, (*batchCursor).compare)
204 success := false
205 for i := 1; i < len(r.frontier); i++ {
206 if ok, err = tryAdvance(i); err != nil {
207 return Event{}, err
208 } else if ok {
209 success = true
210 break
211 }
212 }
213 if !success {
214 return Event{}, fmt.Errorf("broken trace: failed to advance: frontier:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
215 }
216 }
217
218
219 ev, ok := r.order.Next()
220 if !ok {
221 panic("invariant violation: advance successful, but queue is empty")
222 }
223 return ev, nil
224 }
225
226 func dumpFrontier(frontier []*batchCursor) string {
227 var sb strings.Builder
228 for _, bc := range frontier {
229 spec := go122.Specs()[bc.ev.typ]
230 fmt.Fprintf(&sb, "M %d [%s time=%d", bc.m, spec.Name, bc.ev.time)
231 for i, arg := range spec.Args[1:] {
232 fmt.Fprintf(&sb, " %s=%d", arg, bc.ev.args[i])
233 }
234 fmt.Fprintf(&sb, "]\n")
235 }
236 return sb.String()
237 }
238
View as plain text