Skip to content

Commit bf0a4b3

Browse files
mykauldkropachev
authored andcommitted
Optimize write path in protocol.py to reduce copies
Refactored `_ProtocolHandler.encode_message` to reduce memory copies and allocations. - Implemented 'Reserve and Seek' strategy for the write path. - In uncompressed scenarios (including Protocol V5+), we now write directly to the final buffer instead of an intermediate one, avoiding `bytes` creation and buffer copying. - Reserved space for the frame header, wrote the body, and then back-filled the header with the correct length. - Unified buffer initialization and header writing logic for cleaner code. - Optimized conditional checks for compression support. Signed-off-by: Yaniv Kaul <yaniv.kaul@scylladb.com>
1 parent 0688d6f commit bf0a4b3

1 file changed

Lines changed: 24 additions & 12 deletions

File tree

cassandra/protocol.py

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1085,20 +1085,10 @@ def encode_message(cls, msg, stream_id, protocol_version, compressor, allow_beta
10851085
:param compressor: optional compression function to be used on the body
10861086
"""
10871087
flags = 0
1088-
body = io.BytesIO()
10891088
if msg.custom_payload:
10901089
if protocol_version < 4:
10911090
raise UnsupportedOperation("Custom key/value payloads can only be used with protocol version 4 or higher")
10921091
flags |= CUSTOM_PAYLOAD_FLAG
1093-
write_bytesmap(body, msg.custom_payload)
1094-
msg.send_body(body, protocol_version)
1095-
body = body.getvalue()
1096-
1097-
# With checksumming, the compression is done at the segment frame encoding
1098-
if (not ProtocolVersion.has_checksumming_support(protocol_version)
1099-
and compressor and len(body) > 0):
1100-
body = compressor(body)
1101-
flags |= COMPRESSED_FLAG
11021092

11031093
if msg.tracing:
11041094
flags |= TRACING_FLAG
@@ -1107,9 +1097,31 @@ def encode_message(cls, msg, stream_id, protocol_version, compressor, allow_beta
11071097
flags |= USE_BETA_FLAG
11081098

11091099
buff = io.BytesIO()
1110-
cls._write_header(buff, protocol_version, flags, stream_id, msg.opcode, len(body))
1111-
buff.write(body)
1100+
buff.seek(9)
1101+
1102+
# With checksumming, the compression is done at the segment frame encoding
1103+
if (compressor and not ProtocolVersion.has_checksumming_support(protocol_version)):
1104+
body = io.BytesIO()
1105+
if msg.custom_payload:
1106+
write_bytesmap(body, msg.custom_payload)
1107+
msg.send_body(body, protocol_version)
1108+
body = body.getvalue()
1109+
1110+
if len(body) > 0:
1111+
body = compressor(body)
1112+
flags |= COMPRESSED_FLAG
1113+
1114+
buff.write(body)
1115+
length = len(body)
1116+
else:
1117+
if msg.custom_payload:
1118+
write_bytesmap(buff, msg.custom_payload)
1119+
msg.send_body(buff, protocol_version)
1120+
1121+
length = buff.tell() - 9
11121122

1123+
buff.seek(0)
1124+
cls._write_header(buff, protocol_version, flags, stream_id, msg.opcode, length)
11131125
return buff.getvalue()
11141126

11151127
@staticmethod

0 commit comments

Comments
 (0)