diff --git a/client/src/main/java/org/asynchttpclient/netty/NettyResponseFuture.java b/client/src/main/java/org/asynchttpclient/netty/NettyResponseFuture.java index 99b964735..07b7cb188 100755 --- a/client/src/main/java/org/asynchttpclient/netty/NettyResponseFuture.java +++ b/client/src/main/java/org/asynchttpclient/netty/NettyResponseFuture.java @@ -136,6 +136,9 @@ public final class NettyResponseFuture implements ListenableFuture { private volatile List roundRobinAddresses; private volatile Uri roundRobinBaseUri; private volatile ScramContext scramContext; + // Memoized base (host/scheme/port) partition key; see basePartitionKey(). proxyServer is final and + // targetRequest is its only other input, so this is invalidated only by setTargetRequest. + private volatile Object basePartitionKeyCache; public NettyResponseFuture(Request originalRequest, AsyncHandler asyncHandler, @@ -370,6 +373,9 @@ public Request getTargetRequest() { public void setTargetRequest(Request targetRequest) { this.targetRequest = targetRequest; + // Invalidate the memoized base partition key: a redirect/retry may target a different + // host/scheme/port, which changes the key. + basePartitionKeyCache = null; } public Request getCurrentRequest() { @@ -552,8 +558,16 @@ public Object getPartitionKey() { * initially selected. */ public Object basePartitionKey() { - return connectionPoolPartitioning.getPartitionKey(targetRequest.getUri(), targetRequest.getVirtualHost(), - proxyServer); + // Memoized: the same key is needed at several sites per request attempt (semaphore acquire, pool + // poll/offer, HTTP/2 registration). It depends only on targetRequest (host/scheme/port + virtualHost) + // and the final proxyServer, so it is recomputed only when setTargetRequest changes the target. + Object key = basePartitionKeyCache; + if (key == null) { + key = connectionPoolPartitioning.getPartitionKey(targetRequest.getUri(), targetRequest.getVirtualHost(), + proxyServer); + basePartitionKeyCache = key; + } + return key; } /** diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java index a92789fb7..0042fe83d 100755 --- a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java @@ -1205,15 +1205,18 @@ private Channel pollPooledChannel(NettyResponseFuture future, Request request // connection would send the WS handshake as a plain HTTP/2 request and the WebSocket handler would // receive raw frames ("Invalid message ... AdaptiveByteBuf"). Fall through to an HTTP/1.1 connection. // See Issue #2160. + // Compute the base partition key once and reuse it for both the HTTP/2 registry poll and the + // HTTP/1.1 pool poll, instead of recomputing (and re-allocating) it inside each channelManager call. + Object partitionKey = request.getChannelPoolPartitioning().getPartitionKey(uri, virtualHost, proxy); if (!uri.isWebSocket()) { - Channel h2Channel = channelManager.pollHttp2(uri, virtualHost, proxy, request.getChannelPoolPartitioning()); + Channel h2Channel = channelManager.pollHttp2Connection(partitionKey); if (h2Channel != null) { LOGGER.debug("Using HTTP/2 multiplexed Channel '{}' for '{}' to '{}'", h2Channel, request.getMethod(), uri); return h2Channel; } } - final Channel channel = channelManager.poll(uri, virtualHost, proxy, request.getChannelPoolPartitioning()); + final Channel channel = channelManager.poll(partitionKey); if (channel != null) { LOGGER.debug("Using pooled Channel '{}' for '{}' to '{}'", channel, request.getMethod(), uri); diff --git a/client/src/test/java/org/asynchttpclient/netty/NettyResponseFutureTest.java b/client/src/test/java/org/asynchttpclient/netty/NettyResponseFutureTest.java index a05289300..a0cb2c2c5 100644 --- a/client/src/test/java/org/asynchttpclient/netty/NettyResponseFutureTest.java +++ b/client/src/test/java/org/asynchttpclient/netty/NettyResponseFutureTest.java @@ -17,12 +17,18 @@ import io.github.artsok.RepeatedIfExceptionsTest; import org.asynchttpclient.AsyncHandler; +import org.asynchttpclient.Request; +import org.asynchttpclient.channel.ChannelPoolPartitioning; +import org.junit.jupiter.api.Test; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import static org.asynchttpclient.Dsl.get; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -90,4 +96,29 @@ public void testGetThrowsExceptionOnAbort() throws Exception { assertThrows(ExecutionException.class, () -> nettyResponseFuture.get(), "An ExecutionException must have occurred by now as 'abort' was called before 'get'"); } + + @Test + public void basePartitionKeyIsMemoizedAndInvalidatedOnTargetChange() { + AsyncHandler asyncHandler = mock(AsyncHandler.class); + Request reqA = get("http://hosta.example/").build(); + ChannelPoolPartitioning partitioning = reqA.getChannelPoolPartitioning(); + NettyResponseFuture future = new NettyResponseFuture<>(reqA, asyncHandler, null, 3, partitioning, null, null); + + Object k1 = future.basePartitionKey(); + Object k2 = future.basePartitionKey(); + // Memoized: repeat calls return the SAME instance (previously each call allocated a fresh key). + assertSame(k1, k2, "base partition key must be memoized (same instance on repeat calls)"); + // ...and it equals a fresh computation for the same target, so behavior is unchanged. + assertEquals(partitioning.getPartitionKey(reqA.getUri(), reqA.getVirtualHost(), null), k1, + "memoized key must equal a fresh computation for the current target"); + + // Changing the target host must invalidate the memo and yield the new host's key — otherwise a + // redirect could reuse a pooled connection to the wrong host. + Request reqB = get("http://hostb.example/").build(); + future.setTargetRequest(reqB); + Object k3 = future.basePartitionKey(); + assertNotEquals(k1, k3, "changing the target host must invalidate the memo and yield a different key"); + assertEquals(partitioning.getPartitionKey(reqB.getUri(), reqB.getVirtualHost(), null), k3, + "after setTargetRequest the key must match a fresh computation for the new target"); + } }