Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions internal/destregistry/basepublisher.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package destregistry

import (
"fmt"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -67,14 +66,14 @@ func (p *BasePublisher) StartClose() {

func (p *BasePublisher) MakeMetadata(event *models.Event, timestamp time.Time) map[string]string {
systemMetadata := map[string]string{
"timestamp": fmt.Sprintf("%d", timestamp.Unix()),
"timestamp": timestamp.UTC().Format(time.RFC3339),
"event-id": event.ID,
"topic": event.Topic,
}

// Add millisecond timestamp if enabled
if p.includeMillisecondTimestamp {
systemMetadata["timestamp-ms"] = fmt.Sprintf("%d", timestamp.UnixMilli())
systemMetadata["timestamp-ms"] = timestamp.UTC().Format(time.RFC3339Nano)
}

// Merge with priority: system < deliveryMetadata < event.Metadata
Expand Down
18 changes: 9 additions & 9 deletions internal/destregistry/basepublisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestMakeMetadata_WithoutDeliveryMetadata(t *testing.T) {
metadata := publisher.MakeMetadata(&event, timestamp)

// System metadata should be present
assert.Equal(t, "1609459200", metadata["timestamp"])
assert.Equal(t, "2021-01-01T00:00:00Z", metadata["timestamp"])
assert.Equal(t, "evt_123", metadata["event-id"])
assert.Equal(t, "user.created", metadata["topic"])

Expand Down Expand Up @@ -60,7 +60,7 @@ func TestMakeMetadata_WithDeliveryMetadata(t *testing.T) {
metadata := publisher.MakeMetadata(&event, timestamp)

// System metadata should be present
assert.Equal(t, "1609459200", metadata["timestamp"])
assert.Equal(t, "2021-01-01T00:00:00Z", metadata["timestamp"])
assert.Equal(t, "evt_123", metadata["event-id"])
assert.Equal(t, "user.created", metadata["topic"])

Expand Down Expand Up @@ -138,9 +138,9 @@ func TestMakeMetadata_WithMillisecondTimestamp(t *testing.T) {

metadata := publisher.MakeMetadata(&event, timestamp)

// Should include both timestamp and timestamp-ms
assert.Equal(t, "1609459200", metadata["timestamp"])
assert.Equal(t, "1609459200123", metadata["timestamp-ms"])
// Should include both ISO timestamps
assert.Equal(t, "2021-01-01T00:00:00Z", metadata["timestamp"])
assert.Equal(t, "2021-01-01T00:00:00.123456789Z", metadata["timestamp-ms"])
}

func TestMakeMetadata_WithMillisecondTimestampAndDeliveryMetadata(t *testing.T) {
Expand All @@ -150,7 +150,7 @@ func TestMakeMetadata_WithMillisecondTimestampAndDeliveryMetadata(t *testing.T)
publisher := destregistry.NewBasePublisher(
destregistry.WithMillisecondTimestamp(true),
destregistry.WithDeliveryMetadata(map[string]string{
"timestamp-ms": "999999999999", // Override the millisecond timestamp
"timestamp-ms": "custom-override",
}),
)
event := testutil.EventFactory.Any(
Expand All @@ -162,7 +162,7 @@ func TestMakeMetadata_WithMillisecondTimestampAndDeliveryMetadata(t *testing.T)
metadata := publisher.MakeMetadata(&event, timestamp)

// Delivery metadata should override system timestamp-ms
assert.Equal(t, "999999999999", metadata["timestamp-ms"])
assert.Equal(t, "custom-override", metadata["timestamp-ms"])
}

func TestMakeMetadata_EmptyDeliveryMetadata(t *testing.T) {
Expand All @@ -182,7 +182,7 @@ func TestMakeMetadata_EmptyDeliveryMetadata(t *testing.T) {
metadata := publisher.MakeMetadata(&event, timestamp)

// Should only have system metadata
assert.Equal(t, "1609459200", metadata["timestamp"])
assert.Equal(t, "2021-01-01T00:00:00Z", metadata["timestamp"])
assert.Equal(t, "evt_123", metadata["event-id"])
assert.Equal(t, "user.created", metadata["topic"])
assert.Len(t, metadata, 3)
Expand All @@ -207,7 +207,7 @@ func TestMakeMetadata_NilEventMetadata(t *testing.T) {
metadata := publisher.MakeMetadata(&event, timestamp)

// System metadata
assert.Equal(t, "1609459200", metadata["timestamp"])
assert.Equal(t, "2021-01-01T00:00:00Z", metadata["timestamp"])
assert.Equal(t, "evt_123", metadata["event-id"])
assert.Equal(t, "user.created", metadata["topic"])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (a *KinesisAsserter) AssertMessage(t testsuite.TestingT, msg testsuite.Mess

// Verify system metadata
assert.NotEmpty(t, metadata["timestamp"], "timestamp should be present")
testsuite.AssertTimestampIsUnixSeconds(t, metadata["timestamp"])
testsuite.AssertTimestampIsISO8601(t, metadata["timestamp"])
assert.Equal(t, event.ID, metadata["event-id"], "event-id should match")
assert.Equal(t, event.Topic, metadata["topic"], "topic should match")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (a *S3Asserter) AssertMessage(t testsuite.TestingT, msg testsuite.Message,
// 2. Assert system metadata is present
metadata := msg.Metadata
assert.NotEmpty(t, metadata["timestamp"], "timestamp should be present")
testsuite.AssertTimestampIsUnixSeconds(t, metadata["timestamp"])
testsuite.AssertTimestampIsISO8601(t, metadata["timestamp"])
assert.Equal(t, event.ID, metadata["event-id"], "event-id should match")
assert.Equal(t, event.Topic, metadata["topic"], "topic should match")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (a *SQSAsserter) AssertMessage(t testsuite.TestingT, msg testsuite.Message,

// Verify system metadata
assert.NotEmpty(t, metadata["timestamp"], "timestamp should be present")
testsuite.AssertTimestampIsUnixSeconds(t, metadata["timestamp"])
testsuite.AssertTimestampIsISO8601(t, metadata["timestamp"])
assert.Equal(t, event.ID, metadata["event-id"], "event-id should match")
assert.Equal(t, event.Topic, metadata["topic"], "topic should match")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (a *AzureServiceBusAsserter) AssertMessage(t testsuite.TestingT, msg testsu

// Verify system metadata
assert.NotEmpty(t, metadata["timestamp"], "timestamp should be present")
testsuite.AssertTimestampIsUnixSeconds(t, metadata["timestamp"])
testsuite.AssertTimestampIsISO8601(t, metadata["timestamp"])
assert.Equal(t, event.ID, metadata["event-id"], "event-id should match")
assert.Equal(t, event.Topic, metadata["topic"], "topic should match")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (a *KafkaAsserter) AssertMessage(t testsuite.TestingT, msg testsuite.Messag
// Verify system metadata in headers
metadata := msg.Metadata
assert.NotEmpty(t, metadata["timestamp"], "timestamp should be present")
testsuite.AssertTimestampIsUnixSeconds(t, metadata["timestamp"])
testsuite.AssertTimestampIsISO8601(t, metadata["timestamp"])
assert.Equal(t, event.ID, metadata["event-id"], "event-id should match")
assert.Equal(t, event.Topic, metadata["topic"], "topic should match")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (a *RabbitMQAsserter) AssertMessage(t testsuite.TestingT, msg testsuite.Mes
// Verify system metadata
metadata := msg.Metadata
assert.NotEmpty(t, metadata["timestamp"], "timestamp should be present")
testsuite.AssertTimestampIsUnixSeconds(t, metadata["timestamp"])
testsuite.AssertTimestampIsISO8601(t, metadata["timestamp"])
assert.Equal(t, event.ID, metadata["event-id"], "event-id should match")
assert.Equal(t, event.Topic, metadata["topic"], "topic should match")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func (a *WebhookAsserter) AssertMessage(t testsuite.TestingT, msg testsuite.Mess
timestampHeader := req.Header.Get(a.headerPrefix + "timestamp")
assert.NotEmpty(t, timestampHeader, "timestamp header should be present")

// Verify timestamp is in Unix seconds (not milliseconds)
testsuite.AssertTimestampIsUnixSeconds(t, timestampHeader)
// Verify timestamp is in ISO 8601 format
testsuite.AssertTimestampIsISO8601(t, timestampHeader)

assert.Equal(t, event.ID, req.Header.Get(a.headerPrefix+"event-id"), "event-id header should match")
assert.Equal(t, event.Topic, req.Header.Get(a.headerPrefix+"topic"), "topic header should match")
Expand All @@ -111,13 +111,6 @@ func (a *WebhookAsserter) AssertMessage(t testsuite.TestingT, msg testsuite.Mess
signatureHeader := req.Header.Get(a.headerPrefix + "signature")
assertSignatureFormat(t, signatureHeader, a.expectedSignatures)

// Verify timestamp in signature header matches the timestamp header
signatureParts := strings.SplitN(signatureHeader, ",", 2)
if len(signatureParts) >= 2 {
signatureTimestampStr := strings.TrimPrefix(signatureParts[0], "t=")
assert.Equal(t, timestampHeader, signatureTimestampStr, "timestamp in signature header should match timestamp header")
}

// Verify each expected signature
for _, secret := range a.secrets {
assertValidSignature(t, secret, msg.Data, signatureHeader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"io"
"net/http"
"strconv"
"net/http/httptest"
"strings"
"sync"
Expand Down Expand Up @@ -119,7 +120,9 @@ func (a *StandardWebhookAsserter) AssertMessage(t testsuite.TestingT, msg testsu

webhookTimestamp := req.Header.Get(prefix + "timestamp")
assert.NotEmpty(t, webhookTimestamp, prefix+"timestamp should be present")
testsuite.AssertTimestampIsUnixSeconds(t, webhookTimestamp)
// Standard Webhooks spec requires Unix seconds for the webhook-timestamp header
_, err := strconv.ParseInt(webhookTimestamp, 10, 64)
assert.NoError(t, err, "webhook-timestamp should be a valid Unix timestamp integer")

webhookSignature := req.Header.Get(prefix + "signature")
assert.NotEmpty(t, webhookSignature, prefix+"signature should be present")
Expand Down
35 changes: 5 additions & 30 deletions internal/destregistry/testing/publisher_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"errors"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -37,37 +36,13 @@ type TestingT interface {
Helper()
}

// AssertTimestampIsUnixSeconds verifies that a timestamp string is in Unix seconds format (not milliseconds).
// It checks if the timestamp is within a reasonable range for Unix seconds (between year 2000 and 2100).
func AssertTimestampIsUnixSeconds(t TestingT, timestampStr string, msgAndArgs ...interface{}) {
// AssertTimestampIsISO8601 verifies that a timestamp string is a valid ISO 8601 / RFC3339 timestamp.
func AssertTimestampIsISO8601(t TestingT, timestampStr string, msgAndArgs ...interface{}) {
t.Helper()

timestampInt, err := strconv.ParseInt(timestampStr, 10, 64)
assert.NoError(t, err, "timestamp should be a valid integer")

// Check if timestamp is in a reasonable range for Unix seconds
// Year 2000: ~946,684,800
// Year 2100: ~4,102,444,800
// Current time in seconds: ~1,700,000,000 (2023-2024)
// Current time in millis: ~1,700,000,000,000

minUnixSeconds := int64(946684800) // Jan 1, 2000
maxUnixSeconds := int64(4102444800) // Jan 1, 2100

if timestampInt < minUnixSeconds || timestampInt > maxUnixSeconds {
// Likely milliseconds - check if dividing by 1000 gives a reasonable timestamp
possibleSeconds := timestampInt / 1000
if possibleSeconds >= minUnixSeconds && possibleSeconds <= maxUnixSeconds {
assert.Fail(t, "timestamp appears to be in milliseconds, expected Unix seconds",
"timestamp %d is likely in milliseconds (would be %s if converted to seconds), expected Unix seconds (around %s)",
timestampInt,
time.Unix(possibleSeconds, 0).Format(time.RFC3339),
time.Now().Format(time.RFC3339))
} else {
assert.Fail(t, "timestamp is out of reasonable range",
"timestamp %d is not within reasonable Unix seconds range (year 2000-2100)", timestampInt)
}
}
parsed, err := time.Parse(time.RFC3339Nano, timestampStr)
assert.NoError(t, err, "timestamp should be a valid ISO 8601 / RFC3339 string, got: %s", timestampStr)
assert.Equal(t, time.UTC, parsed.Location(), "timestamp should be in UTC, got: %s", timestampStr)
}

// MessageConsumer is the interface that providers must implement
Expand Down
Loading