Skip to content

Commit c2944a1

Browse files
joerg1985diemol
andauthored
Handle fragmented websocket messages (#11962)
Co-authored-by: Diego Molina <[email protected]>
1 parent bb95c2c commit c2944a1

File tree

1 file changed

+83
-14
lines changed

1 file changed

+83
-14
lines changed

java/src/org/openqa/selenium/netty/server/MessageInboundConverter.java

Lines changed: 83 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,48 +22,117 @@
2222
import io.netty.channel.SimpleChannelInboundHandler;
2323
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
2424
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
25+
import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
2526
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
2627
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
2728
import org.openqa.selenium.remote.http.BinaryMessage;
2829
import org.openqa.selenium.remote.http.CloseMessage;
2930
import org.openqa.selenium.remote.http.Message;
3031
import org.openqa.selenium.remote.http.TextMessage;
3132

33+
import java.io.ByteArrayOutputStream;
34+
import java.io.IOException;
35+
import java.io.UncheckedIOException;
3236
import java.nio.ByteBuffer;
3337
import java.util.logging.Logger;
3438

3539
class MessageInboundConverter extends SimpleChannelInboundHandler<WebSocketFrame> {
3640

41+
private enum Continuation {
42+
Text, Binary, None
43+
}
44+
3745
private static final Logger LOG = Logger.getLogger(MessageInboundConverter.class.getName());
3846

47+
private Continuation next;
48+
private StringBuilder builder;
49+
private ByteArrayOutputStream buffer;
50+
51+
public MessageInboundConverter() {
52+
next = Continuation.None;
53+
buffer = new ByteArrayOutputStream();
54+
builder = new StringBuilder();
55+
}
56+
3957
@Override
4058
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) {
41-
if (!frame.isFinalFragment()) {
42-
LOG.warning("Frame is not final. Chaos may ensue");
43-
}
59+
boolean finalFragment = frame.isFinalFragment();
60+
Message message;
61+
62+
if (frame instanceof ContinuationWebSocketFrame) {
63+
switch (next) {
64+
case Binary:
65+
try {
66+
ByteBuf content = frame.content();
67+
content.readBytes(buffer, content.readableBytes());
68+
} catch (IOException e) {
69+
throw new UncheckedIOException("failed to transfer buffer", e);
70+
}
4471

45-
Message message = null;
72+
if (finalFragment) {
73+
message = new BinaryMessage(buffer.toByteArray());
74+
buffer.reset();
75+
next = Continuation.None;
76+
} else {
77+
message = null;
78+
}
79+
break;
80+
case Text:
81+
builder.append(((ContinuationWebSocketFrame) frame).text());
4682

47-
if (frame instanceof TextWebSocketFrame) {
48-
message = new TextMessage(((TextWebSocketFrame) frame).text());
83+
if (finalFragment) {
84+
message = new TextMessage(builder.toString());
85+
builder.setLength(0);
86+
next = Continuation.None;
87+
} else {
88+
message = null;
89+
}
90+
break;
91+
case None:
92+
ctx.write(frame);
93+
return;
94+
default:
95+
throw new IllegalStateException("unexpected enum: " + next);
96+
}
97+
} else if (next != Continuation.None) {
98+
throw new IllegalStateException("expected a continuation frame");
99+
} else if (frame instanceof TextWebSocketFrame) {
100+
if (finalFragment) {
101+
message = new TextMessage(((TextWebSocketFrame) frame).text());
102+
} else {
103+
next = Continuation.Text;
104+
message = null;
105+
builder.append(((TextWebSocketFrame) frame).text());
106+
}
49107
} else if (frame instanceof BinaryWebSocketFrame) {
50-
ByteBuf buf = frame.content();
51-
if (buf.nioBufferCount() != -1) {
52-
message = new BinaryMessage(buf.nioBuffer());
53-
} else if (buf.hasArray()) {
54-
message = new BinaryMessage(ByteBuffer.wrap(buf.array()));
108+
ByteBuf content = frame.content();
109+
if (finalFragment) {
110+
if (content.nioBufferCount() != -1) {
111+
message = new BinaryMessage(content.nioBuffer());
112+
} else if (content.hasArray()) {
113+
message = new BinaryMessage(ByteBuffer.wrap(content.array()));
114+
} else {
115+
throw new IllegalStateException("Unable to handle bytebuf: " + content);
116+
}
55117
} else {
56-
throw new IllegalStateException("Unable to handle bytebuf: " + buf);
118+
next = Continuation.Binary;
119+
message = null;
120+
try {
121+
content.readBytes(buffer, content.readableBytes());
122+
} catch (IOException e) {
123+
throw new UncheckedIOException("failed to transfer buffer", e);
124+
}
57125
}
58126
} else if (frame instanceof CloseWebSocketFrame) {
59127
CloseWebSocketFrame closeFrame = (CloseWebSocketFrame) frame;
60128
message = new CloseMessage(closeFrame.statusCode(), closeFrame.reasonText());
129+
} else {
130+
ctx.write(frame);
131+
return;
61132
}
62133

63134
if (message != null) {
64135
ctx.fireChannelRead(message);
65-
} else {
66-
ctx.write(frame);
67136
}
68137
}
69138
}

0 commit comments

Comments
 (0)