diff --git a/logtail/buffer.go b/logtail/buffer.go index 82c9b4610..6efdbda63 100644 --- a/logtail/buffer.go +++ b/logtail/buffer.go @@ -8,8 +8,10 @@ package logtail import ( "bytes" "errors" + "expvar" "fmt" + "tailscale.com/metrics" "tailscale.com/syncs" ) @@ -39,12 +41,42 @@ type memBuffer struct { dropMu syncs.Mutex dropCount int + + // Metrics (see [memBuffer.ExpVar] for details). + writeCalls expvar.Int + readCalls expvar.Int + writeBytes expvar.Int + readBytes expvar.Int + droppedBytes expvar.Int + storedBytes expvar.Int +} + +// ExpVar returns a [metrics.Set] with metrics about the buffer. +// +// - counter_write_calls: Total number of write calls. +// - counter_read_calls: Total number of read calls. +// - counter_write_bytes: Total number of bytes written. +// - counter_read_bytes: Total number of bytes read. +// - counter_dropped_bytes: Total number of bytes dropped. +// - gauge_stored_bytes: Current number of bytes stored in memory. +func (b *memBuffer) ExpVar() expvar.Var { + m := new(metrics.Set) + m.Set("counter_write_calls", &b.writeCalls) + m.Set("counter_read_calls", &b.readCalls) + m.Set("counter_write_bytes", &b.writeBytes) + m.Set("counter_read_bytes", &b.readBytes) + m.Set("counter_dropped_bytes", &b.droppedBytes) + m.Set("gauge_stored_bytes", &b.storedBytes) + return m } func (m *memBuffer) TryReadLine() ([]byte, error) { + m.readCalls.Add(1) if m.next != nil { msg := m.next m.next = nil + m.readBytes.Add(int64(len(msg))) + m.storedBytes.Add(-int64(len(msg))) return msg, nil } @@ -52,8 +84,13 @@ func (m *memBuffer) TryReadLine() ([]byte, error) { case ent := <-m.pending: if ent.dropCount > 0 { m.next = ent.msg - return fmt.Appendf(nil, "----------- %d logs dropped ----------", ent.dropCount), nil + b := fmt.Appendf(nil, "----------- %d logs dropped ----------", ent.dropCount) + m.writeBytes.Add(int64(len(b))) // indicate pseudo-injected log message + m.readBytes.Add(int64(len(b))) + return b, nil } + m.readBytes.Add(int64(len(ent.msg))) + m.storedBytes.Add(-int64(len(ent.msg))) return ent.msg, nil default: return nil, nil @@ -61,6 +98,7 @@ func (m *memBuffer) TryReadLine() ([]byte, error) { } func (m *memBuffer) Write(b []byte) (int, error) { + m.writeCalls.Add(1) m.dropMu.Lock() defer m.dropMu.Unlock() @@ -70,10 +108,13 @@ func (m *memBuffer) Write(b []byte) (int, error) { } select { case m.pending <- ent: + m.writeBytes.Add(int64(len(b))) + m.storedBytes.Add(+int64(len(b))) m.dropCount = 0 return len(b), nil default: m.dropCount++ + m.droppedBytes.Add(int64(len(b))) return 0, errBufferFull } } diff --git a/logtail/filch/filch.go b/logtail/filch/filch.go index 12ac647c4..88c72f233 100644 --- a/logtail/filch/filch.go +++ b/logtail/filch/filch.go @@ -18,6 +18,7 @@ import ( "slices" "sync" + "tailscale.com/metrics" "tailscale.com/util/must" ) @@ -88,7 +89,7 @@ type Filch struct { storedBytes expvar.Int } -// ExpVar report metrics about the buffer. +// ExpVar returns a [metrics.Set] with metrics about the buffer. // // - counter_write_calls: Total number of calls to [Filch.Write] // (excludes calls when file is closed). @@ -114,7 +115,7 @@ type Filch struct { // // - gauge_stored_bytes: Current number of bytes stored on disk. func (f *Filch) ExpVar() expvar.Var { - m := new(expvar.Map) + m := new(metrics.Set) m.Set("counter_write_calls", &f.writeCalls) m.Set("counter_read_calls", &f.readCalls) m.Set("counter_rotate_calls", &f.rotateCalls) diff --git a/logtail/logtail.go b/logtail/logtail.go index 2879c6b0d..91bfed8b1 100644 --- a/logtail/logtail.go +++ b/logtail/logtail.go @@ -12,6 +12,7 @@ import ( "context" "crypto/rand" "encoding/binary" + "expvar" "fmt" "io" "log" @@ -28,6 +29,7 @@ import ( "github.com/creachadair/msync/trigger" "github.com/go-json-experiment/json/jsontext" "tailscale.com/envknob" + "tailscale.com/metrics" "tailscale.com/net/netmon" "tailscale.com/net/sockstats" "tailscale.com/tstime" @@ -180,6 +182,12 @@ type Logger struct { shutdownStartMu sync.Mutex // guards the closing of shutdownStart shutdownStart chan struct{} // closed when shutdown begins shutdownDone chan struct{} // closed when shutdown complete + + // Metrics (see [Logger.ExpVar] for details). + uploadCalls expvar.Int + failedCalls expvar.Int + uploadedBytes expvar.Int + uploadingTime expvar.Int } type atomicSocktatsLabel struct{ p atomic.Uint32 } @@ -477,6 +485,9 @@ func (lg *Logger) awaitInternetUp(ctx context.Context) { // origlen indicates the pre-compression body length. // origlen of -1 indicates that the body is not compressed. func (lg *Logger) upload(ctx context.Context, body []byte, origlen int) (retryAfter time.Duration, err error) { + lg.uploadCalls.Add(1) + startUpload := time.Now() + const maxUploadTime = 45 * time.Second ctx = sockstats.WithSockStats(ctx, lg.sockstatsLabel.Load(), lg.Logf) ctx, cancel := context.WithTimeout(ctx, maxUploadTime) @@ -516,15 +527,20 @@ func (lg *Logger) upload(ctx context.Context, body []byte, origlen int) (retryAf lg.httpDoCalls.Add(1) resp, err := lg.httpc.Do(req) if err != nil { + lg.failedCalls.Add(1) return 0, fmt.Errorf("log upload of %d bytes %s failed: %v", len(body), compressedNote, err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { + lg.failedCalls.Add(1) n, _ := strconv.Atoi(resp.Header.Get("Retry-After")) b, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<10)) return time.Duration(n) * time.Second, fmt.Errorf("log upload of %d bytes %s failed %d: %s", len(body), compressedNote, resp.StatusCode, bytes.TrimSpace(b)) } + + lg.uploadedBytes.Add(int64(len(body))) + lg.uploadingTime.Add(int64(time.Since(startUpload))) return 0, nil } @@ -546,6 +562,30 @@ func (lg *Logger) StartFlush() { } } +// ExpVar report metrics about the logger. +// +// - counter_upload_calls: Total number of upload attempts. +// +// - counter_upload_errors: Total number of upload attempts that failed. +// +// - counter_uploaded_bytes: Total number of bytes successfully uploaded +// (which is calculated after compression is applied). +// +// - counter_uploading_nsecs: Total number of nanoseconds spent uploading. +// +// - buffer: An optional [metrics.Set] with metrics for the [Buffer]. +func (lg *Logger) ExpVar() expvar.Var { + m := new(metrics.Set) + m.Set("counter_upload_calls", &lg.uploadCalls) + m.Set("counter_upload_errors", &lg.failedCalls) + m.Set("counter_uploaded_bytes", &lg.uploadedBytes) + m.Set("counter_uploading_nsecs", &lg.uploadingTime) + if v, ok := lg.buffer.(interface{ ExpVar() expvar.Var }); ok { + m.Set("buffer", v.ExpVar()) + } + return m +} + // logtailDisabled is whether logtail uploads to logcatcher are disabled. var logtailDisabled atomic.Bool