Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import (
KafkaConnection,
KafkaConnection as KafkaConnectionConfig,
)
from metadata.generated.schema.entity.services.connections.messaging.redpandaConnection import (
RedpandaConnection,
)
from metadata.generated.schema.entity.services.connections.testConnectionResult import (
TestConnectionResult,
)
from metadata.ingestion.connections.connection import BaseConnection
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.constants import THREE_MIN
Expand Down Expand Up @@ -65,7 +66,7 @@
self.consumer_client = consumer_client


def get_connection(connection: Union[KafkaConnection, RedpandaConnection]) -> KafkaClient: # noqa: UP007
def get_connection(connection: Union[KafkaConnectionConfig, RedpandaConnection]) -> KafkaClient: # noqa: UP007

Check failure on line 69 in ingestion/src/metadata/ingestion/source/messaging/kafka/connection.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 19 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ67KCvfgNoDi7s0L3jt&open=AZ67KCvfgNoDi7s0L3jt&pullRequest=28994

Check warning on line 69 in ingestion/src/metadata/ingestion/source/messaging/kafka/connection.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Use a union type expression for this type hint.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ67KCvfgNoDi7s0L3ju&open=AZ67KCvfgNoDi7s0L3ju&pullRequest=28994
"""
Create connection
"""
Expand Down Expand Up @@ -119,7 +120,7 @@
def test_connection(
metadata: OpenMetadata,
client: KafkaClient,
service_connection: Union[KafkaConnection, RedpandaConnection], # noqa: UP007
service_connection: Union[KafkaConnectionConfig, RedpandaConnection], # noqa: UP007

Check warning on line 123 in ingestion/src/metadata/ingestion/source/messaging/kafka/connection.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Use a union type expression for this type hint.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ67KCvfgNoDi7s0L3jv&open=AZ67KCvfgNoDi7s0L3jv&pullRequest=28994
automation_workflow: Optional[AutomationWorkflow] = None, # noqa: UP045
timeout_seconds: Optional[int] = THREE_MIN, # noqa: UP045
) -> TestConnectionResult:
Expand Down Expand Up @@ -158,3 +159,22 @@
automation_workflow=automation_workflow,
timeout_seconds=timeout_seconds,
)


class KafkaConnection(BaseConnection[KafkaConnectionConfig, KafkaClient]):
def _get_client(self) -> KafkaClient:
return get_connection(self.service_connection)

