Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions internal/strategy/git/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,21 @@ import (
)

type gitMetrics struct {
operationDuration metric.Float64Histogram
operationTotal metric.Int64Counter
requestTotal metric.Int64Counter
operationDuration metric.Float64Histogram
operationTotal metric.Int64Counter
requestTotal metric.Int64Counter
snapshotServeTotal metric.Int64Counter
snapshotServeSize metric.Float64Histogram
}

func newGitMetrics() *gitMetrics {
meter := otel.Meter("cachew.git")
return &gitMetrics{
operationDuration: metrics.NewMetric[metric.Float64Histogram](meter, "cachew.git.operation_duration_seconds", "s", "Duration of git operations (clone, fetch, repack, snapshot)"),
operationTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.operations_total", "{operations}", "Total number of git operations"),
requestTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.requests_total", "{requests}", "Total number of git HTTP requests by type"),
operationDuration: metrics.NewMetric[metric.Float64Histogram](meter, "cachew.git.operation_duration_seconds", "s", "Duration of git operations (clone, fetch, repack, snapshot)"),
operationTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.operations_total", "{operations}", "Total number of git operations"),
requestTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.requests_total", "{requests}", "Total number of git HTTP requests by type"),
snapshotServeTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.snapshot_serves_total", "{serves}", "Snapshot serve events by source (cache, spool, cold_cache) and repository"),
snapshotServeSize: metrics.NewMetric[metric.Float64Histogram](meter, "cachew.git.snapshot_serve_bytes", "By", "Size of served snapshots in bytes"),
}
}

