diff --git a/CHANGELOG.md b/CHANGELOG.md index f2df165baa..ccd4d0ee2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ * [ENHANCEMENT] Ingester: Instrument Ingester CPU profile with userID for read APIs. #7184 * [ENHANCEMENT] Ingester: Add fetch timeout for Ingester expanded postings cache. #7185 * [ENHANCEMENT] Ingester: Add feature flag to collect metrics of how expensive an unoptimized regex matcher is and new limits to protect Ingester query path against expensive unoptimized regex matchers. #7194 #7210 +* [ENHANCEMENT] Compactor: Add partition group creation time to visit marker. #7217 * [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088 * [BUGFIX] Ruler: Add XFunctions validation support. #7111 * [BUGFIX] Querier: propagate Prometheus info annotations in protobuf responses. #7132 diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index 9396a1bd9a..3b1ffab898 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -1000,9 +1000,9 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) { require.NoError(t, err) require.False(t, partitionedGroupFileExists) - partitionedGroupFileExists, err = userBucket.Exists(ctx, visitMarker.GetVisitMarkerFilePath()) + visitMarkerExists, err := userBucket.Exists(ctx, visitMarker.GetVisitMarkerFilePath()) require.NoError(t, err) - require.False(t, partitionedGroupFileExists) + require.False(t, visitMarkerExists) } func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) { diff --git a/pkg/compactor/partition_compaction_grouper.go b/pkg/compactor/partition_compaction_grouper.go index 4d9f6811ca..a304239c17 100644 --- a/pkg/compactor/partition_compaction_grouper.go +++ b/pkg/compactor/partition_compaction_grouper.go @@ -632,7 +632,7 @@ func (g *PartitionCompactionGrouper) pickPartitionCompactionJob(partitionCompact partitionCount := partitionedGroup.partitionedGroupInfo.PartitionCount partitionID := partitionedGroup.partition.PartitionID partitionedGroupLogger := log.With(g.logger, "rangeStart", partitionedGroup.rangeStartTime().String(), "rangeEnd", partitionedGroup.rangeEndTime().String(), "rangeDuration", partitionedGroup.rangeDuration().String(), "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "group_hash", groupHash) - visitMarker := newPartitionVisitMarker(g.ringLifecyclerID, partitionedGroupID, partitionID) + visitMarker := newPartitionVisitMarker(g.ringLifecyclerID, partitionedGroupID, partitionedGroup.partitionedGroupInfo.CreationTime, partitionID) visitMarkerManager := NewVisitMarkerManager(g.bkt, g.logger, g.ringLifecyclerID, visitMarker) if isVisited, err := g.isGroupVisited(partitionID, visitMarkerManager); err != nil { level.Warn(partitionedGroupLogger).Log("msg", "unable to check if partition is visited", "err", err, "group", partitionedGroup.String()) diff --git a/pkg/compactor/partition_compaction_planner.go b/pkg/compactor/partition_compaction_planner.go index 566a62e87e..989315de46 100644 --- a/pkg/compactor/partition_compaction_planner.go +++ b/pkg/compactor/partition_compaction_planner.go @@ -85,7 +85,7 @@ func (p *PartitionCompactionPlanner) PlanWithPartition(_ context.Context, metasB // claimed same partition in grouper at same time. time.Sleep(p.plannerDelay) - visitMarker := newPartitionVisitMarker(p.ringLifecyclerID, partitionedGroupID, partitionID) + visitMarker := newPartitionVisitMarker(p.ringLifecyclerID, partitionedGroupID, partitionInfo.PartitionedGroupCreationTime, partitionID) visitMarkerManager := NewVisitMarkerManager(p.bkt, p.logger, p.ringLifecyclerID, visitMarker) existingPartitionVisitMarker := &partitionVisitMarker{} err := visitMarkerManager.ReadVisitMarker(p.ctx, existingPartitionVisitMarker) diff --git a/pkg/compactor/partition_visit_marker.go b/pkg/compactor/partition_visit_marker.go index 4a5d8fdc4a..128f502936 100644 --- a/pkg/compactor/partition_visit_marker.go +++ b/pkg/compactor/partition_visit_marker.go @@ -28,18 +28,22 @@ type partitionVisitMarker struct { CompactorID string `json:"compactorID"` Status VisitStatus `json:"status"` PartitionedGroupID uint32 `json:"partitionedGroupID"` - PartitionID int `json:"partitionID"` + // VisitTime is a unix timestamp of when the partitioning group plan was created, in order to validate if the marker + // is referring to the latest version of the group plan + PartitionedGroupCreationTime int64 `json:"partitionedGroupCreationTime"` + PartitionID int `json:"partitionID"` // VisitTime is a unix timestamp of when the partition was visited (mark updated). VisitTime int64 `json:"visitTime"` // Version of the file. Version int `json:"version"` } -func newPartitionVisitMarker(compactorID string, partitionedGroupID uint32, partitionID int) *partitionVisitMarker { +func newPartitionVisitMarker(compactorID string, partitionedGroupID uint32, partitioned_group_creation_time int64, partitionID int) *partitionVisitMarker { return &partitionVisitMarker{ - CompactorID: compactorID, - PartitionedGroupID: partitionedGroupID, - PartitionID: partitionID, + CompactorID: compactorID, + PartitionedGroupID: partitionedGroupID, + PartitionedGroupCreationTime: partitioned_group_creation_time, + PartitionID: partitionID, } } diff --git a/pkg/compactor/partitioned_group_info.go b/pkg/compactor/partitioned_group_info.go index ffb6f82240..cdc66d1027 100644 --- a/pkg/compactor/partitioned_group_info.go +++ b/pkg/compactor/partitioned_group_info.go @@ -156,7 +156,8 @@ func (p *PartitionedGroupInfo) getPartitionedGroupStatus( status.PendingPartitions++ allPartitionCompleted = false status.PendingOrFailedPartitions = append(status.PendingOrFailedPartitions, partition) - } else if visitMarker.VisitTime < p.CreationTime { + } else if visitMarker.VisitTime < p.CreationTime || + (visitMarker.PartitionedGroupCreationTime > 0 && visitMarker.PartitionedGroupCreationTime < p.CreationTime) { status.VisitMarkersToDelete = append(status.VisitMarkersToDelete, visitMarker) allPartitionCompleted = false } else if (visitMarker.GetStatus() == Pending || visitMarker.GetStatus() == InProgress) && !visitMarker.IsExpired(partitionVisitMarkerTimeout) {