Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
- Rejected server-initiated JSON-RPC requests received on 2026 stateless
Streamable HTTP client response streams; servers must use MRTR
`input_required` results instead.
- Enforced `subscriptions/listen` stream ordering and filters for 2026
subscription notifications.

## 2.2.0

Expand Down
142 changes: 126 additions & 16 deletions lib/src/shared/protocol.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ final _logger = Logger("mcp_dart.shared.protocol");
bool _isProgressToken(Object? token) => token is int || token is String;

final _lastProgressByExtra = Expando<double>();
final _subscriptionStateByExtra = Expando<_SubscriptionStreamState>();

class _SubscriptionStreamState {
bool acknowledgmentSent = false;
SubscriptionFilter acknowledgedNotifications = const SubscriptionFilter();
}

/// Callback for progress notifications.
typedef ProgressCallback = void Function(Progress progress);
Expand Down Expand Up @@ -152,6 +158,16 @@ class RequestHandlerExtra {
this.closeStandaloneSSEStream,
});

_SubscriptionStreamState get _activeSubscriptionState =>
(_subscriptionStateByExtra[this] ??= _SubscriptionStreamState());

void _validateSubscriptionNotification(JsonRpcNotification notification) {
_recordOrValidateSubscriptionNotification(
_activeSubscriptionState,
notification,
);
}

