-
Notifications
You must be signed in to change notification settings - Fork 76
feat(optimizer): [0/N] Optimizer Data Model #527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| plugins { | ||
| id 'openhouse.springboot-ext-conventions' | ||
| id 'org.springframework.boot' version '2.7.8' | ||
| } | ||
|
|
||
| dependencies { | ||
| implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8' | ||
| implementation 'com.vladmihalcea:hibernate-types-55:2.21.1' | ||
| implementation 'org.springframework.boot:spring-boot-starter-web:2.7.8' | ||
| implementation 'mysql:mysql-connector-java:8.+' | ||
| testImplementation 'com.h2database:h2:2.2.224' | ||
| testImplementation 'org.springframework.boot:spring-boot-starter-test:2.7.8' | ||
| } | ||
|
|
||
| test { | ||
| useJUnitPlatform() | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| package com.linkedin.openhouse.optimizer; | ||
|
|
||
| import org.springframework.boot.SpringApplication; | ||
| import org.springframework.boot.autoconfigure.SpringBootApplication; | ||
|
|
||
| /** Spring Boot entry point for the Optimizer Service. */ | ||
| @SpringBootApplication | ||
| public class OptimizerServiceApplication { | ||
|
|
||
| public static void main(String[] args) { | ||
| SpringApplication.run(OptimizerServiceApplication.class, args); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| package com.linkedin.openhouse.optimizer.api.mapper; | ||
|
|
||
| import com.linkedin.openhouse.optimizer.api.model.TableOperationsDto; | ||
| import com.linkedin.openhouse.optimizer.api.model.TableOperationsHistoryDto; | ||
| import com.linkedin.openhouse.optimizer.api.model.TableStatsDto; | ||
| import com.linkedin.openhouse.optimizer.api.model.TableStatsHistoryDto; | ||
| import com.linkedin.openhouse.optimizer.entity.TableOperationsHistoryRow; | ||
| import com.linkedin.openhouse.optimizer.entity.TableOperationsRow; | ||
| import com.linkedin.openhouse.optimizer.entity.TableStatsHistoryRow; | ||
| import com.linkedin.openhouse.optimizer.entity.TableStatsRow; | ||
| import org.mapstruct.Mapper; | ||
|
|
||
| /** | ||
| * MapStruct mapper for converting between optimizer JPA entities and their corresponding DTOs. | ||
| * | ||
| * <p>Spring-instantiated at compile time. Inject via {@code @Autowired} or constructor injection. | ||
| */ | ||
| @Mapper(componentModel = "spring") | ||
| public interface OptimizerMapper { | ||
|
|
||
| /** Map a {@link TableOperationsRow} to its DTO. */ | ||
| TableOperationsDto toDto(TableOperationsRow row); | ||
|
|
||
| /** Map a {@link TableOperationsHistoryRow} to its DTO. */ | ||
| TableOperationsHistoryDto toDto(TableOperationsHistoryRow row); | ||
|
|
||
| /** Map a {@link TableStatsRow} to its DTO. */ | ||
| TableStatsDto toDto(TableStatsRow row); | ||
|
|
||
| /** Map a {@link TableStatsHistoryRow} to its DTO. */ | ||
| TableStatsHistoryDto toDto(TableStatsHistoryRow row); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| package com.linkedin.openhouse.optimizer.api.model; | ||
|
|
||
| import lombok.AllArgsConstructor; | ||
| import lombok.Builder; | ||
| import lombok.Data; | ||
| import lombok.NoArgsConstructor; | ||
|
|
||
| /** | ||
| * Request body for {@code POST /v1/table-operations/{id}/complete}. | ||
| * | ||
| * <p>Reports the outcome of a completed operation. The backend looks up the operation row by {@code | ||
| * id} and writes a history entry with the operation's table metadata and the supplied result. | ||
| */ | ||
| @Data | ||
| @Builder | ||
| @NoArgsConstructor | ||
| @AllArgsConstructor | ||
| public class CompleteOperationRequest { | ||
|
|
||
| /** Outcome of the operation. */ | ||
| private OperationHistoryStatus status; | ||
|
|
||
| /** Error details on failure; {@code null} on success. */ | ||
| private JobResult result; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| package com.linkedin.openhouse.optimizer.api.model; | ||
|
|
||
| import lombok.AllArgsConstructor; | ||
| import lombok.Builder; | ||
| import lombok.Data; | ||
| import lombok.NoArgsConstructor; | ||
|
|
||
| /** | ||
| * Result payload for a completed Spark maintenance job. | ||
| * | ||
| * <p>Stored as JSON in the {@code result} column of {@code table_operations_history}. Both fields | ||
| * are {@code null} on success; populated on failure. | ||
| */ | ||
| @Data | ||
| @Builder | ||
| @NoArgsConstructor | ||
| @AllArgsConstructor | ||
| public class JobResult { | ||
|
|
||
| /** Human-readable error message; {@code null} if the job succeeded. */ | ||
| private String errorMessage; | ||
|
|
||
| /** Error category (e.g., {@code OOM}, {@code TIMEOUT}); {@code null} if the job succeeded. */ | ||
| private String errorType; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| package com.linkedin.openhouse.optimizer.api.model; | ||
|
|
||
| /** Terminal states for a completed Spark maintenance job. */ | ||
| public enum OperationHistoryStatus { | ||
| SUCCESS, | ||
| FAILED | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| package com.linkedin.openhouse.optimizer.api.model; | ||
|
|
||
| /** Lifecycle states for a table operation recommendation. */ | ||
| public enum OperationStatus { | ||
|
|
||
| /** Recommended by the Analyzer but not yet claimed by the Scheduler. */ | ||
| PENDING, | ||
|
|
||
| /** Claimed by the Scheduler; waiting for the Jobs Service to return a job ID. */ | ||
| SCHEDULING, | ||
|
|
||
| /** Job submitted to the Jobs Service; the row now carries a {@code jobId}. */ | ||
| SCHEDULED, | ||
|
|
||
| /** | ||
| * Marked by the Scheduler when it detects duplicate PENDING rows for the same {@code (table_uuid, | ||
| * operation_type)}. Only the most-recent PENDING row is claimed; older duplicates are CANCELED | ||
| * before the claim step. | ||
| */ | ||
| CANCELED | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| package com.linkedin.openhouse.optimizer.api.model; | ||
|
|
||
| /** Maintenance operation types supported by the continuous optimizer. */ | ||
| public enum OperationType { | ||
| /** Removes orphaned data files no longer referenced by table metadata. */ | ||
| ORPHAN_FILES_DELETION | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| package com.linkedin.openhouse.optimizer.api.model; | ||
|
|
||
| import java.time.Instant; | ||
| import lombok.AllArgsConstructor; | ||
| import lombok.Builder; | ||
| import lombok.Data; | ||
| import lombok.NoArgsConstructor; | ||
|
|
||
| /** DTO for {@code table_operations} — Analyzer recommendations read by the Scheduler. */ | ||
| @Data | ||
| @Builder | ||
| @NoArgsConstructor | ||
| @AllArgsConstructor | ||
| public class TableOperationsDto { | ||
|
|
||
| /** Client-generated UUID identifying this specific operation recommendation. */ | ||
| private String id; | ||
|
|
||
| /** Stable table identity from the Tables Service. */ | ||
| private String tableUuid; | ||
|
|
||
| /** Denormalized database name for display; not part of the primary key. */ | ||
| private String databaseName; | ||
|
|
||
| /** Denormalized table name for display; not part of the primary key. */ | ||
| private String tableName; | ||
|
|
||
| /** The type of maintenance operation (e.g. ORPHAN_FILES_DELETION). */ | ||
| private OperationType operationType; | ||
mkuchenbecker marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| /** {@code PENDING} or {@code SCHEDULED}. Defaults to {@code PENDING} on creation. */ | ||
| private OperationStatus status; | ||
|
|
||
| /** Server-set when the row is first created by the Analyzer. */ | ||
| private Instant createdAt; | ||
|
|
||
| /** Set by the Scheduler when claiming; {@code null} while PENDING. */ | ||
| private Instant scheduledAt; | ||
|
|
||
| /** Job ID returned by the Jobs Service after successful submission. */ | ||
| private String jobId; | ||
|
|
||
| /** Reserved for future per-operation metadata; currently unused. */ | ||
| private String metrics; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we have a class instead to capture more info? Or do we plan to capture json string here? |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| package com.linkedin.openhouse.optimizer.api.model; | ||
|
|
||
| import java.time.Instant; | ||
| import lombok.AllArgsConstructor; | ||
| import lombok.Builder; | ||
| import lombok.Data; | ||
| import lombok.NoArgsConstructor; | ||
|
|
||
| /** DTO for {@code table_operations_history} — append-only operation results. */ | ||
| @Data | ||
| @Builder | ||
| @NoArgsConstructor | ||
| @AllArgsConstructor | ||
| public class TableOperationsHistoryDto { | ||
|
|
||
| /** Same UUID as the originating {@code table_operations.id}; supplied by the caller. */ | ||
| private String id; | ||
|
|
||
| /** Stable table identity from the Tables Service. */ | ||
| private String tableUuid; | ||
|
|
||
| private String databaseName; | ||
| private String tableName; | ||
| private OperationType operationType; | ||
|
|
||
| /** When the operation completed, as recorded by the complete endpoint. */ | ||
| private Instant submittedAt; | ||
|
|
||
| /** {@code SUCCESS} or {@code FAILED}. */ | ||
| private OperationHistoryStatus status; | ||
|
|
||
| /** Job ID from the Jobs Service. */ | ||
| private String jobId; | ||
|
|
||
| /** Job result payload; both fields null on success. */ | ||
| private JobResult result; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| package com.linkedin.openhouse.optimizer.api.model; | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonIgnoreProperties; | ||
| import lombok.AllArgsConstructor; | ||
| import lombok.Builder; | ||
| import lombok.Data; | ||
| import lombok.NoArgsConstructor; | ||
|
|
||
| /** Combined stats payload stored as a single JSON blob per table. */ | ||
| @Data | ||
| @Builder(toBuilder = true) | ||
| @NoArgsConstructor | ||
| @AllArgsConstructor | ||
| @JsonIgnoreProperties(ignoreUnknown = true) | ||
| public class TableStats { | ||
|
|
||
| /** Snapshot fields — overwritten on every upsert. */ | ||
| private SnapshotMetrics snapshot; | ||
|
|
||
| /** Delta fields — accumulated across commit events. */ | ||
| private CommitDelta delta; | ||
|
|
||
| /** Point-in-time metadata read from Iceberg at scan time. */ | ||
| @Data | ||
| @Builder(toBuilder = true) | ||
| @NoArgsConstructor | ||
| @AllArgsConstructor | ||
| @JsonIgnoreProperties(ignoreUnknown = true) | ||
| public static class SnapshotMetrics { | ||
| private String clusterId; | ||
| private String tableVersion; | ||
| private String tableLocation; | ||
| private Long tableSizeBytes; | ||
| /** Total number of data files as of the latest snapshot — used for bin-packing. */ | ||
| private Long numCurrentFiles; | ||
| } | ||
|
|
||
| /** Per-commit incremental counters; accumulated across all recorded commit events. */ | ||
| @Data | ||
| @Builder(toBuilder = true) | ||
| @NoArgsConstructor | ||
| @AllArgsConstructor | ||
| public static class CommitDelta { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this also require @JsonIgnoreProperties ? could provide forward compatibility or safeguard during upgrades in case of new fields addition |
||
| private Long numFilesAdded; | ||
| private Long numFilesDeleted; | ||
| private Long addedSizeBytes; | ||
| private Long deletedSizeBytes; | ||
mkuchenbecker marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| package com.linkedin.openhouse.optimizer.api.model; | ||
|
|
||
| import java.time.Instant; | ||
| import java.util.Map; | ||
| import lombok.AllArgsConstructor; | ||
| import lombok.Builder; | ||
| import lombok.Data; | ||
| import lombok.NoArgsConstructor; | ||
|
|
||
| /** DTO for {@code table_stats} — used for response payloads. */ | ||
| @Data | ||
| @Builder | ||
| @NoArgsConstructor | ||
| @AllArgsConstructor | ||
| public class TableStatsDto { | ||
|
|
||
| /** Stable Iceberg table UUID. Primary key of the stats row. */ | ||
| private String tableUuid; | ||
|
|
||
| /** Denormalized database name for display. */ | ||
| private String databaseId; | ||
|
|
||
| /** Denormalized table name for display. */ | ||
| private String tableName; | ||
|
|
||
| /** Combined snapshot + delta stats payload, stored as JSON. */ | ||
| private TableStats stats; | ||
|
|
||
| /** Current table properties snapshot (e.g. maintenance opt-in flags). */ | ||
| private Map<String, String> tableProperties; | ||
|
|
||
| /** When this row was last written. Used for staleness monitoring. */ | ||
| private Instant updatedAt; | ||
mkuchenbecker marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| package com.linkedin.openhouse.optimizer.api.model; | ||
|
|
||
| import java.time.Instant; | ||
| import lombok.AllArgsConstructor; | ||
| import lombok.Builder; | ||
| import lombok.Data; | ||
| import lombok.NoArgsConstructor; | ||
|
|
||
| /** DTO for {@code table_stats_history} — used for response payloads. */ | ||
| @Data | ||
| @Builder | ||
| @NoArgsConstructor | ||
| @AllArgsConstructor | ||
| public class TableStatsHistoryDto { | ||
|
|
||
| /** Auto-increment primary key. */ | ||
| private Long id; | ||
|
|
||
| /** Stable Iceberg table UUID. */ | ||
| private String tableUuid; | ||
|
|
||
| /** Denormalized database name for display. */ | ||
| private String databaseId; | ||
|
|
||
| /** Denormalized table name for display. */ | ||
| private String tableName; | ||
|
|
||
| /** Snapshot + delta stats from this commit event. */ | ||
| private TableStats stats; | ||
|
|
||
| /** When this history row was recorded. */ | ||
| private Instant recordedAt; | ||
mkuchenbecker marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have keep the existing status such as canceled, queued etc. These are valid status as some times jobs could not be submitted due to GGW/Yarn issue etc.