Skip to content

PostgreSQL connection caching and reuse#131

Merged
tmikula-dev merged 18 commits intomasterfrom
feature/postgresql-connection-pooling
May 5, 2026
Merged

PostgreSQL connection caching and reuse#131
tmikula-dev merged 18 commits intomasterfrom
feature/postgresql-connection-pooling

Conversation

@tmikula-dev
Copy link
Copy Markdown
Collaborator

@tmikula-dev tmikula-dev commented Apr 13, 2026

Overview

This pull request introduces connection caching and reuse for both PostgreSQL readers and writers, improving efficiency and reliability by maintaining a single connection per instance. It also adds robust reconnection logic, updates tests to reflect the new behavior, and enhances configuration for dependency updates.

Release Notes

  • PostgreSQL connection pooling for

Related

Closes #115

Summary by CodeRabbit

  • New Features

    • Shared Postgres backend with packaged SQL, topic schema loading, and health endpoint now reporting per-dependency statuses.
  • Bug Fixes

    • Robust DB error handling and guaranteed connection cleanup on failures.
  • Performance Improvements

    • Cached reusable DB connections with reconnect/retry behavior.
  • Tests

    • Added integration and unit tests covering connection reuse, retries, and updated health/write semantics.
  • Documentation

    • Updated contributor/agent guidance.
  • Chores

    • Dependabot grouping rules and dependency updates; lint config bumped.

@tmikula-dev tmikula-dev self-assigned this Apr 13, 2026
@tmikula-dev tmikula-dev added the enhancement New feature or request label Apr 13, 2026
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 13, 2026

Walkthrough

Adds a shared PostgresBase with cached connection, retry/close semantics and typed config; refactors ReaderPostgres and WriterPostgres to inherit it and load SQL via aiosql; switches writers/health APIs to exception-based WriteError/HealthCheckError; adds SQL files and tests for connection reuse; logging and OpenAPI health response updated.

Changes

Postgres connection reuse and writer/reader refactor

Layer / File(s) Summary
Data Shape / SQL
src/utils/constants.py, src/readers/sql/stats.sql, src/writers/sql/inserts.sql
Adds Postgres tuning constants and REQUIRED_CONNECTION_FIELDS; introduces TOPIC_* constants and supported topic sets; adds named stats queries and parameterized INSERT statements for writer topics.
Core Implementation
src/utils/postgres_base.py, src/readers/reader_postgres.py, src/writers/writer_postgres.py, src/writers/writer.py
New PostgresBase (typed PostgresConfig, lazy _pg_config, cached _get_connection, _close_connection, _execute_with_retry); Reader/Writer now subclass PostgresBase, load SQL via aiosql, reuse cached connection, and adopt exception-based WriteError/HealthCheckError and `check_health() -> str
Wiring / Integration
src/handlers/handler_topic.py, src/handlers/handler_health.py, src/handlers/handler_stats.py, src/utils/config_loader.py, src/utils/logging_levels.py, src/event_gate_lambda.py, src/event_stats_lambda.py
Handlers use TOPIC_* constants and catch WriteError; health handler aggregates per-dependency dependencies and treats HealthCheckError; Lambdas use init_root_logger.
Tests / Coverage
tests/unit/*, tests/integration/test_connection_reuse.py, tests/unit/utils/test_postgres_base.py
Unit and integration tests added/updated to validate PostgresBase config, connection reuse/close/retry semantics, and new exception-based writer/health contracts; mocks switched to src.utils.postgres_base psycopg2 shim.
Dependencies / Docs / CI
requirements.txt, .github/dependabot.yml, .github/copilot-instructions.md, api.yaml, .pylintrc
Adds aiosql and pins botocore; Dependabot grouping rules added; copilot instructions note Postgres connection caching and type-alias guidance; OpenAPI /health now exposes dependencies; pylint py-version bumped.

Sequence Diagram(s)

sequenceDiagram
    participant Lambda as Lambda Handler
    participant PostgresBase as PostgresBase
    participant Secrets as AWS SecretsManager
    participant Postgres as PostgreSQL

    Lambda->>PostgresBase: request connection / run operation (read/write)
    alt first access
        PostgresBase->>Secrets: load_postgres_config(secret_name, region)
        Secrets-->>PostgresBase: secret JSON
        PostgresBase-->>Postgres: psycopg2.connect(**cfg, options=...)
        Postgres-->>PostgresBase: connection
        PostgresBase-->>Lambda: provide cached connection
    else cached connection exists
        PostgresBase-->>Lambda: return cached connection
    end
    Lambda->>Postgres: execute SQL (via cached connection)
    alt OperationalError
        Postgres-->>PostgresBase: OperationalError
        PostgresBase->>PostgresBase: close cached connection, retry (up to POSTGRES_MAX_RETRIES)
        PostgresBase-->>Postgres: connect again
        Postgres-->>PostgresBase: connection
        PostgresBase-->>Lambda: retry operation
    end
    Postgres-->>Lambda: query result / acknowledgement
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Suggested labels

refactoring

Suggested reviewers

  • petr-pokorny-absa
  • lsulak
  • oto-macenauer-absa

Poem

🐰 I nibble secrets, stash and keep,
One cozy connection for lambdas to sleep.
I close on hiccups, then try once more,
No new TCPs knocking at the door.
Hop — reuse wins, the rabbit says "encore!"

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 38.92% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'PostgreSQL connection caching and reuse' directly and clearly summarizes the main change introduced in the PR.
Description check ✅ Passed The PR description covers the Overview and is mostly complete, but the Release Notes section is incomplete (ends with 'PostgreSQL connection pooling for' without finishing the thought).
Linked Issues check ✅ Passed The PR successfully implements connection caching for PostgreSQL readers and writers with automatic reconnection logic, matching the core acceptance criteria from issue #115.
Out of Scope Changes check ✅ Passed All changes are in scope: PostgreSQL infrastructure refactoring, configuration updates, test improvements, and handler/logging adjustments directly support the connection caching objective.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/postgresql-connection-pooling

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 0/1 reviews remaining, refill in 60 minutes.

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
tests/unit/readers/test_reader_postgres.py (1)

385-405: Unused variable pagination flagged by static analysis.

The variable pagination on line 402 is unpacked but never used. Consider using an underscore prefix to indicate it's intentionally unused.

🔧 Suggested fix
-            rows, pagination = reader.read_stats(limit=10)
+            rows, _pagination = reader.read_stats(limit=10)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/readers/test_reader_postgres.py` around lines 385 - 405, The test
test_retries_on_operational_error unpacks reader.read_stats into rows,
pagination but never uses pagination; rename the unused variable to _pagination
(or simply use an underscore `_`) to satisfy static analysis and indicate
intentional non-use. Update the line that calls reader.read_stats in
test_retries_on_operational_error to unpack as rows, _pagination (or rows, _) so
the test behavior (asserting connect call count and rows) remains unchanged
while eliminating the unused-variable warning.
src/writers/writer_postgres.py (1)

