diff --git a/internal/cli/cluster.go b/internal/cli/cluster.go index 021327b..cea1b27 100644 --- a/internal/cli/cluster.go +++ b/internal/cli/cluster.go @@ -193,6 +193,7 @@ func runClusterInfo( p.Field("expires in", "never (static-secret fallback)") } + p.Newline() p.Successf("Ready for `tracebloc dataset push`.") return nil } diff --git a/internal/cli/dataset.go b/internal/cli/dataset.go index 4b562be..80f7eba 100644 --- a/internal/cli/dataset.go +++ b/internal/cli/dataset.go @@ -26,7 +26,7 @@ import ( // discovery) and PR-b (this one: ephemeral stage Pod + tar-over- // exec stream + progress bar + SIGINT-safe cleanup). `dataset rm` // (#30) removes a pushed dataset's in-cluster artifacts; `dataset -// list` hangs off this parent later. +// list` lists the ingested datasets. func newDatasetCmd() *cobra.Command { cmd := &cobra.Command{ Use: "dataset", @@ -42,6 +42,7 @@ ingestor Job to completion (streaming logs + the final summary). before the first push.`, } cmd.AddCommand(newDatasetPushCmd()) + cmd.AddCommand(newDatasetListCmd()) cmd.AddCommand(newDatasetRmCmd()) return cmd } @@ -614,6 +615,7 @@ contributors train against it without ever seeing the raw files.`)) // 8. Dry-run stop. Acknowledged success, plus a reminder of the // live-only steps (stage + ingest) the customer just skipped. if a.DryRun { + a.Printer.Newline() a.Printer.Successf("Dry-run complete — your dataset and cluster check out; nothing was created.") a.Printer.Hintf("A real run continues with step 3 (stage your files) and step 4 (run the ingestion).") if a.OutputJSON { diff --git a/internal/cli/dataset_list.go b/internal/cli/dataset_list.go new file mode 100644 index 0000000..a0fb36f --- /dev/null +++ b/internal/cli/dataset_list.go @@ -0,0 +1,197 @@ +package cli + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + + "github.com/spf13/cobra" + + "github.com/tracebloc/cli/internal/cluster" + "github.com/tracebloc/cli/internal/push" + "github.com/tracebloc/cli/internal/ui" +) + +// runDatasetListArgs is the resolved input to runDatasetList — same +// shape convention as the other dataset verbs, keeping the RunE a thin +// flag-to-struct adapter. +type runDatasetListArgs struct { + Kubeconfig string + Context string + Namespace string + OutputJSON bool + Printer *ui.Printer + JSONOut io.Writer +} + +// newDatasetListCmd implements `tracebloc dataset list` — a read-only +// listing of the datasets ingested into the cluster. The kubeconfig +// flags are all zero-value-safe, so the minimal `tracebloc dataset list` +// runs against the current context + its namespace; the flags only +// override that (same convention as `cluster info`). +func newDatasetListCmd() *cobra.Command { + var ( + kubeconfigPath string + contextOverride string + nsOverride string + outputJSON bool + ) + + cmd := &cobra.Command{ + Use: "list", + Short: "List datasets ingested in the cluster", + Long: `Lists the datasets pushed + ingested into the parent client release — +the tables in ` + push.IngestionDatabase + ` on the cluster. + +With no flags it uses your current kubeconfig context and its namespace; +the flags below override that, same as ` + "`cluster info`" + ` and ` + "`dataset push`" + `. +For the full catalog (with metadata), see the dashboard at +https://ai.tracebloc.io/metadata. + +Exit codes: + 0 listed successfully (including an empty list) + 3 kubeconfig error + 4 cluster reachable but no parent release in the namespace + 7 couldn't query the cluster for datasets`, + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, _ []string) error { + // In --output-json mode, human output (the banner) goes to + // stderr so stdout carries only the JSON — same split as push. + printer := printerFor(cmd) + var jsonOut io.Writer + if outputJSON { + printer = printerForWriter(cmd, cmd.ErrOrStderr()) + jsonOut = cmd.OutOrStdout() + } + return runDatasetList(cmd.Context(), runDatasetListArgs{ + Kubeconfig: kubeconfigPath, + Context: contextOverride, + Namespace: nsOverride, + OutputJSON: outputJSON, + Printer: printer, + JSONOut: jsonOut, + }) + }, + } + + cmd.Flags().StringVar(&kubeconfigPath, "kubeconfig", "", + "path to the kubeconfig file (default: $KUBECONFIG, then ~/.kube/config)") + cmd.Flags().StringVar(&contextOverride, "context", "", + "name of the kubeconfig context to use (default: kubeconfig's current-context)") + cmd.Flags().StringVarP(&nsOverride, "namespace", "n", "", + "namespace where the parent tracebloc/client release is installed") + cmd.Flags().BoolVar(&outputJSON, "output-json", false, + "emit the dataset list as JSON on stdout (human output → stderr)") + + return cmd +} + +// runDatasetList discovers the cluster, enumerates the ingested tables, +// and renders them. Mirrors the other dataset verbs' discovery so the +// exit-code contract is consistent. +func runDatasetList(ctx context.Context, a runDatasetListArgs) (err error) { + // In --output-json mode, guarantee stdout always carries JSON: the + // success path emits the listing and sets jsonEmitted; this defer + // covers the early-failure returns (kubeconfig, no release, query) + // with a JSON error object, mirroring dataset push. (Bugbot #53) + jsonEmitted := false + defer func() { + if a.OutputJSON && err != nil && !jsonEmitted { + code := 1 + var ee *exitError + if errors.As(err, &ee) { + code = ee.Code() + } + writeDatasetListErrorJSON(a.JSONOut, err, code) + } + }() + + p := a.Printer + p.Banner("tracebloc", "datasets in the cluster") + + resolved, err := cluster.Load(cluster.KubeconfigOptions{ + Path: a.Kubeconfig, + Context: a.Context, + Namespace: a.Namespace, + }) + if err != nil { + return &exitError{code: 3, err: fmt.Errorf("loading kubeconfig: %w", err)} + } + cs, err := cluster.NewClientset(resolved) + if err != nil { + return &exitError{code: 3, err: err} + } + release, err := cluster.DiscoverParentRelease(ctx, cs, resolved.Namespace) + if err != nil { + return &exitError{code: 4, err: err} + } + + tables, err := push.ListDatasets(ctx, cs, resolved.RestConfig, resolved.Namespace) + if err != nil { + return &exitError{code: 7, err: err} + } + + if a.OutputJSON { + writeDatasetListJSON(a.JSONOut, resolved.Namespace, release.ReleaseName, tables) + jsonEmitted = true + return nil + } + renderDatasetList(p, resolved.Namespace, tables) + return nil +} + +// renderDatasetList prints the human-facing listing. Split out so it's +// unit-testable with a buffer-backed Printer. +func renderDatasetList(p *ui.Printer, namespace string, tables []string) { + p.Section(fmt.Sprintf("Datasets in %s (%d)", namespace, len(tables))) + if len(tables) == 0 { + p.Infof("No datasets yet — push one with `tracebloc dataset push`.") + return + } + for _, t := range tables { + p.Infof("%s", t) + } +} + +// datasetListJSON is the --output-json shape (owned by the CLI layer). +type datasetListJSON struct { + Namespace string `json:"namespace"` + Release string `json:"release"` + Count int `json:"count"` + Datasets []string `json:"datasets"` +} + +func writeDatasetListJSON(w io.Writer, namespace, release string, tables []string) { + if tables == nil { + tables = []string{} // emit [] not null + } + res := datasetListJSON{ + Namespace: namespace, + Release: release, + Count: len(tables), + Datasets: tables, + } + b, err := json.MarshalIndent(res, "", " ") + if err != nil { + return + } + _, _ = fmt.Fprintln(w, string(b)) +} + +// writeDatasetListErrorJSON emits a minimal JSON error object for +// --output-json runs that fail before the listing is produced, so +// stdout is never empty on failure (parallels dataset push). (Bugbot #53) +func writeDatasetListErrorJSON(w io.Writer, e error, code int) { + res := struct { + Status string `json:"status"` + Error string `json:"error"` + ExitCode int `json:"exit_code"` + }{Status: "error", Error: e.Error(), ExitCode: code} + b, err := json.MarshalIndent(res, "", " ") + if err != nil { + return + } + _, _ = fmt.Fprintln(w, string(b)) +} diff --git a/internal/cli/dataset_list_test.go b/internal/cli/dataset_list_test.go new file mode 100644 index 0000000..7e5d246 --- /dev/null +++ b/internal/cli/dataset_list_test.go @@ -0,0 +1,95 @@ +package cli + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/tracebloc/cli/internal/ui" +) + +// TestRunDatasetList_OutputJSONEarlyFailureEmitsJSON: with --output-json, +// a failure before the listing (here a broken kubeconfig, exit 3) still +// writes a JSON error object to stdout — the stdout-always-JSON contract +// that #49 established for dataset push. (Bugbot #53) +func TestRunDatasetList_OutputJSONEarlyFailureEmitsJSON(t *testing.T) { + bad := filepath.Join(t.TempDir(), "broken.yaml") + if err := os.WriteFile(bad, []byte("}{ not valid kubeconfig"), 0o644); err != nil { + t.Fatal(err) + } + var jsonBuf, human bytes.Buffer + err := runDatasetList(context.Background(), runDatasetListArgs{ + Kubeconfig: bad, + OutputJSON: true, + Printer: ui.New(&human, ui.WithColor(false)), + JSONOut: &jsonBuf, + }) + + var ee *exitError + if !errors.As(err, &ee) || ee.Code() != 3 { + t.Fatalf("err = %v, want *exitError code 3", err) + } + var got map[string]any + if e := json.Unmarshal(jsonBuf.Bytes(), &got); e != nil { + t.Fatalf("stdout is not JSON: %v\n%s", e, jsonBuf.String()) + } + if got["status"] != "error" || got["exit_code"] != float64(3) { + t.Errorf("got %+v, want status=error exit_code=3", got) + } +} + +// TestRenderDatasetList_Empty: the empty listing shows the count and +// points the user at `dataset push`. +func TestRenderDatasetList_Empty(t *testing.T) { + var buf bytes.Buffer + renderDatasetList(ui.New(&buf, ui.WithColor(false)), "ap-workspace", nil) + out := buf.String() + if !strings.Contains(out, "Datasets in ap-workspace (0)") { + t.Errorf("missing header/count:\n%s", out) + } + if !strings.Contains(out, "dataset push") { + t.Errorf("empty state should point at `dataset push`:\n%s", out) + } +} + +// TestRenderDatasetList_Items: a populated listing shows the count and +// every table name. +func TestRenderDatasetList_Items(t *testing.T) { + var buf bytes.Buffer + renderDatasetList(ui.New(&buf, ui.WithColor(false)), "tracebloc-templates", []string{"reg_train", "churn_test"}) + out := buf.String() + for _, want := range []string{"Datasets in tracebloc-templates (2)", "reg_train", "churn_test"} { + if !strings.Contains(out, want) { + t.Errorf("missing %q:\n%s", want, out) + } + } +} + +// TestWriteDatasetListJSON: valid JSON with the expected fields, and a +// nil dataset slice marshals as [] (not null) so scripts get an array. +func TestWriteDatasetListJSON(t *testing.T) { + var buf bytes.Buffer + writeDatasetListJSON(&buf, "ns1", "tracebloc", []string{"a", "b"}) + + var got datasetListJSON + if err := json.Unmarshal(buf.Bytes(), &got); err != nil { + t.Fatalf("not JSON: %v\n%s", err, buf.String()) + } + if got.Namespace != "ns1" || got.Release != "tracebloc" || got.Count != 2 { + t.Errorf("unexpected: %+v", got) + } + if len(got.Datasets) != 2 || got.Datasets[0] != "a" { + t.Errorf("datasets wrong: %+v", got.Datasets) + } + + buf.Reset() + writeDatasetListJSON(&buf, "ns1", "tracebloc", nil) + if !strings.Contains(buf.String(), `"datasets": []`) { + t.Errorf("nil datasets should marshal as []:\n%s", buf.String()) + } +} diff --git a/internal/cli/dataset_rm.go b/internal/cli/dataset_rm.go index 761750c..a7c398f 100644 --- a/internal/cli/dataset_rm.go +++ b/internal/cli/dataset_rm.go @@ -46,9 +46,8 @@ func newDatasetRmCmd() *cobra.Command { for a table: the MySQL table in ` + push.IngestionDatabase + ` and the dataset's directories on the shared PVC. Destructive and not undoable. -NOTE: the central tracebloc backend catalog entry is NOT removed — the -CLI has no direct line to that backend. Full cleanup of a successfully -ingested dataset needs the server-side delete path (tracebloc/cli#39). +The dataset's catalog metadata on the tracebloc backend is removed +automatically after deletion — no manual step required. Exit codes: 0 artifacts removed (or --dry-run, or the user declined) @@ -152,6 +151,7 @@ undone — re-pushing the data is the only way back.`) // 5. Dry-run stop. if a.DryRun { + p.Newline() p.Successf("Dry-run — nothing was deleted.") return nil } @@ -194,6 +194,8 @@ undone — re-pushing the data is the only way back.`) return &exitError{code: 7, err: fmt.Errorf("teardown failed: %w", err)} } + p.Newline() p.Successf("Deleted %s.%s and %d PVC path(s).", plan.Database, plan.Table, len(res.RemovedPaths)) + p.Infof("The dataset's catalog metadata will be removed automatically — no further action needed.") return nil } diff --git a/internal/cli/root.go b/internal/cli/root.go index 311c826..f1047d6 100644 --- a/internal/cli/root.go +++ b/internal/cli/root.go @@ -34,7 +34,7 @@ type BuildInfo struct { func NewRootCmd(info BuildInfo) *cobra.Command { root := &cobra.Command{ Use: "tracebloc", - Short: "tracebloc — declarative data ingestion for your cluster", + Short: "tracebloc — interactive data ingestion for your cluster", Long: `tracebloc is the customer-facing CLI for the tracebloc declarative ingestion path. It wraps the same POST /internal/submit-ingestion-run protocol the tracebloc/ingestor Helm chart uses, so any cluster running @@ -90,12 +90,14 @@ what's planned next.`, return cmd.Help() // an arg that wasn't a known subcommand } p := printerFor(cmd) - p.Banner("tracebloc", "declarative data ingestion for your cluster") + p.Banner("tracebloc", "interactive data ingestion for your cluster") p.Section("Get started") - p.Infof("tracebloc dataset push ./data — stage + ingest a dataset (guided if you omit flags)") + p.Infof("tracebloc dataset push — stage + ingest a dataset interactively (or use --help to see flags)") + p.Infof("tracebloc dataset list — list datasets ingested in the cluster") p.Infof("tracebloc dataset rm — delete a pushed dataset (its table + files)") p.Infof("tracebloc cluster info — check the CLI can reach your cluster") p.Infof("tracebloc ingest validate f.yaml — validate an ingest.yaml locally") + p.Newline() p.Hintf("Add --help to any command for the full flag list.") return nil } diff --git a/internal/cli/root_test.go b/internal/cli/root_test.go index ee84096..6ac7f02 100644 --- a/internal/cli/root_test.go +++ b/internal/cli/root_test.go @@ -41,7 +41,7 @@ func TestRootCmd_HomeScreen(t *testing.T) { if err := root.Execute(); err != nil { t.Fatalf("bare root failed: %v\n%s", err, out.String()) } - for _, want := range []string{"tracebloc", "dataset push", "dataset rm", "cluster info"} { + for _, want := range []string{"tracebloc", "dataset push", "dataset list", "dataset rm", "cluster info"} { if !strings.Contains(out.String(), want) { t.Errorf("home screen missing %q:\n%s", want, out.String()) } diff --git a/internal/push/list.go b/internal/push/list.go new file mode 100644 index 0000000..89c55fa --- /dev/null +++ b/internal/push/list.go @@ -0,0 +1,56 @@ +package push + +import ( + "bytes" + "context" + "fmt" + "strings" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +// ListDatasets returns the names of the datasets ingested into the +// cluster — the tables in IngestionDatabase — by querying the mysql pod. +// It reuses the same exec seam + pod discovery as Teardown. +// +// The query goes through information_schema (not SHOW TABLES) so a +// cluster where nothing has been pushed yet — the database doesn't even +// exist — returns an empty list rather than an error. +func ListDatasets(ctx context.Context, cs kubernetes.Interface, cfg *rest.Config, namespace string) ([]string, error) { + exec := &SPDYExecutor{Config: cfg, Client: cs} + + mysqlPod, mysqlContainer, err := findRunningPod(ctx, cs, namespace, "mysql") + if err != nil { + return nil, fmt.Errorf("locating mysql pod: %w", err) + } + + // -N drops the column header so stdout is one bare table name per + // line. IngestionDatabase is a compile-time constant, so the + // interpolation carries no injection risk. + query := fmt.Sprintf( + "SELECT table_name FROM information_schema.tables WHERE table_schema='%s' ORDER BY table_name", + IngestionDatabase) + script := fmt.Sprintf(`mysql -uroot -p"$MYSQL_ROOT_PASSWORD" -N -e "%s"`, query) + + var stdout, stderr bytes.Buffer + if err := exec.Exec(ctx, namespace, mysqlPod, mysqlContainer, + []string{"sh", "-c", script}, nil, &stdout, &stderr); err != nil { + return nil, fmt.Errorf("querying datasets: %w%s", err, stderrSuffix(&stderr)) + } + return parseDatasetList(stdout.String()), nil +} + +// parseDatasetList turns the raw `mysql -N` output (one table name per +// line) into a cleaned slice, dropping blank lines and surrounding +// whitespace. Kept separate from the exec so it's unit-testable without +// a cluster. +func parseDatasetList(raw string) []string { + var names []string + for _, line := range strings.Split(raw, "\n") { + if t := strings.TrimSpace(line); t != "" { + names = append(names, t) + } + } + return names +} diff --git a/internal/push/list_test.go b/internal/push/list_test.go new file mode 100644 index 0000000..330ac82 --- /dev/null +++ b/internal/push/list_test.go @@ -0,0 +1,30 @@ +package push + +import ( + "reflect" + "testing" +) + +// TestParseDatasetList pins the raw `mysql -N` output → []string +// parsing: real names kept in order, blank/whitespace lines and a +// trailing newline dropped, and empty input → nil (no datasets). +func TestParseDatasetList(t *testing.T) { + cases := []struct { + name string + raw string + want []string + }{ + {"empty", "", nil}, + {"whitespace only", "\n \n\t\n", nil}, + {"one", "cats_dogs_train\n", []string{"cats_dogs_train"}}, + {"several with trailing newline", "a\nb\nc\n", []string{"a", "b", "c"}}, + {"surrounding whitespace", " reg_train \n\tchurn_test\t\n", []string{"reg_train", "churn_test"}}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + if got := parseDatasetList(c.raw); !reflect.DeepEqual(got, c.want) { + t.Errorf("parseDatasetList(%q) = %#v, want %#v", c.raw, got, c.want) + } + }) + } +} diff --git a/internal/push/teardown.go b/internal/push/teardown.go index 6f3a9c9..5d30ccf 100644 --- a/internal/push/teardown.go +++ b/internal/push/teardown.go @@ -23,10 +23,9 @@ const IngestionDatabase = "training_test_datasets" // // It deliberately does NOT include the central tracebloc backend // catalog entry: the CLI has no direct line to that backend (only the -// in-cluster ingestor does, with its own creds), so removing it is the -// cross-repo follow-up (tracebloc/cli#39). A successfully-ingested -// dataset torn down this way leaves a stale catalog entry until #39 -// lands. +// in-cluster ingestor does, with its own creds). The backend removes +// the dataset's catalog metadata automatically once these in-cluster +// artifacts are gone, so there's no CLI-side catalog teardown to do. type TeardownPlan struct { Database string // MySQL schema (IngestionDatabase) Table string // table name — MUST have passed ValidateTableName @@ -59,11 +58,10 @@ type TeardownResult struct { // - rm -rf the PVC dirs by exec-ing inside the jobs-manager pod, // which mounts the shared PVC at SharedRoot. // -// DESIGN NOTE (under review): this exec-into-existing-pods approach is -// the "CLI-direct teardown". The alternative under discussion is a -// server-side jobs-manager delete endpoint that could also remove the -// backend catalog entry (#39) in one place. It assumes (a) a pod whose -// name contains "mysql" exposes $MYSQL_ROOT_PASSWORD, and (b) the +// DESIGN NOTE: this exec-into-existing-pods approach is the +// "CLI-direct teardown" (the alternative considered was a server-side +// jobs-manager delete endpoint). It assumes (a) a pod whose name +// contains "mysql" exposes $MYSQL_ROOT_PASSWORD, and (b) the // jobs-manager pod mounts the shared PVC at SharedRoot — both true for // the current parent chart, but worth confirming before this ships. func Teardown(ctx context.Context, cs kubernetes.Interface, cfg *rest.Config, namespace string, plan TeardownPlan) (TeardownResult, error) { diff --git a/internal/submit/summary.go b/internal/submit/summary.go index 32d2adb..2be7772 100644 --- a/internal/submit/summary.go +++ b/internal/submit/summary.go @@ -358,7 +358,7 @@ func RenderSummary(p *ui.Printer, s *Summary) { p.Field("success rate", fmt.Sprintf("%.1f%%", s.SuccessRate())) p.Section("What's next") - p.Infof("View it in the dashboard: https://ai.tracebloc.io") + p.Infof("View it in the dashboard: https://ai.tracebloc.io/metadata") p.Hintf("The table is staged and ready for training jobs.") } diff --git a/internal/ui/ui.go b/internal/ui/ui.go index 68b7219..e87122f 100644 --- a/internal/ui/ui.go +++ b/internal/ui/ui.go @@ -163,6 +163,13 @@ func (p *Printer) PromptHint(format string, a ...any) { p.out("\n %s\n", p.paint(fmt.Sprintf(format, a...), color.FgCyan)) } +// Newline emits a single blank line. Used to detach a closing line or +// call-to-action (e.g. cluster info's "Ready" line, a dry-run / deletion +// result) from the field block above it, so it doesn't get lost. +func (p *Printer) Newline() { + p.out("\n") +} + // PromptHeader prints a bold-white label before a user-input prompt. func (p *Printer) PromptHeader(label string) { p.out("\n %s\n", p.paint(label, color.Bold, color.FgWhite))