Expand All @@ -39,3 +43,16 @@ func (m *gitMetrics) recordOperation(ctx context.Context, operation, status stri
func (m *gitMetrics) recordRequest(ctx context.Context, requestType string) {
m.requestTotal.Add(ctx, 1, metric.WithAttributes(attribute.String("type", requestType)))
}

// recordSnapshotServe records a snapshot serve event with its source and repository.
// Source is one of: "cache", "cold_cache", "spool", "generated".
func (m *gitMetrics) recordSnapshotServe(ctx context.Context, source, repo string, sizeBytes int64) {
attrs := metric.WithAttributes(
attribute.String("source", source),
attribute.String("repository", repo),
)
m.snapshotServeTotal.Add(ctx, 1, attrs)
if sizeBytes > 0 {
m.snapshotServeSize.Record(ctx, float64(sizeBytes), attrs)
}
}
27 changes: 19 additions & 8 deletions internal/strategy/git/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request,

repoPath := ExtractRepoPath(strings.TrimSuffix(pathValue, "/snapshot.tar.zst"))
upstreamURL := "https://" + host + "/" + repoPath
repoName := host + "/" + repoPath

repo, repoErr := s.cloneManager.GetOrCreate(ctx, upstreamURL)
if repoErr != nil {
Expand Down Expand Up @@ -220,7 +221,9 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request,
}()
logger.InfoContext(ctx, "Serving locally cached snapshot after waiting for in-flight fill", "upstream", upstreamURL)
w.Header().Set("Content-Type", "application/zstd")
if _, err := io.Copy(w, reader); err != nil {
n, err := io.Copy(w, reader)
s.metrics.recordSnapshotServe(ctx, "cold_cache", repoName, n)
if err != nil {
logger.WarnContext(ctx, "Failed to stream locally cached snapshot", "upstream", upstreamURL, "error", err)
}
return
Expand All @@ -234,7 +237,9 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request,
if openErr == nil && reader != nil {
logger.InfoContext(ctx, "Serving cached snapshot while mirror warms up", "upstream", upstreamURL)
w.Header().Set("Content-Type", "application/zstd")
if _, err := io.Copy(w, reader); err != nil {
n, err := io.Copy(w, reader)
s.metrics.recordSnapshotServe(ctx, "cold_cache", repoName, n)
if err != nil {
logger.WarnContext(ctx, "Failed to stream cached snapshot", "upstream", upstreamURL, "error", err)
}
_ = reader.Close()
Expand Down Expand Up @@ -264,14 +269,14 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request,
}

if reader == nil {
if err := s.serveSnapshotWithSpool(w, r, repo, upstreamURL); err != nil {
if err := s.serveSnapshotWithSpool(w, r, repo, upstreamURL, repoName); err != nil {
logger.ErrorContext(ctx, "Failed to serve snapshot via spool", "upstream", upstreamURL, "error", err)
}
return
}
defer reader.Close()

if err := s.serveSnapshotWithBundle(ctx, w, reader, headers, repo, upstreamURL); err != nil {
if err := s.serveSnapshotWithBundle(ctx, w, reader, headers, repo, upstreamURL, repoName); err != nil {
logger.ErrorContext(ctx, "Failed to serve snapshot", "upstream", upstreamURL, "error", err)
}
}
Expand Down Expand Up @@ -343,7 +348,7 @@ func (s *Strategy) handleBundleRequest(w http.ResponseWriter, r *http.Request, h
}
}

func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseWriter, reader io.ReadCloser, headers http.Header, repo *gitclone.Repository, upstreamURL string) error {
func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseWriter, reader io.ReadCloser, headers http.Header, repo *gitclone.Repository, upstreamURL, repoName string) error {
snapshotCommit := headers.Get("X-Cachew-Snapshot-Commit")
mirrorHead := s.getMirrorHead(ctx, repo)

Expand Down Expand Up @@ -376,7 +381,8 @@ func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseW
}

w.Header().Set("Content-Type", "application/zstd")
_, err := io.Copy(w, reader)
n, err := io.Copy(w, reader)
s.metrics.recordSnapshotServe(ctx, "cache", repoName, n)
return errors.Wrap(err, "stream snapshot")
}

Expand Down Expand Up @@ -454,7 +460,7 @@ func (s *Strategy) createBundle(ctx context.Context, repo *gitclone.Repository,
// mirror, streams tar+zstd to both the HTTP client and a spool file, then
// triggers a background cache backfill. Concurrent requests for the same URL
// become readers that follow the spool, avoiding redundant clone+tar work.
func (s *Strategy) serveSnapshotWithSpool(w http.ResponseWriter, r *http.Request, repo *gitclone.Repository, upstreamURL string) error {
func (s *Strategy) serveSnapshotWithSpool(w http.ResponseWriter, r *http.Request, repo *gitclone.Repository, upstreamURL, repoName string) error {
ctx := r.Context()
logger := logging.FromContext(ctx)

Expand All @@ -475,13 +481,18 @@ func (s *Strategy) serveSnapshotWithSpool(w http.ResponseWriter, r *http.Request
}
return errors.Wrap(err, "snapshot spool read")
}
s.metrics.recordSnapshotServe(ctx, "spool", repoName, spool.Written())
return nil
}
// Writer failed; fall through to generate independently.
return s.streamSnapshotDirect(w, r, repo)
}

return s.writeSnapshotSpool(w, r, repo, upstreamURL, entry)
err := s.writeSnapshotSpool(w, r, repo, upstreamURL, entry)
if err == nil {
s.metrics.recordSnapshotServe(ctx, "generated", repoName, entry.spool.Written())
}
return err
}

// streamSnapshotDirect streams a snapshot directly to the client without
Expand Down
7 changes: 7 additions & 0 deletions internal/strategy/git/spool.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ func (rs *ResponseSpool) MarkError(err error) {
rs.cond.Broadcast()
}

// Written returns the total number of bytes written to the spool.
func (rs *ResponseSpool) Written() int64 {
rs.mu.Lock()
defer rs.mu.Unlock()
return rs.written
}

func (rs *ResponseSpool) Failed() bool {
rs.mu.Lock()
defer rs.mu.Unlock()
Expand Down