60-60: Consider adding explicit connection cleanup for non-Lambda deployments.

The cached connection has no explicit close() mechanism. While this works well for Lambda (connections naturally close when the container terminates), long-running processes or integration tests may benefit from explicit cleanup. The Writer base class could be extended with an optional close() method.

This is not blocking for the current Lambda use case.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/writers/writer_postgres.py` at line 60, Add an explicit cleanup hook that
closes the cached DB connection: extend the Writer base class with an optional
close() method and implement it in the Postgres writer to call and null out
self._connection.close() (or await if async) when a connection exists; update
any connection-creating methods that set self._connection to ensure close() will
be safe to call, and add a brief unit test or integration cleanup call to
exercise Writer.close() in long-running tests or processes to avoid leaked
connections.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@src/writers/writer_postgres.py`:
- Line 60: Add an explicit cleanup hook that closes the cached DB connection:
extend the Writer base class with an optional close() method and implement it in
the Postgres writer to call and null out self._connection.close() (or await if
async) when a connection exists; update any connection-creating methods that set
self._connection to ensure close() will be safe to call, and add a brief unit
test or integration cleanup call to exercise Writer.close() in long-running
tests or processes to avoid leaked connections.

In `@tests/unit/readers/test_reader_postgres.py`:
- Around line 385-405: The test test_retries_on_operational_error unpacks
reader.read_stats into rows, pagination but never uses pagination; rename the
unused variable to _pagination (or simply use an underscore `_`) to satisfy
static analysis and indicate intentional non-use. Update the line that calls
reader.read_stats in test_retries_on_operational_error to unpack as rows,
_pagination (or rows, _) so the test behavior (asserting connect call count and
rows) remains unchanged while eliminating the unused-variable warning.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 6c04b6fc-8b9c-4a86-a7cd-246694e57bde

📥 Commits

Reviewing files that changed from the base of the PR and between ccc1639 and 452008d.

📒 Files selected for processing (7)
  • .github/copilot-instructions.md
  • .github/dependabot.yml
  • src/readers/reader_postgres.py
  • src/writers/writer_postgres.py
  • tests/integration/test_connection_reuse.py
  • tests/unit/readers/test_reader_postgres.py
  • tests/unit/writers/test_writer_postgres.py

Copy link
Copy Markdown
Collaborator

@oto-macenauer-absa oto-macenauer-absa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

Comment thread src/readers/reader_postgres.py Outdated
Comment thread src/readers/reader_postgres.py Outdated
Comment thread src/readers/reader_postgres.py Outdated
Comment thread src/readers/reader_postgres.py Outdated
Comment thread src/readers/reader_postgres.py Outdated
Copy link
Copy Markdown

@lsulak lsulak Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw these queries. Couple of upgrade ideas, if you want, you can either (ordered from low to higher effort & practice):

  • put them at least into triple double-quotes, not like this. """ query here """ i.e. multi-line strings are perfect for this.
  • put them into a separated sql file and load it. Combining SQL and Python is a bad practice. In simple projects like this it's okay but as projects scale this is not maintainable (not to mention to typical engineering practices - formatting, testing, discovery of these SQL files)
  • put them into a separated JINJA2 file - with this, you can parametrize it from Python
  • my most favourite option - AIOSQL: https://nackjicholson.github.io/aiosql/, also the idea here is to have a separated file with the SQL that you load and work with

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[TL;DR, but please read the above still]

