Add aircraft fusion engine + fused live state (M3.1)#13
Conversation
The keystone of M3 (Network fusion): same-identity local-RF and network
observations now collapse into one track instead of overwriting each other.
Backend
- New src/aether/fusion/: pure, deterministic core (now injected, FUSION-FR-007).
- freshness.py — per-source windows (PRD §15.4: local 5/30/60s, network
15/60/120s) measured on observed_at; future-skew cap so a far-future
timestamp can't pin a group in memory.
- precedence.py — PRD §15.2 ladder (fresh local > fresh network > stale local
> stale network > expired), deterministic tie-breaks; network fills fields
local lacks; geometry chosen whole, never averaged (PRD §15.5).
- engine.py — FusionEngine/FusionGroup: one group per correlation_key, merged
provenance, fusion metadata in attributes["fusion"] (no schema_version bump).
- state/live.py fuses correlation-keyed tracks (fused id == correlation_key);
None-key tracks stay keyed by id (FUSION-FR-006, no proximity merge). expire()
removes a fused track only when all contributors expire and re-upserts on a
LOCAL->NET handoff (FUSION-FR-004). One source record -> one StateChange.
- hub.py reads the clock once at the edge; main.py runs a 1s background expiry
sweep, exception-isolated.
- demo_publisher.py gains a demo-net leg + 4 scenario aircraft (fused / handoff /
local-only / network-only) so the no-hardware path demonstrates fusion.
Frontend
- Fused provenance surfaced in TrackList (contributing sources, fused count).
- Display-only All / Local-RF / Network filter (store + pure visibleTracks
selector); never affects ingestion.
Failure isolation: a fused-record validation error degrades to the raw record
(rekeyed to its correlation_key, no client ghost track); a poison group is
dropped rather than aborting the whole expiry sweep.
Tests: fusion freshness/precedence/engine units, live-state fusion + expiry,
demo-ws integration, frontend selector + TrackList. scripts/check.sh green
(160 passed); frontend lint/typecheck/test/build green.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Code Review
This pull request implements in-process aircraft fusion (M3.1), which merges same-identity local-RF and network observations into a single track based on a shared correlation key. It introduces a stateful backend fusion engine, integrates it into the live state and periodic expiry tasks, and updates the frontend to support provenance filtering and display fusion metadata badges. The review feedback highlights several opportunities to improve robustness, specifically recommending timezone normalization of incoming timestamps to prevent timezone-naive/aware comparison crashes, and suggesting defensive checks for potentially missing provenance and contributor arrays to avoid runtime TypeErrors.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| def update(self, record: TrackRecord) -> bool: | ||
| """Add/replace this source's contribution. Returns False if discarded (out-of-order). | ||
|
|
||
| An older observation from a source we already have a newer one for is | ||
| dropped for fusion (the track must not jump backwards); an equal or newer | ||
| ``observed_at`` replaces it. Duplicates are therefore idempotent. | ||
| """ | ||
| local_rf = _is_local_rf(record) | ||
| existing = self.contributors.get(record.source) | ||
| if existing is not None and record.observed_at < existing.observed_at: | ||
| return False | ||
| self.contributors[record.source] = Contributor( | ||
| source=record.source, | ||
| record=record, | ||
| local_rf=local_rf, | ||
| observed_at=record.observed_at, | ||
| received_at=record.received_at, | ||
| ) | ||
| if local_rf and ( | ||
| self.last_local_rf_at is None or record.observed_at > self.last_local_rf_at | ||
| ): | ||
| self.last_local_rf_at = record.observed_at | ||
| self._dirty = True | ||
| return True |
There was a problem hiding this comment.
Normalize observed_at and received_at to UTC if they are timezone-naive. This prevents potential TypeError crashes when comparing naive and aware datetimes (e.g., if existing.observed_at is aware but record.observed_at is naive), avoids timezone-shifted comparisons in _sort_key using .timestamp(), and ensures consistent sorting in _merge_attributes.
def update(self, record: TrackRecord) -> bool:
"""Add/replace this source's contribution. Returns False if discarded (out-of-order).
An older observation from a source we already have a newer one for is
dropped for fusion (the track must not jump backwards); an equal or newer
``observed_at`` replaces it. Duplicates are therefore idempotent.
"""
local_rf = _is_local_rf(record)
observed_at = record.observed_at
if observed_at.tzinfo is None:
observed_at = observed_at.replace(tzinfo=UTC)
received_at = record.received_at
if received_at.tzinfo is None:
received_at = received_at.replace(tzinfo=UTC)
existing = self.contributors.get(record.source)
if existing is not None and observed_at < existing.observed_at:
return False
self.contributors[record.source] = Contributor(
source=record.source,
record=record,
local_rf=local_rf,
observed_at=observed_at,
received_at=received_at,
)
if local_rf and (
self.last_local_rf_at is None or observed_at > self.last_local_rf_at
):
self.last_local_rf_at = observed_at
self._dirty = True
return True| import logging | ||
| from dataclasses import dataclass, field | ||
| from datetime import datetime, timedelta | ||
| from typing import Any |
There was a problem hiding this comment.
Import UTC from datetime to allow timezone normalization of incoming track record timestamps.
| import logging | |
| from dataclasses import dataclass, field | |
| from datetime import datetime, timedelta | |
| from typing import Any | |
| import logging | |
| from dataclasses import dataclass, field | |
| from datetime import UTC, datetime, timedelta | |
| from typing import Any |
| entries: list[Provenance] = [] | ||
| for source in sorted(self.contributors): | ||
| contrib = self.contributors[source] | ||
| own = next((p for p in contrib.record.provenance if p.source == source), None) |
There was a problem hiding this comment.
Add a defensive check for contrib.record.provenance being None or empty. If provenance is missing or set to None on the incoming record, iterating over it directly will raise a TypeError and crash the fusion process.
| entries: list[Provenance] = [] | |
| for source in sorted(self.contributors): | |
| contrib = self.contributors[source] | |
| own = next((p for p in contrib.record.provenance if p.source == source), None) | |
| entries: list[Provenance] = [] | |
| for source in sorted(self.contributors): | |
| contrib = self.contributors[source] | |
| own = next((p for p in (contrib.record.provenance or []) if p.source == source), None) |
| function fusionTooltip(track: TrackRecord): string | undefined { | ||
| const meta = fusionMeta(track); | ||
| if (!meta) return undefined; | ||
| const sources = meta.contributors.map((c) => c.source).join(", "); | ||
| const lastLocal = meta.last_local_rf_at ?? "never"; | ||
| return `Sources: ${sources}\nActive: ${meta.active_source}\nLast local RF: ${lastLocal}`; | ||
| } |
There was a problem hiding this comment.
Add a defensive check for meta.contributors being null or undefined. If a malformed track record is received where contributors is missing, calling .map() directly will throw a TypeError and crash the React render tree.
| function fusionTooltip(track: TrackRecord): string | undefined { | |
| const meta = fusionMeta(track); | |
| if (!meta) return undefined; | |
| const sources = meta.contributors.map((c) => c.source).join(", "); | |
| const lastLocal = meta.last_local_rf_at ?? "never"; | |
| return `Sources: ${sources}\nActive: ${meta.active_source}\nLast local RF: ${lastLocal}`; | |
| } | |
| function fusionTooltip(track: TrackRecord): string | undefined { | |
| const meta = fusionMeta(track); | |
| if (!meta) return undefined; | |
| const sources = (meta.contributors ?? []).map((c) => c.source).join(", "); | |
| const lastLocal = meta.last_local_rf_at ?? "never"; | |
| return `Sources: ${sources}\nActive: ${meta.active_source}\nLast local RF: ${lastLocal}`; | |
| } |
|
Thanks for the review. Declining all four — each is already defended at the schema boundary:
CI green, no functional changes needed. |
What & why
First slice of M3 (Network fusion) — the fusion engine, the keystone the rest of M3 (network ADS-B adapter, APRS-IS/APRS fusion, AIS, filters, TOI) builds on. Until now
LiveStatekeyed tracks purely byid, and adapters setid == correlation_key, so a network observation of the same airframe would overwrite the local one. This makes same-identity local-RF + network observations fuse into one track with full provenance (FUSION-FR-001…007, PRD §11.4/§15).Scope is deliberately fusion-only: the real network ADS-B adapter is the next slice (M3.2). The demo source simulates the network contributor so the whole path is exercised no-hardware.
How it works
src/aether/fusion/(new), pure & deterministic —nowis always injected (FUSION-FR-007), no hidden clock, no per-sourceif/elif:freshness.py— per-source windows (PRD §15.4: local5/30/60s, network15/60/120s) measured onobserved_at(§8.4); future-skew cap so a far-future timestamp can't pin a group in memory.precedence.py— PRD §15.2 ladder (fresh local > fresh network > stale local > stale network > expired) with deterministic tie-breaks; network fills fields local lacks (FR-003); geometry chosen whole, never averaged (§15.5).engine.py—FusionEngine/FusionGroup: one group percorrelation_key, merged provenance, fusion metadata inattributes["fusion"](active source, per-field source, last-local-RF, contributors, conflicts).state/live.py— correlation-keyed tracks fuse (fusedid==correlation_key);None-key tracks stay keyed byid(FR-006, no proximity merge).expire()removes a fused track only when all contributors expire and re-upserts on a LOCAL→NET handoff (FR-004). One source record → oneStateChange(websocket contract preserved).hub.py/main.py— clock read once at the edge; a 1s background expiry sweep, exception-isolated.demo_publisher.py— adds ademo-netleg + 4 scenario aircraft (fused / handoff / local-only / network-only) so the no-hardware demo shows fusion.TrackList; display-only All / Local-RF / Network filter (store + purevisibleTracksselector) that never affects ingestion.schema_versionbump — all fusion metadata lives inattributes["fusion"].Failure isolation
correlation_key(no stranded client ghost track).None-key tracks/features still age out).These two + a memory-pin via future timestamps were found by the review pass and fixed before this PR.
Tests / verification
scripts/check.shgreen: ruff + ruff-format + mypy strict + 160 pytest + receive-only Dire Wolf tripwire.tsc -b+ 31 vitest +vite build.selectors+TrackList.Reviewer notes / known limitations (carried as follow-ups, not blockers)
How this was built
Authored via an ultracode workflow: 3-lens design panel → synthesized spec → implementation → 5-dimension adversarial review → fix pass. Both gates re-run from ground truth before this PR.
🤖 Generated with Claude Code