Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions internal/cli/coverage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cli
import (
"bytes"
"context"
"encoding/json"
"errors"
"os"
"path/filepath"
Expand All @@ -13,6 +14,7 @@ import (

"github.com/tracebloc/cli/internal/cluster"
"github.com/tracebloc/cli/internal/push"
"github.com/tracebloc/cli/internal/submit"
"github.com/tracebloc/cli/internal/ui"
)

Expand Down Expand Up @@ -61,6 +63,64 @@ func TestPrintPushPreflight_RendersKeyFacts(t *testing.T) {
}
}

// TestWritePushJSON checks the --output-json result serializes to
// valid JSON with the expected fields.
func TestWritePushJSON(t *testing.T) {
spec := map[string]any{"table": "reg_train", "category": "tabular_regression", "intent": "train"}
s := &submit.Summary{IngestorID: "run-1", TotalRecords: 240, InsertedRecords: 240, APISentRecords: 240}

var buf bytes.Buffer
writePushJSON(&buf, "succeeded", spec, s, "ns1", "ingest-job-x")

var got pushJSONResult
if err := json.Unmarshal(buf.Bytes(), &got); err != nil {
t.Fatalf("output is not valid JSON: %v\n%s", err, buf.String())
}
if got.Status != "succeeded" || got.Table != "reg_train" || got.JobName != "ingest-job-x" {
t.Errorf("unexpected result: %+v", got)
}
if got.Summary == nil || got.Summary.InsertedRecords != 240 {
t.Errorf("summary missing/wrong: %+v", got.Summary)
}
}

// TestClassifyPushOutcome pins the --output-json status ↔ exit-code
// contract (Bugbot #38): the status must agree with the exit code on
// every path — a partial-failure must NOT report "succeeded", and a
// watch error must still classify (so JSON gets emitted). wantCode 0
// means no exitError (success).
func TestClassifyPushOutcome(t *testing.T) {
resp := &submit.SubmitResponse{Namespace: "ns1", JobName: "ingest-job-x"}
cases := []struct {
name string
res *submit.Result
err error
wantStat string
wantCode int
}{
{"clean", &submit.Result{Submit: resp, Watch: &submit.WatchResult{Outcome: submit.JobOutcomeSucceeded, Summary: &submit.Summary{TotalRecords: 10, InsertedRecords: 10}}}, nil, "succeeded", 0},
{"partial", &submit.Result{Submit: resp, Watch: &submit.WatchResult{Outcome: submit.JobOutcomeSucceeded, Summary: &submit.Summary{TotalRecords: 10, InsertedRecords: 7, FailedRecords: 3}}}, nil, "completed_with_failures", 9},
{"failed", &submit.Result{Submit: resp, Watch: &submit.WatchResult{Outcome: submit.JobOutcomeFailed}}, nil, "failed", 9},
{"detached", &submit.Result{Submit: resp}, nil, "detached", 0},
{"watch error", &submit.Result{Submit: resp}, &submit.WatchError{Err: errors.New("stream broke")}, "watch_error", 9},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
gotStat, gotErr := classifyPushOutcome(c.res, c.err)
if gotStat != c.wantStat {
t.Errorf("status = %q, want %q", gotStat, c.wantStat)
}
code := 0
if gotErr != nil {
code = gotErr.Code()
}
if code != c.wantCode {
t.Errorf("exit code = %d, want %d", code, c.wantCode)
}
})
}
}

// TestExitError_Methods pins the exit-code carrier: Error() surfaces
// the wrapped message (or a fallback when nil), and Code() returns the
// process exit code main() propagates.
Expand Down
169 changes: 137 additions & 32 deletions internal/cli/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cli

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -83,8 +84,9 @@ func newDatasetPushCmd() *cobra.Command {
numberOfKeypoints int

// Operations flags.
dryRun bool
noInput bool
dryRun bool
noInput bool
outputJSON bool

// Ingestor SA name override. Used as the ServiceAccountName
// of the ephemeral stage Pod, so the Pod inherits whatever
Expand Down Expand Up @@ -157,12 +159,22 @@ Exit codes:
// for whatever's still missing. Off a TTY / with --no-input,
// prompter stays nil and runDatasetPush keeps flag-only
// behavior.
interactive := !noInput && isInteractiveTTY()
interactive := !noInput && !outputJSON && isInteractiveTTY()
var pr prompter
if interactive {
pr = surveyPrompter{}
}
return runDatasetPush(cmd.Context(), cmd.OutOrStdout(), cmd.ErrOrStderr(),
// In --output-json mode, human output goes to stderr so
// stdout carries only the JSON result.
humanOut := cmd.OutOrStdout()
printer := printerFor(cmd)
var jsonOut io.Writer
if outputJSON {
humanOut = cmd.ErrOrStderr()
printer = printerForWriter(cmd, cmd.ErrOrStderr())
jsonOut = cmd.OutOrStdout()
}
return runDatasetPush(cmd.Context(), humanOut, cmd.ErrOrStderr(),
runDatasetPushArgs{
LocalPath: localPath,
Kubeconfig: kubeconfigPath,
Expand All @@ -181,10 +193,12 @@ Exit codes:
Detach: detach,
IdempotencyKey: idempotencyKey,
ImageDigest: imageDigest,
Printer: printerFor(cmd),
Printer: printer,
Interactive: interactive,
Prompter: pr,
CategorySet: cmd.Flags().Changed("category"),
OutputJSON: outputJSON,
JSONOut: jsonOut,
})
},
}
Expand Down Expand Up @@ -228,6 +242,8 @@ Exit codes:
"validate + discover + walk, but don't create any cluster resources")
cmd.Flags().BoolVar(&noInput, "no-input", false,
"disable interactive prompts; fail on missing required values (for CI/scripts)")
cmd.Flags().BoolVar(&outputJSON, "output-json", false,
"emit a machine-readable JSON result on stdout (human output → stderr; implies --no-input)")
cmd.Flags().StringVar(&ingestorSAName, "ingestor-sa", "",
"override the ingestor ServiceAccount name (default: \"ingestor\"); "+
"set this if you customized ingestionAuthz.serviceAccountName in the parent client chart")
Expand Down Expand Up @@ -278,6 +294,12 @@ type runDatasetPushArgs struct {
Prompter prompter
CategorySet bool

// OutputJSON routes human output to stderr and emits a JSON result
// to JSONOut (stdout); set together by the RunE in --output-json
// mode (which also forces non-interactive).
OutputJSON bool
JSONOut io.Writer

// Phase 4 (#152) fields. See the flag declarations for the
// per-knob rationale; all three are optional.
Detach bool
Expand Down Expand Up @@ -528,6 +550,9 @@ func runDatasetPush(ctx context.Context, out, errOut io.Writer, a runDatasetPush
// 8. Dry-run stop. Acknowledged success.
if a.DryRun {
_, _ = fmt.Fprintln(out, "Dry-run complete — no cluster resources were created.")
if a.OutputJSON {
writePushJSON(a.JSONOut, "dry-run", spec, nil, "", "")
}
return nil
}

Expand Down Expand Up @@ -625,48 +650,73 @@ func runDatasetPush(ctx context.Context, out, errOut io.Writer, a runDatasetPush
Out: out,
Printer: a.Printer,
})
// Classify once: a machine-readable status + the process exit error
// in lockstep, so --output-json emits exactly one result object on
// EVERY path (success / partial / failure / submit-or-watch error)
// whose status matches the exit code. (Bugbot #38.)
status, exitErr := classifyPushOutcome(submitRes, err)

if a.OutputJSON {
Comment thread
cursor[bot] marked this conversation as resolved.
var summary *submit.Summary
var ns, jobName string
if submitRes != nil {
if submitRes.Watch != nil {
summary = submitRes.Watch.Summary
}
if submitRes.Submit != nil {
ns, jobName = submitRes.Submit.Namespace, submitRes.Submit.JobName
}
}
writePushJSON(a.JSONOut, status, spec, summary, ns, jobName)
}

Comment thread
cursor[bot] marked this conversation as resolved.
if exitErr != nil {
return exitErr
}
return nil
}

