Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions apps/optimizer/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
plugins {
id 'openhouse.java-minimal-conventions'
}

// Avoid build-directory collision with services:optimizer (same project.name 'optimizer').
buildDir = "${rootProject.buildDir}/apps-optimizer"

dependencies {
implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8'
implementation 'com.vladmihalcea:hibernate-types-55:2.21.1'
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where exactly this dependency is used? Seems like new dependency.

testImplementation 'org.springframework.boot:spring-boot-starter-test:2.7.8'
testRuntimeOnly 'com.h2database:h2'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.linkedin.openhouse.optimizer.entity;

import java.time.Instant;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;

/** Lightweight JPA entity for reading {@code table_operations_history} rows. */
@Entity
@Table(name = "table_operations_history")
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TableOperationHistoryRow {

@Id
@Column(name = "id", nullable = false, length = 36)
private String id;

@Column(name = "table_uuid", nullable = false, length = 36)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to capture databaseId and tableId for getting historical insights on table operations.

private String tableUuid;

@Column(name = "operation_type", nullable = false, length = 50)
private String operationType;

@Column(name = "submitted_at", nullable = false)
private Instant submittedAt;

@Column(name = "status", nullable = false, length = 20)
private String status;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.linkedin.openhouse.optimizer.entity;

import java.time.Instant;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

/** JPA entity mapping to the {@code table_operations} table in the optimizer DB. */
@Entity
@Table(name = "table_operations")
@Getter
@Setter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TableOperationRow {

@Id
@Column(name = "id", nullable = false, length = 36)
private String id;

@Column(name = "table_uuid", nullable = false, length = 36)
private String tableUuid;

@Column(name = "database_name", nullable = false, length = 255)
private String databaseName;

@Column(name = "table_name", nullable = false, length = 255)
private String tableName;

@Column(name = "operation_type", nullable = false, length = 50)
private String operationType;

@Column(name = "status", nullable = false, length = 20)
private String status;

@Column(name = "created_at")
private Instant createdAt;

@Column(name = "scheduled_at")
private Instant scheduledAt;

@Column(name = "job_id", length = 255)
private String jobId;

/** Plain version column — not managed by JPA optimistic locking. */
@Column(name = "version")
private Long version;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.linkedin.openhouse.optimizer.entity;

import com.linkedin.openhouse.optimizer.model.TableStats;
import com.vladmihalcea.hibernate.type.json.JsonStringType;
import java.time.Instant;
import java.util.Map;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.hibernate.annotations.Type;
import org.hibernate.annotations.TypeDef;

/**
* JPA entity for the optimizer {@code table_stats} table. Written by the Tables Service on every
* Iceberg commit; read by the Analyzer and Scheduler directly via JPA.
*/
@TypeDef(name = "json", typeClass = JsonStringType.class)
@Entity
@Table(name = "table_stats")
@Getter
@Setter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TableStatsRow {

@Id
@Column(name = "table_uuid", nullable = false, length = 36)
private String tableUuid;

@Column(name = "database_id", nullable = false, length = 255)
private String databaseId;

@Column(name = "table_name", nullable = false, length = 255)
private String tableName;

@Type(type = "json")
@Column(name = "stats", columnDefinition = "TEXT")
private TableStats stats;

@Type(type = "json")
@Column(name = "table_properties", columnDefinition = "TEXT")
private Map<String, String> tableProperties;

@Column(name = "updated_at", nullable = false)
private Instant updatedAt;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.linkedin.openhouse.optimizer.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/** Combined stats payload stored as a single JSON blob per table in {@code table_stats}. */
@Data
@Builder(toBuilder = true)
@NoArgsConstructor
@AllArgsConstructor
public class TableStats {

/** Snapshot fields — overwritten on every upsert. */
private SnapshotMetrics snapshot;

/** Delta fields — accumulated across commit events. */
private CommitDelta delta;

/** Point-in-time metadata read from Iceberg at scan time. */
@Data
@Builder(toBuilder = true)
@NoArgsConstructor
@AllArgsConstructor
public static class SnapshotMetrics {
private String clusterId;
private String tableVersion;
private String tableLocation;
private Long tableSizeBytes;
/** Total number of data files as of the latest snapshot — used for bin-packing. */
private Long numCurrentFiles;
}

/** Per-commit incremental counters accumulated across all recorded commit events. */
@Data
@Builder(toBuilder = true)
@NoArgsConstructor
@AllArgsConstructor
public static class CommitDelta {
private Long numFilesAdded;
private Long numFilesDeleted;
private Long deletedSizeBytes;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.linkedin.openhouse.optimizer.repository;

import com.linkedin.openhouse.optimizer.entity.TableOperationHistoryRow;
import java.time.Instant;
import java.util.List;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;

/** Repository for reading {@code table_operations_history} in the Analyzer. */
public interface TableOperationHistoryRepository
extends JpaRepository<TableOperationHistoryRow, String> {

/**
* Return history rows matching the given filters, ordered by {@code submittedAt} descending.
* Every parameter is optional — pass {@code null} to skip that filter.
*/
@Query(
"SELECT r FROM TableOperationHistoryRow r "
+ "WHERE (:operationType IS NULL OR r.operationType = :operationType) "
+ "AND (:tableUuid IS NULL OR r.tableUuid = :tableUuid) "
+ "AND (:status IS NULL OR r.status = :status) "
+ "AND (:since IS NULL OR r.submittedAt >= :since) "
+ "ORDER BY r.submittedAt DESC")
List<TableOperationHistoryRow> find(
@Param("operationType") String operationType,
@Param("tableUuid") String tableUuid,
@Param("status") String status,
@Param("since") Instant since,
Pageable pageable);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.linkedin.openhouse.optimizer.repository;

import com.linkedin.openhouse.optimizer.entity.TableOperationRow;
import java.util.List;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;

/** Spring Data JPA repository for {@code table_operations} rows in the optimizer DB. */
public interface TableOperationsRepository extends JpaRepository<TableOperationRow, String> {

/**
* Return operations matching the given filters. Every parameter is optional — pass {@code null}
* to skip that filter.
*/
@Query(
"SELECT r FROM TableOperationRow r "
+ "WHERE (:operationType IS NULL OR r.operationType = :operationType) "
+ "AND (:status IS NULL OR r.status = :status) "
+ "AND (:tableUuid IS NULL OR r.tableUuid = :tableUuid) "
+ "AND (:databaseName IS NULL OR r.databaseName = :databaseName) "
+ "AND (:tableName IS NULL OR r.tableName = :tableName)")
List<TableOperationRow> find(
@Param("operationType") String operationType,
@Param("status") String status,
@Param("tableUuid") String tableUuid,
@Param("databaseName") String databaseName,
@Param("tableName") String tableName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.linkedin.openhouse.optimizer.repository;

import com.linkedin.openhouse.optimizer.entity.TableStatsRow;
import java.util.List;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;

/** Spring Data JPA repository for {@code table_stats} rows in the optimizer DB. */
public interface TableStatsRepository extends JpaRepository<TableStatsRow, String> {

/**
* Return stats rows matching the given filters. Every parameter is optional — pass {@code null}
* to skip that filter.
*/
@Query(
"SELECT r FROM TableStatsRow r "
+ "WHERE (:databaseId IS NULL OR r.databaseId = :databaseId) "
+ "AND (:tableName IS NULL OR r.tableName = :tableName) "
+ "AND (:tableUuid IS NULL OR r.tableUuid = :tableUuid)")
List<TableStatsRow> find(
@Param("databaseId") String databaseId,
@Param("tableName") String tableName,
@Param("tableUuid") String tableUuid);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.linkedin.openhouse.optimizer.repository;

import com.linkedin.openhouse.optimizer.api.model.OperationHistoryStatus;
import com.linkedin.openhouse.optimizer.api.model.OperationType;
import com.linkedin.openhouse.optimizer.entity.TableOperationsHistoryRow;
import java.time.Instant;
import java.util.List;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;

/**
* Repository for {@link TableOperationsHistoryRow}. Append-only; PK is auto-increment {@code id}.
*/
@Repository
public interface TableOperationsHistoryRepository
extends JpaRepository<TableOperationsHistoryRow, Long> {

/**
* Return history rows matching the given filters, ordered by {@code submittedAt} descending.
* Every parameter is optional — pass {@code null} to skip that filter.
*/
@Query(
"SELECT r FROM TableOperationsHistoryRow r "
+ "WHERE (:databaseName IS NULL OR r.databaseName = :databaseName) "
+ "AND (:tableName IS NULL OR r.tableName = :tableName) "
+ "AND (:tableUuid IS NULL OR r.tableUuid = :tableUuid) "
+ "AND (:operationType IS NULL OR r.operationType = :operationType) "
+ "AND (:status IS NULL OR r.status = :status) "
+ "AND (:since IS NULL OR r.submittedAt >= :since) "
+ "AND (:until IS NULL OR r.submittedAt <= :until) "
+ "ORDER BY r.submittedAt DESC")
List<TableOperationsHistoryRow> find(
@Param("databaseName") String databaseName,
@Param("tableName") String tableName,
@Param("tableUuid") String tableUuid,
@Param("operationType") OperationType operationType,
@Param("status") OperationHistoryStatus status,
@Param("since") Instant since,
@Param("until") Instant until,
Pageable pageable);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.linkedin.openhouse.optimizer.repository;

import com.linkedin.openhouse.optimizer.api.model.OperationStatus;
import com.linkedin.openhouse.optimizer.api.model.OperationType;
import com.linkedin.openhouse.optimizer.entity.TableOperationsRow;
import java.util.List;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;

/** Repository for {@link TableOperationsRow}. PK is the client-generated UUID {@code id}. */
@Repository
public interface TableOperationsRepository extends JpaRepository<TableOperationsRow, String> {

/**
* Return operations matching the given filters. Every parameter is optional — pass {@code null}
* to skip that filter. No filters returns all rows.
*/
@Query(
"SELECT r FROM TableOperationsRow r "
+ "WHERE (:operationType IS NULL OR r.operationType = :operationType) "
+ "AND (:status IS NULL OR r.status = :status) "
+ "AND (:databaseName IS NULL OR r.databaseName = :databaseName) "
+ "AND (:tableName IS NULL OR r.tableName = :tableName) "
+ "AND (:tableUuid IS NULL OR r.tableUuid = :tableUuid)")
List<TableOperationsRow> find(
@Param("operationType") OperationType operationType,
@Param("status") OperationStatus status,
@Param("databaseName") String databaseName,
@Param("tableName") String tableName,
@Param("tableUuid") String tableUuid);
}
Loading