From 2119555c2fe7748f704dfdf245fb32921349ba52 Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Fri, 3 Apr 2026 11:00:58 -0700 Subject: [PATCH 1/3] =?UTF-8?q?feat(optimizer):=20add=20data=20model=20?= =?UTF-8?q?=E2=80=94=20schema,=20entities,=20DTOs,=20converters?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces the optimizer service module with: - MySQL/H2 schema for table_operations, table_stats, table_stats_history, and table_operations_history - JPA entities with JSON column support (vladmihalcea hibernate-types) - All model/DTO/enum types: OperationType, OperationStatus, TableStats, CompleteOperationRequest, JobResult, OperationMetrics, etc. - JPA AttributeConverters for JobResult and OperationMetrics JSON columns - MapStruct mapper (OptimizerMapper) for entity→DTO conversion - Spring Boot application shell and build wiring (settings.gradle, build.gradle dockerPrereqs) No repositories, controllers, or service layer yet — those follow in subsequent PRs. Co-Authored-By: Claude Opus 4.6 --- build.gradle | 3 + services/optimizer/build.gradle | 17 ++++ .../OptimizerServiceApplication.java | 13 +++ .../optimizer/api/mapper/OptimizerMapper.java | 32 ++++++ .../api/model/CompleteOperationRequest.java | 31 ++++++ .../optimizer/api/model/JobResult.java | 25 +++++ .../api/model/OperationHistoryStatus.java | 7 ++ .../optimizer/api/model/OperationMetrics.java | 24 +++++ .../optimizer/api/model/OperationStatus.java | 21 ++++ .../optimizer/api/model/OperationType.java | 12 +++ .../api/model/TableOperationsDto.java | 40 ++++++++ .../api/model/TableOperationsHistoryDto.java | 43 ++++++++ .../optimizer/api/model/TableStats.java | 48 +++++++++ .../optimizer/api/model/TableStatsDto.java | 23 +++++ .../api/model/TableStatsHistoryDto.java | 22 +++++ .../model/UpsertTableOperationsRequest.java | 26 +++++ .../api/model/UpsertTableStatsRequest.java | 25 +++++ .../optimizer/config/JobResultConverter.java | 39 ++++++++ .../config/OperationMetricsConverter.java | 44 +++++++++ .../entity/TableOperationsHistoryRow.java | 91 +++++++++++++++++ .../optimizer/entity/TableOperationsRow.java | 99 +++++++++++++++++++ .../entity/TableStatsHistoryRow.java | 64 ++++++++++++ .../optimizer/entity/TableStatsRow.java | 57 +++++++++++ .../optimizer/entity/package-info.java | 2 + .../src/main/resources/application.properties | 20 ++++ .../main/resources/db/optimizer-schema.sql | 53 ++++++++++ .../resources/application-test.properties | 12 +++ settings.gradle | 1 + 28 files changed, 894 insertions(+) create mode 100644 services/optimizer/build.gradle create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/OptimizerServiceApplication.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/mapper/OptimizerMapper.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/CompleteOperationRequest.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/JobResult.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationHistoryStatus.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationMetrics.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationStatus.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationType.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableOperationsDto.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableOperationsHistoryDto.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStats.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStatsDto.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStatsHistoryDto.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/UpsertTableOperationsRequest.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/UpsertTableStatsRequest.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/config/JobResultConverter.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/config/OperationMetricsConverter.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationsHistoryRow.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationsRow.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableStatsHistoryRow.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableStatsRow.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/package-info.java create mode 100644 services/optimizer/src/main/resources/application.properties create mode 100644 services/optimizer/src/main/resources/db/optimizer-schema.sql create mode 100644 services/optimizer/src/test/resources/application-test.properties diff --git a/build.gradle b/build.gradle index 4699ca592..4cfac4a5d 100644 --- a/build.gradle +++ b/build.gradle @@ -157,6 +157,7 @@ tasks.register('CopyGitHooksTask', Copy) { // tables-service.Dockerfile -> :services:tables:bootJar // housetables-service.Dockerfile -> :services:housetables:bootJar // jobs-service.Dockerfile -> :services:jobs:bootJar +// optimizer-service.Dockerfile -> :services:optimizer:bootJar // jobs-scheduler.Dockerfile -> :apps:openhouse-spark-apps_2.12:shadowJar (uber JAR) // spark-base-hadoop2.8.dockerfile -> // :integrations:spark:spark-3.1:openhouse-spark-runtime_2.12:shadowJar (uber JAR) @@ -176,6 +177,7 @@ tasks.register('dockerPrereqs') { dependsOn ':services:tables:bootJar' dependsOn ':services:housetables:bootJar' dependsOn ':services:jobs:bootJar' + dependsOn ':services:optimizer:bootJar' // Spark runtime uber JARs (shadowJar) dependsOn ':integrations:spark:spark-3.1:openhouse-spark-runtime_2.12:shadowJar' @@ -196,6 +198,7 @@ tasks.register('dockerPrereqs') { println ' build/tables/libs/tables.jar' println ' build/housetables/libs/housetables.jar' println ' build/jobs/libs/jobs.jar' + println ' build/optimizer/libs/optimizer.jar' println ' build/openhouse-spark-runtime_2.12/libs/openhouse-spark-runtime_2.12-uber.jar' println ' build/openhouse-spark-3.5-runtime_2.12/libs/openhouse-spark-3.5-runtime_2.12-uber.jar' println ' build/openhouse-spark-apps_2.12/libs/openhouse-spark-apps_2.12-uber.jar' diff --git a/services/optimizer/build.gradle b/services/optimizer/build.gradle new file mode 100644 index 000000000..c05c7f9c3 --- /dev/null +++ b/services/optimizer/build.gradle @@ -0,0 +1,17 @@ +plugins { + id 'openhouse.springboot-ext-conventions' + id 'org.springframework.boot' version '2.7.8' +} + +dependencies { + implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8' + implementation 'com.vladmihalcea:hibernate-types-55:2.21.1' + implementation 'org.springframework.boot:spring-boot-starter-web:2.7.8' + implementation 'mysql:mysql-connector-java:8.+' + testImplementation 'com.h2database:h2:2.2.224' + testImplementation 'org.springframework.boot:spring-boot-starter-test:2.7.8' +} + +test { + useJUnitPlatform() +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/OptimizerServiceApplication.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/OptimizerServiceApplication.java new file mode 100644 index 000000000..38eb363a8 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/OptimizerServiceApplication.java @@ -0,0 +1,13 @@ +package com.linkedin.openhouse.optimizer; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** Spring Boot entry point for the Optimizer Service. */ +@SpringBootApplication +public class OptimizerServiceApplication { + + public static void main(String[] args) { + SpringApplication.run(OptimizerServiceApplication.class, args); + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/mapper/OptimizerMapper.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/mapper/OptimizerMapper.java new file mode 100644 index 000000000..8c0b17462 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/mapper/OptimizerMapper.java @@ -0,0 +1,32 @@ +package com.linkedin.openhouse.optimizer.api.mapper; + +import com.linkedin.openhouse.optimizer.api.model.TableOperationsDto; +import com.linkedin.openhouse.optimizer.api.model.TableOperationsHistoryDto; +import com.linkedin.openhouse.optimizer.api.model.TableStatsDto; +import com.linkedin.openhouse.optimizer.api.model.TableStatsHistoryDto; +import com.linkedin.openhouse.optimizer.entity.TableOperationsHistoryRow; +import com.linkedin.openhouse.optimizer.entity.TableOperationsRow; +import com.linkedin.openhouse.optimizer.entity.TableStatsHistoryRow; +import com.linkedin.openhouse.optimizer.entity.TableStatsRow; +import org.mapstruct.Mapper; + +/** + * MapStruct mapper for converting between optimizer JPA entities and their corresponding DTOs. + * + *

Spring-instantiated at compile time. Inject via {@code @Autowired} or constructor injection. + */ +@Mapper(componentModel = "spring") +public interface OptimizerMapper { + + /** Map a {@link TableOperationsRow} to its DTO. */ + TableOperationsDto toDto(TableOperationsRow row); + + /** Map a {@link TableOperationsHistoryRow} to its DTO. */ + TableOperationsHistoryDto toDto(TableOperationsHistoryRow row); + + /** Map a {@link TableStatsRow} to its DTO. */ + TableStatsDto toDto(TableStatsRow row); + + /** Map a {@link TableStatsHistoryRow} to its DTO. */ + TableStatsHistoryDto toDto(TableStatsHistoryRow row); +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/CompleteOperationRequest.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/CompleteOperationRequest.java new file mode 100644 index 000000000..c26893197 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/CompleteOperationRequest.java @@ -0,0 +1,31 @@ +package com.linkedin.openhouse.optimizer.api.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Request body for {@code POST /v1/table-operations/{id}/complete}. + * + *

Reports the outcome of a completed operation. The backend looks up the operation row by {@code + * id} and writes a history entry with the operation's table metadata and the supplied result. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class CompleteOperationRequest { + + /** Outcome of the operation. */ + private OperationHistoryStatus status; + + /** Error details on failure; {@code null} on success. */ + private JobResult result; + + /** Number of orphan files deleted; set by OFD Spark app on success. */ + private Integer orphanFilesDeleted; + + /** Bytes reclaimed by orphan file deletion; set by OFD Spark app on success. */ + private Long orphanBytesDeleted; +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/JobResult.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/JobResult.java new file mode 100644 index 000000000..74942243c --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/JobResult.java @@ -0,0 +1,25 @@ +package com.linkedin.openhouse.optimizer.api.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Result payload for a completed Spark maintenance job. + * + *

Stored as JSON in the {@code result} column of {@code table_operations_history}. Both fields + * are {@code null} on success; populated on failure. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class JobResult { + + /** Human-readable error message; {@code null} if the job succeeded. */ + private String errorMessage; + + /** Error category (e.g., {@code OOM}, {@code TIMEOUT}); {@code null} if the job succeeded. */ + private String errorType; +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationHistoryStatus.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationHistoryStatus.java new file mode 100644 index 000000000..791d910a6 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationHistoryStatus.java @@ -0,0 +1,7 @@ +package com.linkedin.openhouse.optimizer.api.model; + +/** Terminal states for a completed Spark maintenance job. */ +public enum OperationHistoryStatus { + SUCCESS, + FAILED +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationMetrics.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationMetrics.java new file mode 100644 index 000000000..d6f788fcc --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationMetrics.java @@ -0,0 +1,24 @@ +package com.linkedin.openhouse.optimizer.api.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Denormalized stats snapshot captured by the Analyzer at analysis time. + * + *

Stored as JSON in the {@code metrics} column of {@code table_operations}. These values are + * point-in-time snapshots — they record what the Analyzer saw when it recommended the operation, + * not cumulative totals. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class OperationMetrics { + + private Long tableSizeBytes; + private Integer numFilesAdded; + private Integer numFilesDeleted; +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationStatus.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationStatus.java new file mode 100644 index 000000000..c97be441b --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationStatus.java @@ -0,0 +1,21 @@ +package com.linkedin.openhouse.optimizer.api.model; + +/** Lifecycle states for a table operation recommendation. */ +public enum OperationStatus { + + /** Recommended by the Analyzer but not yet claimed by the Scheduler. */ + PENDING, + + /** Claimed by the Scheduler; waiting for the Jobs Service to return a job ID. */ + SCHEDULING, + + /** Job submitted to the Jobs Service; the row now carries a {@code jobId}. */ + SCHEDULED, + + /** + * Marked by the Scheduler when it detects duplicate PENDING rows for the same {@code (table_uuid, + * operation_type)}. Only the most-recent PENDING row is claimed; older duplicates are CANCELED + * before the claim step. + */ + CANCELED +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationType.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationType.java new file mode 100644 index 000000000..05e4a1e7b --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationType.java @@ -0,0 +1,12 @@ +package com.linkedin.openhouse.optimizer.api.model; + +/** + * Maintenance operation types supported by the continuous optimizer. + * + *

Only {@code ORPHAN_FILES_DELETION} is currently implemented. Additional types will be added as + * they are built out. + */ +public enum OperationType { + /** Removes orphaned data files no longer referenced by table metadata. */ + ORPHAN_FILES_DELETION +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableOperationsDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableOperationsDto.java new file mode 100644 index 000000000..5eb5eaaa6 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableOperationsDto.java @@ -0,0 +1,40 @@ +package com.linkedin.openhouse.optimizer.api.model; + +import java.time.Instant; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** DTO for {@code table_operations} — Analyzer recommendations read by the Scheduler. */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TableOperationsDto { + + /** Client-generated UUID identifying this specific operation recommendation. */ + private String id; + + /** Stable table identity from the Tables Service. */ + private String tableUuid; + + private String databaseName; + private String tableName; + private OperationType operationType; + + /** {@code PENDING} or {@code SCHEDULED}. Defaults to {@code PENDING} on creation. */ + private OperationStatus status; + + /** Server-set when the row is first created by the Analyzer. */ + private Instant createdAt; + + /** Set by the Scheduler when claiming; {@code null} while PENDING. */ + private Instant scheduledAt; + + /** Job ID returned by the Jobs Service after successful submission. */ + private String jobId; + + /** Denormalized stats snapshot captured at analysis time. */ + private OperationMetrics metrics; +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableOperationsHistoryDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableOperationsHistoryDto.java new file mode 100644 index 000000000..7dca34271 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableOperationsHistoryDto.java @@ -0,0 +1,43 @@ +package com.linkedin.openhouse.optimizer.api.model; + +import java.time.Instant; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** DTO for {@code table_operations_history} — append-only Spark job results. */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TableOperationsHistoryDto { + + /** Same UUID as the originating {@code table_operations.id}; supplied by the caller. */ + private String id; + + /** Stable table identity from the Tables Service. */ + private String tableUuid; + + private String databaseName; + private String tableName; + private OperationType operationType; + + /** When the Spark job was submitted / ran. */ + private Instant submittedAt; + + /** {@code SUCCESS} or {@code FAILED}. */ + private OperationHistoryStatus status; + + /** Spark job ID. */ + private String jobId; + + /** Job result payload; both fields null on success. */ + private JobResult result; + + /** Number of orphan files deleted; null for non-OFD operations or before completion. */ + private Integer orphanFilesDeleted; + + /** Bytes reclaimed by orphan file deletion; null for non-OFD operations. */ + private Long orphanBytesDeleted; +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStats.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStats.java new file mode 100644 index 000000000..cb77d994f --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStats.java @@ -0,0 +1,48 @@ +package com.linkedin.openhouse.optimizer.api.model; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** Combined stats payload stored as a single JSON blob per table. */ +@Data +@Builder(toBuilder = true) +@NoArgsConstructor +@AllArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) +public class TableStats { + + /** Snapshot fields — overwritten on every upsert. */ + private SnapshotMetrics snapshot; + + /** Delta fields — accumulated across commit events. */ + private CommitDelta delta; + + /** Point-in-time metadata read from Iceberg at scan time. */ + @Data + @Builder(toBuilder = true) + @NoArgsConstructor + @AllArgsConstructor + @JsonIgnoreProperties(ignoreUnknown = true) + public static class SnapshotMetrics { + private String clusterId; + private String tableVersion; + private String tableLocation; + private Long tableSizeBytes; + /** Total number of data files as of the latest snapshot — used for bin-packing. */ + private Long numCurrentFiles; + } + + /** Per-commit incremental counters; accumulated across all recorded commit events. */ + @Data + @Builder(toBuilder = true) + @NoArgsConstructor + @AllArgsConstructor + public static class CommitDelta { + private Long numFilesAdded; + private Long numFilesDeleted; + private Long deletedSizeBytes; + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStatsDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStatsDto.java new file mode 100644 index 000000000..1663d5ab0 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStatsDto.java @@ -0,0 +1,23 @@ +package com.linkedin.openhouse.optimizer.api.model; + +import java.time.Instant; +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** DTO for {@code table_stats} — used for response payloads. */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TableStatsDto { + + private String tableUuid; + private String databaseId; + private String tableName; + private TableStats stats; + private Map tableProperties; + private Instant updatedAt; +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStatsHistoryDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStatsHistoryDto.java new file mode 100644 index 000000000..142f00245 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStatsHistoryDto.java @@ -0,0 +1,22 @@ +package com.linkedin.openhouse.optimizer.api.model; + +import java.time.Instant; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** DTO for {@code table_stats_history} — used for response payloads. */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TableStatsHistoryDto { + + private Long id; + private String tableUuid; + private String databaseId; + private String tableName; + private TableStats stats; + private Instant recordedAt; +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/UpsertTableOperationsRequest.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/UpsertTableOperationsRequest.java new file mode 100644 index 000000000..19dd1baac --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/UpsertTableOperationsRequest.java @@ -0,0 +1,26 @@ +package com.linkedin.openhouse.optimizer.api.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * PUT request body for {@code /v1/table-operations/{id}}. + * + *

The Analyzer supplies the operation {@code id} (client-generated UUID) in the path and all + * table-identifying fields in this body. The service upserts by {@code id}: creates on first call, + * updates {@code metrics} on subsequent calls with the same {@code id}. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class UpsertTableOperationsRequest { + + private String tableUuid; + private String databaseName; + private String tableName; + private OperationType operationType; + private OperationMetrics metrics; +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/UpsertTableStatsRequest.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/UpsertTableStatsRequest.java new file mode 100644 index 000000000..3214a85a6 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/UpsertTableStatsRequest.java @@ -0,0 +1,25 @@ +package com.linkedin.openhouse.optimizer.api.model; + +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Request body for {@code PUT /v1/table-stats/{tableUuid}}. + * + *

{@code tableUuid} comes from the path variable. {@code databaseId} and {@code tableName} are + * denormalized display columns carried in the body. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class UpsertTableStatsRequest { + + private String databaseId; + private String tableName; + private TableStats stats; + private Map tableProperties; +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/config/JobResultConverter.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/config/JobResultConverter.java new file mode 100644 index 000000000..4c9bfbe76 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/config/JobResultConverter.java @@ -0,0 +1,39 @@ +package com.linkedin.openhouse.optimizer.config; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.linkedin.openhouse.optimizer.api.model.JobResult; +import java.io.IOException; +import javax.persistence.AttributeConverter; +import javax.persistence.Converter; + +/** JPA {@link AttributeConverter} that serializes {@link JobResult} to/from a JSON string. */ +@Converter +public class JobResultConverter implements AttributeConverter { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Override + public String convertToDatabaseColumn(JobResult attribute) { + if (attribute == null) { + return null; + } + try { + return OBJECT_MAPPER.writeValueAsString(attribute); + } catch (JsonProcessingException e) { + throw new IllegalStateException("Failed to serialize JobResult to JSON", e); + } + } + + @Override + public JobResult convertToEntityAttribute(String dbData) { + if (dbData == null) { + return null; + } + try { + return OBJECT_MAPPER.readValue(dbData, JobResult.class); + } catch (IOException e) { + throw new IllegalStateException("Failed to deserialize JobResult from JSON: " + dbData, e); + } + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/config/OperationMetricsConverter.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/config/OperationMetricsConverter.java new file mode 100644 index 000000000..27f0882f5 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/config/OperationMetricsConverter.java @@ -0,0 +1,44 @@ +package com.linkedin.openhouse.optimizer.config; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.linkedin.openhouse.optimizer.api.model.OperationMetrics; +import java.io.IOException; +import javax.persistence.AttributeConverter; +import javax.persistence.Converter; + +/** + * JPA {@link AttributeConverter} that serializes {@link OperationMetrics} to/from a JSON string. + */ +@Converter +public class OperationMetricsConverter implements AttributeConverter { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Override + public String convertToDatabaseColumn(OperationMetrics attribute) { + // Null metrics are valid for PENDING operations that have not yet produced output. + if (attribute == null) { + return null; + } + try { + return OBJECT_MAPPER.writeValueAsString(attribute); + } catch (JsonProcessingException e) { + throw new IllegalStateException("Failed to serialize OperationMetrics to JSON", e); + } + } + + @Override + public OperationMetrics convertToEntityAttribute(String dbData) { + // Null is stored for PENDING rows; return null so the entity reflects that state. + if (dbData == null) { + return null; + } + try { + return OBJECT_MAPPER.readValue(dbData, OperationMetrics.class); + } catch (IOException e) { + throw new IllegalStateException( + "Failed to deserialize OperationMetrics from JSON: " + dbData, e); + } + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationsHistoryRow.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationsHistoryRow.java new file mode 100644 index 000000000..6a47b5022 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationsHistoryRow.java @@ -0,0 +1,91 @@ +package com.linkedin.openhouse.optimizer.entity; + +import com.linkedin.openhouse.optimizer.api.model.JobResult; +import com.linkedin.openhouse.optimizer.api.model.OperationHistoryStatus; +import com.linkedin.openhouse.optimizer.api.model.OperationType; +import com.linkedin.openhouse.optimizer.config.JobResultConverter; +import java.time.Instant; +import javax.persistence.Column; +import javax.persistence.Convert; +import javax.persistence.Entity; +import javax.persistence.EnumType; +import javax.persistence.Enumerated; +import javax.persistence.Id; +import javax.persistence.Index; +import javax.persistence.Table; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; + +/** + * Append-only record of a completed Spark maintenance job. + * + *

Written by the Spark app after each table's operation finishes. The {@code id} is the same + * UUID as the originating {@code table_operations.id}, tying each history entry directly back to + * the specific operation cycle that produced it. Multiple runs of the same operation on the same + * table produce multiple rows (each cycle gets a new UUID from the Analyzer). + */ +@Entity +@Table( + name = "table_operations_history", + indexes = { + @Index(name = "idx_table_uuid_hist", columnList = "table_uuid"), + @Index(name = "idx_op_type_hist", columnList = "operation_type"), + @Index(name = "idx_submitted_at", columnList = "submitted_at"), + @Index(name = "idx_status_hist", columnList = "status"), + @Index(name = "idx_job_id", columnList = "job_id") + }) +@Getter +@EqualsAndHashCode +@Builder(toBuilder = true) +@NoArgsConstructor(access = AccessLevel.PROTECTED) +@AllArgsConstructor(access = AccessLevel.PROTECTED) +public class TableOperationsHistoryRow { + + /** Same UUID as the originating {@code table_operations.id}. Set by the caller; not generated. */ + @Id + @Column(name = "id", nullable = false, length = 36) + private String id; + + @Column(name = "table_uuid", nullable = false, length = 36) + private String tableUuid; + + @Column(name = "database_name", nullable = false, length = 255) + private String databaseName; + + @Column(name = "table_name", nullable = false, length = 255) + private String tableName; + + @Enumerated(EnumType.STRING) + @Column(name = "operation_type", nullable = false, length = 50) + private OperationType operationType; + + /** When the Spark job was submitted / ran, as reported by the job itself. */ + @Column(name = "submitted_at", nullable = false) + private Instant submittedAt; + + /** {@code SUCCESS} or {@code FAILED}. */ + @Enumerated(EnumType.STRING) + @Column(name = "status", nullable = false, length = 20) + private OperationHistoryStatus status; + + /** Spark job ID; indexed for job → result lookups. */ + @Column(name = "job_id", length = 255) + private String jobId; + + /** Job result: error details on failure, both fields null on success. */ + @Convert(converter = JobResultConverter.class) + @Column(name = "result") + private JobResult result; + + /** Number of orphan files deleted by the Spark job; null for non-OFD operations. */ + @Column(name = "orphan_files_deleted") + private Integer orphanFilesDeleted; + + /** Bytes reclaimed by orphan file deletion; null for non-OFD operations. */ + @Column(name = "orphan_bytes_deleted") + private Long orphanBytesDeleted; +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationsRow.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationsRow.java new file mode 100644 index 000000000..9d835aa20 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationsRow.java @@ -0,0 +1,99 @@ +package com.linkedin.openhouse.optimizer.entity; + +import com.linkedin.openhouse.optimizer.api.model.OperationMetrics; +import com.linkedin.openhouse.optimizer.api.model.OperationStatus; +import com.linkedin.openhouse.optimizer.api.model.OperationType; +import com.linkedin.openhouse.optimizer.config.OperationMetricsConverter; +import java.time.Instant; +import javax.persistence.Column; +import javax.persistence.Convert; +import javax.persistence.Entity; +import javax.persistence.EnumType; +import javax.persistence.Enumerated; +import javax.persistence.Id; +import javax.persistence.Index; +import javax.persistence.Table; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; + +/** + * JPA entity representing an Analyzer recommendation for a table maintenance operation. + * + *

Each row is identified by a client-generated UUID ({@code id}). The Analyzer creates a new row + * when it first recommends an operation for a table, or when re-recommending after a prior terminal + * state (SUCCESS/FAILED). Old terminal rows accumulate — they serve as implicit history. {@code + * table_uuid} is the stable identity for the table (survives renames; rotates on drop+recreate). + * The application enforces one active (PENDING or SCHEDULED) row per {@code (table_uuid, + * operation_type)} at a time. + */ +@Entity +@Table( + name = "table_operations", + indexes = { + @Index(name = "idx_table_uuid", columnList = "table_uuid"), + @Index(name = "idx_op_type", columnList = "operation_type"), + @Index(name = "idx_status", columnList = "status"), + @Index(name = "idx_created_at", columnList = "created_at"), + @Index(name = "idx_scheduled_at", columnList = "scheduled_at") + }) +@Getter +@EqualsAndHashCode +@Builder(toBuilder = true) +@NoArgsConstructor(access = AccessLevel.PROTECTED) +@AllArgsConstructor(access = AccessLevel.PROTECTED) +public class TableOperationsRow { + + /** Client-generated UUID identifying this specific operation recommendation. */ + @Id + @Column(name = "id", nullable = false, length = 36) + private String id; + + /** Stable table identity from the Tables Service. Survives renames; rotates on drop+recreate. */ + @Column(name = "table_uuid", nullable = false, length = 36) + private String tableUuid; + + @Column(name = "database_name", nullable = false, length = 255) + private String databaseName; + + @Column(name = "table_name", nullable = false, length = 255) + private String tableName; + + @Enumerated(EnumType.STRING) + @Column(name = "operation_type", nullable = false, length = 50) + private OperationType operationType; + + @Enumerated(EnumType.STRING) + @Column(name = "status", nullable = false, length = 20) + private OperationStatus status; + + /** When the Analyzer first created this row. Set by the service on insert; never updated. */ + @Column(name = "created_at", nullable = false) + private Instant createdAt; + + /** Set when the operation is claimed; {@code null} while {@code PENDING}. */ + @Column(name = "scheduled_at") + private Instant scheduledAt; + + /** Job ID returned by the Jobs Service after successful submission. */ + @Column(name = "job_id", length = 255) + private String jobId; + + /** + * Manual optimistic lock for the Scheduler claim. Incremented by the raw {@code claimOperation} + * UPDATE query; must NOT use JPA {@code @Version} since the claim bypasses JPA entity management. + */ + @Column(name = "version") + private Long version; + + /** + * Denormalized stats snapshot captured at analysis time: table size, snapshot count, and file + * counts as of the moment the Analyzer ran. + */ + @Convert(converter = OperationMetricsConverter.class) + @Column(name = "metrics") + private OperationMetrics metrics; +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableStatsHistoryRow.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableStatsHistoryRow.java new file mode 100644 index 000000000..85d97a5eb --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableStatsHistoryRow.java @@ -0,0 +1,64 @@ +package com.linkedin.openhouse.optimizer.entity; + +import com.linkedin.openhouse.optimizer.api.model.TableStats; +import com.vladmihalcea.hibernate.type.json.JsonStringType; +import java.time.Instant; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.Index; +import javax.persistence.Table; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import org.hibernate.annotations.Type; +import org.hibernate.annotations.TypeDef; + +/** + * Append-only record of per-commit stats reported by the Tables Service. + * + *

Each Iceberg commit produces one row. The {@code stats} JSON contains both the snapshot + * metrics (point-in-time) and the commit delta (files added/deleted in this commit). Consumers can + * query this table to reconstruct change rates over arbitrary time windows. + */ +@TypeDef(name = "json", typeClass = JsonStringType.class) +@Entity +@Table( + name = "table_stats_history", + indexes = { + @Index(name = "idx_tsh_table_uuid", columnList = "table_uuid"), + @Index(name = "idx_tsh_recorded_at", columnList = "recorded_at") + }) +@Getter +@EqualsAndHashCode +@Builder(toBuilder = true) +@NoArgsConstructor(access = AccessLevel.PROTECTED) +@AllArgsConstructor(access = AccessLevel.PROTECTED) +public class TableStatsHistoryRow { + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + @Column(name = "id", nullable = false) + private Long id; + + @Column(name = "table_uuid", nullable = false, length = 36) + private String tableUuid; + + @Column(name = "database_id", nullable = false, length = 255) + private String databaseId; + + @Column(name = "table_name", nullable = false, length = 255) + private String tableName; + + @Type(type = "json") + @Column(name = "stats", columnDefinition = "TEXT") + private TableStats stats; + + @Column(name = "recorded_at", nullable = false) + private Instant recordedAt; +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableStatsRow.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableStatsRow.java new file mode 100644 index 000000000..71d6a9421 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableStatsRow.java @@ -0,0 +1,57 @@ +package com.linkedin.openhouse.optimizer.entity; + +import com.linkedin.openhouse.optimizer.api.model.TableStats; +import com.vladmihalcea.hibernate.type.json.JsonStringType; +import java.time.Instant; +import java.util.Map; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.Table; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import org.hibernate.annotations.Type; +import org.hibernate.annotations.TypeDef; + +/** + * JPA entity representing a per-table stats snapshot in the optimizer DB. + * + *

Written by the Tables Service on every Iceberg commit. Read by the Analyzer directly via JPA + * to enumerate tables and check scheduling eligibility. + */ +@TypeDef(name = "json", typeClass = JsonStringType.class) +@Entity +@Table(name = "table_stats") +@Getter +@EqualsAndHashCode +@Builder(toBuilder = true) +@NoArgsConstructor(access = AccessLevel.PROTECTED) +@AllArgsConstructor(access = AccessLevel.PROTECTED) +public class TableStatsRow { + + @Id + @Column(name = "table_uuid", nullable = false, length = 36) + private String tableUuid; + + @Column(name = "database_id", nullable = false, length = 255) + private String databaseId; + + @Column(name = "table_name", nullable = false, length = 255) + private String tableName; + + @Type(type = "json") + @Column(name = "stats", columnDefinition = "TEXT") + private TableStats stats; + + @Type(type = "json") + @Column(name = "table_properties", columnDefinition = "TEXT") + private Map tableProperties; + + /** Set on every upsert. Used for stats pipeline staleness monitoring. */ + @Column(name = "updated_at", nullable = false) + private Instant updatedAt; +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/package-info.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/package-info.java new file mode 100644 index 000000000..7c0ca1f67 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/package-info.java @@ -0,0 +1,2 @@ +/** JPA entities for the optimizer service. */ +package com.linkedin.openhouse.optimizer.entity; diff --git a/services/optimizer/src/main/resources/application.properties b/services/optimizer/src/main/resources/application.properties new file mode 100644 index 000000000..c6c3f8437 --- /dev/null +++ b/services/optimizer/src/main/resources/application.properties @@ -0,0 +1,20 @@ +spring.application.name=openhouse-optimizer-service +server.port=8080 + +spring.jpa.hibernate.ddl-auto=none +spring.sql.init.mode=always +spring.jpa.defer-datasource-initialization=true +spring.sql.init.schema-locations=classpath:db/optimizer-schema.sql + +spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL8Dialect +spring.jpa.properties.hibernate.show_sql=false +spring.jpa.properties.hibernate.physical_naming_strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl + +spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver +spring.datasource.url=${OPTIMIZER_DB_URL:jdbc:mysql://localhost:3306/oh_db} +spring.datasource.username=${OPTIMIZER_DB_USERNAME:oh_user} +spring.datasource.password=${OPTIMIZER_DB_PASSWORD:oh_password} +spring.datasource.hikari.maximum-pool-size=20 + +management.endpoints.web.exposure.include=health,prometheus +management.endpoint.health.enabled=true diff --git a/services/optimizer/src/main/resources/db/optimizer-schema.sql b/services/optimizer/src/main/resources/db/optimizer-schema.sql new file mode 100644 index 000000000..53062c5ad --- /dev/null +++ b/services/optimizer/src/main/resources/db/optimizer-schema.sql @@ -0,0 +1,53 @@ +-- Optimizer Service Schema +-- Compatible with MySQL (production) and H2 in MySQL mode (tests). +CREATE TABLE IF NOT EXISTS table_operations ( + id VARCHAR(36) NOT NULL, + table_uuid VARCHAR(36) NOT NULL, + database_name VARCHAR(255) NOT NULL, + table_name VARCHAR(255) NOT NULL, + operation_type VARCHAR(50) NOT NULL, + status VARCHAR(20) NOT NULL, + created_at TIMESTAMP(6) NOT NULL, + scheduled_at TIMESTAMP(6), + job_id VARCHAR(255), + version BIGINT, + metrics TEXT, + PRIMARY KEY (id) +); + +CREATE TABLE IF NOT EXISTS table_stats ( + table_uuid VARCHAR(36) NOT NULL, + database_id VARCHAR(255) NOT NULL, + table_name VARCHAR(255) NOT NULL, + stats TEXT, + table_properties TEXT, + updated_at TIMESTAMP(6) NOT NULL, + PRIMARY KEY (table_uuid) +); + +CREATE TABLE IF NOT EXISTS table_stats_history ( + id BIGINT NOT NULL AUTO_INCREMENT, + table_uuid VARCHAR(36) NOT NULL, + database_id VARCHAR(255) NOT NULL, + table_name VARCHAR(255) NOT NULL, + stats TEXT, + recorded_at TIMESTAMP(6) NOT NULL, + PRIMARY KEY (id), + INDEX idx_tsh_table_uuid (table_uuid), + INDEX idx_tsh_recorded_at (recorded_at) +); + +CREATE TABLE IF NOT EXISTS table_operations_history ( + id VARCHAR(36) NOT NULL, + table_uuid VARCHAR(36) NOT NULL, + database_name VARCHAR(255) NOT NULL, + table_name VARCHAR(255) NOT NULL, + operation_type VARCHAR(50) NOT NULL, + submitted_at TIMESTAMP(6) NOT NULL, + status VARCHAR(20) NOT NULL, + job_id VARCHAR(255), + result TEXT, + orphan_files_deleted INT, + orphan_bytes_deleted BIGINT, + PRIMARY KEY (id) +); diff --git a/services/optimizer/src/test/resources/application-test.properties b/services/optimizer/src/test/resources/application-test.properties new file mode 100644 index 000000000..97b7841dc --- /dev/null +++ b/services/optimizer/src/test/resources/application-test.properties @@ -0,0 +1,12 @@ +spring.datasource.url=jdbc:h2:mem:optimizer_test;MODE=MySQL;DATABASE_TO_LOWER=TRUE;DB_CLOSE_DELAY=-1 +spring.datasource.driver-class-name=org.h2.Driver +spring.datasource.username=sa +spring.datasource.password= + +spring.jpa.hibernate.ddl-auto=none +spring.sql.init.mode=always +spring.jpa.defer-datasource-initialization=true +spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.H2Dialect +spring.jpa.properties.hibernate.physical_naming_strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl + +spring.sql.init.schema-locations=classpath:db/optimizer-schema.sql diff --git a/settings.gradle b/settings.gradle index 035e54349..cad06785e 100644 --- a/settings.gradle +++ b/settings.gradle @@ -49,6 +49,7 @@ include ':libs:datalayout' include ':services:common' include ':services:housetables' include ':services:jobs' +include ':services:optimizer' include ':services:tables' include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.2' include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.5' From 3c93d52f21ce82cc01ae37fef8ca5c1dba2522e1 Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Fri, 3 Apr 2026 11:35:45 -0700 Subject: [PATCH 2/3] fix: address PR review feedback on optimizer data model - Remove OperationMetrics class and converter; stats are read directly from table_stats instead of duplicating into operations - Remove orphanFilesDeleted/orphanBytesDeleted from history entity, DTO, and schema; operation-specific data belongs in the result JSON - Add addedSizeBytes to CommitDelta for tracking write volume - Fix OperationType javadoc to describe current state, not roadmap - Fix TableOperationsHistoryRow javadoc: written on operation complete, not by Spark app directly - Add field comments to all DTOs and request objects Co-Authored-By: Claude Opus 4.6 --- .../optimizer/api/model/OperationMetrics.java | 24 ---------- .../optimizer/api/model/OperationType.java | 7 +-- .../api/model/TableOperationsDto.java | 9 +++- .../api/model/TableOperationsHistoryDto.java | 12 ++--- .../optimizer/api/model/TableStats.java | 1 + .../optimizer/api/model/TableStatsDto.java | 11 +++++ .../api/model/TableStatsHistoryDto.java | 11 +++++ .../model/UpsertTableOperationsRequest.java | 11 +++-- .../api/model/UpsertTableStatsRequest.java | 7 +++ .../config/OperationMetricsConverter.java | 44 ------------------- .../entity/TableOperationsHistoryRow.java | 20 +++------ .../optimizer/entity/TableOperationsRow.java | 10 ++--- .../main/resources/db/optimizer-schema.sql | 2 - 13 files changed, 58 insertions(+), 111 deletions(-) delete mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationMetrics.java delete mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/config/OperationMetricsConverter.java diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationMetrics.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationMetrics.java deleted file mode 100644 index d6f788fcc..000000000 --- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationMetrics.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.linkedin.openhouse.optimizer.api.model; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -/** - * Denormalized stats snapshot captured by the Analyzer at analysis time. - * - *

Stored as JSON in the {@code metrics} column of {@code table_operations}. These values are - * point-in-time snapshots — they record what the Analyzer saw when it recommended the operation, - * not cumulative totals. - */ -@Data -@Builder -@NoArgsConstructor -@AllArgsConstructor -public class OperationMetrics { - - private Long tableSizeBytes; - private Integer numFilesAdded; - private Integer numFilesDeleted; -} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationType.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationType.java index 05e4a1e7b..8507bae12 100644 --- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationType.java +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationType.java @@ -1,11 +1,6 @@ package com.linkedin.openhouse.optimizer.api.model; -/** - * Maintenance operation types supported by the continuous optimizer. - * - *

Only {@code ORPHAN_FILES_DELETION} is currently implemented. Additional types will be added as - * they are built out. - */ +/** Maintenance operation types supported by the continuous optimizer. */ public enum OperationType { /** Removes orphaned data files no longer referenced by table metadata. */ ORPHAN_FILES_DELETION diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableOperationsDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableOperationsDto.java index 5eb5eaaa6..9c33d8907 100644 --- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableOperationsDto.java +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableOperationsDto.java @@ -19,8 +19,13 @@ public class TableOperationsDto { /** Stable table identity from the Tables Service. */ private String tableUuid; + /** Denormalized database name for display; not part of the primary key. */ private String databaseName; + + /** Denormalized table name for display; not part of the primary key. */ private String tableName; + + /** The type of maintenance operation (e.g. ORPHAN_FILES_DELETION). */ private OperationType operationType; /** {@code PENDING} or {@code SCHEDULED}. Defaults to {@code PENDING} on creation. */ @@ -35,6 +40,6 @@ public class TableOperationsDto { /** Job ID returned by the Jobs Service after successful submission. */ private String jobId; - /** Denormalized stats snapshot captured at analysis time. */ - private OperationMetrics metrics; + /** Reserved for future per-operation metadata; currently unused. */ + private String metrics; } diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableOperationsHistoryDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableOperationsHistoryDto.java index 7dca34271..efc9bebbb 100644 --- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableOperationsHistoryDto.java +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableOperationsHistoryDto.java @@ -6,7 +6,7 @@ import lombok.Data; import lombok.NoArgsConstructor; -/** DTO for {@code table_operations_history} — append-only Spark job results. */ +/** DTO for {@code table_operations_history} — append-only operation results. */ @Data @Builder @NoArgsConstructor @@ -23,21 +23,15 @@ public class TableOperationsHistoryDto { private String tableName; private OperationType operationType; - /** When the Spark job was submitted / ran. */ + /** When the operation completed, as recorded by the complete endpoint. */ private Instant submittedAt; /** {@code SUCCESS} or {@code FAILED}. */ private OperationHistoryStatus status; - /** Spark job ID. */ + /** Job ID from the Jobs Service. */ private String jobId; /** Job result payload; both fields null on success. */ private JobResult result; - - /** Number of orphan files deleted; null for non-OFD operations or before completion. */ - private Integer orphanFilesDeleted; - - /** Bytes reclaimed by orphan file deletion; null for non-OFD operations. */ - private Long orphanBytesDeleted; } diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStats.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStats.java index cb77d994f..51aa8a712 100644 --- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStats.java +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStats.java @@ -43,6 +43,7 @@ public static class SnapshotMetrics { public static class CommitDelta { private Long numFilesAdded; private Long numFilesDeleted; + private Long addedSizeBytes; private Long deletedSizeBytes; } } diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStatsDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStatsDto.java index 1663d5ab0..a668af434 100644 --- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStatsDto.java +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStatsDto.java @@ -14,10 +14,21 @@ @AllArgsConstructor public class TableStatsDto { + /** Stable Iceberg table UUID. Primary key of the stats row. */ private String tableUuid; + + /** Denormalized database name for display. */ private String databaseId; + + /** Denormalized table name for display. */ private String tableName; + + /** Combined snapshot + delta stats payload, stored as JSON. */ private TableStats stats; + + /** Current table properties snapshot (e.g. maintenance opt-in flags). */ private Map tableProperties; + + /** When this row was last written. Used for staleness monitoring. */ private Instant updatedAt; } diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStatsHistoryDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStatsHistoryDto.java index 142f00245..0604e07de 100644 --- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStatsHistoryDto.java +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStatsHistoryDto.java @@ -13,10 +13,21 @@ @AllArgsConstructor public class TableStatsHistoryDto { + /** Auto-increment primary key. */ private Long id; + + /** Stable Iceberg table UUID. */ private String tableUuid; + + /** Denormalized database name for display. */ private String databaseId; + + /** Denormalized table name for display. */ private String tableName; + + /** Snapshot + delta stats from this commit event. */ private TableStats stats; + + /** When this history row was recorded. */ private Instant recordedAt; } diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/UpsertTableOperationsRequest.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/UpsertTableOperationsRequest.java index 19dd1baac..21174c337 100644 --- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/UpsertTableOperationsRequest.java +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/UpsertTableOperationsRequest.java @@ -9,8 +9,7 @@ * PUT request body for {@code /v1/table-operations/{id}}. * *

The Analyzer supplies the operation {@code id} (client-generated UUID) in the path and all - * table-identifying fields in this body. The service upserts by {@code id}: creates on first call, - * updates {@code metrics} on subsequent calls with the same {@code id}. + * table-identifying fields in this body. The service creates the row on first call. */ @Data @Builder @@ -18,9 +17,15 @@ @AllArgsConstructor public class UpsertTableOperationsRequest { + /** Stable Iceberg table UUID identifying the target table. */ private String tableUuid; + + /** Denormalized database name for display. */ private String databaseName; + + /** Denormalized table name for display. */ private String tableName; + + /** The type of maintenance operation to create. */ private OperationType operationType; - private OperationMetrics metrics; } diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/UpsertTableStatsRequest.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/UpsertTableStatsRequest.java index 3214a85a6..721c3deaf 100644 --- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/UpsertTableStatsRequest.java +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/UpsertTableStatsRequest.java @@ -18,8 +18,15 @@ @AllArgsConstructor public class UpsertTableStatsRequest { + /** Denormalized database name for display. */ private String databaseId; + + /** Denormalized table name for display. */ private String tableName; + + /** Combined snapshot + delta stats payload from this commit. */ private TableStats stats; + + /** Current table properties snapshot (e.g. maintenance opt-in flags). */ private Map tableProperties; } diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/config/OperationMetricsConverter.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/config/OperationMetricsConverter.java deleted file mode 100644 index 27f0882f5..000000000 --- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/config/OperationMetricsConverter.java +++ /dev/null @@ -1,44 +0,0 @@ -package com.linkedin.openhouse.optimizer.config; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.linkedin.openhouse.optimizer.api.model.OperationMetrics; -import java.io.IOException; -import javax.persistence.AttributeConverter; -import javax.persistence.Converter; - -/** - * JPA {@link AttributeConverter} that serializes {@link OperationMetrics} to/from a JSON string. - */ -@Converter -public class OperationMetricsConverter implements AttributeConverter { - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - @Override - public String convertToDatabaseColumn(OperationMetrics attribute) { - // Null metrics are valid for PENDING operations that have not yet produced output. - if (attribute == null) { - return null; - } - try { - return OBJECT_MAPPER.writeValueAsString(attribute); - } catch (JsonProcessingException e) { - throw new IllegalStateException("Failed to serialize OperationMetrics to JSON", e); - } - } - - @Override - public OperationMetrics convertToEntityAttribute(String dbData) { - // Null is stored for PENDING rows; return null so the entity reflects that state. - if (dbData == null) { - return null; - } - try { - return OBJECT_MAPPER.readValue(dbData, OperationMetrics.class); - } catch (IOException e) { - throw new IllegalStateException( - "Failed to deserialize OperationMetrics from JSON: " + dbData, e); - } - } -} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationsHistoryRow.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationsHistoryRow.java index 6a47b5022..e7493024c 100644 --- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationsHistoryRow.java +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationsHistoryRow.java @@ -21,12 +21,12 @@ import lombok.NoArgsConstructor; /** - * Append-only record of a completed Spark maintenance job. + * Append-only record of a completed maintenance operation. * - *

Written by the Spark app after each table's operation finishes. The {@code id} is the same - * UUID as the originating {@code table_operations.id}, tying each history entry directly back to - * the specific operation cycle that produced it. Multiple runs of the same operation on the same - * table produce multiple rows (each cycle gets a new UUID from the Analyzer). + *

Written when the operation-complete endpoint is called. The {@code id} is the same UUID as the + * originating {@code table_operations.id}, tying each history entry back to the operation cycle + * that produced it. Multiple runs of the same operation on the same table produce multiple rows + * (each cycle gets a new UUID from the Analyzer). */ @Entity @Table( @@ -63,7 +63,7 @@ public class TableOperationsHistoryRow { @Column(name = "operation_type", nullable = false, length = 50) private OperationType operationType; - /** When the Spark job was submitted / ran, as reported by the job itself. */ + /** When the operation completed, as recorded by the complete endpoint. */ @Column(name = "submitted_at", nullable = false) private Instant submittedAt; @@ -80,12 +80,4 @@ public class TableOperationsHistoryRow { @Convert(converter = JobResultConverter.class) @Column(name = "result") private JobResult result; - - /** Number of orphan files deleted by the Spark job; null for non-OFD operations. */ - @Column(name = "orphan_files_deleted") - private Integer orphanFilesDeleted; - - /** Bytes reclaimed by orphan file deletion; null for non-OFD operations. */ - @Column(name = "orphan_bytes_deleted") - private Long orphanBytesDeleted; } diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationsRow.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationsRow.java index 9d835aa20..e5493b510 100644 --- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationsRow.java +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationsRow.java @@ -1,12 +1,9 @@ package com.linkedin.openhouse.optimizer.entity; -import com.linkedin.openhouse.optimizer.api.model.OperationMetrics; import com.linkedin.openhouse.optimizer.api.model.OperationStatus; import com.linkedin.openhouse.optimizer.api.model.OperationType; -import com.linkedin.openhouse.optimizer.config.OperationMetricsConverter; import java.time.Instant; import javax.persistence.Column; -import javax.persistence.Convert; import javax.persistence.Entity; import javax.persistence.EnumType; import javax.persistence.Enumerated; @@ -90,10 +87,9 @@ public class TableOperationsRow { private Long version; /** - * Denormalized stats snapshot captured at analysis time: table size, snapshot count, and file - * counts as of the moment the Analyzer ran. + * Reserved for future per-operation metadata. Stored as JSON text; currently unused. The Analyzer + * reads stats directly from {@code table_stats} instead of duplicating them here. */ - @Convert(converter = OperationMetricsConverter.class) @Column(name = "metrics") - private OperationMetrics metrics; + private String metrics; } diff --git a/services/optimizer/src/main/resources/db/optimizer-schema.sql b/services/optimizer/src/main/resources/db/optimizer-schema.sql index 53062c5ad..098380e7f 100644 --- a/services/optimizer/src/main/resources/db/optimizer-schema.sql +++ b/services/optimizer/src/main/resources/db/optimizer-schema.sql @@ -47,7 +47,5 @@ CREATE TABLE IF NOT EXISTS table_operations_history ( status VARCHAR(20) NOT NULL, job_id VARCHAR(255), result TEXT, - orphan_files_deleted INT, - orphan_bytes_deleted BIGINT, PRIMARY KEY (id) ); From 02a5ab31c62a0847e665f674b1fb3e8684bb3433 Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Mon, 6 Apr 2026 12:19:37 -0700 Subject: [PATCH 3/3] fix: remove orphan fields from CompleteOperationRequest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit These fields never belonged in the data model — remove them at the source rather than adding then deleting in a later PR. Co-Authored-By: Claude Opus 4.6 --- .../optimizer/api/model/CompleteOperationRequest.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/CompleteOperationRequest.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/CompleteOperationRequest.java index c26893197..35f7ba782 100644 --- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/CompleteOperationRequest.java +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/CompleteOperationRequest.java @@ -22,10 +22,4 @@ public class CompleteOperationRequest { /** Error details on failure; {@code null} on success. */ private JobResult result; - - /** Number of orphan files deleted; set by OFD Spark app on success. */ - private Integer orphanFilesDeleted; - - /** Bytes reclaimed by orphan file deletion; set by OFD Spark app on success. */ - private Long orphanBytesDeleted; }