diff --git a/internal/destregistry/basepublisher.go b/internal/destregistry/basepublisher.go index 98a75339..f35b8765 100644 --- a/internal/destregistry/basepublisher.go +++ b/internal/destregistry/basepublisher.go @@ -1,7 +1,6 @@ package destregistry import ( - "fmt" "sync" "sync/atomic" "time" @@ -67,14 +66,14 @@ func (p *BasePublisher) StartClose() { func (p *BasePublisher) MakeMetadata(event *models.Event, timestamp time.Time) map[string]string { systemMetadata := map[string]string{ - "timestamp": fmt.Sprintf("%d", timestamp.Unix()), + "timestamp": timestamp.UTC().Format(time.RFC3339), "event-id": event.ID, "topic": event.Topic, } // Add millisecond timestamp if enabled if p.includeMillisecondTimestamp { - systemMetadata["timestamp-ms"] = fmt.Sprintf("%d", timestamp.UnixMilli()) + systemMetadata["timestamp-ms"] = timestamp.UTC().Format(time.RFC3339Nano) } // Merge with priority: system < deliveryMetadata < event.Metadata diff --git a/internal/destregistry/basepublisher_test.go b/internal/destregistry/basepublisher_test.go index a111e57d..4e4026b6 100644 --- a/internal/destregistry/basepublisher_test.go +++ b/internal/destregistry/basepublisher_test.go @@ -26,7 +26,7 @@ func TestMakeMetadata_WithoutDeliveryMetadata(t *testing.T) { metadata := publisher.MakeMetadata(&event, timestamp) // System metadata should be present - assert.Equal(t, "1609459200", metadata["timestamp"]) + assert.Equal(t, "2021-01-01T00:00:00Z", metadata["timestamp"]) assert.Equal(t, "evt_123", metadata["event-id"]) assert.Equal(t, "user.created", metadata["topic"]) @@ -60,7 +60,7 @@ func TestMakeMetadata_WithDeliveryMetadata(t *testing.T) { metadata := publisher.MakeMetadata(&event, timestamp) // System metadata should be present - assert.Equal(t, "1609459200", metadata["timestamp"]) + assert.Equal(t, "2021-01-01T00:00:00Z", metadata["timestamp"]) assert.Equal(t, "evt_123", metadata["event-id"]) assert.Equal(t, "user.created", metadata["topic"]) @@ -138,9 +138,9 @@ func TestMakeMetadata_WithMillisecondTimestamp(t *testing.T) { metadata := publisher.MakeMetadata(&event, timestamp) - // Should include both timestamp and timestamp-ms - assert.Equal(t, "1609459200", metadata["timestamp"]) - assert.Equal(t, "1609459200123", metadata["timestamp-ms"]) + // Should include both ISO timestamps + assert.Equal(t, "2021-01-01T00:00:00Z", metadata["timestamp"]) + assert.Equal(t, "2021-01-01T00:00:00.123456789Z", metadata["timestamp-ms"]) } func TestMakeMetadata_WithMillisecondTimestampAndDeliveryMetadata(t *testing.T) { @@ -150,7 +150,7 @@ func TestMakeMetadata_WithMillisecondTimestampAndDeliveryMetadata(t *testing.T) publisher := destregistry.NewBasePublisher( destregistry.WithMillisecondTimestamp(true), destregistry.WithDeliveryMetadata(map[string]string{ - "timestamp-ms": "999999999999", // Override the millisecond timestamp + "timestamp-ms": "custom-override", }), ) event := testutil.EventFactory.Any( @@ -162,7 +162,7 @@ func TestMakeMetadata_WithMillisecondTimestampAndDeliveryMetadata(t *testing.T) metadata := publisher.MakeMetadata(&event, timestamp) // Delivery metadata should override system timestamp-ms - assert.Equal(t, "999999999999", metadata["timestamp-ms"]) + assert.Equal(t, "custom-override", metadata["timestamp-ms"]) } func TestMakeMetadata_EmptyDeliveryMetadata(t *testing.T) { @@ -182,7 +182,7 @@ func TestMakeMetadata_EmptyDeliveryMetadata(t *testing.T) { metadata := publisher.MakeMetadata(&event, timestamp) // Should only have system metadata - assert.Equal(t, "1609459200", metadata["timestamp"]) + assert.Equal(t, "2021-01-01T00:00:00Z", metadata["timestamp"]) assert.Equal(t, "evt_123", metadata["event-id"]) assert.Equal(t, "user.created", metadata["topic"]) assert.Len(t, metadata, 3) @@ -207,7 +207,7 @@ func TestMakeMetadata_NilEventMetadata(t *testing.T) { metadata := publisher.MakeMetadata(&event, timestamp) // System metadata - assert.Equal(t, "1609459200", metadata["timestamp"]) + assert.Equal(t, "2021-01-01T00:00:00Z", metadata["timestamp"]) assert.Equal(t, "evt_123", metadata["event-id"]) assert.Equal(t, "user.created", metadata["topic"]) diff --git a/internal/destregistry/providers/destawskinesis/destawskinesis_publish_test.go b/internal/destregistry/providers/destawskinesis/destawskinesis_publish_test.go index 1b323fad..98f5973b 100644 --- a/internal/destregistry/providers/destawskinesis/destawskinesis_publish_test.go +++ b/internal/destregistry/providers/destawskinesis/destawskinesis_publish_test.go @@ -173,7 +173,7 @@ func (a *KinesisAsserter) AssertMessage(t testsuite.TestingT, msg testsuite.Mess // Verify system metadata assert.NotEmpty(t, metadata["timestamp"], "timestamp should be present") - testsuite.AssertTimestampIsUnixSeconds(t, metadata["timestamp"]) + testsuite.AssertTimestampIsISO8601(t, metadata["timestamp"]) assert.Equal(t, event.ID, metadata["event-id"], "event-id should match") assert.Equal(t, event.Topic, metadata["topic"], "topic should match") diff --git a/internal/destregistry/providers/destawss3/destawss3_publish_test.go b/internal/destregistry/providers/destawss3/destawss3_publish_test.go index 7fafeb0e..16c7225b 100644 --- a/internal/destregistry/providers/destawss3/destawss3_publish_test.go +++ b/internal/destregistry/providers/destawss3/destawss3_publish_test.go @@ -137,7 +137,7 @@ func (a *S3Asserter) AssertMessage(t testsuite.TestingT, msg testsuite.Message, // 2. Assert system metadata is present metadata := msg.Metadata assert.NotEmpty(t, metadata["timestamp"], "timestamp should be present") - testsuite.AssertTimestampIsUnixSeconds(t, metadata["timestamp"]) + testsuite.AssertTimestampIsISO8601(t, metadata["timestamp"]) assert.Equal(t, event.ID, metadata["event-id"], "event-id should match") assert.Equal(t, event.Topic, metadata["topic"], "topic should match") diff --git a/internal/destregistry/providers/destawssqs/destawssqs_publish_test.go b/internal/destregistry/providers/destawssqs/destawssqs_publish_test.go index 2d58502b..9c564d1f 100644 --- a/internal/destregistry/providers/destawssqs/destawssqs_publish_test.go +++ b/internal/destregistry/providers/destawssqs/destawssqs_publish_test.go @@ -94,7 +94,7 @@ func (a *SQSAsserter) AssertMessage(t testsuite.TestingT, msg testsuite.Message, // Verify system metadata assert.NotEmpty(t, metadata["timestamp"], "timestamp should be present") - testsuite.AssertTimestampIsUnixSeconds(t, metadata["timestamp"]) + testsuite.AssertTimestampIsISO8601(t, metadata["timestamp"]) assert.Equal(t, event.ID, metadata["event-id"], "event-id should match") assert.Equal(t, event.Topic, metadata["topic"], "topic should match") diff --git a/internal/destregistry/providers/destazureservicebus/destazureservicebus_publish_test.go b/internal/destregistry/providers/destazureservicebus/destazureservicebus_publish_test.go index f4e7ef2c..cf549fb3 100644 --- a/internal/destregistry/providers/destazureservicebus/destazureservicebus_publish_test.go +++ b/internal/destregistry/providers/destazureservicebus/destazureservicebus_publish_test.go @@ -119,7 +119,7 @@ func (a *AzureServiceBusAsserter) AssertMessage(t testsuite.TestingT, msg testsu // Verify system metadata assert.NotEmpty(t, metadata["timestamp"], "timestamp should be present") - testsuite.AssertTimestampIsUnixSeconds(t, metadata["timestamp"]) + testsuite.AssertTimestampIsISO8601(t, metadata["timestamp"]) assert.Equal(t, event.ID, metadata["event-id"], "event-id should match") assert.Equal(t, event.Topic, metadata["topic"], "topic should match") diff --git a/internal/destregistry/providers/destkafka/destkafka_publish_test.go b/internal/destregistry/providers/destkafka/destkafka_publish_test.go index fec90b9d..312db41f 100644 --- a/internal/destregistry/providers/destkafka/destkafka_publish_test.go +++ b/internal/destregistry/providers/destkafka/destkafka_publish_test.go @@ -113,7 +113,7 @@ func (a *KafkaAsserter) AssertMessage(t testsuite.TestingT, msg testsuite.Messag // Verify system metadata in headers metadata := msg.Metadata assert.NotEmpty(t, metadata["timestamp"], "timestamp should be present") - testsuite.AssertTimestampIsUnixSeconds(t, metadata["timestamp"]) + testsuite.AssertTimestampIsISO8601(t, metadata["timestamp"]) assert.Equal(t, event.ID, metadata["event-id"], "event-id should match") assert.Equal(t, event.Topic, metadata["topic"], "topic should match") diff --git a/internal/destregistry/providers/destrabbitmq/destrabbitmq_publish_test.go b/internal/destregistry/providers/destrabbitmq/destrabbitmq_publish_test.go index 5a194775..0b3acaf0 100644 --- a/internal/destregistry/providers/destrabbitmq/destrabbitmq_publish_test.go +++ b/internal/destregistry/providers/destrabbitmq/destrabbitmq_publish_test.go @@ -148,7 +148,7 @@ func (a *RabbitMQAsserter) AssertMessage(t testsuite.TestingT, msg testsuite.Mes // Verify system metadata metadata := msg.Metadata assert.NotEmpty(t, metadata["timestamp"], "timestamp should be present") - testsuite.AssertTimestampIsUnixSeconds(t, metadata["timestamp"]) + testsuite.AssertTimestampIsISO8601(t, metadata["timestamp"]) assert.Equal(t, event.ID, metadata["event-id"], "event-id should match") assert.Equal(t, event.Topic, metadata["topic"], "topic should match") diff --git a/internal/destregistry/providers/destwebhook/destwebhook_publish_test.go b/internal/destregistry/providers/destwebhook/destwebhook_publish_test.go index 4b0d9aa4..e63981c4 100644 --- a/internal/destregistry/providers/destwebhook/destwebhook_publish_test.go +++ b/internal/destregistry/providers/destwebhook/destwebhook_publish_test.go @@ -95,8 +95,8 @@ func (a *WebhookAsserter) AssertMessage(t testsuite.TestingT, msg testsuite.Mess timestampHeader := req.Header.Get(a.headerPrefix + "timestamp") assert.NotEmpty(t, timestampHeader, "timestamp header should be present") - // Verify timestamp is in Unix seconds (not milliseconds) - testsuite.AssertTimestampIsUnixSeconds(t, timestampHeader) + // Verify timestamp is in ISO 8601 format + testsuite.AssertTimestampIsISO8601(t, timestampHeader) assert.Equal(t, event.ID, req.Header.Get(a.headerPrefix+"event-id"), "event-id header should match") assert.Equal(t, event.Topic, req.Header.Get(a.headerPrefix+"topic"), "topic header should match") @@ -111,13 +111,6 @@ func (a *WebhookAsserter) AssertMessage(t testsuite.TestingT, msg testsuite.Mess signatureHeader := req.Header.Get(a.headerPrefix + "signature") assertSignatureFormat(t, signatureHeader, a.expectedSignatures) - // Verify timestamp in signature header matches the timestamp header - signatureParts := strings.SplitN(signatureHeader, ",", 2) - if len(signatureParts) >= 2 { - signatureTimestampStr := strings.TrimPrefix(signatureParts[0], "t=") - assert.Equal(t, timestampHeader, signatureTimestampStr, "timestamp in signature header should match timestamp header") - } - // Verify each expected signature for _, secret := range a.secrets { assertValidSignature(t, secret, msg.Data, signatureHeader) diff --git a/internal/destregistry/providers/destwebhookstandard/destwebhookstandard_publish_test.go b/internal/destregistry/providers/destwebhookstandard/destwebhookstandard_publish_test.go index 933b8160..4038b3d6 100644 --- a/internal/destregistry/providers/destwebhookstandard/destwebhookstandard_publish_test.go +++ b/internal/destregistry/providers/destwebhookstandard/destwebhookstandard_publish_test.go @@ -6,6 +6,7 @@ import ( "encoding/json" "io" "net/http" + "strconv" "net/http/httptest" "strings" "sync" @@ -119,7 +120,9 @@ func (a *StandardWebhookAsserter) AssertMessage(t testsuite.TestingT, msg testsu webhookTimestamp := req.Header.Get(prefix + "timestamp") assert.NotEmpty(t, webhookTimestamp, prefix+"timestamp should be present") - testsuite.AssertTimestampIsUnixSeconds(t, webhookTimestamp) + // Standard Webhooks spec requires Unix seconds for the webhook-timestamp header + _, err := strconv.ParseInt(webhookTimestamp, 10, 64) + assert.NoError(t, err, "webhook-timestamp should be a valid Unix timestamp integer") webhookSignature := req.Header.Get(prefix + "signature") assert.NotEmpty(t, webhookSignature, prefix+"signature should be present") diff --git a/internal/destregistry/testing/publisher_suite.go b/internal/destregistry/testing/publisher_suite.go index 600bc15a..92256807 100644 --- a/internal/destregistry/testing/publisher_suite.go +++ b/internal/destregistry/testing/publisher_suite.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "errors" - "strconv" "sync" "sync/atomic" "time" @@ -37,37 +36,13 @@ type TestingT interface { Helper() } -// AssertTimestampIsUnixSeconds verifies that a timestamp string is in Unix seconds format (not milliseconds). -// It checks if the timestamp is within a reasonable range for Unix seconds (between year 2000 and 2100). -func AssertTimestampIsUnixSeconds(t TestingT, timestampStr string, msgAndArgs ...interface{}) { +// AssertTimestampIsISO8601 verifies that a timestamp string is a valid ISO 8601 / RFC3339 timestamp. +func AssertTimestampIsISO8601(t TestingT, timestampStr string, msgAndArgs ...interface{}) { t.Helper() - timestampInt, err := strconv.ParseInt(timestampStr, 10, 64) - assert.NoError(t, err, "timestamp should be a valid integer") - - // Check if timestamp is in a reasonable range for Unix seconds - // Year 2000: ~946,684,800 - // Year 2100: ~4,102,444,800 - // Current time in seconds: ~1,700,000,000 (2023-2024) - // Current time in millis: ~1,700,000,000,000 - - minUnixSeconds := int64(946684800) // Jan 1, 2000 - maxUnixSeconds := int64(4102444800) // Jan 1, 2100 - - if timestampInt < minUnixSeconds || timestampInt > maxUnixSeconds { - // Likely milliseconds - check if dividing by 1000 gives a reasonable timestamp - possibleSeconds := timestampInt / 1000 - if possibleSeconds >= minUnixSeconds && possibleSeconds <= maxUnixSeconds { - assert.Fail(t, "timestamp appears to be in milliseconds, expected Unix seconds", - "timestamp %d is likely in milliseconds (would be %s if converted to seconds), expected Unix seconds (around %s)", - timestampInt, - time.Unix(possibleSeconds, 0).Format(time.RFC3339), - time.Now().Format(time.RFC3339)) - } else { - assert.Fail(t, "timestamp is out of reasonable range", - "timestamp %d is not within reasonable Unix seconds range (year 2000-2100)", timestampInt) - } - } + parsed, err := time.Parse(time.RFC3339Nano, timestampStr) + assert.NoError(t, err, "timestamp should be a valid ISO 8601 / RFC3339 string, got: %s", timestampStr) + assert.Equal(t, time.UTC, parsed.Location(), "timestamp should be in UTC, got: %s", timestampStr) } // MessageConsumer is the interface that providers must implement