// classifyPushOutcome maps the result of submit.Run to a machine-
// readable status string + the process exit error, kept in lockstep so
// --output-json's status always agrees with the exit code (a nil
// *exitError = success, exit 0). It also covers the error paths
// (auth/submit/watch) so --output-json can still emit a result object
// when submit.Run returns an error. (Bugbot #38.)
func classifyPushOutcome(res *submit.Result, err error) (string, *exitError) {
if err != nil {
switch {
case submit.IsAuthError(err):
return &exitError{code: 5, err: err}
return "auth_error", &exitError{code: 5, err: err}
case submit.IsWatchError(err):
// Watch-phase failure: jobs-manager already accepted
// the run, the cluster is doing the work, the CLI
// just couldn't follow along. Exit 9 (ingest-side)
// not 8 (submit-side). Bugbot flagged the
// previously-undifferentiated mapping on PR #10.
return &exitError{code: 9, err: err}
// jobs-manager accepted the run; the cluster is doing the
// work, the CLI just couldn't follow along — ingest-side
// (exit 9), not submit-side (8).
return "watch_error", &exitError{code: 9, err: err}
default:
return &exitError{code: 8, err: err}
return "submit_error", &exitError{code: 8, err: err}
}
}

// Detach paths (--detach flag OR SIGINT-mid-watch) are
// success — cluster keeps running; the orchestrator already
// printed the reconnect hint.
if submitRes.Watch == nil || submitRes.Watch.Outcome == submit.JobOutcomeDetached {
return nil
// --detach (no watch) or SIGINT-mid-watch: success; cluster runs on.
if res == nil || res.Watch == nil || res.Watch.Outcome == submit.JobOutcomeDetached {
return "detached", nil
}

// Watch outcomes. Both Failed and Unknown route to exit 9
// (Unknown = finalJobStatus timed out without seeing a
// terminal condition, which we can't claim as success).
// Bugbot flagged the prior switch's missing Unknown branch
// on PR #10.
switch submitRes.Watch.Outcome {
switch res.Watch.Outcome {
case submit.JobOutcomeFailed:
return &exitError{code: 9, err: errors.New("ingestion Job exited non-zero — see logs above")}
return "failed", &exitError{code: 9, err: errors.New("ingestion Job exited non-zero — see logs above")}
case submit.JobOutcomeUnknown:
return &exitError{code: 9, err: errors.New(
return "unknown", &exitError{code: 9, err: errors.New(
"ingestion Job's final status couldn't be determined within the watch window — " +
"check `kubectl get job -n " + submitRes.Submit.Namespace + " " + submitRes.Submit.JobName + "` for the outcome")}
"check `kubectl get job -n " + res.Submit.Namespace + " " + res.Submit.JobName + "` for the outcome")}
case submit.JobOutcomeSucceeded:
if submitRes.Watch.Summary != nil && submitRes.Watch.Summary.HasFailures() {
return &exitError{code: 9, err: errors.New(
// Job exited 0, but rows can still have failed — exit 9, and the
// JSON status must say so, NOT "succeeded". (Bugbot #38.)
if res.Watch.Summary != nil && res.Watch.Summary.HasFailures() {
return "completed_with_failures", &exitError{code: 9, err: errors.New(
"ingestion Job completed but the summary reports failures — see panel above")}
}
return "succeeded", nil
}
return nil
return "unknown", nil
}

// printPushPreflight is the customer-facing summary. Mirrors
Expand Down Expand Up @@ -738,3 +788,58 @@ func printPushPreflight(
layout.FileCount(), push.HumanBytes(layout.TotalBytes), spec["table"])
}
}

// pushJSONResult is the machine-readable shape emitted by --output-json.
// It's a presentation type owned by the CLI layer, so submit.Summary
// stays json-tag-free and this wire format can evolve independently.
type pushJSONResult struct {
Status string `json:"status"` // dry-run|succeeded|completed_with_failures|failed|detached|unknown|auth_error|submit_error|watch_error
Table string `json:"table"`
Category string `json:"category"`
Intent string `json:"intent"`
Namespace string `json:"namespace,omitempty"`
JobName string `json:"job_name,omitempty"`
Summary *pushJSONSummary `json:"summary,omitempty"`
}

type pushJSONSummary struct {
IngestorID string `json:"ingestor_id,omitempty"`
TotalRecords int64 `json:"total_records"`
InsertedRecords int64 `json:"inserted_records"`
SentToAPI int64 `json:"sent_to_api"`
SkippedRecords int64 `json:"skipped_records"`
FileTransferFailures int64 `json:"file_transfer_failures"`
DBInsertFailures int64 `json:"db_insert_failures"`
SuccessRate float64 `json:"success_rate"`
}

// writePushJSON serializes the push result to w (stdout in
// --output-json mode). Errors are dropped: marshaling our own struct
// can't fail in practice, and the exit code remains the contract.
func writePushJSON(w io.Writer, status string, spec map[string]any, s *submit.Summary, ns, jobName string) {
res := pushJSONResult{
Status: status,
Table: fmt.Sprintf("%v", spec["table"]),
Category: fmt.Sprintf("%v", spec["category"]),
Intent: fmt.Sprintf("%v", spec["intent"]),
Namespace: ns,
JobName: jobName,
}
if s != nil {
res.Summary = &pushJSONSummary{
IngestorID: s.IngestorID,
TotalRecords: s.TotalRecords,
InsertedRecords: s.InsertedRecords,
SentToAPI: s.APISentRecords,
SkippedRecords: s.SkippedRecords,
FileTransferFailures: s.FileTransferFailures,
DBInsertFailures: s.FailedRecords,
SuccessRate: s.SuccessRate(),
}
}
b, err := json.MarshalIndent(res, "", " ")
if err != nil {
return
}
_, _ = fmt.Fprintln(w, string(b))
}
13 changes: 11 additions & 2 deletions internal/cli/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
package cli

import (
"io"

"github.com/spf13/cobra"

"github.com/tracebloc/cli/internal/ui"
Expand Down Expand Up @@ -105,8 +107,15 @@ what's planned next.`,
// in ui.New; --plain just forces it off. Commands call this at the top
// of their RunE.
func printerFor(cmd *cobra.Command) *ui.Printer {
return printerForWriter(cmd, cmd.OutOrStdout())
}

// printerForWriter is printerFor for an explicit writer — used by
// dataset push's --output-json mode, which routes human output to
// stderr so stdout carries only the JSON result.
func printerForWriter(cmd *cobra.Command, w io.Writer) *ui.Printer {
if plain, _ := cmd.Flags().GetBool("plain"); plain {
return ui.New(cmd.OutOrStdout(), ui.WithColor(false))
return ui.New(w, ui.WithColor(false))
}
return ui.New(cmd.OutOrStdout())
return ui.New(w)
}
Loading