def test_connection(
self,
metadata: OpenMetadata,
automation_workflow: Optional[AutomationWorkflow] = None, # noqa: UP045
timeout_seconds: Optional[int] = THREE_MIN, # noqa: UP045
) -> TestConnectionResult:
return test_connection(
metadata,
self.client,
self.service_connection,
automation_workflow,
timeout_seconds,
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from metadata.ingestion.source.messaging.kafka.connection import KafkaConnection
from metadata.ingestion.source.messaging.kafka.metadata import KafkaSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=KafkaSource)
ServiceSpec = BaseSpec(metadata_source_class=KafkaSource, connection_class=KafkaConnection) # pyright: ignore[reportArgumentType]
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,19 @@
Source connection handler
"""

from typing import Optional
from typing import Any, Optional

from metadata.clients.aws_client import AWSClient
from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.messaging.kinesisConnection import (
KinesisConnection,
KinesisConnection as KinesisConnectionConfig,
)
from metadata.generated.schema.entity.services.connections.testConnectionResult import (
TestConnectionResult,
)
from metadata.ingestion.connections.connection import BaseConnection
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.constants import THREE_MIN
Expand All @@ -33,31 +34,30 @@
logger = ingestion_logger()


def get_connection(connection: KinesisConnection):
"""
Create connection
"""
return AWSClient(connection.awsConfig).get_kinesis_client()


def test_connection(
metadata: OpenMetadata,
client,
service_connection: KinesisConnection,
automation_workflow: Optional[AutomationWorkflow] = None, # noqa: UP045
timeout_seconds: Optional[int] = THREE_MIN, # noqa: UP045
) -> TestConnectionResult:
"""
Test connection. This can be executed either as part
of a metadata workflow or during an Automation Workflow
"""

test_fn = {"GetTopics": client.list_streams}

return test_connection_steps(
metadata=metadata,
test_fn=test_fn,
service_type=service_connection.type.value,
automation_workflow=automation_workflow,
timeout_seconds=timeout_seconds,
)
class KinesisConnection(BaseConnection[KinesisConnectionConfig, Any]):
def _get_client(self) -> Any:
connection = self.service_connection
return AWSClient(connection.awsConfig).get_kinesis_client()

def test_connection(
self,
metadata: OpenMetadata,
automation_workflow: Optional[AutomationWorkflow] = None, # noqa: UP045
timeout_seconds: Optional[int] = THREE_MIN, # noqa: UP045
) -> TestConnectionResult:
"""
Test connection. This can be executed either as part
of a metadata workflow or during an Automation Workflow
"""
client = self.client
service_connection = self.service_connection

test_fn = {"GetTopics": client.list_streams}

return test_connection_steps(
metadata=metadata,
test_fn=test_fn,
service_type=service_connection.type.value, # pyright: ignore[reportOptionalMemberAccess]
automation_workflow=automation_workflow,
timeout_seconds=timeout_seconds,
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from metadata.ingestion.source.messaging.kinesis.connection import KinesisConnection
from metadata.ingestion.source.messaging.kinesis.metadata import KinesisSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=KinesisSource)
ServiceSpec = BaseSpec(metadata_source_class=KinesisSource, connection_class=KinesisConnection) # pyright: ignore[reportArgumentType]
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.messaging.pubSubConnection import (
PubSubConnection,
PubSubConnection as PubSubConnectionConfig,
)
from metadata.generated.schema.entity.services.connections.testConnectionResult import (
TestConnectionResult,
Expand All @@ -33,6 +33,7 @@
GcpCredentialsValues,
SingleProjectId,
)
from metadata.ingestion.connections.connection import BaseConnection
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.constants import THREE_MIN
Expand All @@ -52,7 +53,7 @@
project_id: str


def _get_project_id(connection: PubSubConnection) -> Optional[str]: # noqa: UP045
def _get_project_id(connection: PubSubConnectionConfig) -> Optional[str]: # noqa: UP045

Check failure on line 56 in ingestion/src/metadata/ingestion/source/messaging/pubsub/connection.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 26 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ67KC5ygNoDi7s0L3jw&open=AZ67KC5ygNoDi7s0L3jw&pullRequest=28994
"""
Get project ID from connection config or from credentials.
Returns None if project ID cannot be determined.
Expand Down Expand Up @@ -82,7 +83,7 @@
return None


def get_connection(connection: PubSubConnection) -> PubSubClient:
def get_connection(connection: PubSubConnectionConfig) -> PubSubClient:
"""
Create Pub/Sub client connection.

Expand Down Expand Up @@ -127,44 +128,49 @@
del os.environ[PUBSUB_EMULATOR_HOST]


def test_connection(
metadata: OpenMetadata,
client: PubSubClient,
service_connection: PubSubConnection,
automation_workflow: Optional[AutomationWorkflow] = None, # noqa: UP045
timeout_seconds: Optional[int] = THREE_MIN, # noqa: UP045
) -> TestConnectionResult:
"""
Test connection. This can be executed either as part
of a metadata workflow or during an Automation Workflow
"""

def list_topics_test():
project_path = f"projects/{client.project_id}"
try:
topics_iter = client.publisher.list_topics(request={"project": project_path})
next(iter(topics_iter), None)
except GoogleAPIError as err: # noqa: TRY203
raise err # noqa: TRY201

def schema_registry_test():
if client.schema_client:
class PubSubConnection(BaseConnection[PubSubConnectionConfig, PubSubClient]):
def _get_client(self) -> PubSubClient:
return get_connection(self.service_connection)

def test_connection(
self,
metadata: OpenMetadata,
automation_workflow: Optional[AutomationWorkflow] = None, # noqa: UP045
timeout_seconds: Optional[int] = THREE_MIN, # noqa: UP045
) -> TestConnectionResult:
"""
Test connection. This can be executed either as part
of a metadata workflow or during an Automation Workflow
"""
client = self.client
service_connection = self.service_connection

def list_topics_test():
project_path = f"projects/{client.project_id}"
try:
schemas_iter = client.schema_client.list_schemas(request={"parent": project_path})
next(iter(schemas_iter), None)
topics_iter = client.publisher.list_topics(request={"project": project_path})
next(iter(topics_iter), None)
except GoogleAPIError as err: # noqa: TRY203
raise err # noqa: TRY201

Check warning on line 154 in ingestion/src/metadata/ingestion/source/messaging/pubsub/connection.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Add logic to this except clause or eliminate it and rethrow the exception automatically.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ67KC5ygNoDi7s0L3jx&open=AZ67KC5ygNoDi7s0L3jx&pullRequest=28994

test_fn = {
"GetTopics": list_topics_test,
"CheckSchemaRegistry": schema_registry_test,
}

return test_connection_steps(
metadata=metadata,
test_fn=test_fn,
service_type=service_connection.type.value,
automation_workflow=automation_workflow,
timeout_seconds=timeout_seconds,
)
def schema_registry_test():
if client.schema_client:
project_path = f"projects/{client.project_id}"
try:
schemas_iter = client.schema_client.list_schemas(request={"parent": project_path})
next(iter(schemas_iter), None)
except GoogleAPIError as err: # noqa: TRY203
raise err # noqa: TRY201

Check warning on line 163 in ingestion/src/metadata/ingestion/source/messaging/pubsub/connection.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Add logic to this except clause or eliminate it and rethrow the exception automatically.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ67KC5ygNoDi7s0L3jy&open=AZ67KC5ygNoDi7s0L3jy&pullRequest=28994

test_fn = {
"GetTopics": list_topics_test,
"CheckSchemaRegistry": schema_registry_test,
}

return test_connection_steps(
metadata=metadata,
test_fn=test_fn,
service_type=service_connection.type.value, # pyright: ignore[reportOptionalMemberAccess]
automation_workflow=automation_workflow,
timeout_seconds=timeout_seconds,
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
Pub/Sub service spec
"""