on the last option, AIOSQL: https://nackjicholson.github.io/aiosql/

I think that this is actually the simplest, most clear, and fastest option at the same time (I elaborated the previous options mostly for a quick ideas list how I saw it being done in projects across years and years of doing this). If you are bored with doing this manually, just tell this to the CoPilot and it might blow you away how clean and quick solution it can give you (and also it's a good exercise for us to work on our 'AI agentic / context engineering' muscle memory :D), preferably using Opus 4.6 for this. Or even, 4.7 if it's available - iut was released 2 days ago :) https://www.anthropic.com/news/claude-opus-4-7

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! That was whole new way, how to handle sql files inside the python file. I implemented that solution, see the result please.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. I will leave this comment unresolved, so that it can be found if someone / we want to check it again. Thanks!

Comment thread src/readers/reader_postgres.py Outdated
Comment thread src/writers/writer_postgres.py Outdated
Comment thread src/writers/writer_postgres.py Outdated
Comment thread src/readers/reader_postgres.py Outdated
db_cursor.execute(query, params)
col_names = [desc[0] for desc in db_cursor.description] # type: ignore[union-attr]
raw_rows = db_cursor.fetchall()
connection.rollback()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rollback on read? what am I missing here?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand. But this is a correct way, how to leave cached connection in a state for reuse. PostgreSQL automatically opens a transaction, even a plain SELECT. Until that transaction is explicitly closed, the connection is stuck in an "in transaction" state. So we use a rollback. I did add a comment to this logic, to be more clear.

# Rollback closes the implicit transaction opened by the SELECT,
# leaving the cached connection in a clean idle state for reuse.

Copy link
Copy Markdown

@lsulak lsulak Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, because you don't have autocommit=true I assume. It's fine, can be like this

Comment thread src/readers/reader_postgres.py Outdated
Comment thread src/writers/writer_postgres.py Outdated
@lsulak
Copy link
Copy Markdown

lsulak commented Apr 16, 2026

I find the title to be a bit misleading considering the implementation. There is no connection pooling, just caching implemented manually. Either change the title and PR description to say that connection is reshared / cached, or introduce SimpleConnectionPool or something like that.

One idea on the connection pooling: check how many connections can Aurora, with our current infrastructure setup, tolerate. It's good to be setting this up on the 'application' (Lambda) side, but for this you must understand the Lambda Execution lifecycle: https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtime-environment.html because some things can be implemented in 'shared' way between individual Lambda invocations, like connection pooling, but it's slightly more advanced topic and your current approach would need to significantly change. SOmething like, that the connection pool must be mutable and on global level, on the place when the lambda handler is called I think. This is completely possible and would lead to more robustness (having less bottlenecks on DB connections - not that we have bottlenecks currently?), just saying that it's more work. I would read the documentation I sent you and then ask the AI about this (you can learn how it works and check the approach and your code)

From the above, see Optimizing static initialization section:

Static initialization happens before the handler code starts running in a function. This is the initialization code that you provide, that is outside of the main handler. This code is often used to import libraries and dependencies, set up configurations, and initialize connections to other services.

...

The largest contributor of latency before function execution comes from initialization code. This code runs when a new execution environment is created for the first time. 

...

Static initialization is also often the best place to open database connections to allow a function to reuse connections over multiple invocations to the same execution environment. However, you may have large numbers of objects that are only used in certain execution paths in your function. In this case, you can lazily load variables in the global scope to reduce the static initialization duration.

Two more resources:

@tmikula-dev tmikula-dev changed the title PostgreSQL connection pooling PostgreSQL connection caching and reuse Apr 20, 2026
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🧹 Nitpick comments (1)
tests/unit/utils/test_postgres_base.py (1)

76-86: Move PostgreSQL/AWS-secret mocks into shared fixtures.

These tests define external-service mocks inline in several places. Please centralize the pb.psycopg2 and load_postgres_config mocks in conftest.py fixtures to keep the unit-suite isolation consistent.

As per coding guidelines, tests/unit/**/*.py: Unit tests must mock external services (Kafka, EventBridge, PostgreSQL, S3) via conftest.py.

