diff --git a/internal/server/event/event_test.go b/internal/server/event/event_test.go index 4e53b6e2a3c..f117d9595c3 100644 --- a/internal/server/event/event_test.go +++ b/internal/server/event/event_test.go @@ -155,143 +155,69 @@ func TestEventServer_GetJobSetEvents_QueueDoNotExist(t *testing.T) { } func TestEventServer_GetJobSetEvents_ErrorIfMissing(t *testing.T) { - ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 5*time.Second) - defer cancel() q := queue.Queue{ Name: "test-queue", PriorityFactor: 1, } - t.Run("job set non existent ErrorIfMissing true", func(t *testing.T) { - withEventServer( - ctx, - t, - func(s *EventServer) { - err := s.queueRepository.(armadaqueue.QueueRepository).CreateQueue(ctx, q) - assert.NoError(t, err) - stream := &eventStreamMock{} - - err = s.GetJobSetEvents(&api.JobSetRequest{ - Id: "job-set-1", - Watch: false, - Queue: "test-queue", - ErrorIfMissing: true, - }, stream) - e, ok := status.FromError(err) - assert.True(t, ok) - assert.Equal(t, codes.NotFound, e.Code()) - }, - ) - }) - - t.Run("job set non existent ErrorIfMissing false", func(t *testing.T) { - withEventServer( - ctx, - t, - func(s *EventServer) { - err := s.queueRepository.(armadaqueue.QueueRepository).CreateQueue(ctx, q) - assert.NoError(t, err) - stream := &eventStreamMock{} - err = s.GetJobSetEvents(&api.JobSetRequest{ - Id: "job-set-1", - Watch: false, - Queue: "test-queue", - ErrorIfMissing: false, - }, stream) - assert.NoError(t, err) - }, - ) - }) + tests := map[string]struct { + publishEvent bool + errorIfMissing bool + expectErrorCode codes.Code // codes.OK means no error expected + expectMessages int + }{ + "job set non existent, ErrorIfMissing true": { + errorIfMissing: true, expectErrorCode: codes.NotFound, + }, + "job set non existent, ErrorIfMissing false": { + errorIfMissing: false, expectErrorCode: codes.OK, + }, + "job set exists, ErrorIfMissing true": { + publishEvent: true, errorIfMissing: true, expectErrorCode: codes.OK, expectMessages: 1, + }, + "job set exists, ErrorIfMissing false": { + publishEvent: true, errorIfMissing: false, expectErrorCode: codes.OK, expectMessages: 1, + }, + } - t.Run("job set exists ErrorIfMissing true", func(t *testing.T) { - withEventServer( - ctx, - t, - func(s *EventServer) { + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 5*time.Second) + defer cancel() + withEventServer(ctx, t, func(s *EventServer) { err := s.queueRepository.(armadaqueue.QueueRepository).CreateQueue(ctx, q) - assert.NoError(t, err) - stream := &eventStreamMock{} + require.NoError(t, err) - jobId := "01f3j0g1md4qx7z5qb148qnh4r" - runId := "123e4567-e89b-12d3-a456-426614174000" - baseTime, _ := time.Parse("2006-01-02T15:04:05.000Z", "2022-03-01T15:04:05.000Z") - baseTimeProto := protoutil.ToTimestamp(baseTime) - - assigned := &armadaevents.EventSequence_Event{ - Created: baseTimeProto, - Event: &armadaevents.EventSequence_Event_JobRunAssigned{ - JobRunAssigned: &armadaevents.JobRunAssigned{ - RunId: runId, - JobId: jobId, - }, - }, + if tc.publishEvent { + require.NoError(t, reportPulsarEvent(ctx, &armadaevents.EventSequence{ + Queue: "test-queue", + JobSetName: "job-set-1", + Events: []*armadaevents.EventSequence_Event{jobRunAssignedEvent()}, + })) } - err = reportPulsarEvent(ctx, &armadaevents.EventSequence{ - Queue: "test-queue", - JobSetName: "job-set-1", - Events: []*armadaevents.EventSequence_Event{assigned}, - }) - require.NoError(t, err) - + stream := &eventStreamMock{} err = s.GetJobSetEvents(&api.JobSetRequest{ Id: "job-set-1", Watch: false, Queue: "test-queue", - ErrorIfMissing: true, + ErrorIfMissing: tc.errorIfMissing, }, stream) - require.NoError(t, err) - assert.Equal(t, 1, len(stream.sendMessages)) - }, - ) - }) - t.Run("job set exists ErrorIfMissing false", func(t *testing.T) { - withEventServer( - ctx, - t, - func(s *EventServer) { - err := s.queueRepository.(armadaqueue.QueueRepository).CreateQueue(ctx, q) - require.NoError(t, err) - stream := &eventStreamMock{} - - jobId := "01f3j0g1md4qx7z5qb148qnh4r" - runId := "123e4567-e89b-12d3-a456-426614174000" - baseTime, _ := time.Parse("2006-01-02T15:04:05.000Z", "2022-03-01T15:04:05.000Z") - baseTimeProto := protoutil.ToTimestamp(baseTime) - assigned := &armadaevents.EventSequence_Event{ - Created: baseTimeProto, - Event: &armadaevents.EventSequence_Event_JobRunAssigned{ - JobRunAssigned: &armadaevents.JobRunAssigned{ - RunId: runId, - JobId: jobId, - }, - }, + if tc.expectErrorCode == codes.OK { + require.NoError(t, err) + } else { + e, ok := status.FromError(err) + assert.True(t, ok) + assert.Equal(t, tc.expectErrorCode, e.Code()) } - - err = reportPulsarEvent(ctx, &armadaevents.EventSequence{ - Queue: "test-queue", - JobSetName: "job-set-1", - Events: []*armadaevents.EventSequence_Event{assigned}, - }) - require.NoError(t, err) - - err = s.GetJobSetEvents(&api.JobSetRequest{ - Id: "job-set-1", - Watch: false, - Queue: "test-queue", - ErrorIfMissing: false, - }, stream) - require.NoError(t, err) - assert.Equal(t, 1, len(stream.sendMessages)) - }, - ) - }) + assert.Equal(t, tc.expectMessages, len(stream.sendMessages)) + }) + }) + } } func TestEventServer_GetJobSetEvents_Permissions(t *testing.T) { - ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 5*time.Second) - defer cancel() emptyPerms := make(map[permission.Permission][]string) perms := map[permission.Permission][]string{ permissions.WatchAllEvents: {"watch-all-events-group"}, @@ -310,43 +236,27 @@ func TestEventServer_GetJobSetEvents_Permissions(t *testing.T) { PriorityFactor: 1, } - t.Run("no permissions", func(t *testing.T) { - withEventServer( - ctx, - t, - func(s *EventServer) { - s.authorizer = auth.NewAuthorizer(auth.NewPrincipalPermissionChecker(perms, emptyPerms, emptyPerms)) - err := s.queueRepository.(armadaqueue.QueueRepository).CreateQueue(ctx, q) - assert.NoError(t, err) - - principal := auth.NewStaticPrincipal("alice", "test", []string{}) - ctx := auth.WithPrincipal(armadacontext.Background(), principal) - stream := &eventStreamMock{ctx: ctx} - - err = s.GetJobSetEvents(&api.JobSetRequest{ - Id: "job-set-1", - Watch: false, - Queue: "test-queue", - }, stream) - e, ok := status.FromError(err) - assert.True(t, ok) - assert.Equal(t, codes.PermissionDenied, e.Code()) - }, - ) - }) + tests := map[string]struct { + userGroups []string + expectCode codes.Code + }{ + "no permissions": {userGroups: []string{}, expectCode: codes.PermissionDenied}, + "global permissions": {userGroups: []string{"watch-all-events-group"}, expectCode: codes.OK}, + "queue permission": {userGroups: []string{"watch-events-group", "watch-queue-group"}, expectCode: codes.OK}, + } - t.Run("global permissions", func(t *testing.T) { - withEventServer( - ctx, - t, - func(s *EventServer) { + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 5*time.Second) + defer cancel() + withEventServer(ctx, t, func(s *EventServer) { s.authorizer = auth.NewAuthorizer(auth.NewPrincipalPermissionChecker(perms, emptyPerms, emptyPerms)) err := s.queueRepository.(armadaqueue.QueueRepository).CreateQueue(ctx, q) - assert.NoError(t, err) + require.NoError(t, err) - principal := auth.NewStaticPrincipal("alice", "test", []string{"watch-all-events-group"}) - ctx := auth.WithPrincipal(armadacontext.Background(), principal) - stream := &eventStreamMock{ctx: ctx} + principal := auth.NewStaticPrincipal("alice", "test", tc.userGroups) + principalCtx := auth.WithPrincipal(armadacontext.Background(), principal) + stream := &eventStreamMock{ctx: principalCtx} err = s.GetJobSetEvents(&api.JobSetRequest{ Id: "job-set-1", @@ -355,31 +265,23 @@ func TestEventServer_GetJobSetEvents_Permissions(t *testing.T) { }, stream) e, ok := status.FromError(err) assert.True(t, ok) - assert.Equal(t, codes.OK, e.Code()) - }, - ) - }) - - t.Run("queue permission", func(t *testing.T) { - withEventServer(ctx, t, func(s *EventServer) { - s.authorizer = auth.NewAuthorizer(auth.NewPrincipalPermissionChecker(perms, emptyPerms, emptyPerms)) - err := s.queueRepository.(armadaqueue.QueueRepository).CreateQueue(ctx, q) - assert.NoError(t, err) - - principal := auth.NewStaticPrincipal("alice", "test", []string{"watch-events-group", "watch-queue-group"}) - ctx := auth.WithPrincipal(armadacontext.Background(), principal) - stream := &eventStreamMock{ctx: ctx} - - err = s.GetJobSetEvents(&api.JobSetRequest{ - Id: "job-set-1", - Watch: false, - Queue: "test-queue", - }, stream) - e, ok := status.FromError(err) - assert.True(t, ok) - assert.Equal(t, codes.OK, e.Code()) + assert.Equal(t, tc.expectCode, e.Code()) + }) }) - }) + } +} + +func jobRunAssignedEvent() *armadaevents.EventSequence_Event { + baseTime, _ := time.Parse("2006-01-02T15:04:05.000Z", "2022-03-01T15:04:05.000Z") + return &armadaevents.EventSequence_Event{ + Created: protoutil.ToTimestamp(baseTime), + Event: &armadaevents.EventSequence_Event_JobRunAssigned{ + JobRunAssigned: &armadaevents.JobRunAssigned{ + RunId: "123e4567-e89b-12d3-a456-426614174000", + JobId: "01f3j0g1md4qx7z5qb148qnh4r", + }, + }, + } } func reportPulsarEvent(ctx *armadacontext.Context, es *armadaevents.EventSequence) error {