Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@
- Retried `server/discover` with an advertised compatible stateless protocol
version after `UnsupportedProtocolVersionError` instead of falling back to
legacy initialization.
- Added client-side `subscriptions/listen` handles that correlate stream
notifications by `io.modelcontextprotocol/subscriptionId`, validate the
acknowledgment, and cancel long-lived streams with `notifications/cancelled`.

## 2.2.0

Expand Down
265 changes: 265 additions & 0 deletions lib/src/client/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,36 @@ class McpClientOptions extends ProtocolOptions {
@Deprecated('Use McpClientOptions instead')
typedef ClientOptions = McpClientOptions;

/// Handle for an active `subscriptions/listen` stream opened by [McpClient].
class McpSubscription {
final void Function([Object? reason]) _cancel;

/// JSON-RPC request ID that identifies this subscription stream.
final int id;

/// Acknowledgment sent as the first message on the subscription stream.
final Future<SubscriptionsAcknowledgedNotification> acknowledged;

/// Notifications delivered on this subscription stream after acknowledgment.
final Stream<JsonRpcNotification> notifications;

/// Completes when the `subscriptions/listen` request ends.
final Future<EmptyResult> done;

McpSubscription._({
required this.id,
required this.acknowledged,
required this.notifications,
required this.done,
required void Function([Object? reason]) cancel,
}) : _cancel = cancel;

/// Cancels this subscription stream.
void cancel([Object? reason]) {
_cancel(reason);
}
}

// Recursively applies default values from a JSON Schema to a data object.
void _applyElicitationDefaults(JsonSchema schema, Map<String, dynamic> data) {
if (schema is! JsonObject) return;
Expand Down Expand Up @@ -99,6 +129,7 @@ class McpClient extends Protocol {
final Map<String, JsonSchema> _cachedToolOutputSchemas = {};
final Set<String> _cachedRequiredTaskTools = {};
final ToolParameterHeaderMappings _cachedToolParameterHeaders = {};
final Map<Object, _ClientSubscriptionState> _activeSubscriptions = {};

/// Callback for handling elicitation requests from the server.
///
Expand Down Expand Up @@ -559,6 +590,31 @@ class McpClient extends Protocol {
}
}

@override
void onIncomingNotificationAccepted(JsonRpcNotification notification) {
final subscriptionId = notification.meta?[McpMetaKey.subscriptionId];
if (subscriptionId is! int && subscriptionId is! String) {
return;
}

final activeSubscription = _activeSubscriptions[subscriptionId];
activeSubscription?.handleNotification(notification);
}

@override
void onConnectionClosed() {
final subscriptions = List<_ClientSubscriptionState>.from(
_activeSubscriptions.values,
);
_activeSubscriptions.clear();
for (final subscription in subscriptions) {
subscription.fail(
McpError(ErrorCode.connectionClosed.value, 'Connection closed'),
StackTrace.current,
);
}
}

@override
void assertCapabilityForMethod(String method) {
final serverCaps = _serverCapabilities;
Expand Down Expand Up @@ -837,6 +893,47 @@ class McpClient extends Protocol {
return request<EmptyResult>(req, (json) => const EmptyResult(), options);
}

/// Opens a `subscriptions/listen` stream and demultiplexes notifications.
McpSubscription listenSubscriptions(SubscriptionsListenRequest params) {
if (transport == null) {
throw StateError('Not connected to a transport.');
}

final requestId = reserveRequestId();
final abortController = BasicAbortController();
final state = _ClientSubscriptionState(
id: requestId,
requestedNotifications: params.notifications,
abortController: abortController,
onClose: () => _activeSubscriptions.remove(requestId),
);
_activeSubscriptions[requestId] = state;

final requestData = JsonRpcSubscriptionsListenRequest(
id: requestId,
listenParams: params,
meta: _usesStatelessProtocol ? _statelessRequestMeta(null) : null,
);
final requestDone = super.requestWithReservedId<EmptyResult>(
requestId,
requestData,
(json) => const EmptyResult(),
RequestOptions(
signal: abortController.signal,
timeoutEnabled: false,
),
);
state.trackRequest(requestDone);

return McpSubscription._(
id: requestId,
acknowledged: state.acknowledged,
notifications: state.notifications,
done: state.done,
cancel: state.cancel,
);
}

/// Sends a `tools/call` request to invoke a tool on the server.
Future<CallToolResult> callTool(
CallToolRequest params, {
Expand Down Expand Up @@ -1067,6 +1164,174 @@ class McpClient extends Protocol {
@Deprecated('Use McpClient instead')
typedef Client = McpClient;

class _ClientSubscriptionState {
final int id;
final SubscriptionFilter requestedNotifications;
final BasicAbortController abortController;
final void Function() onClose;
final StreamController<JsonRpcNotification> _notifications =
StreamController<JsonRpcNotification>.broadcast();
final Completer<SubscriptionsAcknowledgedNotification> _acknowledged =
Completer<SubscriptionsAcknowledgedNotification>();
final Completer<EmptyResult> _done = Completer<EmptyResult>();

SubscriptionFilter? _acknowledgedNotifications;
bool _closed = false;
bool _localCancellation = false;

_ClientSubscriptionState({
required this.id,
required this.requestedNotifications,
required this.abortController,
required this.onClose,
});

Future<SubscriptionsAcknowledgedNotification> get acknowledged =>
_acknowledged.future;

Stream<JsonRpcNotification> get notifications => _notifications.stream;

Future<EmptyResult> get done => _done.future;

void handleNotification(JsonRpcNotification notification) {
if (_closed) {
return;
}

if (_acknowledgedNotifications == null) {
if (notification.method !=
Method.notificationsSubscriptionsAcknowledged) {
fail(
McpError(
ErrorCode.invalidRequest.value,
'Subscription $id received ${notification.method} before '
'${Method.notificationsSubscriptionsAcknowledged}.',
),
StackTrace.current,
);
return;
}

final acknowledgedParams =
(notification as JsonRpcSubscriptionsAcknowledgedNotification)
.acknowledgedParams;
final acknowledgedNotifications = acknowledgedParams.notifications;
if (!acknowledgedNotifications.isSubsetOf(requestedNotifications)) {
fail(
McpError(
ErrorCode.invalidRequest.value,
'Subscription $id acknowledged notifications that were not '
'requested.',
),
StackTrace.current,
);
return;
}

_acknowledgedNotifications = acknowledgedNotifications;
if (!_acknowledged.isCompleted) {
_acknowledged.complete(acknowledgedParams);
}
return;
}

final acknowledgedNotifications = _acknowledgedNotifications!;
if (!acknowledgedNotifications.allowsNotification(notification)) {
fail(
McpError(
ErrorCode.invalidRequest.value,
'${notification.method} was not requested or acknowledged for '
'subscription $id.',
),
StackTrace.current,
);
return;
}

_notifications.add(notification);
}

void trackRequest(Future<EmptyResult> requestDone) {
requestDone.then(
complete,
onError: (Object error, StackTrace stackTrace) {
if (_localCancellation) {
complete(const EmptyResult());
} else {
fail(error, stackTrace, abort: false);
}
},
);
}

void cancel([Object? reason]) {
if (_closed) {
return;
}

_localCancellation = true;
if (!_acknowledged.isCompleted) {
_acknowledged.completeError(AbortError(reason), StackTrace.current);
}
abortController.abort(reason);
complete(const EmptyResult());
}

void complete(EmptyResult result) {
if (_closed) {
return;
}

final missingAcknowledgment = _acknowledgedNotifications == null &&
!_localCancellation &&
!abortController.signal.aborted;
if (missingAcknowledgment) {
fail(
McpError(
ErrorCode.invalidRequest.value,
'Subscription $id completed before '
'${Method.notificationsSubscriptionsAcknowledged}.',
),
StackTrace.current,
abort: false,
);
return;
}

_closed = true;
onClose();
if (!_done.isCompleted) {
_done.complete(result);
}
_notifications.close();
}

void fail(
Object error,
StackTrace stackTrace, {
bool abort = true,
}) {
if (_closed) {
return;
}

_closed = true;
onClose();
if (abort && !abortController.signal.aborted) {
abortController.abort(error);
}
if (!_acknowledged.isCompleted) {
_acknowledged.completeError(error, stackTrace);
}
if (!_done.isCompleted) {
_done.completeError(error, stackTrace);
}
_notifications
..addError(error, stackTrace)
..close();
}
}

class _ToolParameterHeaderValidation {
final Map<String, String> mappings;
final String? rejectionReason;
Expand Down
Loading