Also applies to: 108-139, 187-220

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/utils/test_postgres_base.py` around lines 76 - 86, Tests
inline-patch external services (pb.psycopg2 and load_postgres_config) inside
test_postgres_base.py; move those mocks into reusable fixtures in conftest.py
and update tests to use them. Create fixtures in conftest.py that patch
src.utils.postgres_base.load_postgres_config (returning the secret dict) and
pb.psycopg2 connection behavior, then remove the inline with patch(...) blocks
from test_pg_config_builds_correct_values and related tests (including the ones
at 108-139 and 187-220) and accept the fixtures (e.g., postgres_config_mock,
psycopg2_mock) as function args so tests simply access base._pg_config and other
behavior without in-test patching. Ensure fixture scope is appropriate (function
or module) and that mocks expose call_count/assertions used by the tests.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/readers/reader_postgres.py`:
- Around line 159-162: The finally block currently calls connection.rollback()
unguarded which can raise and replace the original execute() error; wrap the
rollback call in its own try/except so any exception from connection.rollback()
is caught and logged/suppressed, and ensure the cached connection is discarded
(close the connection and clear whatever cache variable holds it, e.g. set
self._cached_connection = None or call the connection-cache cleanup helper) so a
bad connection isn’t reused; also make sure to re-raise the original exception
from the execute path (do not let rollback exceptions replace it).

In `@src/utils/postgres_base.py`:
- Around line 55-68: The _build_postgres_config currently masks missing/invalid
fields by coercing to empty strings and port 0; change it to validate required
keys from aws_secret and raise clear errors instead of silent defaults: in
_build_postgres_config check aws_secret contains non-empty
"database","host","user","password" and that "port" is present and a positive
integer (raise ValueError with descriptive messages on failures), then construct
and return PostgresConfig using those validated values so malformed AWS secrets
fail fast; reference the function _build_postgres_config, the aws_secret input
dict, and the PostgresConfig constructor when making the changes.
- Around line 123-142: The _execute_with_retry method currently retries any
operation (operation: Callable[..., T]) on OperationalError which can duplicate
non-idempotent writes; change the API so retries are opt-in: add a parameter
(e.g. retry: bool or idempotent: bool) to _execute_with_retry and require
callers of writer paths to pass retry=True only when they guarantee idempotency,
or alternatively require callers to supply an idempotency token/callback; inside
_execute_with_retry (referencing POSTGRES_MAX_RETRIES, _get_connection,
_close_connection, OperationalError) only perform the retry loop when the new
flag indicates it is safe, otherwise raise immediately after the first
OperationalError with the original exception preserved.
- Around line 99-109: The code calling psycopg2.connect via connect_kwargs lacks
a connect_timeout, which can cause Lambda functions to hang on network/DNS
stalls; update the connection construction in the method that builds
connect_kwargs (referencing connect_kwargs, pg_config, _connect_options and
psycopg2.connect) to include a connect_timeout value (make it configurable via
pg_config or a class attribute with a sensible default, e.g., 5–10 seconds) and
ensure that value is added to connect_kwargs before calling psycopg2.connect so
self._connection is created with the timeout applied.

