-
Notifications
You must be signed in to change notification settings - Fork 8.9k
feature: Support HTTP/2 stream push for the Watch API in Server Raft mode #7903
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 2.x
Are you sure you want to change the base?
Conversation
# Conflicts: # changes/en-us/2.x.md # changes/zh-cn/2.x.md # common/src/main/java/org/apache/seata/common/util/HttpClientUtil.java
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## 2.x #7903 +/- ##
============================================
+ Coverage 71.58% 71.61% +0.02%
- Complexity 883 884 +1
============================================
Files 1294 1296 +2
Lines 49554 49704 +150
Branches 5884 5913 +29
============================================
+ Hits 35475 35594 +119
- Misses 11155 11172 +17
- Partials 2924 2938 +14
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds HTTP/2 Server-Sent Events (SSE) support for the Watch API in Server Raft mode, replacing the previous callback-based HTTP/2 implementation with a more standard streaming approach.
Key changes:
- Introduced
SeataHttpWatchclass providing an iterator-based API for consuming SSE streams - Replaced callback-based HTTP/2 methods with new
watch()andwatchPost()methods inHttpClientUtil - Updated server-side
ClusterWatcherManagerto send SSE-formatted events over HTTP/2 with proper stream management
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 22 comments.
Show a summary per file
| File | Description |
|---|---|
common/src/main/java/org/apache/seata/common/util/SeataHttpWatch.java |
New class implementing HTTP/2 SSE stream consumption with iterator pattern |
common/src/main/java/org/apache/seata/common/util/HttpClientUtil.java |
Removed HTTP/2 callback methods, added watch methods for SSE streaming |
common/src/main/java/org/apache/seata/common/metadata/ClusterWatchEvent.java |
New DTO class for cluster watch events |
common/src/main/java/org/apache/seata/common/executor/HttpCallback.java |
Deleted callback interface (no longer needed) |
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java |
Refactored to send SSE events over HTTP/2 with keepalive support |
server/src/test/java/org/apache/seata/server/controller/ClusterControllerTest.java |
Updated tests to use new watch API instead of callbacks |
server/src/test/java/org/apache/seata/server/cluster/manager/ClusterWatcherManagerTest.java |
Removed HTTP/2-specific tests |
common/src/test/java/org/apache/seata/common/util/SeataHttpWatchTest.java |
Comprehensive unit tests for SeataHttpWatch class |
common/src/test/java/org/apache/seata/common/util/HttpClientUtilTest.java |
Updated with tests for new watch methods |
changes/zh-cn/2.x.md |
Added Chinese changelog entry |
changes/en-us/2.x.md |
Added English changelog entry |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
common/src/main/java/org/apache/seata/common/util/HttpClientUtil.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/apache/seata/server/controller/ClusterControllerTest.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/apache/seata/common/util/HttpClientUtil.java
Outdated
Show resolved
Hide resolved
common/src/test/java/org/apache/seata/common/util/HttpClientUtilTest.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/apache/seata/common/util/HttpClientUtil.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/apache/seata/server/controller/ClusterControllerTest.java
Outdated
Show resolved
Hide resolved
…il.java Co-authored-by: Copilot <[email protected]>
…chTest.java Co-authored-by: Copilot <[email protected]>
|
@funky-eyes PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 10 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| for (String group : WATCHERS.keySet()) { | ||
| Optional.ofNullable(WATCHERS.remove(group)) | ||
| .ifPresent(watchers -> watchers.parallelStream().forEach(watcher -> { | ||
| if (System.currentTimeMillis() >= watcher.getTimeout()) { | ||
| watcher.setDone(true); | ||
| sendWatcherResponse(watcher, HttpResponseStatus.NOT_MODIFIED); | ||
| } | ||
| if (!watcher.isDone()) { | ||
| // Re-register | ||
| registryWatcher(watcher); | ||
| HttpContext context = watcher.getAsyncContext(); | ||
| boolean isHttp2 = context instanceof HttpContext && context.isHttp2(); | ||
| if (isHttp2) { | ||
| if (!context.getContext().channel().isActive()) { | ||
| watcher.setDone(true); | ||
| HTTP2_HEADERS_SENT.remove(watcher); | ||
| } else { | ||
| registryWatcher(watcher); | ||
| } | ||
| } else { | ||
| if (System.currentTimeMillis() >= watcher.getTimeout()) { | ||
| watcher.setDone(true); | ||
| sendWatcherResponse(watcher, HttpResponseStatus.NOT_MODIFIED, true, false); | ||
| } else if (!watcher.isDone()) { | ||
| registryWatcher(watcher); | ||
| } | ||
| } | ||
| })); | ||
| } |
Copilot
AI
Jan 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The scheduled task removes all watchers from a group at once using WATCHERS.remove(group), then processes them. If new watchers are registered for the same group during processing, they could be lost. Consider using a more concurrent-safe approach, such as iterating over watchers without removing the entire queue first, or using a copy-on-write approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
当前实现是安全的,原因如下:
- 新注册的 watcher 不会丢失:
registryWatcher使用computeIfAbsent,即使队列被remove,新 watcher 注册时会创建新队列并加入。 - 定时任务会定期扫描:每秒执行一次,新 watcher 最多等待 1 秒即可被处理。
- 并发场景下:定时任务
remove后,新 watcher 会进入新队列;定时任务重新注册的 watcher 也会进入同一新队列,下次扫描时一并处理。
对于长轮询场景,1 秒的延迟是可接受的。当前实现已满足需求,无需额外优化。
The current implementation is safe for the following reasons:
-
New watchers won't be lost: The
registryWatchermethod usescomputeIfAbsent, so even if the queue is removed byWATCHERS.remove(group), newly registered watchers will create a new queue and be added to it. -
Scheduled task will scan periodically: The task runs every second, so new watchers will be processed within at most 1 second.
-
In concurrent scenarios: After the scheduled task removes a queue, new watchers will be added to a new queue created by
computeIfAbsent. When the scheduled task re-registers watchers, they will also be added to the same new queue, and all will be processed in the next scan.
For long-polling scenarios, a 1-second delay is acceptable. The current implementation meets the requirements and no additional optimization is needed.
@funky-eyes WDYT?
common/src/main/java/org/apache/seata/common/util/SeataHttpWatch.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/apache/seata/server/controller/ClusterControllerTest.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/apache/seata/server/controller/ClusterControllerTest.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java
Show resolved
Hide resolved
server/src/test/java/org/apache/seata/server/controller/ClusterControllerTest.java
Outdated
Show resolved
Hide resolved
| // Default to GET if method is not specified or not supported | ||
| requestBuilder.get(); |
Copilot
AI
Jan 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method allows POST and PUT with requestBody, but silently defaults to GET for any unsupported method (including DELETE, PATCH, etc.). This could mask errors when an incorrect method is specified. Consider throwing an IllegalArgumentException for unsupported methods instead of silently defaulting to GET.
| // Default to GET if method is not specified or not supported | |
| requestBuilder.get(); | |
| throw new IllegalArgumentException("Unsupported HTTP method for watch request: " + method); |
| // Fast failure during connection phase | ||
| .connectTimeout(connectTimeoutSeconds, TimeUnit.SECONDS) | ||
| // Infinite read timeout to allow continuous listening for server push | ||
| .readTimeout(0, TimeUnit.SECONDS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make the read timeout configurable instead of hardcoding it to 0. When readTimeout is set to 0, the underlying socket timeout becomes infinite, which can cause the request to block indefinitely. In cases where the server shuts down abruptly (e.g., a crash without a graceful TCP handshake), this may lead to permanent blocking.
The connectTimeout should be limited to around 10 seconds, while the readTimeout should be adjustable based on the specific request type.
| watcher.setDone(true); | ||
| HTTP2_HEADERS_SENT.remove(watcher); | ||
| } else { | ||
| registryWatcher(watcher); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the cluster state changes again during the interval before re-registering for the subscription queue, is there a risk—in extreme cases—that the notification could be lost?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, currently through integration testing, if a change event occurs and then another change event occurs within 500ms, the client will not receive the second notification because it has not re-registered. And it has been stable and reproducible. I will solve this problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, optimizations have been made for potential loss issues. Different queues have been created for watching HTTP1 and HTTP2 respectively to store data. HTTP2 does not remove and re-register regularly. The scheduled tasks only check whether the channel is valid to prevent memory leaks.
// Separate watchers for HTTP/1.1 (one-time requests) and HTTP/2 (long-lived connections)
private static final Map<String, Queue<Watcher<HttpContext>>> HTTP1_WATCHERS = new ConcurrentHashMap<>();
private static final Map<String, Queue<Watcher<HttpContext>>> HTTP2_WATCHERS = new ConcurrentHashMap<>();
// Check HTTP/2 watchers for connection validity (don't remove, just check)
for (Map.Entry<String, Queue<Watcher<HttpContext>>> entry : HTTP2_WATCHERS.entrySet()) {
String group = entry.getKey();
Queue<Watcher<HttpContext>> watchers = entry.getValue();
if (watchers == null || watchers.isEmpty()) {
continue;
}
// Create snapshot to avoid concurrent modification
List<Watcher<HttpContext>> watchersToCheck = new ArrayList<>(watchers);
watchersToCheck.forEach(watcher -> {
HttpContext context = watcher.getAsyncContext();
if (!context.getContext().channel().isActive()) {
// Remove invalid watcher
watchers.remove(watcher);
watcher.setDone(true);
HTTP2_HEADERS_SENT.remove(watcher);
logger.debug("Removed inactive HTTP/2 watcher for group: {}", group);
}
});
}
},There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, optimizations have been made for potential loss issues. Different queues have been created for watching HTTP1 and HTTP2 respectively to store data. HTTP2 does not remove and re-register regularly. The scheduled tasks only check whether the channel is valid to prevent memory leaks.
// Separate watchers for HTTP/1.1 (one-time requests) and HTTP/2 (long-lived connections) private static final Map<String, Queue<Watcher<HttpContext>>> HTTP1_WATCHERS = new ConcurrentHashMap<>(); private static final Map<String, Queue<Watcher<HttpContext>>> HTTP2_WATCHERS = new ConcurrentHashMap<>(); // Check HTTP/2 watchers for connection validity (don't remove, just check) for (Map.Entry<String, Queue<Watcher<HttpContext>>> entry : HTTP2_WATCHERS.entrySet()) { String group = entry.getKey(); Queue<Watcher<HttpContext>> watchers = entry.getValue(); if (watchers == null || watchers.isEmpty()) { continue; } // Create snapshot to avoid concurrent modification List<Watcher<HttpContext>> watchersToCheck = new ArrayList<>(watchers); watchersToCheck.forEach(watcher -> { HttpContext context = watcher.getAsyncContext(); if (!context.getContext().channel().isActive()) { // Remove invalid watcher watchers.remove(watcher); watcher.setDone(true); HTTP2_HEADERS_SENT.remove(watcher); logger.debug("Removed inactive HTTP/2 watcher for group: {}", group); } }); } },
We also need to consider another issue: since the response is written back using HTTP/2 + SSE, is there any thread safety concern with concurrent writes to the SSE response? Additionally, could there be an out-of-order delivery problem—for example, if two events are generated, with event 1 arriving at the client after event 2, potentially causing the newer event to be overwritten by the older one?
To address this, should we ensure that events are delivered in order and include term validation? Specifically, if the term of a previous event in the response is already greater than that of a subsequent event, the latter should be discarded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that writing data to the ChannelHandlerContext in a concurrent manner does have this possibility, but it does not affect the data push. It's just that several lines of data are pushed at once. As long as the data sent at one time is complete, it's fine. Because the client parses one line at a time. As for the disorder issue of the term, the client can make its own judgment. Of course, the server can also make some appropriate optimizations.
我认为并发往ChannelHandlerContext里面write数据是存在这种可能性,但是不影响数据推送,无非一下子推送了好几行数据,服务端只要确保一次性发出去的数据是完整的就行,因为客户端是一行行解析的。至于term的乱序问题 客户端可以自己判断,当然服务端也可以适当的做一点优化
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that writing data to the ChannelHandlerContext in a concurrent manner does have this possibility, but it does not affect the data push. It's just that several lines of data are pushed at once. As long as the data sent at one time is complete, it's fine. Because the client parses one line at a time. As for the disorder issue of the term, the client can make its own judgment. Of course, the server can also make some appropriate optimizations.
我认为并发往ChannelHandlerContext里面write数据是存在这种可能性,但是不影响数据推送,无非一下子推送了好几行数据,服务端只要确保一次性发出去的数据是完整的就行,因为客户端是一行行解析的。至于term的乱序问题 客户端可以自己判断,当然服务端也可以适当的做一点优化
这一块并发写入不会破坏数据完整性,不需要完全顺序处理,保持并发性能,只做必要的过滤优化 @funky-eyes wdyt?
// For HTTP/2, headers must be sent first on the initial response
if (sendHeaders) {
Http2Headers headers = new DefaultHttp2Headers().status(nettyStatus.codeAsText());
headers.set(HttpHeaderNames.CONTENT_TYPE, "text/event-stream; charset=utf-8");
headers.set(HttpHeaderNames.CACHE_CONTROL, "no-cache");
ctx.write(new DefaultHttp2HeadersFrame(headers));
}
String group = watcher.getGroup();
Long finalTerm = term != null ? term : GROUP_UPDATE_TERM.getOrDefault(group, 0L);
String eventData = buildEventData(nettyStatus, closeStream, sendHeaders, group, finalTerm);
ByteBuf content = Unpooled.copiedBuffer(eventData, StandardCharsets.UTF_8);
// Send DATA frame (if closeStream is true, it will end the current stream)
ctx.write(new DefaultHttp2DataFrame(content, closeStream));
ctx.flush();There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that writing data to the ChannelHandlerContext in a concurrent manner does have this possibility, but it does not affect the data push. It's just that several lines of data are pushed at once. As long as the data sent at one time is complete, it's fine. Because the client parses one line at a time. As for the disorder issue of the term, the client can make its own judgment. Of course, the server can also make some appropriate optimizations.
我认为并发往ChannelHandlerContext里面write数据是存在这种可能性,但是不影响数据推送,无非一下子推送了好几行数据,服务端只要确保一次性发出去的数据是完整的就行,因为客户端是一行行解析的。至于term的乱序问题 客户端可以自己判断,当然服务端也可以适当的做一点优化这一块并发写入不会破坏数据完整性,不需要完全顺序处理,保持并发性能,只做必要的过滤优化 @funky-eyes wdyt?
// For HTTP/2, headers must be sent first on the initial response if (sendHeaders) { Http2Headers headers = new DefaultHttp2Headers().status(nettyStatus.codeAsText()); headers.set(HttpHeaderNames.CONTENT_TYPE, "text/event-stream; charset=utf-8"); headers.set(HttpHeaderNames.CACHE_CONTROL, "no-cache"); ctx.write(new DefaultHttp2HeadersFrame(headers)); } String group = watcher.getGroup(); Long finalTerm = term != null ? term : GROUP_UPDATE_TERM.getOrDefault(group, 0L); String eventData = buildEventData(nettyStatus, closeStream, sendHeaders, group, finalTerm); ByteBuf content = Unpooled.copiedBuffer(eventData, StandardCharsets.UTF_8); // Send DATA frame (if closeStream is true, it will end the current stream) ctx.write(new DefaultHttp2DataFrame(content, closeStream)); ctx.flush();
The out-of-order issue I mentioned isn't limited to HTTP/2 responses alone. The onChangeEvent method is annotated with @Async, and notifications to watchers are performed using parallelStream. As a result, when the cluster experiences a rapid series of consecutive changes, this could lead to out-of-order notifications (among other potential issues). Of course, addressing this can be treated as a separate optimization task.
| eventType = "cluster-update"; | ||
| } | ||
|
|
||
| String json = String.format( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope that when an HTTP/2 connection is established, the server directly pushes the cluster change results to the client, rather than requiring the client to initiate a separate request to fetch the cluster information.
…am' into zy_1226_http2_client_server_stream
…lusterWatcherManager.java Co-authored-by: Copilot <[email protected]>
…rControllerTest.java Co-authored-by: Copilot <[email protected]>
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additionally, I don't seem to see any usage of SeataHttpWatch on the client side.
Ⅰ. Describe what this PR did
Core Changes:
keepaliveandcluster-update.Highlights:
keepaliveevent upon establishmentcluster-updateevent when cluster changes occurⅡ. Does this pull request fix one issue?
Ⅲ. Why don't you add test cases (unit test/integration test)?
Ⅳ. Describe how to verify it
Unit Tests:
Verification Points:
keepaliveevent immediately after establishmenttimeoutevent received)cluster-updateevent is received promptly when cluster changes occurⅤ. Special notes for reviews
Design Decisions:
SeataHttpWatch.Key Code Locations:
ClusterWatcherManager.init(): Timeout handling logic (distinguishes HTTP1/HTTP2)ClusterWatcherManager.notifyWatcher(): HTTP2 re-registration logicClusterWatcherManager.sendWatcherResponse(): HTTP2 data frame pushingNotes: