Fixes #28998: refactor(ingestion): migrate pipeline connectors (batch B) to BaseConnection#28999
Merged
Merged
Conversation
…nection Migrate flink, gluepipeline, kafkaconnect, nifi, openlineage and spline onto the BaseConnection[Config, Client] pattern and wire connection_class into each service spec. The pipeline base source already builds its client and runs test_connection through the generic resolver (source.connections), so the new classes are exercised end-to-end. flink keeps get_connection module-level because an existing test patches it; openlineage keeps it module-level too (its get_connection has a multi-line Union[KafkaConsumer, BaseClient] signature plus a Kafka/Kinesis dispatch helper) — both classes delegate. gluepipeline, kafkaconnect, nifi and spline move the build into _get_client; gluepipeline's boto3 glue client is typed Any (matching glue/sagemaker/kinesis). Add colocated connection unit tests for all six.
…gistry pollution TestYieldPipelineStatus built its fake DagRun with MagicMock(spec=DagRun). Building a spec'd mock makes unittest.mock walk dir(DagRun); DagRun's association proxies resolve through orm.class_mapper(), which forces SQLAlchemy to configure the entire shared mapper registry (cascade=True). If any other test on the same xdist worker has left a mapper in a failed state (e.g. airflow's DagWarning mapper failing to initialize), that cascade re-raises InvalidRequestError and these two date-fallback tests fail for reasons unrelated to what they verify. Because xdist work distribution shifts with the test corpus, the failure surfaces on some PRs and not others, which reads as flakiness. Use a SimpleNamespace carrying exactly the attributes yield_pipeline_status reads. It never introspects DagRun, so it cannot trigger mapper configuration, and it still raises AttributeError on unexpected access.
Contributor
✅ PR checks passedThe linked issue has a description and all required Shipping project fields set. Thanks! |
Contributor
🟡 Playwright Results — all passed (12 flaky)✅ 4292 passed · ❌ 0 failed · 🟡 12 flaky · ⏭️ 88 skipped
🟡 12 flaky test(s) (passed on retry)
How to debug locally# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip # view trace |
|
pmbrull
approved these changes
Jun 12, 2026
Code Review ✅ ApprovedCompletes the pipeline connector migration to BaseConnection and isolates airflow status tests to prevent SQLAlchemy registry pollution. No issues found. OptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.



Fixes #28998
What
Migrates the second batch of pipeline connectors onto
BaseConnection[Config, Client], completing the pipeline vertical.FlinkClientAny(boto3 glue)_get_clientKafkaConnectClient_get_clientNifiClient_get_clientKafkaConsumer | BaseClientSplineClient_get_clientHow
XConnection(BaseConnection[...])with_get_client+test_connection;connection_classwired into eachservice_spec.py. The pipeline base source already resolves client + test_connection throughmetadata.ingestion.source.connections, so no source-side changes.connection.get_connection, and openlineage'sget_connectionhas a multi-lineUnion[KafkaConsumer, BaseClient]signature with a Kafka/Kinesis dispatch helper — both stay module-level and the class delegates.Any, matching the merged glue/sagemaker/kinesis connectors.Tests
tests/unit/source/pipeline/<connector>/test_connection.pyfor all six (18 tests).basedpyrightbaseline check clean;ruffclean.Note — bundled test-isolation fix
Includes the one-commit fix from #28995 (
test(airflow): isolate yield_pipeline_status tests from SQLAlchemy registry pollution). This PR adds colocated test files that shift thepytest -n autodistribution; theSimpleNamespacechange keepsTestYieldPipelineStatusfrom flaking on a poisoned shared SQLAlchemy mapper registry. Identical to #28995, so it merges cleanly whichever lands first.