In `@src/writers/writer_postgres.py`:
- Around line 188-189: The exception log currently builds a string into err_msg
and calls logger.exception(err_msg) which forces eager formatting and may omit a
trailing period; change the call to use lazy logging with logger.exception("The
Postgres writer failed with unknown error: %s.", e) (or logger.exception("The
Postgres writer failed with unknown error: %s.", str(e))) and keep err_msg for
the return value if needed; update the message to ensure it ends with a period
and reference the err_msg variable and logger.exception call in your edit.
- Around line 185-186: The retry wrapper around _write_topic via
_execute_with_retry is re-running non-idempotent INSERT batches (which include
connection.commit()) and can cause duplicate rows if the commit actually
succeeded but the client perceived failure; update the implementation to make
retries idempotent by either modifying the INSERT statements in
src/writers/sql/inserts.sql to include an ON CONFLICT ... DO NOTHING/DO UPDATE
targeting event_id, or ensure a UNIQUE constraint exists on event_id in the DB
schema, or change _execute_with_retry/_write_topic so retries only occur before
any DML is sent (i.e., detect/avoid retrying after commit); pick one of these
fixes and apply it consistently to _write_topic and the SQL files.

In `@tests/unit/readers/test_reader_postgres.py`:
- Line 405: The test currently unpacks a second value from
reader.read_stats(limit=10) into pagination but never uses it, triggering
RUF059; change the unpack target to an intentionally unused variable (e.g.,
_pagination or _) when calling reader.read_stats in the test so the linter knows
the value is intentionally ignored while still preserving the two-value
unpacking for the read_stats function.

In `@tests/unit/utils/test_postgres_base.py`:
- Around line 41-47: The test currently asserts empty-string/0 defaults for
missing DB fields; instead make the behavior fail-fast by updating
_build_postgres_config to validate required fields (database, host, user,
password, port) and raise a clear exception (e.g., ValueError) when any are
missing or invalid, and change
test_build_postgres_config_defaults_for_missing_keys to assert that calling
_build_postgres_config({}) raises that exception; reference the
_build_postgres_config function for where to add the validation and the
test_test_build_postgres_config_defaults_for_missing_keys for the assertion
change.

In `@tests/unit/writers/test_writer_postgres.py`:
- Around line 188-192: Replace manual assignments/deletions of
type(writer)._pg_config in tests (e.g., test_write_skips_when_no_database) with
monkeypatch to avoid leaks: use the monkeypatch fixture to call
monkeypatch.setattr(type(writer), "_pg_config", property(lambda self:
{"database": ""})) before invoking WriterPostgres.write and remove the manual
del; apply this pattern for all tests that override _pg_config (e.g., those
referencing WriterPostgres and _pg_config at the listed lines).

---

Nitpick comments:
In `@tests/unit/utils/test_postgres_base.py`:
- Around line 76-86: Tests inline-patch external services (pb.psycopg2 and
load_postgres_config) inside test_postgres_base.py; move those mocks into
reusable fixtures in conftest.py and update tests to use them. Create fixtures
in conftest.py that patch src.utils.postgres_base.load_postgres_config
(returning the secret dict) and pb.psycopg2 connection behavior, then remove the
inline with patch(...) blocks from test_pg_config_builds_correct_values and
related tests (including the ones at 108-139 and 187-220) and accept the
fixtures (e.g., postgres_config_mock, psycopg2_mock) as function args so tests
simply access base._pg_config and other behavior without in-test patching.
Ensure fixture scope is appropriate (function or module) and that mocks expose
call_count/assertions used by the tests.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 641ba377-cd75-4685-b525-f0b77848a77c

📥 Commits

Reviewing files that changed from the base of the PR and between 452008d and 3cdabd0.

📒 Files selected for processing (15)
  • .github/copilot-instructions.md
  • requirements.txt
  • src/handlers/handler_topic.py
  • src/readers/reader_postgres.py
  • src/readers/sql/stats.sql
  • src/utils/config_loader.py
  • src/utils/constants.py
  • src/utils/postgres_base.py
  • src/utils/utils.py
  • src/writers/sql/inserts.sql
  • src/writers/writer_postgres.py
  • tests/unit/readers/test_reader_postgres.py
  • tests/unit/utils/test_postgres_base.py
  • tests/unit/utils/test_trace_logging.py
  • tests/unit/writers/test_writer_postgres.py
✅ Files skipped from review due to trivial changes (5)
  • requirements.txt
  • .github/copilot-instructions.md
  • src/utils/utils.py
  • src/readers/sql/stats.sql
  • src/writers/sql/inserts.sql

Comment thread src/readers/reader_postgres.py Outdated
Comment thread src/utils/postgres_base.py
Comment thread src/utils/postgres_base.py
Comment thread src/utils/postgres_base.py Outdated
Comment thread src/writers/writer_postgres.py Outdated
Comment thread src/writers/writer_postgres.py Outdated
Comment thread tests/unit/readers/test_reader_postgres.py Outdated
Comment thread tests/unit/utils/test_postgres_base.py Outdated
Comment thread tests/unit/writers/test_writer_postgres.py Outdated
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 21, 2026

Caution

Failed to replace (edit) comment. This is likely due to insufficient permissions or the comment being deleted.

Error details
{}

@tmikula-dev tmikula-dev added the work in progress Work on this item is not yet finished (mainly intended for PRs) label Apr 21, 2026
@tmikula-dev tmikula-dev requested a review from lsulak April 22, 2026 06:47
@tmikula-dev tmikula-dev removed the work in progress Work on this item is not yet finished (mainly intended for PRs) label Apr 22, 2026
# Conflicts:
#	requirements.txt
#	src/handlers/handler_topic.py
#	src/readers/reader_postgres.py
#	src/writers/writer_postgres.py
Comment thread src/utils/constants.py Outdated
Comment thread src/utils/constants.py Outdated
Comment thread src/readers/reader_postgres.py Outdated
Comment thread src/readers/reader_postgres.py Outdated
Comment thread src/readers/reader_postgres.py Outdated
# Rollback closes the implicit transaction opened by the SELECT,
# leaving the cached connection in a clean idle state for reuse.
try:
connection.rollback()
Copy link
Copy Markdown

@lsulak lsulak Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand. It's not because of Select query itself, it's probably that in psycopg2, every query runs inside a transaction unless autocommit is enabled. And it's not (I found only 1 occurrence, in integration tests). So it's more like this:

End the implicit transaction started by the query. Even SELECTs open a transaction in PostgreSQL; if left open, the connection remains "idle in transaction", which can cause MVCC bloat and issues when reusing the connection.

Maybe improve the comment but I think that the overall approach is okay like this. It's more defensive strategy

Copy link
Copy Markdown
Collaborator Author

@tmikula-dev tmikula-dev May 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is updated also in commit: 0e4fbc8. Do you agree with this approach, where we do not use autocommit right?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I do agree

Comment thread src/writers/writer_postgres.py Outdated
Comment thread src/writers/writer_postgres.py Outdated
Copy link
Copy Markdown

@lsulak lsulak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing the comments, I finished another review iteration, found couple of things but I think that this is a good improvement.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/writers/writer_kafka.py (1)

145-153: ⚠️ Potential issue | 🔴 Critical

Fail the write when Kafka still has queued messages after all flush retries.

A positive remaining from flush() means delivery is still unconfirmed. Right now this path only logs a warning and returns success when errors is empty, allowing the Lambda to acknowledge an event that never reached Kafka. This is a critical reliability bug.

Additionally, the current log message at line 148 is missing a period (violates the guideline to end all log messages with a period).

Suggested fix
-        if isinstance(remaining, int) and remaining > 0:
-            logger.warning(
-                "Kafka flush timeout after %ss: %d message(s) still pending.", _KAFKA_FLUSH_TIMEOUT_SEC, remaining
-            )
+        if isinstance(remaining, int) and remaining > 0:
+            failure_text = (
+                f"Kafka writer failed: flush timed out after {_KAFKA_FLUSH_TIMEOUT_SEC}s "
+                f"with {remaining} message(s) still pending"
+            )
+            logger.error("%s.", failure_text)
+            raise WriteError(failure_text)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/writers/writer_kafka.py` around lines 145 - 153, The code currently logs
a warning when Kafka flush() returns a positive remaining but does not fail the
write; change the behavior in the block handling remaining to treat any int
remaining > 0 as a write failure: update the logger.warning message to end with
a period and then raise WriteError (similar to the existing failure path that
uses failure_text) so the function does not return success when delivery is
unconfirmed; ensure you reference the same symbols (remaining, flush(),
_KAFKA_FLUSH_TIMEOUT_SEC, logger, WriteError) and reuse the errors/has_exception
pattern to include any existing error text if present.
🧹 Nitpick comments (2)
tests/unit/writers/test_writer_eventbridge.py (1)

80-93: ⚡ Quick win

Use mocker.patch here to match the repo test pattern.

These two tests still use unittest.mock.patch, while the repo standard for tests is mocker.patch(...) / mocker.patch.object(...). Keeping one patching style makes the fixture behavior from conftest.py consistent across the suite.

Suggested rewrite
-def test_check_health_success():
+def test_check_health_success(mocker):
     writer = WriterEventBridge({"event_bus_arn": "arn:aws:events:region:acct:event-bus/bus"})
-    with patch("boto3.client") as mock_client:
-        mock_client.return_value = MagicMock()
-        writer.check_health()
+    mocker.patch("boto3.client", return_value=MagicMock())
+    writer.check_health()
     assert writer._client is not None
 
 
-def test_check_health_client_error():
+def test_check_health_client_error(mocker):
     class DummyError(BotoCoreError):
         fmt = "Dummy error"
 
     writer = WriterEventBridge({"event_bus_arn": "arn:aws:events:region:acct:event-bus/bus"})
-    with patch("boto3.client", side_effect=DummyError()):
-        with pytest.raises(HealthCheckError):
-            writer.check_health()
+    mocker.patch("boto3.client", side_effect=DummyError())
+    with pytest.raises(HealthCheckError):
+        writer.check_health()

As per coding guidelines tests/**/*.py: Use mocker.patch("module.dependency") or mocker.patch.object(Class, "method") for mocking in tests.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/writers/test_writer_eventbridge.py` around lines 80 - 93, Replace
the use of unittest.mock.patch in the tests test_check_health and
test_check_health_client_error with the pytest-mock fixture by using
mocker.patch (or mocker.patch.object) so the repo's test pattern is followed;
specifically, in test_check_health replace with mocker.patch("boto3.client") and
set its return_value to a MagicMock before calling
WriterEventBridge.check_health (ensure writer = WriterEventBridge(...) remains),
and in test_check_health_client_error use mocker.patch("boto3.client",
side_effect=DummyError()) so that calling writer.check_health() raises
HealthCheckError; keep the same DummyError class and assertions but swap the
patch/context-manager usage to mocker.patch to match conftest fixtures.
tests/unit/writers/test_writer_postgres.py (1)

376-382: 💤 Low value

Consider a cleaner alternative to the generator .throw() trick.

(_ for _ in ()).throw(ValueError(...)) is correct but obscure. Switching to monkeypatch (already used by all sibling tests) with a named helper makes intent clear and keeps the fixture style consistent.

♻️ Proposed refactor
-def test_check_health_load_config_exception(mocker):
+def test_check_health_load_config_exception(monkeypatch):
     """check_health raises HealthCheckError when _pg_config raises."""
     writer = WriterPostgres({})
     writer._secret_name = "mysecret"
     writer._secret_region = "eu-west-1"

-    mocker.patch.object(
-        type(writer),
-        "_pg_config",
-        new_callable=lambda: property(lambda self: (_ for _ in ()).throw(ValueError("secret fetch failed"))),
-    )
+    def _raise_config_error(self):
+        raise ValueError("secret fetch failed")
+
+    monkeypatch.setattr(type(writer), "_pg_config", property(_raise_config_error))
     with pytest.raises(HealthCheckError, match="secret fetch failed"):
         writer.check_health()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/writers/test_writer_postgres.py` around lines 376 - 382, The test
uses an obscure generator `.throw()` to make the _pg_config property raise;
replace that with a clearer monkeypatch-based helper so intent matches sibling
tests: use the pytest monkeypatch fixture to set type(writer)._pg_config to a
property that raises ValueError("secret fetch failed") (e.g., define a small
helper function or lambda that raises and wrap it with property) and then assert
writer.check_health() raises HealthCheckError with the same message; update the
test to call monkeypatch.setattr(type(writer), "_pg_config",
property(raising_helper), raising=True) and keep the
pytest.raises(HealthCheckError, match="secret fetch failed") assertion.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/readers/reader_postgres.py`:
- Around line 99-118: In read_stats(), move the resolution and validation of
self._pg_config (including checking "database" and REQUIRED_CONNECTION_FIELDS)
inside the existing try block that wraps the DB call so any
secret-loading/validation errors are raised as RuntimeError and caught by
HandlerStats.handle_request; ensure the try includes evaluating config,
computing limit/ts_start/ts_end as needed, and only then calls
_execute_with_retry(lambda conn: self._run_stats_query(...)), preserving the
existing PsycopgError handling and self._close_connection() behavior.

In `@src/utils/logging_levels.py`:
- Around line 58-59: Normalize and validate LOG_LEVEL before passing to
setLevel: read os.environ.get("LOG_LEVEL", "INFO"), strip() and .upper() it,
then resolve it to a numeric level via logging.getLevelName(normalized) and
check that the result is an int (valid level); if not, fall back to a safe
default (e.g., logging.INFO) and optionally log a warning. Then call
root_logger.setLevel with the numeric level (the int) instead of the raw string;
update occurrences of log_level and root_logger.setLevel in this module
accordingly.

In `@src/writers/writer_postgres.py`:
- Around line 169-174: The try/except around accessing self._pg_config currently
catches RuntimeError, BotoCoreError, and ClientError but not ValueError, so
ValueError (raised when database is present but other fields are missing)
escapes and is not translated into a WriteError; update the except tuple in the
block that accesses _pg_config to also catch ValueError and raise
WriteError(err_msg) from e (same pattern as for the other exceptions) so
HandlerTopic sees the writer-level error; keep the same err_msg formatting and
use logger.exception as done now.
- Around line 231-235: check_health() can raise RuntimeError when accessing
self._pg_config (secret loading), but it only catches BotoCoreError,
ClientError, ValueError, KeyError; update the exception handling in the
try/except around accessing self._pg_config to also catch RuntimeError and
re-raise it as HealthCheckError (raise HealthCheckError(str(err)) from err) so
HandlerHealth (which only handles HealthCheckError) will correctly treat
secret-loading failures as degraded health; reference symbols: check_health(),
self._pg_config, HealthCheckError, HandlerHealth, write().
- Around line 186-188: The current early-return when
self._is_psycopg2_available() is False causes silent success and dropped events;
instead fail fast by raising a clear exception (e.g., RuntimeError or the
module's WriterError) from the same method (the writer method containing the
check, e.g., PostgresWriter._write or PostgresWriter.publish) so callers see a
write failure; preserve the log call but follow it with a raised exception that
includes context (psycopg2 missing and PostgreSQL configured) and any relevant
config identifiers.

In `@src/writers/writer.py`:
- Around line 41-58: The Writer interface was changed to exception-driven
returns but must adhere to the documented tuple contract; update the abstract
methods on class Writer so write(self, topic_name: str, message: dict[str, Any])
-> tuple[bool, str | None] returns (success, optional_message) instead of
raising, and check_health(self) -> tuple[bool, str] returns (is_healthy,
status_message); ensure all concrete writers in src/writers/*.py implement these
signatures and propagate errors by returning (False, "error message") or (True,
"ok") rather than raising exceptions so existing consumers keep the original
tuple-based API.

---

Outside diff comments:
In `@src/writers/writer_kafka.py`:
- Around line 145-153: The code currently logs a warning when Kafka flush()
returns a positive remaining but does not fail the write; change the behavior in
the block handling remaining to treat any int remaining > 0 as a write failure:
update the logger.warning message to end with a period and then raise WriteError
(similar to the existing failure path that uses failure_text) so the function
does not return success when delivery is unconfirmed; ensure you reference the
same symbols (remaining, flush(), _KAFKA_FLUSH_TIMEOUT_SEC, logger, WriteError)
and reuse the errors/has_exception pattern to include any existing error text if
present.

---

Nitpick comments:
In `@tests/unit/writers/test_writer_eventbridge.py`:
- Around line 80-93: Replace the use of unittest.mock.patch in the tests
test_check_health and test_check_health_client_error with the pytest-mock
fixture by using mocker.patch (or mocker.patch.object) so the repo's test
pattern is followed; specifically, in test_check_health replace with
mocker.patch("boto3.client") and set its return_value to a MagicMock before
calling WriterEventBridge.check_health (ensure writer = WriterEventBridge(...)
remains), and in test_check_health_client_error use mocker.patch("boto3.client",
side_effect=DummyError()) so that calling writer.check_health() raises
HealthCheckError; keep the same DummyError class and assertions but swap the
patch/context-manager usage to mocker.patch to match conftest fixtures.

In `@tests/unit/writers/test_writer_postgres.py`:
- Around line 376-382: The test uses an obscure generator `.throw()` to make the
_pg_config property raise; replace that with a clearer monkeypatch-based helper
so intent matches sibling tests: use the pytest monkeypatch fixture to set
type(writer)._pg_config to a property that raises ValueError("secret fetch
failed") (e.g., define a small helper function or lambda that raises and wrap it
with property) and then assert writer.check_health() raises HealthCheckError
with the same message; update the test to call monkeypatch.setattr(type(writer),
"_pg_config", property(raising_helper), raising=True) and keep the
pytest.raises(HealthCheckError, match="secret fetch failed") assertion.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 24f9cb44-6a1d-4037-87d6-3bf11d9eda97

📥 Commits

Reviewing files that changed from the base of the PR and between 3cdabd0 and 65ef0fe.

📒 Files selected for processing (27)
  • .github/copilot-instructions.md
  • .pylintrc
  • api.yaml
  • requirements.txt
  • src/event_gate_lambda.py
  • src/event_stats_lambda.py
  • src/handlers/handler_health.py
  • src/handlers/handler_stats.py
  • src/handlers/handler_topic.py
  • src/readers/reader_postgres.py
  • src/utils/config_loader.py
  • src/utils/constants.py
  • src/utils/logging_levels.py
  • src/utils/postgres_base.py
  • src/writers/writer.py
  • src/writers/writer_eventbridge.py
  • src/writers/writer_kafka.py
  • src/writers/writer_postgres.py
  • tests/unit/handlers/test_handler_health.py
  • tests/unit/handlers/test_handler_topic.py
  • tests/unit/readers/test_reader_postgres.py
  • tests/unit/test_event_stats_lambda.py
  • tests/unit/utils/test_postgres_base.py
  • tests/unit/utils/test_trace_logging.py
  • tests/unit/writers/test_writer_eventbridge.py
  • tests/unit/writers/test_writer_kafka.py
  • tests/unit/writers/test_writer_postgres.py
✅ Files skipped from review due to trivial changes (2)
  • .github/copilot-instructions.md
  • tests/unit/utils/test_postgres_base.py
🚧 Files skipped from review as they are similar to previous changes (4)
  • tests/unit/utils/test_trace_logging.py
  • requirements.txt
  • src/utils/postgres_base.py
  • src/utils/config_loader.py

Comment thread src/readers/reader_postgres.py Outdated
Comment thread src/utils/logging_levels.py Outdated
Comment thread src/writers/writer_postgres.py
Comment thread src/writers/writer_postgres.py Outdated
Comment thread src/writers/writer_postgres.py
Comment thread src/writers/writer.py
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@tests/unit/readers/test_reader_postgres.py`:
- Line 354: The regex in the pytest.raises assertion is using an unescaped
trailing dot in the match string ("Failed to load."), which treats the dot as a
wildcard; update the pytest.raises(...) call (the match= argument on the
pytest.raises invocation) to escape the period (e.g., use "Failed to load\\." or
raw string r"Failed to load\.") so the assertion matches a literal final period
instead of any character.
- Line 232: The pytest.raises call that uses match="PostgreSQL configuration
error.*Missing PostgreSQL secret fields" contains an unescaped backslash-style
regex sequence and should use a raw string to satisfy RUF043; update the
pytest.raises(...) invocation (the match= argument in the pytest.raises call
inside the Postgres reader test) to use a raw string prefix (r"...") so the
regex is treated correctly.

In `@tests/unit/writers/test_writer_postgres.py`:
- Around line 202-203: The test assertion passes a non-raw string to
pytest.raises(match=...) which triggers RUF043 because the pattern "host.*not
configured" contains regex metacharacters; update the test in
tests/unit/writers/test_writer_postgres.py to pass a raw string literal for the
match parameter (e.g., r"host.*not configured") in the pytest.raises call that
wraps writer.write("public.cps.za.test", {}) so the intent as a regex is
explicit and the quality gate violation is resolved.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 260f790c-4e89-4c09-98e2-835a43312e51

📥 Commits

Reviewing files that changed from the base of the PR and between 65ef0fe and 0f8c338.

📒 Files selected for processing (6)
  • requirements.txt
  • src/readers/reader_postgres.py
  • src/utils/logging_levels.py
  • src/writers/writer_postgres.py
  • tests/unit/readers/test_reader_postgres.py
  • tests/unit/writers/test_writer_postgres.py
✅ Files skipped from review due to trivial changes (1)
  • src/writers/writer_postgres.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/utils/logging_levels.py
  • src/readers/reader_postgres.py

Comment thread tests/unit/readers/test_reader_postgres.py
Comment thread tests/unit/readers/test_reader_postgres.py
Comment thread tests/unit/writers/test_writer_postgres.py
Copy link
Copy Markdown

@lsulak lsulak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, approving! Thanks for the hard work!

@tmikula-dev tmikula-dev merged commit c7c3b75 into master May 5, 2026
11 checks passed
@tmikula-dev tmikula-dev deleted the feature/postgresql-connection-pooling branch May 5, 2026 15:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

PostgreSQL connection pooling for WriterPostgres and ReaderPostgres

3 participants