diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d7e8569..f722119e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,6 +61,8 @@ - Rejected server-initiated JSON-RPC requests received on 2026 stateless Streamable HTTP client response streams; servers must use MRTR `input_required` results instead. +- Enforced `subscriptions/listen` stream ordering and filters for 2026 + subscription notifications. ## 2.2.0 diff --git a/lib/src/shared/protocol.dart b/lib/src/shared/protocol.dart index e7fa07f2..80fadda2 100644 --- a/lib/src/shared/protocol.dart +++ b/lib/src/shared/protocol.dart @@ -12,6 +12,12 @@ final _logger = Logger("mcp_dart.shared.protocol"); bool _isProgressToken(Object? token) => token is int || token is String; final _lastProgressByExtra = Expando(); +final _subscriptionStateByExtra = Expando<_SubscriptionStreamState>(); + +class _SubscriptionStreamState { + bool acknowledgmentSent = false; + SubscriptionFilter acknowledgedNotifications = const SubscriptionFilter(); +} /// Callback for progress notifications. typedef ProgressCallback = void Function(Progress progress); @@ -152,6 +158,16 @@ class RequestHandlerExtra { this.closeStandaloneSSEStream, }); + _SubscriptionStreamState get _activeSubscriptionState => + (_subscriptionStateByExtra[this] ??= _SubscriptionStreamState()); + + void _validateSubscriptionNotification(JsonRpcNotification notification) { + _recordOrValidateSubscriptionNotification( + _activeSubscriptionState, + notification, + ); + } + /// Sends a progress notification for the current request. /// /// This method automatically retrieves the `progressToken` from the request metadata. @@ -215,19 +231,97 @@ class RequestHandlerExtra { Future sendSubscriptionNotification( JsonRpcNotification notification, ) { - final meta = { - ...?notification.meta, - McpMetaKey.subscriptionId: requestId, - }; + final subscriptionNotification = + _withSubscriptionId(notification, requestId); - return sendNotification( - JsonRpcNotification( - method: notification.method, - params: notification.params, - meta: meta, - ), + _validateSubscriptionNotification(subscriptionNotification); + + return sendNotification(subscriptionNotification); + } +} + +JsonRpcNotification _withSubscriptionId( + JsonRpcNotification notification, + RequestId requestId, +) { + final meta = { + ...?notification.meta, + McpMetaKey.subscriptionId: requestId, + }; + return JsonRpcNotification( + method: notification.method, + params: notification.params, + meta: meta, + ); +} + +void _recordOrValidateSubscriptionNotification( + _SubscriptionStreamState state, + JsonRpcNotification notification, +) { + if (notification.method == Method.notificationsSubscriptionsAcknowledged) { + state + ..acknowledgmentSent = true + ..acknowledgedNotifications = + _acknowledgedSubscriptionFilter(notification); + return; + } + + if (!state.acknowledgmentSent) { + throw McpError( + ErrorCode.invalidRequest.value, + 'subscriptions/listen streams must send ' + '${Method.notificationsSubscriptionsAcknowledged} before ' + '${notification.method}.', + ); + } + + if (!_subscriptionFilterAllowsNotification( + state.acknowledgedNotifications, + notification, + )) { + throw McpError( + ErrorCode.invalidRequest.value, + '${notification.method} was not requested or acknowledged for this ' + 'subscriptions/listen stream.', + ); + } +} + +SubscriptionFilter _acknowledgedSubscriptionFilter( + JsonRpcNotification notification, +) { + final params = notification.params; + if (params == null) { + throw const FormatException( + 'subscriptions acknowledged notification params are required', ); } + + return SubscriptionsAcknowledgedNotification.fromJson(params).notifications; +} + +bool _subscriptionFilterAllowsNotification( + SubscriptionFilter filter, + JsonRpcNotification notification, +) { + switch (notification.method) { + case Method.notificationsToolsListChanged: + return filter.toolsListChanged == true; + case Method.notificationsPromptsListChanged: + return filter.promptsListChanged == true; + case Method.notificationsResourcesListChanged: + return filter.resourcesListChanged == true; + case Method.notificationsResourcesUpdated: + final uri = notification.params?['uri']; + return uri is String && + (filter.resourceSubscriptions?.contains(uri) ?? false); + case Method.notificationsTasks: + final taskId = notification.params?['taskId']; + return taskId is String && (filter.taskIds?.contains(taskId) ?? false); + default: + return false; + } } /// Internal class holding timeout state for a request. @@ -1137,6 +1231,9 @@ abstract class Protocol { final abortController = BasicAbortController(); _requestHandlerAbortControllers[request.id] = abortController; + final subscriptionState = request is JsonRpcSubscriptionsListenRequest + ? _SubscriptionStreamState() + : null; final extra = RequestHandlerExtra( signal: abortController.signal, @@ -1154,12 +1251,22 @@ abstract class Protocol { : null, taskRequestedTtl: (request.params?['task'] as Map?)?['ttl'] as int?, - sendNotification: (notification, {relatedTask}) => - _notificationWithRequestId( - notification, - relatedTask: relatedTask, - relatedRequestId: request.id, - ), + sendNotification: (notification, {relatedTask}) { + var outgoingNotification = notification; + if (subscriptionState != null) { + outgoingNotification = _withSubscriptionId(notification, request.id); + _recordOrValidateSubscriptionNotification( + subscriptionState, + outgoingNotification, + ); + } + + return _notificationWithRequestId( + outgoingNotification, + relatedTask: relatedTask, + relatedRequestId: request.id, + ); + }, sendRequest: ( JsonRpcRequest req, T Function(Map) resultFactory, @@ -1185,6 +1292,9 @@ abstract class Protocol { ); }, ); + if (subscriptionState != null) { + _subscriptionStateByExtra[extra] = subscriptionState; + } // If task creation is requested, check capability if (extra.taskRequestedTtl != null || diff --git a/test/mcp_2026_07_28_test.dart b/test/mcp_2026_07_28_test.dart index 72157c06..f906f5d3 100644 --- a/test/mcp_2026_07_28_test.dart +++ b/test/mcp_2026_07_28_test.dart @@ -610,6 +610,117 @@ void main() { expect(transport.sentMessages.last, isA()); }); + test('server rejects subscription notifications before acknowledgment', + () async { + final server = Server( + const Implementation(name: 'server', version: '1.0.0'), + options: const McpServerOptions( + capabilities: ServerCapabilities( + tools: ServerCapabilitiesTools(listChanged: true), + ), + ), + ); + server.setRequestHandler( + Method.subscriptionsListen, + (request, extra) async { + await extra.sendNotification( + const JsonRpcToolListChangedNotification(), + ); + return const EmptyResult(); + }, + (id, params, meta) => JsonRpcSubscriptionsListenRequest( + id: id, + listenParams: SubscriptionsListenRequest.fromJson(params!), + meta: meta, + ), + ); + final transport = RecordingTransport(); + await server.connect(transport); + + transport.receive( + JsonRpcSubscriptionsListenRequest( + id: 'sub-1', + listenParams: const SubscriptionsListenRequest( + notifications: SubscriptionFilter(toolsListChanged: true), + ), + meta: _clientMeta(), + ), + ); + await _pump(); + + final response = transport.sentMessages.single as JsonRpcError; + expect(response.error.code, ErrorCode.invalidRequest.value); + expect( + response.error.message, + contains(Method.notificationsSubscriptionsAcknowledged), + ); + }); + + test('server tags direct subscription notifications with subscription id', + () async { + final server = Server( + const Implementation(name: 'server', version: '1.0.0'), + options: const McpServerOptions( + capabilities: ServerCapabilities( + tools: ServerCapabilitiesTools(listChanged: true), + ), + ), + ); + server.setRequestHandler( + Method.subscriptionsListen, + (request, extra) async { + await extra.sendSubscriptionAcknowledged( + request.listenParams.notifications.acknowledgedBy( + const ServerCapabilities( + tools: ServerCapabilitiesTools(listChanged: true), + ), + ), + ); + await extra.sendNotification( + const JsonRpcToolListChangedNotification(), + ); + return const EmptyResult(); + }, + (id, params, meta) => JsonRpcSubscriptionsListenRequest( + id: id, + listenParams: SubscriptionsListenRequest.fromJson(params!), + meta: meta, + ), + ); + final transport = RecordingTransport(); + await server.connect(transport); + + transport.receive( + JsonRpcSubscriptionsListenRequest( + id: 'sub-1', + listenParams: const SubscriptionsListenRequest( + notifications: SubscriptionFilter(toolsListChanged: true), + ), + meta: _clientMeta(), + ), + ); + await _pump(); + + expect(transport.sentMessages, hasLength(3)); + expect( + transport.sentMessages.take(2).map((message) => message.toJson()), + everyElement( + containsPair( + 'params', + containsPair( + '_meta', + containsPair(McpMetaKey.subscriptionId, 'sub-1'), + ), + ), + ), + ); + expect( + (transport.sentMessages[1] as JsonRpcNotification).method, + Method.notificationsToolsListChanged, + ); + expect(transport.sentMessages.last, isA()); + }); + test('stateless server responses add complete result and cache defaults', () async { final server = Server( diff --git a/test/types/subscriptions_test.dart b/test/types/subscriptions_test.dart index 6a10ed35..49842569 100644 --- a/test/types/subscriptions_test.dart +++ b/test/types/subscriptions_test.dart @@ -1,6 +1,27 @@ +import 'package:mcp_dart/src/shared/protocol.dart'; import 'package:mcp_dart/src/types.dart'; import 'package:test/test.dart'; +Future _unusedRequest( + JsonRpcRequest request, + T Function(Map resultJson) resultFactory, + RequestOptions options, +) { + throw StateError('Unexpected request from subscription helper test'); +} + +RequestHandlerExtra _subscriptionExtra(List sent) { + final abort = BasicAbortController(); + return RequestHandlerExtra( + signal: abort.signal, + requestId: 'sub-1', + sendNotification: (notification, {relatedTask}) async { + sent.add(notification); + }, + sendRequest: _unusedRequest, + ); +} + void main() { group('SubscriptionFilter', () { test('serializes and parses requested notification filters', () { @@ -175,4 +196,115 @@ void main() { ); }); }); + + group('RequestHandlerExtra subscription helpers', () { + test('require acknowledgment before stream notifications', () async { + final sent = []; + final extra = _subscriptionExtra(sent); + + expect( + () => extra.sendSubscriptionNotification( + const JsonRpcToolListChangedNotification(), + ), + throwsA( + isA().having( + (error) => error.message, + 'message', + contains(Method.notificationsSubscriptionsAcknowledged), + ), + ), + ); + expect(sent, isEmpty); + }); + + test('allow only acknowledged notification filters', () async { + final sent = []; + final extra = _subscriptionExtra(sent); + + await extra.sendSubscriptionAcknowledged( + const SubscriptionFilter( + toolsListChanged: true, + resourcesListChanged: true, + resourceSubscriptions: ['file:///project/config.json'], + taskIds: ['task-1'], + ), + ); + expect(sent.single.method, Method.notificationsSubscriptionsAcknowledged); + sent.clear(); + + await extra.sendSubscriptionNotification( + const JsonRpcToolListChangedNotification(), + ); + await extra.sendSubscriptionNotification( + JsonRpcResourceUpdatedNotification( + updatedParams: const ResourceUpdatedNotification( + uri: 'file:///project/config.json', + ), + ), + ); + await extra.sendSubscriptionNotification( + const JsonRpcResourceListChangedNotification(), + ); + await extra.sendSubscriptionNotification( + JsonRpcTaskNotification( + task: const TaskExtensionTask( + taskId: 'task-1', + status: TaskStatus.working, + createdAt: '2026-07-28T00:00:00Z', + lastUpdatedAt: '2026-07-28T00:01:00Z', + ttlMs: 300000, + ), + ), + ); + + expect(sent, hasLength(4)); + expect( + sent.map( + (notification) => notification.meta?[McpMetaKey.subscriptionId], + ), + everyElement('sub-1'), + ); + + expect( + () => extra.sendSubscriptionNotification( + const JsonRpcPromptListChangedNotification(), + ), + throwsA( + isA().having( + (error) => error.message, + 'message', + contains('not requested or acknowledged'), + ), + ), + ); + expect( + () => extra.sendSubscriptionNotification( + JsonRpcResourceUpdatedNotification( + updatedParams: const ResourceUpdatedNotification( + uri: 'file:///project/other.json', + ), + ), + ), + throwsA( + isA().having( + (error) => error.message, + 'message', + contains('not requested or acknowledged'), + ), + ), + ); + expect( + () => extra.sendSubscriptionNotification( + const JsonRpcNotification(method: 'notifications/custom'), + ), + throwsA( + isA().having( + (error) => error.message, + 'message', + contains('not requested or acknowledged'), + ), + ), + ); + }); + }); }