Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
015f49b
refactor(service): split CollectionDAO & EntityRepository; fix lineag…
harshach Jun 6, 2026
2d7d9ff
Merge origin/main into harshach/port-louis
harshach Jun 6, 2026
ee8966c
perf(jdbi3): index data_contract lookups (A3) + keyset tag_usage clea…
harshach Jun 6, 2026
bc157cf
perf(jdbi3): batch field_relationship inserts for mention write paths…
harshach Jun 6, 2026
5346830
refactor(jdbi3): extract shared row/mappers from CollectionDAO (Track…
harshach Jun 6, 2026
e32736f
fix(it): redirect moved jdbi3 symbols in integration-tests after the …
harshach Jun 7, 2026
e9fa5c9
fix(jdbi3): make tag_usage cleanup keyset pagination NULL-safe (E1 fo…
harshach Jun 7, 2026
4608148
refactor(jdbi3): extract static custom-property validators into Custo…
harshach Jun 7, 2026
89c3e93
perf(jdbi3): make report_data delete sargable via UTC timestamp range…
harshach Jun 7, 2026
3fcf3f4
perf(jdbi3): batch EntityReference resolution in user/team bulk fetch…
harshach Jun 7, 2026
f0ef396
test+docs: address PR #28778 review — exhaustive composition guard + …
harshach Jun 7, 2026
7d3e082
Merge origin/main into harshach/port-louis
harshach Jun 7, 2026
4b8cc21
Revert "perf(jdbi3): data_contract generated columns + index (A3)" — …
harshach Jun 7, 2026
2163e85
fix(lineage): emit ENTITY_LINEAGE_DELETED for source-based bulk delet…
harshach Jun 7, 2026
090a8b9
Merge branch 'main' into harshach/port-louis
harshach Jun 7, 2026
ff3481a
Merge branch 'main' into harshach/port-louis
harshach Jun 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions bootstrap/sql/migrations/native/2.0.0/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -377,3 +377,26 @@ CREATE TABLE IF NOT EXISTS `user_session` (
KEY `user_session_idle_expiry` (`status`,`idleExpiresAt`),
KEY `user_session_prune` (`status`,`updatedAt`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

-- Perf: UsageDAO.computePercentile runs four correlated COUNT(*) subqueries that each
-- filter entity_usage on (entityType, usageDate). The only existing index is
-- UNIQUE (id, usageDate), which is unusable for that predicate, so every run full-scans
-- the table once per subquery. A composite (entityType, usageDate) index turns the
-- percentile subqueries into range scans. MySQL has no `ADD KEY IF NOT EXISTS`, so guard
-- via information_schema.
SET @ddl = (
SELECT IF(
EXISTS (
SELECT 1
FROM information_schema.statistics
WHERE table_schema = DATABASE()
AND table_name = 'entity_usage'
AND index_name = 'idx_entity_usage_entitytype_usagedate'
),
'SELECT 1',
'ALTER TABLE entity_usage ADD KEY idx_entity_usage_entitytype_usagedate (entityType, usageDate)'
)
);
PREPARE stmt FROM @ddl;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
26 changes: 26 additions & 0 deletions bootstrap/sql/migrations/native/2.0.0/postgres/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -329,3 +329,29 @@ CREATE INDEX IF NOT EXISTS user_session_user_status_idx ON user_session USING bt
CREATE INDEX IF NOT EXISTS user_session_expiry_idx ON user_session USING btree (status, expiresat);
CREATE INDEX IF NOT EXISTS user_session_idle_expiry_idx ON user_session USING btree (status, idleexpiresat);
CREATE INDEX IF NOT EXISTS user_session_prune_idx ON user_session USING btree (status, updatedat);

-- Perf: UsageDAO.computePercentile runs four correlated COUNT(*) subqueries that each
-- filter entity_usage on (entityType, usageDate). The only existing index is
-- UNIQUE (id, usageDate), which is unusable for that predicate, so every run sequential-scans
-- the table once per subquery. A composite (entityType, usageDate) index turns the
-- percentile subqueries into range scans.
CREATE INDEX IF NOT EXISTS idx_entity_usage_entitytype_usagedate
ON entity_usage (entityType, usageDate);

-- Correctness: migration 1.6.3 defined the Postgres isBot generated column as
-- (json ->> 'deleted')::boolean instead of (json ->> 'isBot'), so on Postgres isBot has
-- always mirrored `deleted` rather than the real bot flag. countDailyActiveUsers (and any
-- isBot column filter) was therefore wrong on Postgres. Postgres cannot alter a generated
-- column's expression in place, so backfill any rows missing $.isBot, drop the column
-- (this also drops idx_isBot) and recreate it reading the correct path.
-- Operational note: ADD COLUMN ... STORED rewrites the whole user_entity table and holds an
-- ACCESS EXCLUSIVE lock for its duration, and the backfill UPDATE below also scans the full
-- table. On deployments with a very large user_entity, run this migration in a maintenance
-- window; runtime scales with row count (typically seconds, but minutes for millions of users).
-- The change is one-time, idempotent, and Postgres-only (MySQL 1.6.3 was already correct).
UPDATE user_entity SET json = jsonb_set(json, '{isBot}', 'false'::jsonb, true)
WHERE (json ->> 'isBot') IS NULL;
ALTER TABLE user_entity DROP COLUMN IF EXISTS isBot;
ALTER TABLE user_entity
ADD COLUMN isBot BOOLEAN GENERATED ALWAYS AS ((json ->> 'isBot')::boolean) STORED NOT NULL;
CREATE INDEX IF NOT EXISTS idx_isBot ON user_entity (isBot);
Comment thread
gitar-bot[bot] marked this conversation as resolved.
Comment thread
gitar-bot[bot] marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.EntityCaches;
import org.openmetadata.service.util.RequestEntityCache;

/**
Expand Down Expand Up @@ -124,11 +124,11 @@ void testGuavaCacheEntryRemovedAfterUpdate(TestNamespace ns) throws Exception {
UUID domainId = domain.getId();

// Force a cache load
EntityRepository.CACHE_WITH_ID.get(new ImmutablePair<>(Entity.DOMAIN, domainId));
EntityCaches.CACHE_WITH_ID.get(new ImmutablePair<>(Entity.DOMAIN, domainId));

// Verify cache has the entity
String cachedJson =
EntityRepository.CACHE_WITH_ID.getIfPresent(new ImmutablePair<>(Entity.DOMAIN, domainId));
EntityCaches.CACHE_WITH_ID.getIfPresent(new ImmutablePair<>(Entity.DOMAIN, domainId));
assertNotNull(cachedJson, "Cache should contain the entity after get()");
Domain cachedEntity = JsonUtils.readValue(cachedJson, Domain.class);
assertEquals("before update", cachedEntity.getDescription());
Expand All @@ -138,8 +138,7 @@ void testGuavaCacheEntryRemovedAfterUpdate(TestNamespace ns) throws Exception {
SdkClients.adminClient().domains().update(domainId.toString(), domain);

// After update, re-loading from cache should get fresh data
String freshJson =
EntityRepository.CACHE_WITH_ID.get(new ImmutablePair<>(Entity.DOMAIN, domainId));
String freshJson = EntityCaches.CACHE_WITH_ID.get(new ImmutablePair<>(Entity.DOMAIN, domainId));
Domain freshEntity = JsonUtils.readValue(freshJson, Domain.class);
assertEquals(
"after update",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.CollectionDAO.SearchIndexRetryQueueDAO;
import org.openmetadata.service.jdbi3.CollectionDAO.SearchIndexRetryQueueDAO.SearchIndexRetryRecord;
import org.openmetadata.service.jdbi3.SearchReindexDAOs.SearchIndexRetryQueueDAO;
import org.openmetadata.service.jdbi3.SearchReindexDAOs.SearchIndexRetryQueueDAO.SearchIndexRetryRecord;
import org.openmetadata.service.search.SearchIndexRetryQueue;
import org.openmetadata.service.search.SearchIndexRetryWorker;
import org.openmetadata.service.search.SearchRepository;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import org.openmetadata.service.jdbi3.TableRepository;
import org.openmetadata.service.rules.RuleEngine;
import org.openmetadata.service.util.AsyncService;
import org.openmetadata.service.util.CustomPropertyValidator;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.FullyQualifiedName;
import org.openmetadata.service.util.RestUtil.PutResponse;
Expand Down Expand Up @@ -848,7 +849,8 @@ private Object parseEnumType(
String propertyConfig) {
List<String> enumKeys = listOrEmpty(fieldToInternalArray(fieldValue.toString()));
try {
EntityRepository.validateEnumKeys(fieldName, JsonUtils.valueToTree(enumKeys), propertyConfig);
CustomPropertyValidator.validateEnumKeys(
fieldName, JsonUtils.valueToTree(enumKeys), propertyConfig);
} catch (Exception e) {
deferredFailure(
csvRecord,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import org.openmetadata.service.governance.workflows.WorkflowHandler;
import org.openmetadata.service.jdbi3.BulkExecutor;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityCaches;
import org.openmetadata.service.jdbi3.EntityRelationshipRepository;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.MigrationDAO;
Expand Down Expand Up @@ -1230,8 +1231,8 @@ public void start() {

@Override
public void stop() throws InterruptedException, SchedulerException {
LOG.info("Cache with Id Stats {}", EntityRepository.CACHE_WITH_ID.stats());
LOG.info("Cache with name Stats {}", EntityRepository.CACHE_WITH_NAME.stats());
LOG.info("Cache with Id Stats {}", EntityCaches.CACHE_WITH_ID.stats());
LOG.info("Cache with name Stats {}", EntityCaches.CACHE_WITH_NAME.stats());
EventPubSub.shutdown();
EventSubscriptionScheduler.shutDown();
AsyncService.getInstance().shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.CollectionDAO.EntityRelationshipObject;
import org.openmetadata.service.jdbi3.CoreRelationshipDAOs.EntityRelationshipObject;
import org.openmetadata.service.rdf.RdfRepository;

@Slf4j
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.openmetadata.service.apps.bundles.rdf.distributed.RdfIndexJob;
import org.openmetadata.service.exception.AppException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.CollectionDAO.EntityRelationshipObject;
import org.openmetadata.service.jdbi3.CoreRelationshipDAOs.EntityRelationshipObject;
import org.openmetadata.service.jdbi3.EntityDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.ListFilter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@
import org.openmetadata.service.apps.bundles.searchIndex.distributed.PartitionStatus;
import org.openmetadata.service.apps.bundles.searchIndex.distributed.ServerIdentityResolver;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.CollectionDAO.RdfIndexJobDAO.RdfIndexJobRecord;
import org.openmetadata.service.jdbi3.CollectionDAO.RdfIndexPartitionDAO.RdfAggregatedStatsRecord;
import org.openmetadata.service.jdbi3.CollectionDAO.RdfIndexPartitionDAO.RdfEntityStatsRecord;
import org.openmetadata.service.jdbi3.CollectionDAO.RdfIndexPartitionDAO.RdfIndexPartitionRecord;
import org.openmetadata.service.jdbi3.CollectionDAO.RdfIndexPartitionDAO.RdfServerPartitionStatsRecord;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.jdbi3.RdfInfraDAOs.RdfIndexJobDAO.RdfIndexJobRecord;
import org.openmetadata.service.jdbi3.RdfInfraDAOs.RdfIndexPartitionDAO.RdfAggregatedStatsRecord;
import org.openmetadata.service.jdbi3.RdfInfraDAOs.RdfIndexPartitionDAO.RdfEntityStatsRecord;
import org.openmetadata.service.jdbi3.RdfInfraDAOs.RdfIndexPartitionDAO.RdfIndexPartitionRecord;
import org.openmetadata.service.jdbi3.RdfInfraDAOs.RdfIndexPartitionDAO.RdfServerPartitionStatsRecord;
import org.openmetadata.service.util.FullyQualifiedName;
import org.openmetadata.service.util.RestUtil;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@
import org.openmetadata.service.apps.bundles.searchIndex.ReindexingConfiguration;
import org.openmetadata.service.apps.bundles.searchIndex.SearchIndexEntityTypes;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.CollectionDAO.SearchIndexJobDAO;
import org.openmetadata.service.jdbi3.CollectionDAO.SearchIndexJobDAO.SearchIndexJobRecord;
import org.openmetadata.service.jdbi3.CollectionDAO.SearchIndexPartitionDAO;
import org.openmetadata.service.jdbi3.CollectionDAO.SearchIndexPartitionDAO.AggregatedStatsRecord;
import org.openmetadata.service.jdbi3.CollectionDAO.SearchIndexPartitionDAO.EntityStatsRecord;
import org.openmetadata.service.jdbi3.CollectionDAO.SearchIndexPartitionDAO.SearchIndexPartitionRecord;
import org.openmetadata.service.jdbi3.CollectionDAO.SearchIndexPartitionDAO.ServerStatsRecord;
import org.openmetadata.service.jdbi3.CollectionDAO.SearchReindexLockDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.jdbi3.SearchReindexDAOs.SearchIndexJobDAO;
import org.openmetadata.service.jdbi3.SearchReindexDAOs.SearchIndexJobDAO.SearchIndexJobRecord;
import org.openmetadata.service.jdbi3.SearchReindexDAOs.SearchIndexPartitionDAO;
import org.openmetadata.service.jdbi3.SearchReindexDAOs.SearchIndexPartitionDAO.AggregatedStatsRecord;
import org.openmetadata.service.jdbi3.SearchReindexDAOs.SearchIndexPartitionDAO.EntityStatsRecord;
import org.openmetadata.service.jdbi3.SearchReindexDAOs.SearchIndexPartitionDAO.SearchIndexPartitionRecord;
import org.openmetadata.service.jdbi3.SearchReindexDAOs.SearchIndexPartitionDAO.ServerStatsRecord;
import org.openmetadata.service.jdbi3.SearchReindexDAOs.SearchReindexLockDAO;
import org.openmetadata.service.util.RestUtil;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.CollectionDAO.SearchReindexLockDAO;
import org.openmetadata.service.jdbi3.SearchReindexDAOs.SearchReindexLockDAO;

/**
* Manages recovery of distributed indexing jobs after server crashes or restarts.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.CollectionDAO.SearchIndexServerStatsDAO.AggregatedServerStats;
import org.openmetadata.service.jdbi3.CollectionDAO.SearchIndexServerStatsDAO.EntityStats;
import org.openmetadata.service.jdbi3.SearchReindexDAOs.SearchIndexServerStatsDAO.AggregatedServerStats;
import org.openmetadata.service.jdbi3.SearchReindexDAOs.SearchIndexServerStatsDAO.EntityStats;

/**
* Manages stats trackers for all entity types within a job. Provides a simple API to get trackers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.AccessControlDAOs.ChangeEventDAO.ChangeEventRecord;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.CollectionDAO.ChangeEventDAO.ChangeEventRecord;
import org.openmetadata.service.util.DIContainer;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.CollectionDAO.EntityRelationshipObject;
import org.openmetadata.service.jdbi3.CollectionDAO.EntityRelationshipRecord;
import org.openmetadata.service.jdbi3.CoreRelationshipDAOs.EntityRelationshipObject;
import org.openmetadata.service.jdbi3.CoreRelationshipDAOs.EntityRelationshipRecord;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.FullyQualifiedName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void run(OpenMetadataApplicationConfig configuration, Environment environ
}
return;
}
org.openmetadata.service.jdbi3.EntityRepository.onRemoteCacheInvalidate(
org.openmetadata.service.jdbi3.EntityCacheInvalidator.onRemoteCacheInvalidate(
msg.type(), msg.id(), msg.fqn());
if (msg.id() != null && cachedReadBundle != null) {
cachedReadBundle.invalidate(msg.type(), msg.id());
Expand Down Expand Up @@ -220,7 +220,7 @@ public static void registerInvalidatable(Invalidatable layer) {

/**
* Fan an entity-write invalidation out to every registered {@link Invalidatable}. Today
* this is invoked from {@code EntityRepository.invalidateCacheForEntity(type, id, fqn)}
* this is invoked from {@code EntityCacheInvalidator.invalidateCacheForEntity(type, id, fqn)}
* (the static helper called from {@code postCreate} and other mutation paths), from the
* pub-sub handler above when a remote pod publishes a write, and from the admin
* {@code POST /system/cache/invalidate} endpoint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface Invalidatable {
* have both; implementations should drop what they can.
*
* <p>Called on the local pod via {@link CacheBundle#invalidateEntity(String, UUID, String)},
* which is wired into {@code EntityRepository.invalidateCacheForEntity} (called from
* which is wired into {@code EntityCacheInvalidator.invalidateCacheForEntity} (called from
* {@code postCreate}, write-through bulk update paths, and the admin invalidate endpoint).
* Note that {@code postUpdate} / {@code postDelete} / {@code restoreEntity} do NOT call
* this fan-out today — they rely on the write-through cache + L1 eviction. If a new
Expand Down
Loading
Loading