feat(java): implement Async Connection Pooling using FixedChannelPool#2606
feat(java): implement Async Connection Pooling using FixedChannelPool#2606rythm-sachdeva wants to merge 1 commit intoapache:masterfrom
Conversation
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
Outdated
Show resolved
Hide resolved
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
Outdated
Show resolved
Hide resolved
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
Outdated
Show resolved
Hide resolved
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
Outdated
Show resolved
Hide resolved
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR. Thank you for your contribution! |
2cdb131 to
50e8a36
Compare
|
Hi @mmodzelewski Can you please review . |
Hey @rythm-sachdeva, I will, but I need some time. I might not get round to it until the beginning of next week. |
50e8a36 to
ca4583f
Compare
|
@rythm-sachdeva Please see why CI is failing and fix the errors. |
@rythm-sachdeva I've added a verification if all ByteBufs are properly released (not leaked) and it appears that the new code does not release all resources. See the logs, or run the tests locally, you'll see reports of the leaks in there. |
4eaf354 to
181682b
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #2606 +/- ##
============================================
- Coverage 68.52% 68.50% -0.02%
- Complexity 656 673 +17
============================================
Files 743 744 +1
Lines 62808 62934 +126
Branches 59221 59235 +14
============================================
+ Hits 43039 43114 +75
- Misses 17656 17700 +44
- Partials 2113 2120 +7
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
mmodzelewski
left a comment
There was a problem hiding this comment.
Hey @rythm-sachdeva, I left a few comments, please let me know if you have any questions.
| public CompletableFuture<Void> connect() { | ||
| connection = new AsyncTcpConnection(host, port, enableTls, tlsCertificate); | ||
| TCPConnectionPoolConfig.Builder poolConfigBuilder = new TCPConnectionPoolConfig.Builder(); | ||
| if (connectionPoolSize.isPresent()) { |
There was a problem hiding this comment.
instead of isPresent/get, it would be more idiomatic to just call ifPresent with an update lambda
|
|
||
| public AsyncTcpConnection(String host, int port) { | ||
| this(host, port, false, Optional.empty()); | ||
| this(host, port, false, Optional.empty(), new TCPConnectionPoolConfig(5, 1000, 1000)); |
There was a problem hiding this comment.
Please create a no-args constructor for the config, and use it here instead.
| public static final class Builder { | ||
| private int maxConnections = 5; | ||
| private int maxPendingAcquires = 1000; | ||
| private long acquireTimeoutMillis = 5000; |
There was a problem hiding this comment.
assign above values to constants, and use them in no-args constructor for the config
| } | ||
| } | ||
|
|
||
| // Inner Class for Channel pool configurations |
There was a problem hiding this comment.
Please drop all comments that do not introduce any new information. Comments are justified in situations where code is not obvious, something is done in a non-standard way, etc.
| */ | ||
| public CompletableFuture<Void> connect() { | ||
| if (isClosed.get()) { | ||
| return CompletableFuture.failedFuture(new IllegalStateException("Client is Closed")); |
There was a problem hiding this comment.
in the SDK code, please only use exceptions that inherit from IggyException
| poolConfigBuilder.setMaxConnections(connectionPoolSize.get()); | ||
| } | ||
| if (connectionTimeout.isPresent()) { | ||
| poolConfigBuilder.setAcquireTimeoutMillis(connectionTimeout.get().toMillis()); |
There was a problem hiding this comment.
seems like connection timeout should map to CONNECT_TIMEOUT_MILLIS, acquire timeout is semantically different thing
There was a problem hiding this comment.
I think by mistake I have given the wrong name it should be acquireTimeout
| * Metrics for monitoring connection pool performance. | ||
| * Tracks active connections, wait times, and errors. | ||
| */ | ||
| public class PoolMetrics { |
There was a problem hiding this comment.
I'd drop the metrics for now. Let's make sure that the connection pool is working fine first. This can be a follow up.
| if (channelPool != null) { | ||
| channelPool.close(); | ||
| } | ||
| return CompletableFuture.runAsync(eventLoopGroup::shutdownGracefully); |
There was a problem hiding this comment.
It would make sense to wait for the result of shutdownGracefully. Currently, it is ignored.
|
|
||
| public Builder() {} | ||
|
|
||
| public Builder setMaxConnections(int maxConnections) { |
There was a problem hiding this comment.
let's at least add some simple validations for the values inserted to the config
| * Returns the result of the LAST connection's execution, allowing the caller | ||
| * to treat this like a single request. | ||
| */ | ||
| public CompletableFuture<ByteBuf> broadcastAsync(int commandCode, ByteBuf payload) { |
There was a problem hiding this comment.
broadcastAsync() for authentication is unreliable - FixedChannelPool creates channels lazily, so sending poolSize login requests may hit the same channel multiple times (it gets released in channelRead0 and re-acquired) rather than reaching every distinct connection. Connections created later (e.g. reconnect after health-check failure) will also be unauthenticated.
A simpler approach: authenticate lazily in send(). After acquiring a channel, check a channel attribute (e.g. channel.attr(AUTH_KEY)). If not authenticated, send login first, set the attribute on success, then send the actual command. This works naturally with the async model, handles all channel lifecycle events (pool growth, reconnection, idle eviction), and eliminates the need for broadcastAsync() entirely.
There was a problem hiding this comment.
Currently the login is called by the user once After client creation. In this case how would the flow work ?
There was a problem hiding this comment.
The user-facing API stays the same - login() is still called once. Internally, login() would store the credentials and authenticate the one channel it acquires. Then in send(), after acquiring a channel, you check a channel attribute (e.g. AUTH_KEY). If the channel isn't authenticated yet, send login with the stored credentials first, set the attribute, then send the actual command. This way, every channel gets authenticated transparently on first use, including channels created later by the pool.
| } | ||
|
|
||
| @Override | ||
| public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { |
There was a problem hiding this comment.
A channel should also be released to the pool in case of exception
fix(java-sdk):Added Missing PoolMetrics and linting issues fix(java):fixed connection error fix(java):Fixed Linting Issues fix(java):fixed builder issues fix(java):fixed ci issues fix(java): fixed memory leaks fix(java):fixed memory leak at channelread0 fix(java):fixed broadcastasync memory leak feat(java): Added attribute based channel login fix(java): fixed error handling in inbound handler
181682b to
a31643a
Compare
Description
Implements connection pooling for
AsyncTcpConnectionusing Netty'sFixedChannelPoolto handle concurrent async operations efficiently.Changes
FixedChannelPoolfor concurrent request handlingPoolMetricsclass to track:TCPConnectionPoolConfigwith builder pattern for customizing:ConcurrentLinkedQueuebroadcastAsync()method to execute commands across all pooled connections (useful for authentication)AsyncIggyTcpClient.Builder.connectionPoolSize()Implementation Details
FixedChannelPoolwithChannelHealthChecker.ACTIVEIggyResponseHandlerwith a response queueTesting
testConnectionPoolMetrics()to verify metrics tracking