from metadata.ingestion.source.messaging.pubsub.connection import PubSubConnection
from metadata.ingestion.source.messaging.pubsub.metadata import PubsubSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=PubsubSource)
ServiceSpec = BaseSpec(metadata_source_class=PubsubSource, connection_class=PubSubConnection) # pyright: ignore[reportArgumentType]
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.messaging.redpandaConnection import (
RedpandaConnection,
RedpandaConnection as RedpandaConnectionConfig,
)
from metadata.generated.schema.entity.services.connections.testConnectionResult import (
TestConnectionResult,
)
from metadata.ingestion.connections.connection import BaseConnection
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.messaging.kafka.connection import KafkaClient
from metadata.ingestion.source.messaging.kafka.connection import (
Expand All @@ -38,29 +39,20 @@
logger = ingestion_logger()


def get_connection(connection: RedpandaConnection) -> KafkaClient:
"""
Create connection
"""
return get_kafka_connection(connection)


def test_connection(
metadata: OpenMetadata,
client: KafkaClient,
service_connection: RedpandaConnection,
automation_workflow: Optional[AutomationWorkflow] = None, # noqa: UP045
timeout_seconds: Optional[int] = THREE_MIN, # noqa: UP045
) -> TestConnectionResult:
"""
Test connection. This can be executed either as part
of a metadata workflow or during an Automation Workflow
"""

return test_kafka_connection(
metadata=metadata,
client=client,
service_connection=service_connection,
automation_workflow=automation_workflow,
timeout_seconds=timeout_seconds,
)
class RedpandaConnection(BaseConnection[RedpandaConnectionConfig, KafkaClient]):
def _get_client(self) -> KafkaClient:
return get_kafka_connection(self.service_connection)

def test_connection(
self,
metadata: OpenMetadata,
automation_workflow: Optional[AutomationWorkflow] = None, # noqa: UP045
timeout_seconds: Optional[int] = THREE_MIN, # noqa: UP045
) -> TestConnectionResult:
return test_kafka_connection(
metadata=metadata,
client=self.client,
service_connection=self.service_connection,
automation_workflow=automation_workflow,
timeout_seconds=timeout_seconds,
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from metadata.ingestion.source.messaging.redpanda.connection import RedpandaConnection
from metadata.ingestion.source.messaging.redpanda.metadata import RedpandaSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=RedpandaSource)
ServiceSpec = BaseSpec(metadata_source_class=RedpandaSource, connection_class=RedpandaConnection) # pyright: ignore[reportArgumentType]
Loading
Loading