/// Sends a progress notification for the current request.
///
/// This method automatically retrieves the `progressToken` from the request metadata.
Expand Down Expand Up @@ -215,19 +231,97 @@ class RequestHandlerExtra {
Future<void> sendSubscriptionNotification(
JsonRpcNotification notification,
) {
final meta = <String, dynamic>{
...?notification.meta,
McpMetaKey.subscriptionId: requestId,
};
final subscriptionNotification =
_withSubscriptionId(notification, requestId);

return sendNotification(
JsonRpcNotification(
method: notification.method,
params: notification.params,
meta: meta,
),
_validateSubscriptionNotification(subscriptionNotification);

return sendNotification(subscriptionNotification);
}
}

JsonRpcNotification _withSubscriptionId(
JsonRpcNotification notification,
RequestId requestId,
) {
final meta = <String, dynamic>{
...?notification.meta,
McpMetaKey.subscriptionId: requestId,
};
return JsonRpcNotification(
method: notification.method,
params: notification.params,
meta: meta,
);
}

void _recordOrValidateSubscriptionNotification(
_SubscriptionStreamState state,
JsonRpcNotification notification,
) {
if (notification.method == Method.notificationsSubscriptionsAcknowledged) {
state
..acknowledgmentSent = true
..acknowledgedNotifications =
_acknowledgedSubscriptionFilter(notification);
return;
}

if (!state.acknowledgmentSent) {
throw McpError(
ErrorCode.invalidRequest.value,
'subscriptions/listen streams must send '
'${Method.notificationsSubscriptionsAcknowledged} before '
'${notification.method}.',
);
}

if (!_subscriptionFilterAllowsNotification(
state.acknowledgedNotifications,
notification,
)) {
throw McpError(
ErrorCode.invalidRequest.value,
'${notification.method} was not requested or acknowledged for this '
'subscriptions/listen stream.',
);
}
}

SubscriptionFilter _acknowledgedSubscriptionFilter(
JsonRpcNotification notification,
) {
final params = notification.params;
if (params == null) {
throw const FormatException(
'subscriptions acknowledged notification params are required',
);
}

return SubscriptionsAcknowledgedNotification.fromJson(params).notifications;
}

bool _subscriptionFilterAllowsNotification(
SubscriptionFilter filter,
JsonRpcNotification notification,
) {
switch (notification.method) {
case Method.notificationsToolsListChanged:
return filter.toolsListChanged == true;
case Method.notificationsPromptsListChanged:
return filter.promptsListChanged == true;
case Method.notificationsResourcesListChanged:
return filter.resourcesListChanged == true;
case Method.notificationsResourcesUpdated:
final uri = notification.params?['uri'];
return uri is String &&
(filter.resourceSubscriptions?.contains(uri) ?? false);
case Method.notificationsTasks:
final taskId = notification.params?['taskId'];
return taskId is String && (filter.taskIds?.contains(taskId) ?? false);
default:
return false;
}
}

/// Internal class holding timeout state for a request.
Expand Down Expand Up @@ -1137,6 +1231,9 @@ abstract class Protocol {

final abortController = BasicAbortController();
_requestHandlerAbortControllers[request.id] = abortController;
final subscriptionState = request is JsonRpcSubscriptionsListenRequest
? _SubscriptionStreamState()
: null;

final extra = RequestHandlerExtra(
signal: abortController.signal,
Expand All @@ -1154,12 +1251,22 @@ abstract class Protocol {
: null,
taskRequestedTtl:
(request.params?['task'] as Map<String, dynamic>?)?['ttl'] as int?,
sendNotification: (notification, {relatedTask}) =>
_notificationWithRequestId(
notification,
relatedTask: relatedTask,
relatedRequestId: request.id,
),
sendNotification: (notification, {relatedTask}) {
var outgoingNotification = notification;
if (subscriptionState != null) {
outgoingNotification = _withSubscriptionId(notification, request.id);
_recordOrValidateSubscriptionNotification(
subscriptionState,
outgoingNotification,
);
}

return _notificationWithRequestId(
outgoingNotification,
relatedTask: relatedTask,
relatedRequestId: request.id,
);
},
sendRequest: <T extends BaseResultData>(
JsonRpcRequest req,
T Function(Map<String, dynamic>) resultFactory,
Expand All @@ -1185,6 +1292,9 @@ abstract class Protocol {
);
},
);
if (subscriptionState != null) {
_subscriptionStateByExtra[extra] = subscriptionState;
}

// If task creation is requested, check capability
if (extra.taskRequestedTtl != null ||
Expand Down
111 changes: 111 additions & 0 deletions test/mcp_2026_07_28_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,117 @@ void main() {
expect(transport.sentMessages.last, isA<JsonRpcResponse>());
});

test('server rejects subscription notifications before acknowledgment',
() async {
final server = Server(
const Implementation(name: 'server', version: '1.0.0'),
options: const McpServerOptions(
capabilities: ServerCapabilities(
tools: ServerCapabilitiesTools(listChanged: true),
),
),
);
server.setRequestHandler<JsonRpcSubscriptionsListenRequest>(
Method.subscriptionsListen,
(request, extra) async {
await extra.sendNotification(
const JsonRpcToolListChangedNotification(),
);
return const EmptyResult();
},
(id, params, meta) => JsonRpcSubscriptionsListenRequest(
id: id,
listenParams: SubscriptionsListenRequest.fromJson(params!),
meta: meta,
),
);
final transport = RecordingTransport();
await server.connect(transport);

transport.receive(
JsonRpcSubscriptionsListenRequest(
id: 'sub-1',
listenParams: const SubscriptionsListenRequest(
notifications: SubscriptionFilter(toolsListChanged: true),
),
meta: _clientMeta(),
),
);
await _pump();

final response = transport.sentMessages.single as JsonRpcError;
expect(response.error.code, ErrorCode.invalidRequest.value);
expect(
response.error.message,
contains(Method.notificationsSubscriptionsAcknowledged),
);
});

test('server tags direct subscription notifications with subscription id',
() async {
final server = Server(
const Implementation(name: 'server', version: '1.0.0'),
options: const McpServerOptions(
capabilities: ServerCapabilities(
tools: ServerCapabilitiesTools(listChanged: true),
),
),
);
server.setRequestHandler<JsonRpcSubscriptionsListenRequest>(
Method.subscriptionsListen,
(request, extra) async {
await extra.sendSubscriptionAcknowledged(
request.listenParams.notifications.acknowledgedBy(
const ServerCapabilities(
tools: ServerCapabilitiesTools(listChanged: true),
),
),
);
await extra.sendNotification(
const JsonRpcToolListChangedNotification(),
);
return const EmptyResult();
},
(id, params, meta) => JsonRpcSubscriptionsListenRequest(
id: id,
listenParams: SubscriptionsListenRequest.fromJson(params!),
meta: meta,
),
);
final transport = RecordingTransport();
await server.connect(transport);

transport.receive(
JsonRpcSubscriptionsListenRequest(
id: 'sub-1',
listenParams: const SubscriptionsListenRequest(
notifications: SubscriptionFilter(toolsListChanged: true),
),
meta: _clientMeta(),
),
);
await _pump();

expect(transport.sentMessages, hasLength(3));
expect(
transport.sentMessages.take(2).map((message) => message.toJson()),
everyElement(
containsPair(
'params',
containsPair(
'_meta',
containsPair(McpMetaKey.subscriptionId, 'sub-1'),
),
),
),
);
expect(
(transport.sentMessages[1] as JsonRpcNotification).method,
Method.notificationsToolsListChanged,
);
expect(transport.sentMessages.last, isA<JsonRpcResponse>());
});

test('stateless server responses add complete result and cache defaults',
() async {
final server = Server(
Expand Down
Loading