diff --git a/client/src/main/java/org/asynchttpclient/request/body/multipart/MultipartBody.java b/client/src/main/java/org/asynchttpclient/request/body/multipart/MultipartBody.java index a1fbb6087..abb656b9a 100644 --- a/client/src/main/java/org/asynchttpclient/request/body/multipart/MultipartBody.java +++ b/client/src/main/java/org/asynchttpclient/request/body/multipart/MultipartBody.java @@ -108,7 +108,12 @@ public BodyState transferTo(ByteBuf target) throws IOException { } } - return BodyState.CONTINUE; + // Signal STOP on the same call that finishes the last part (done just became true), so consumers + // don't need an extra empty readChunk/nextChunk cycle to discover the end. Both ByteBuf consumers + // send any bytes written this call before honouring STOP — BodyChunkedInput returns the buffer and + // sets endOfInput; the HTTP/2 pump (BodyChunkSource) returns a readable buffer before checking the + // state — so the terminal chunk is never dropped. + return done ? BodyState.STOP : BodyState.CONTINUE; } // RandomAccessBody API, suited for HTTP but not for HTTPS (zero-copy) diff --git a/client/src/test/java/org/asynchttpclient/request/body/multipart/MultipartBodyTest.java b/client/src/test/java/org/asynchttpclient/request/body/multipart/MultipartBodyTest.java index f31046ea2..54e0dbf5a 100644 --- a/client/src/test/java/org/asynchttpclient/request/body/multipart/MultipartBodyTest.java +++ b/client/src/test/java/org/asynchttpclient/request/body/multipart/MultipartBodyTest.java @@ -84,10 +84,15 @@ private static long transferWithCopy(MultipartBody multipartBody, int bufferSize long transferred = 0; final ByteBuf buffer = Unpooled.buffer(bufferSize); try { - while (multipartBody.transferTo(buffer) != BodyState.STOP) { + // Count bytes written on EVERY call, including the terminal STOP: transferTo now reports STOP on + // the same call that writes the body's last bytes (mirroring the real consumers, which send the + // buffer's contents before honouring STOP). + BodyState state; + do { + state = multipartBody.transferTo(buffer); transferred += buffer.readableBytes(); buffer.clear(); - } + } while (state != BodyState.STOP); return transferred; } finally { buffer.release(); @@ -144,4 +149,23 @@ public void transferZeroCopy() throws Exception { } } } + + @RepeatedIfExceptionsTest(repeats = 5) + public void finishingChunkReportsStopAndCarriesAllBytes() throws Exception { + try (MultipartBody multipartBody = buildMultipart()) { + // A buffer large enough for the whole body: the single transferTo that writes the last bytes must + // report STOP (previously it returned CONTINUE and forced an extra empty readChunk to reach STOP), + // and that STOP call must still carry every byte of the body. + int capacity = (int) MAX_MULTIPART_CONTENT_LENGTH_ESTIMATE + 100; + ByteBuf buffer = Unpooled.buffer(capacity); + try { + BodyState state = multipartBody.transferTo(buffer); + assertEquals(BodyState.STOP, state, "the call that finishes the body must report STOP"); + assertEquals(multipartBody.getContentLength(), buffer.readableBytes(), + "the finishing STOP call must still carry all of the body's bytes"); + } finally { + buffer.release(); + } + } + } }