From e8677757754bc7db9c81f67e9fe3b955b0e59196 Mon Sep 17 00:00:00 2001 From: Jhin Lee Date: Sun, 31 May 2026 19:10:25 -0400 Subject: [PATCH] Add client subscription listen support --- CHANGELOG.md | 3 + lib/src/client/client.dart | 265 ++++++++++++++++++ lib/src/shared/protocol.dart | 106 +++---- lib/src/types/completion.dart | 4 +- lib/src/types/prompts.dart | 4 +- lib/src/types/resources.dart | 4 +- lib/src/types/subscriptions.dart | 55 ++++ lib/src/types/tools.dart | 4 +- test/mcp_2026_07_28_test.dart | 428 +++++++++++++++++++++++++++++ test/types/subscriptions_test.dart | 90 ++++++ 10 files changed, 909 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 32e1b96b..8dee8ca8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -66,6 +66,9 @@ - Retried `server/discover` with an advertised compatible stateless protocol version after `UnsupportedProtocolVersionError` instead of falling back to legacy initialization. +- Added client-side `subscriptions/listen` handles that correlate stream + notifications by `io.modelcontextprotocol/subscriptionId`, validate the + acknowledgment, and cancel long-lived streams with `notifications/cancelled`. ## 2.2.0 diff --git a/lib/src/client/client.dart b/lib/src/client/client.dart index 461c1f87..1932338c 100644 --- a/lib/src/client/client.dart +++ b/lib/src/client/client.dart @@ -40,6 +40,36 @@ class McpClientOptions extends ProtocolOptions { @Deprecated('Use McpClientOptions instead') typedef ClientOptions = McpClientOptions; +/// Handle for an active `subscriptions/listen` stream opened by [McpClient]. +class McpSubscription { + final void Function([Object? reason]) _cancel; + + /// JSON-RPC request ID that identifies this subscription stream. + final int id; + + /// Acknowledgment sent as the first message on the subscription stream. + final Future acknowledged; + + /// Notifications delivered on this subscription stream after acknowledgment. + final Stream notifications; + + /// Completes when the `subscriptions/listen` request ends. + final Future done; + + McpSubscription._({ + required this.id, + required this.acknowledged, + required this.notifications, + required this.done, + required void Function([Object? reason]) cancel, + }) : _cancel = cancel; + + /// Cancels this subscription stream. + void cancel([Object? reason]) { + _cancel(reason); + } +} + // Recursively applies default values from a JSON Schema to a data object. void _applyElicitationDefaults(JsonSchema schema, Map data) { if (schema is! JsonObject) return; @@ -99,6 +129,7 @@ class McpClient extends Protocol { final Map _cachedToolOutputSchemas = {}; final Set _cachedRequiredTaskTools = {}; final ToolParameterHeaderMappings _cachedToolParameterHeaders = {}; + final Map _activeSubscriptions = {}; /// Callback for handling elicitation requests from the server. /// @@ -559,6 +590,31 @@ class McpClient extends Protocol { } } + @override + void onIncomingNotificationAccepted(JsonRpcNotification notification) { + final subscriptionId = notification.meta?[McpMetaKey.subscriptionId]; + if (subscriptionId is! int && subscriptionId is! String) { + return; + } + + final activeSubscription = _activeSubscriptions[subscriptionId]; + activeSubscription?.handleNotification(notification); + } + + @override + void onConnectionClosed() { + final subscriptions = List<_ClientSubscriptionState>.from( + _activeSubscriptions.values, + ); + _activeSubscriptions.clear(); + for (final subscription in subscriptions) { + subscription.fail( + McpError(ErrorCode.connectionClosed.value, 'Connection closed'), + StackTrace.current, + ); + } + } + @override void assertCapabilityForMethod(String method) { final serverCaps = _serverCapabilities; @@ -837,6 +893,47 @@ class McpClient extends Protocol { return request(req, (json) => const EmptyResult(), options); } + /// Opens a `subscriptions/listen` stream and demultiplexes notifications. + McpSubscription listenSubscriptions(SubscriptionsListenRequest params) { + if (transport == null) { + throw StateError('Not connected to a transport.'); + } + + final requestId = reserveRequestId(); + final abortController = BasicAbortController(); + final state = _ClientSubscriptionState( + id: requestId, + requestedNotifications: params.notifications, + abortController: abortController, + onClose: () => _activeSubscriptions.remove(requestId), + ); + _activeSubscriptions[requestId] = state; + + final requestData = JsonRpcSubscriptionsListenRequest( + id: requestId, + listenParams: params, + meta: _usesStatelessProtocol ? _statelessRequestMeta(null) : null, + ); + final requestDone = super.requestWithReservedId( + requestId, + requestData, + (json) => const EmptyResult(), + RequestOptions( + signal: abortController.signal, + timeoutEnabled: false, + ), + ); + state.trackRequest(requestDone); + + return McpSubscription._( + id: requestId, + acknowledged: state.acknowledged, + notifications: state.notifications, + done: state.done, + cancel: state.cancel, + ); + } + /// Sends a `tools/call` request to invoke a tool on the server. Future callTool( CallToolRequest params, { @@ -1067,6 +1164,174 @@ class McpClient extends Protocol { @Deprecated('Use McpClient instead') typedef Client = McpClient; +class _ClientSubscriptionState { + final int id; + final SubscriptionFilter requestedNotifications; + final BasicAbortController abortController; + final void Function() onClose; + final StreamController _notifications = + StreamController.broadcast(); + final Completer _acknowledged = + Completer(); + final Completer _done = Completer(); + + SubscriptionFilter? _acknowledgedNotifications; + bool _closed = false; + bool _localCancellation = false; + + _ClientSubscriptionState({ + required this.id, + required this.requestedNotifications, + required this.abortController, + required this.onClose, + }); + + Future get acknowledged => + _acknowledged.future; + + Stream get notifications => _notifications.stream; + + Future get done => _done.future; + + void handleNotification(JsonRpcNotification notification) { + if (_closed) { + return; + } + + if (_acknowledgedNotifications == null) { + if (notification.method != + Method.notificationsSubscriptionsAcknowledged) { + fail( + McpError( + ErrorCode.invalidRequest.value, + 'Subscription $id received ${notification.method} before ' + '${Method.notificationsSubscriptionsAcknowledged}.', + ), + StackTrace.current, + ); + return; + } + + final acknowledgedParams = + (notification as JsonRpcSubscriptionsAcknowledgedNotification) + .acknowledgedParams; + final acknowledgedNotifications = acknowledgedParams.notifications; + if (!acknowledgedNotifications.isSubsetOf(requestedNotifications)) { + fail( + McpError( + ErrorCode.invalidRequest.value, + 'Subscription $id acknowledged notifications that were not ' + 'requested.', + ), + StackTrace.current, + ); + return; + } + + _acknowledgedNotifications = acknowledgedNotifications; + if (!_acknowledged.isCompleted) { + _acknowledged.complete(acknowledgedParams); + } + return; + } + + final acknowledgedNotifications = _acknowledgedNotifications!; + if (!acknowledgedNotifications.allowsNotification(notification)) { + fail( + McpError( + ErrorCode.invalidRequest.value, + '${notification.method} was not requested or acknowledged for ' + 'subscription $id.', + ), + StackTrace.current, + ); + return; + } + + _notifications.add(notification); + } + + void trackRequest(Future requestDone) { + requestDone.then( + complete, + onError: (Object error, StackTrace stackTrace) { + if (_localCancellation) { + complete(const EmptyResult()); + } else { + fail(error, stackTrace, abort: false); + } + }, + ); + } + + void cancel([Object? reason]) { + if (_closed) { + return; + } + + _localCancellation = true; + if (!_acknowledged.isCompleted) { + _acknowledged.completeError(AbortError(reason), StackTrace.current); + } + abortController.abort(reason); + complete(const EmptyResult()); + } + + void complete(EmptyResult result) { + if (_closed) { + return; + } + + final missingAcknowledgment = _acknowledgedNotifications == null && + !_localCancellation && + !abortController.signal.aborted; + if (missingAcknowledgment) { + fail( + McpError( + ErrorCode.invalidRequest.value, + 'Subscription $id completed before ' + '${Method.notificationsSubscriptionsAcknowledged}.', + ), + StackTrace.current, + abort: false, + ); + return; + } + + _closed = true; + onClose(); + if (!_done.isCompleted) { + _done.complete(result); + } + _notifications.close(); + } + + void fail( + Object error, + StackTrace stackTrace, { + bool abort = true, + }) { + if (_closed) { + return; + } + + _closed = true; + onClose(); + if (abort && !abortController.signal.aborted) { + abortController.abort(error); + } + if (!_acknowledged.isCompleted) { + _acknowledged.completeError(error, stackTrace); + } + if (!_done.isCompleted) { + _done.completeError(error, stackTrace); + } + _notifications + ..addError(error, stackTrace) + ..close(); + } +} + class _ToolParameterHeaderValidation { final Map mappings; final String? rejectionReason; diff --git a/lib/src/shared/protocol.dart b/lib/src/shared/protocol.dart index 80fadda2..7948820e 100644 --- a/lib/src/shared/protocol.dart +++ b/lib/src/shared/protocol.dart @@ -78,6 +78,12 @@ class RequestOptions { /// Maximum total time to wait for a response. final Duration? maxTotalTimeout; + /// Whether this request should use protocol-level timeout handling. + /// + /// Long-lived requests such as `subscriptions/listen` can disable this and + /// rely on explicit cancellation or transport closure instead. + final bool timeoutEnabled; + /// Augments the request with task creation parameters. final TaskCreation? task; @@ -91,6 +97,7 @@ class RequestOptions { this.timeout, this.resetTimeoutOnProgress = false, this.maxTotalTimeout, + this.timeoutEnabled = true, this.task, this.relatedTask, }); @@ -276,10 +283,7 @@ void _recordOrValidateSubscriptionNotification( ); } - if (!_subscriptionFilterAllowsNotification( - state.acknowledgedNotifications, - notification, - )) { + if (!state.acknowledgedNotifications.allowsNotification(notification)) { throw McpError( ErrorCode.invalidRequest.value, '${notification.method} was not requested or acknowledged for this ' @@ -301,29 +305,6 @@ SubscriptionFilter _acknowledgedSubscriptionFilter( return SubscriptionsAcknowledgedNotification.fromJson(params).notifications; } -bool _subscriptionFilterAllowsNotification( - SubscriptionFilter filter, - JsonRpcNotification notification, -) { - switch (notification.method) { - case Method.notificationsToolsListChanged: - return filter.toolsListChanged == true; - case Method.notificationsPromptsListChanged: - return filter.promptsListChanged == true; - case Method.notificationsResourcesListChanged: - return filter.resourcesListChanged == true; - case Method.notificationsResourcesUpdated: - final uri = notification.params?['uri']; - return uri is String && - (filter.resourceSubscriptions?.contains(uri) ?? false); - case Method.notificationsTasks: - final taskId = notification.params?['taskId']; - return taskId is String && (filter.taskIds?.contains(taskId) ?? false); - default: - return false; - } -} - /// Internal class holding timeout state for a request. class _TimeoutInfo { /// The active timer. @@ -1036,6 +1017,10 @@ abstract class Protocol { @protected void onIncomingRequestAccepted(JsonRpcRequest request) {} + /// Subclass hook called after an incoming notification has passed validation. + @protected + void onIncomingNotificationAccepted(JsonRpcNotification notification) {} + /// Subclass hook called after an incoming request handler has completed and /// its response has been sent or enqueued. @protected @@ -1070,6 +1055,8 @@ abstract class Protocol { return; } + onIncomingNotificationAccepted(notification); + if (notification is JsonRpcTaskStatusNotification) { _onTaskStatusNotification(notification); } @@ -1278,6 +1265,7 @@ abstract class Protocol { timeout: options.timeout, resetTimeoutOnProgress: options.resetTimeoutOnProgress, maxTotalTimeout: options.maxTotalTimeout, + timeoutEnabled: options.timeoutEnabled, task: options.task, relatedTask: options.relatedTask ?? (relatedTaskId != null @@ -1558,11 +1546,35 @@ abstract class Protocol { ); } + /// Reserves an outgoing integer request ID for APIs that need to correlate + /// side-channel data before the response arrives. + @protected + int reserveRequestId() => _requestMessageId++; + + /// Sends a request using a previously reserved outgoing integer request ID. + @protected + Future requestWithReservedId( + int requestId, + JsonRpcRequest requestData, + T Function(Map resultJson) resultFactory, [ + RequestOptions? options, + RequestId? relatedRequestId, + ]) { + return _requestWithRequestId( + requestData, + resultFactory, + options, + relatedRequestId, + requestId, + ); + } + Future _requestWithRequestId( JsonRpcRequest requestData, T Function(Map resultJson) resultFactory, [ RequestOptions? options, RequestId? relatedRequestId, + int? reservedRequestId, ]) { if (_transport == null) { return Future.error(StateError("Not connected to a transport.")); @@ -1585,7 +1597,7 @@ abstract class Protocol { return Future.error(e); } - final messageId = _requestMessageId++; + final messageId = reservedRequestId ?? _requestMessageId++; final completer = Completer(); Error? capturedError; Object? progressToken; @@ -1746,27 +1758,29 @@ abstract class Protocol { taskRequestState?.abortSubscription = abortSubscription; } - final timeoutDuration = options?.timeout ?? defaultRequestTimeout; - final maxTotalTimeoutDuration = options?.maxTotalTimeout; - void timeoutHandler() { - cancel( - McpError( - ErrorCode.requestTimeout.value, - "Request $messageId timed out after $timeoutDuration", - {'timeout': timeoutDuration.inMilliseconds}, - ), - fromTimeout: true, + if (options?.timeoutEnabled ?? true) { + final timeoutDuration = options?.timeout ?? defaultRequestTimeout; + final maxTotalTimeoutDuration = options?.maxTotalTimeout; + void timeoutHandler() { + cancel( + McpError( + ErrorCode.requestTimeout.value, + "Request $messageId timed out after $timeoutDuration", + {'timeout': timeoutDuration.inMilliseconds}, + ), + fromTimeout: true, + ); + } + + _setupTimeout( + messageId, + timeoutDuration, + maxTotalTimeoutDuration, + options?.resetTimeoutOnProgress ?? false, + timeoutHandler, ); } - _setupTimeout( - messageId, - timeoutDuration, - maxTotalTimeoutDuration, - options?.resetTimeoutOnProgress ?? false, - timeoutHandler, - ); - // Queue request if related to a task if (options?.relatedTask != null) { final relatedTaskId = options!.relatedTask!.taskId; diff --git a/lib/src/types/completion.dart b/lib/src/types/completion.dart index 25cdb88b..1841ff94 100644 --- a/lib/src/types/completion.dart +++ b/lib/src/types/completion.dart @@ -260,13 +260,13 @@ class CompleteResult implements BaseResultData { 'Stable MCP 2025-11-25 does not define completion list-changed notifications.', ) class JsonRpcCompletionListChangedNotification extends JsonRpcNotification { - const JsonRpcCompletionListChangedNotification() + const JsonRpcCompletionListChangedNotification({super.meta}) : super(method: Method.notificationsExperimentalCompletionsListChanged); factory JsonRpcCompletionListChangedNotification.fromJson( Map json, ) => - const JsonRpcCompletionListChangedNotification(); + JsonRpcCompletionListChangedNotification(meta: extractRequestMeta(json)); } /// Deprecated alias for [CompleteRequest]. diff --git a/lib/src/types/prompts.dart b/lib/src/types/prompts.dart index 24b7fc24..0e4ed466 100644 --- a/lib/src/types/prompts.dart +++ b/lib/src/types/prompts.dart @@ -343,13 +343,13 @@ class GetPromptResult implements BaseResultData { /// Notification from server indicating the list of available prompts has changed. class JsonRpcPromptListChangedNotification extends JsonRpcNotification { - const JsonRpcPromptListChangedNotification() + const JsonRpcPromptListChangedNotification({super.meta}) : super(method: Method.notificationsPromptsListChanged); factory JsonRpcPromptListChangedNotification.fromJson( Map json, ) => - const JsonRpcPromptListChangedNotification(); + JsonRpcPromptListChangedNotification(meta: extractRequestMeta(json)); } /// Deprecated alias for [ListPromptsRequest]. diff --git a/lib/src/types/resources.dart b/lib/src/types/resources.dart index ea60d8a5..a5c3baee 100644 --- a/lib/src/types/resources.dart +++ b/lib/src/types/resources.dart @@ -553,13 +553,13 @@ class ReadResourceResult implements CacheableResultData { /// Notification from server indicating the list of available resources has changed. class JsonRpcResourceListChangedNotification extends JsonRpcNotification { - const JsonRpcResourceListChangedNotification() + const JsonRpcResourceListChangedNotification({super.meta}) : super(method: Method.notificationsResourcesListChanged); factory JsonRpcResourceListChangedNotification.fromJson( Map json, ) => - const JsonRpcResourceListChangedNotification(); + JsonRpcResourceListChangedNotification(meta: extractRequestMeta(json)); } /// Parameters for the `resources/subscribe` request. diff --git a/lib/src/types/subscriptions.dart b/lib/src/types/subscriptions.dart index 5680f052..bcdecd66 100644 --- a/lib/src/types/subscriptions.dart +++ b/lib/src/types/subscriptions.dart @@ -76,6 +76,50 @@ class SubscriptionFilter { ); } + /// Whether this filter is a subset of [requested]. + bool isSubsetOf(SubscriptionFilter requested) { + if (toolsListChanged == true && requested.toolsListChanged != true) { + return false; + } + if (promptsListChanged == true && requested.promptsListChanged != true) { + return false; + } + if (resourcesListChanged == true && + requested.resourcesListChanged != true) { + return false; + } + if (!_stringListSubsetOf( + resourceSubscriptions, + requested.resourceSubscriptions, + )) { + return false; + } + if (!_stringListSubsetOf(taskIds, requested.taskIds)) { + return false; + } + return true; + } + + /// Whether this acknowledged filter allows [notification]. + bool allowsNotification(JsonRpcNotification notification) { + switch (notification.method) { + case Method.notificationsToolsListChanged: + return toolsListChanged == true; + case Method.notificationsPromptsListChanged: + return promptsListChanged == true; + case Method.notificationsResourcesListChanged: + return resourcesListChanged == true; + case Method.notificationsResourcesUpdated: + final uri = notification.params?['uri']; + return uri is String && (resourceSubscriptions?.contains(uri) ?? false); + case Method.notificationsTasks: + final taskId = notification.params?['taskId']; + return taskId is String && (taskIds?.contains(taskId) ?? false); + default: + return false; + } + } + Map toJson() => { if (toolsListChanged != null) 'toolsListChanged': toolsListChanged, if (promptsListChanged != null) @@ -233,6 +277,17 @@ List? _readOptionalStringList(Object? value, String field) { return value.cast(); } +bool _stringListSubsetOf(List? subset, List? superset) { + if (subset == null || subset.isEmpty) { + return true; + } + final allowed = superset?.toSet(); + if (allowed == null) { + return false; + } + return subset.every(allowed.contains); +} + Map? _readOptionalJsonObject(Object? value, String field) { if (value == null) { return null; diff --git a/lib/src/types/tools.dart b/lib/src/types/tools.dart index 1866e6a3..57da2dc2 100644 --- a/lib/src/types/tools.dart +++ b/lib/src/types/tools.dart @@ -456,13 +456,13 @@ class CallToolResult implements BaseResultData { /// Notification from server indicating the list of available tools has changed. class JsonRpcToolListChangedNotification extends JsonRpcNotification { - const JsonRpcToolListChangedNotification() + const JsonRpcToolListChangedNotification({super.meta}) : super(method: Method.notificationsToolsListChanged); factory JsonRpcToolListChangedNotification.fromJson( Map json, ) => - const JsonRpcToolListChangedNotification(); + JsonRpcToolListChangedNotification(meta: extractRequestMeta(json)); } void _validateObjectRootSchema( diff --git a/test/mcp_2026_07_28_test.dart b/test/mcp_2026_07_28_test.dart index 3c38fa5d..84458e85 100644 --- a/test/mcp_2026_07_28_test.dart +++ b/test/mcp_2026_07_28_test.dart @@ -1852,6 +1852,434 @@ void main() { expect(listRequest.meta?[McpMetaKey.clientCapabilities], {}); }); + test('client listenSubscriptions requires a connected transport', () { + final client = McpClient( + const Implementation(name: 'client', version: '1.0.0'), + ); + + expect( + () => client.listenSubscriptions( + const SubscriptionsListenRequest( + notifications: SubscriptionFilter(toolsListChanged: true), + ), + ), + throwsStateError, + ); + }); + + test('client listenSubscriptions demultiplexes by subscription id', + () async { + final transport = DiscoveringClientTransport( + capabilities: const ServerCapabilities( + tools: ServerCapabilitiesTools(listChanged: true), + resources: ServerCapabilitiesResources(), + ), + ); + final client = McpClient( + const Implementation(name: 'client', version: '1.0.0'), + options: const McpClientOptions(useServerDiscover: true), + ); + await client.connect(transport); + + final toolsSubscription = client.listenSubscriptions( + const SubscriptionsListenRequest( + notifications: SubscriptionFilter(toolsListChanged: true), + ), + ); + final resourcesSubscription = client.listenSubscriptions( + const SubscriptionsListenRequest( + notifications: SubscriptionFilter( + resourceSubscriptions: ['file:///project/config.json'], + ), + ), + ); + await _pump(); + + final listenRequests = transport.sentMessages + .whereType() + .where((message) => message.method == Method.subscriptionsListen) + .toList(); + expect(listenRequests, hasLength(2)); + expect(listenRequests[0].id, toolsSubscription.id); + expect(listenRequests[1].id, resourcesSubscription.id); + expect( + listenRequests[0].meta?[McpMetaKey.protocolVersion], + draftProtocolVersion2026_07_28, + ); + expect(listenRequests[0].params?['notifications'], { + 'toolsListChanged': true, + }); + + transport.onmessage?.call( + JsonRpcSubscriptionsAcknowledgedNotification( + acknowledgedParams: const SubscriptionsAcknowledgedNotification( + notifications: SubscriptionFilter( + resourceSubscriptions: ['file:///project/config.json'], + ), + ), + meta: {McpMetaKey.subscriptionId: resourcesSubscription.id}, + ), + ); + transport.onmessage?.call( + JsonRpcSubscriptionsAcknowledgedNotification( + acknowledgedParams: const SubscriptionsAcknowledgedNotification( + notifications: SubscriptionFilter(toolsListChanged: true), + ), + meta: {McpMetaKey.subscriptionId: toolsSubscription.id}, + ), + ); + + final toolsAcknowledged = await toolsSubscription.acknowledged; + final resourcesAcknowledged = await resourcesSubscription.acknowledged; + expect(toolsAcknowledged.notifications.toolsListChanged, isTrue); + expect( + resourcesAcknowledged.notifications.resourceSubscriptions, + ['file:///project/config.json'], + ); + + final toolNotification = toolsSubscription.notifications.first; + final resourceNotification = resourcesSubscription.notifications.first; + transport.onmessage?.call( + JsonRpcToolListChangedNotification( + meta: {McpMetaKey.subscriptionId: toolsSubscription.id}, + ), + ); + transport.onmessage?.call( + JsonRpcResourceUpdatedNotification( + updatedParams: const ResourceUpdatedNotification( + uri: 'file:///project/config.json', + ), + meta: {McpMetaKey.subscriptionId: resourcesSubscription.id}, + ), + ); + + expect( + (await toolNotification).method, + Method.notificationsToolsListChanged, + ); + expect( + (await resourceNotification).method, + Method.notificationsResourcesUpdated, + ); + + toolsSubscription.cancel('done'); + resourcesSubscription.cancel('done'); + await expectLater(toolsSubscription.done, completes); + await expectLater(resourcesSubscription.done, completes); + await _pump(); + + final cancellations = + transport.sentMessages.whereType(); + expect( + cancellations + .map((notification) => notification.cancelParams.requestId), + containsAll([toolsSubscription.id, resourcesSubscription.id]), + ); + }); + + test('client subscription rejects notifications before acknowledgment', + () async { + final transport = DiscoveringClientTransport( + capabilities: const ServerCapabilities( + tools: ServerCapabilitiesTools(listChanged: true), + ), + ); + final client = McpClient( + const Implementation(name: 'client', version: '1.0.0'), + options: const McpClientOptions(useServerDiscover: true), + ); + await client.connect(transport); + + final subscription = client.listenSubscriptions( + const SubscriptionsListenRequest( + notifications: SubscriptionFilter(toolsListChanged: true), + ), + ); + subscription.notifications.listen(null, onError: (_) {}); + await _pump(); + + final acknowledgedExpectation = expectLater( + subscription.acknowledged, + throwsA( + isA().having( + (error) => error.message, + 'message', + contains(Method.notificationsSubscriptionsAcknowledged), + ), + ), + ); + final doneExpectation = expectLater( + subscription.done, + throwsA(isA()), + ); + + transport.onmessage?.call( + JsonRpcToolListChangedNotification( + meta: {McpMetaKey.subscriptionId: subscription.id}, + ), + ); + + await acknowledgedExpectation; + await doneExpectation; + await _pump(); + + final cancellation = transport.sentMessages + .whereType() + .single; + expect(cancellation.cancelParams.requestId, subscription.id); + }); + + test('client subscription fails when the connection closes', () async { + final transport = DiscoveringClientTransport( + capabilities: const ServerCapabilities( + tools: ServerCapabilitiesTools(listChanged: true), + ), + ); + final client = McpClient( + const Implementation(name: 'client', version: '1.0.0'), + options: const McpClientOptions(useServerDiscover: true), + ); + await client.connect(transport); + + final subscription = client.listenSubscriptions( + const SubscriptionsListenRequest( + notifications: SubscriptionFilter(toolsListChanged: true), + ), + ); + subscription.notifications.listen(null, onError: (_) {}); + await _pump(); + + final acknowledgedExpectation = expectLater( + subscription.acknowledged, + throwsA( + isA().having( + (error) => error.code, + 'code', + ErrorCode.connectionClosed.value, + ), + ), + ); + final doneExpectation = expectLater( + subscription.done, + throwsA( + isA().having( + (error) => error.message, + 'message', + 'Connection closed', + ), + ), + ); + + await transport.close(); + + await acknowledgedExpectation; + await doneExpectation; + }); + + test('client subscription rejects acknowledgments outside requested filter', + () async { + final transport = DiscoveringClientTransport( + capabilities: const ServerCapabilities( + tools: ServerCapabilitiesTools(listChanged: true), + prompts: ServerCapabilitiesPrompts(listChanged: true), + ), + ); + final client = McpClient( + const Implementation(name: 'client', version: '1.0.0'), + options: const McpClientOptions(useServerDiscover: true), + ); + await client.connect(transport); + + final subscription = client.listenSubscriptions( + const SubscriptionsListenRequest( + notifications: SubscriptionFilter(toolsListChanged: true), + ), + ); + subscription.notifications.listen(null, onError: (_) {}); + await _pump(); + + final acknowledgedExpectation = expectLater( + subscription.acknowledged, + throwsA( + isA().having( + (error) => error.message, + 'message', + contains('not requested'), + ), + ), + ); + final doneExpectation = expectLater( + subscription.done, + throwsA(isA()), + ); + + transport.onmessage?.call( + JsonRpcNotification( + method: Method.notificationsSubscriptionsAcknowledged, + params: const { + 'notifications': {'promptsListChanged': true}, + }, + meta: {McpMetaKey.subscriptionId: subscription.id}, + ), + ); + + await acknowledgedExpectation; + await doneExpectation; + await _pump(); + + final cancellation = transport.sentMessages + .whereType() + .single; + expect(cancellation.cancelParams.requestId, subscription.id); + }); + + test('client subscription rejects unacknowledged notification types', + () async { + final transport = DiscoveringClientTransport( + capabilities: const ServerCapabilities( + tools: ServerCapabilitiesTools(listChanged: true), + prompts: ServerCapabilitiesPrompts(listChanged: true), + ), + ); + final client = McpClient( + const Implementation(name: 'client', version: '1.0.0'), + options: const McpClientOptions(useServerDiscover: true), + ); + await client.connect(transport); + + final subscription = client.listenSubscriptions( + const SubscriptionsListenRequest( + notifications: SubscriptionFilter(toolsListChanged: true), + ), + ); + subscription.notifications.listen(null, onError: (_) {}); + await _pump(); + + transport.onmessage?.call( + JsonRpcSubscriptionsAcknowledgedNotification( + acknowledgedParams: const SubscriptionsAcknowledgedNotification( + notifications: SubscriptionFilter(toolsListChanged: true), + ), + meta: {McpMetaKey.subscriptionId: subscription.id}, + ), + ); + await subscription.acknowledged; + + final doneExpectation = expectLater( + subscription.done, + throwsA( + isA().having( + (error) => error.message, + 'message', + contains(Method.notificationsPromptsListChanged), + ), + ), + ); + + transport.onmessage?.call( + JsonRpcPromptListChangedNotification( + meta: {McpMetaKey.subscriptionId: subscription.id}, + ), + ); + + await doneExpectation; + await _pump(); + + final cancellation = transport.sentMessages + .whereType() + .single; + expect(cancellation.cancelParams.requestId, subscription.id); + }); + + test('client subscription cancel before ack completes done', () async { + final transport = DiscoveringClientTransport( + capabilities: const ServerCapabilities( + tools: ServerCapabilitiesTools(listChanged: true), + ), + ); + final client = McpClient( + const Implementation(name: 'client', version: '1.0.0'), + options: const McpClientOptions(useServerDiscover: true), + ); + await client.connect(transport); + + final subscription = client.listenSubscriptions( + const SubscriptionsListenRequest( + notifications: SubscriptionFilter(toolsListChanged: true), + ), + ); + await _pump(); + + final acknowledgedExpectation = expectLater( + subscription.acknowledged, + throwsA( + isA().having( + (error) => error.reason, + 'reason', + 'user cancelled', + ), + ), + ); + + subscription.cancel('user cancelled'); + + await acknowledgedExpectation; + await expectLater(subscription.done, completes); + await _pump(); + + final cancellation = transport.sentMessages + .whereType() + .single; + expect(cancellation.cancelParams.requestId, subscription.id); + }); + + test('client subscription rejects completion before acknowledgment', + () async { + final transport = DiscoveringClientTransport( + capabilities: const ServerCapabilities( + tools: ServerCapabilitiesTools(listChanged: true), + ), + ); + final client = McpClient( + const Implementation(name: 'client', version: '1.0.0'), + options: const McpClientOptions(useServerDiscover: true), + ); + await client.connect(transport); + + final subscription = client.listenSubscriptions( + const SubscriptionsListenRequest( + notifications: SubscriptionFilter(toolsListChanged: true), + ), + ); + subscription.notifications.listen(null, onError: (_) {}); + await _pump(); + + final acknowledgedExpectation = expectLater( + subscription.acknowledged, + throwsA( + isA().having( + (error) => error.message, + 'message', + contains('completed before'), + ), + ), + ); + final doneExpectation = expectLater( + subscription.done, + throwsA(isA()), + ); + + transport.onmessage?.call( + JsonRpcResponse( + id: subscription.id, + result: const EmptyResult().toJson(), + ), + ); + + await acknowledgedExpectation; + await doneExpectation; + }); + test('client rejects unrecognized stateless resultType values', () async { final transport = DiscoveringClientTransport( toolsListResult: const { diff --git a/test/types/subscriptions_test.dart b/test/types/subscriptions_test.dart index 49842569..0eec7c22 100644 --- a/test/types/subscriptions_test.dart +++ b/test/types/subscriptions_test.dart @@ -76,6 +76,73 @@ void main() { expect(acknowledged.toJson(), isEmpty); }); + test('checks acknowledged subsets and allowed notifications', () { + const requested = SubscriptionFilter( + toolsListChanged: true, + resourceSubscriptions: [ + 'file:///project/config.json', + 'file:///project/other.json', + ], + ); + const acknowledged = SubscriptionFilter( + toolsListChanged: true, + resourceSubscriptions: ['file:///project/config.json'], + ); + + expect(acknowledged.isSubsetOf(requested), isTrue); + expect( + const SubscriptionFilter(promptsListChanged: true).isSubsetOf( + requested, + ), + isFalse, + ); + expect( + const SubscriptionFilter(resourcesListChanged: true).isSubsetOf( + requested, + ), + isFalse, + ); + expect( + const SubscriptionFilter( + resourceSubscriptions: ['file:///project/missing.json'], + ).isSubsetOf(requested), + isFalse, + ); + + expect( + acknowledged.allowsNotification( + const JsonRpcToolListChangedNotification(), + ), + isTrue, + ); + expect( + acknowledged.allowsNotification( + JsonRpcResourceUpdatedNotification( + updatedParams: const ResourceUpdatedNotification( + uri: 'file:///project/config.json', + ), + ), + ), + isTrue, + ); + expect( + acknowledged.allowsNotification( + JsonRpcResourceUpdatedNotification( + updatedParams: const ResourceUpdatedNotification( + uri: 'file:///project/missing.json', + ), + ), + ), + isFalse, + ); + expect( + acknowledged.allowsNotification( + const JsonRpcPromptListChangedNotification(), + ), + isFalse, + ); + }); + test('rejects malformed filters', () { expect( () => SubscriptionFilter.fromJson( @@ -151,6 +218,29 @@ void main() { }); group('JsonRpcSubscriptionsAcknowledgedNotification', () { + test('preserves subscription metadata on list changed notifications', () { + for (final notification in [ + const JsonRpcToolListChangedNotification( + meta: {McpMetaKey.subscriptionId: 'sub-1'}, + ), + const JsonRpcPromptListChangedNotification( + meta: {McpMetaKey.subscriptionId: 'sub-1'}, + ), + const JsonRpcResourceListChangedNotification( + meta: {McpMetaKey.subscriptionId: 'sub-1'}, + ), + // ignore: deprecated_member_use_from_same_package, deprecated_member_use + const JsonRpcCompletionListChangedNotification( + meta: {McpMetaKey.subscriptionId: 'sub-1'}, + ), + ]) { + final parsed = JsonRpcMessage.fromJson(notification.toJson()) + as JsonRpcNotification; + + expect(parsed.meta?[McpMetaKey.subscriptionId], 'sub-1'); + } + }); + test('serializes and parses subscription acknowledgments', () { final notification = JsonRpcSubscriptionsAcknowledgedNotification( acknowledgedParams: const SubscriptionsAcknowledgedNotification(