Skip to content

Commit 04b98e4

Browse files
authored
IGNITE-27270 Use MessageSerializer for DataStreamerResponse (#12558)
1 parent 7bd89b0 commit 04b98e4

4 files changed

Lines changed: 68 additions & 101 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer;
4949
import org.apache.ignite.internal.codegen.ClusterMetricsUpdateMessageSerializer;
5050
import org.apache.ignite.internal.codegen.ContinuousRoutineStartResultMessageSerializer;
51+
import org.apache.ignite.internal.codegen.DataStreamerResponseSerializer;
5152
import org.apache.ignite.internal.codegen.ErrorMessageSerializer;
5253
import org.apache.ignite.internal.codegen.ExchangeInfoSerializer;
5354
import org.apache.ignite.internal.codegen.GenerateEncryptionKeyRequestSerializer;
@@ -420,7 +421,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
420421
factory.register((short)59, GridCacheQueryResponse::new, new GridCacheQueryResponseSerializer());
421422
factory.register((short)61, GridContinuousMessage::new);
422423
factory.register((short)62, DataStreamerRequest::new);
423-
factory.register((short)63, DataStreamerResponse::new);
424+
factory.register((short)63, DataStreamerResponse::new, new DataStreamerResponseSerializer());
424425
factory.register((short)76, GridTaskResultRequest::new, new GridTaskResultRequestSerializer());
425426
factory.register((short)77, GridTaskResultResponse::new, new GridTaskResultResponseSerializer());
426427
factory.register((short)78, MissingMappingRequestMessage::new, new MissingMappingRequestMessageSerializer());

modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -270,8 +270,7 @@ private void processRequest(final UUID nodeId, final DataStreamerRequest req) {
270270
topic,
271271
req.requestId(),
272272
new IgniteCheckedException("Failed to get deployment for request [sndId=" + nodeId +
273-
", req=" + req + ']'),
274-
false);
273+
", req=" + req + ']'));
275274

276275
return;
277276
}
@@ -290,7 +289,7 @@ private void processRequest(final UUID nodeId, final DataStreamerRequest req) {
290289
catch (IgniteCheckedException e) {
291290
U.error(log, "Failed to unmarshal message [nodeId=" + nodeId + ", req=" + req + ']', e);
292291

293-
sendResponse(nodeId, topic, req.requestId(), e, false);
292+
sendResponse(nodeId, topic, req.requestId(), e);
294293

295294
return;
296295
}
@@ -356,7 +355,7 @@ else if (!topFut.isDone())
356355
}
357356

358357
if (remapErr != null) {
359-
sendResponse(nodeId, topic, req.requestId(), remapErr, req.forceLocalDeployment());
358+
sendResponse(nodeId, topic, req.requestId(), remapErr);
360359

361360
return;
362361
}
@@ -392,15 +391,15 @@ else if (topWaitFut == null) {
392391
try {
393392
job.call();
394393

395-
sendResponse(nodeId, topic, req.requestId(), null, req.forceLocalDeployment());
394+
sendResponse(nodeId, topic, req.requestId(), null);
396395
}
397396
finally {
398397
if (waitFut != null)
399398
waitFut.onDone();
400399
}
401400
}
402401
catch (Throwable e) {
403-
sendResponse(nodeId, topic, req.requestId(), e, req.forceLocalDeployment());
402+
sendResponse(nodeId, topic, req.requestId(), e);
404403

405404
if (e instanceof Error)
406405
throw (Error)e;
@@ -412,22 +411,11 @@ else if (topWaitFut == null) {
412411
* @param resTopic Response topic.
413412
* @param reqId Request ID.
414413
* @param err Error.
415-
* @param forceLocDep Force local deployment.
416414
*/
417-
private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err,
418-
boolean forceLocDep) {
419-
byte[] errBytes;
415+
private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err) {
416+
DataStreamerResponse res = new DataStreamerResponse(reqId, err);
420417

421-
try {
422-
errBytes = err != null ? U.marshal(marsh, err) : null;
423-
}
424-
catch (Exception e) {
425-
U.error(log, "Failed to marshal error [err=" + err + ", marshErr=" + e + ']', e);
426-
427-
errBytes = marshErrBytes;
428-
}
429-
430-
DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, forceLocDep);
418+
res.prepareMarshal(marsh, log, marshErrBytes);
431419

432420
try {
433421
ctx.io().sendToCustomTopic(nodeId, resTopic, res, threadIoPolicy());

modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2099,11 +2099,11 @@ void onResponse(DataStreamerResponse res, UUID nodeId) {
20992099
try {
21002100
GridPeerDeployAware jobPda0 = jobPda;
21012101

2102-
final Throwable cause = U.unmarshal(
2103-
ctx,
2104-
errBytes,
2102+
res.finishUnmarshal(ctx.marshaller(),
21052103
U.resolveClassLoader(jobPda0 != null ? jobPda0.classLoader() : null, ctx.config()));
21062104

2105+
final Throwable cause = res.error();
2106+
21072107
final String msg = "DataStreamer request failed [node=" + nodeId + "]";
21082108

21092109
if (cause instanceof ClusterTopologyCheckedException)

modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java

Lines changed: 55 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -17,34 +17,37 @@
1717

1818
package org.apache.ignite.internal.processors.datastreamer;
1919

20-
import java.nio.ByteBuffer;
20+
import org.apache.ignite.IgniteCheckedException;
21+
import org.apache.ignite.IgniteLogger;
22+
import org.apache.ignite.internal.Order;
2123
import org.apache.ignite.internal.util.typedef.internal.S;
24+
import org.apache.ignite.internal.util.typedef.internal.U;
25+
import org.apache.ignite.marshaller.Marshaller;
2226
import org.apache.ignite.plugin.extensions.communication.Message;
23-
import org.apache.ignite.plugin.extensions.communication.MessageReader;
24-
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
27+
import org.jetbrains.annotations.Nullable;
2528

2629
/**
2730
*
2831
*/
2932
public class DataStreamerResponse implements Message {
3033
/** */
34+
@Order(value = 0, method = "requestId")
3135
private long reqId;
3236

3337
/** */
34-
private byte[] errBytes;
38+
private @Nullable Throwable err;
3539

3640
/** */
37-
private boolean forceLocDep;
41+
@Order(value = 1, method = "errorBytes")
42+
private @Nullable byte[] errBytes;
3843

3944
/**
4045
* @param reqId Request ID.
41-
* @param errBytes Error bytes.
42-
* @param forceLocDep Force local deployment.
46+
* @param err Error.
4347
*/
44-
public DataStreamerResponse(long reqId, byte[] errBytes, boolean forceLocDep) {
48+
public DataStreamerResponse(long reqId, @Nullable Throwable err) {
4549
this.reqId = reqId;
46-
this.errBytes = errBytes;
47-
this.forceLocDep = forceLocDep;
50+
this.err = err;
4851
}
4952

5053
/**
@@ -61,92 +64,67 @@ public long requestId() {
6164
return reqId;
6265
}
6366

67+
/**
68+
* @param reqId Request ID.
69+
*/
70+
public void requestId(long reqId) {
71+
this.reqId = reqId;
72+
}
73+
6474
/**
6575
* @return Error bytes.
6676
*/
67-
public byte[] errorBytes() {
77+
public @Nullable byte[] errorBytes() {
6878
return errBytes;
6979
}
7080

7181
/**
72-
* @return {@code True} to force local deployment.
82+
* @param errBytes Error bytes.
7383
*/
74-
public boolean forceLocalDeployment() {
75-
return forceLocDep;
84+
public void errorBytes(@Nullable byte[] errBytes) {
85+
this.errBytes = errBytes;
7686
}
7787

78-
/** {@inheritDoc} */
79-
@Override public String toString() {
80-
return S.toString(DataStreamerResponse.class, this);
88+
/**
89+
* @return Error.
90+
*/
91+
public Throwable error() {
92+
return err;
8193
}
8294

83-
/** {@inheritDoc} */
84-
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
85-
writer.setBuffer(buf);
86-
87-
if (!writer.isHeaderWritten()) {
88-
if (!writer.writeHeader(directType()))
89-
return false;
90-
91-
writer.onHeaderWritten();
95+
/**
96+
* @param marsh Marshaller.
97+
* @param log Logger.
98+
* @param marshErrBytes Marshalled error bytes.
99+
*/
100+
public void prepareMarshal(Marshaller marsh, IgniteLogger log, byte[] marshErrBytes) {
101+
if (err != null && errBytes == null) {
102+
try {
103+
errBytes = U.marshal(marsh, err);
104+
}
105+
catch (IgniteCheckedException e) {
106+
U.error(log, "Failed to marshal error [err=" + err + ", marshErr=" + e + ']', e);
107+
108+
errBytes = marshErrBytes;
109+
}
92110
}
111+
}
93112

94-
switch (writer.state()) {
95-
case 0:
96-
if (!writer.writeByteArray(errBytes))
97-
return false;
98-
99-
writer.incrementState();
100-
101-
case 1:
102-
if (!writer.writeBoolean(forceLocDep))
103-
return false;
104-
105-
writer.incrementState();
106-
107-
case 2:
108-
if (!writer.writeLong(reqId))
109-
return false;
110-
111-
writer.incrementState();
113+
/**
114+
* @param marsh Marshaller.
115+
* @param ldr Class loader.
116+
*/
117+
public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException {
118+
if (errBytes != null && err == null) {
119+
err = U.unmarshal(marsh, errBytes, ldr);
112120

121+
errBytes = null;
113122
}
114-
115-
return true;
116123
}
117124

118125
/** {@inheritDoc} */
119-
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
120-
reader.setBuffer(buf);
121-
122-
switch (reader.state()) {
123-
case 0:
124-
errBytes = reader.readByteArray();
125-
126-
if (!reader.isLastRead())
127-
return false;
128-
129-
reader.incrementState();
130-
131-
case 1:
132-
forceLocDep = reader.readBoolean();
133-
134-
if (!reader.isLastRead())
135-
return false;
136-
137-
reader.incrementState();
138-
139-
case 2:
140-
reqId = reader.readLong();
141-
142-
if (!reader.isLastRead())
143-
return false;
144-
145-
reader.incrementState();
146-
147-
}
148-
149-
return true;
126+
@Override public String toString() {
127+
return S.toString(DataStreamerResponse.class, this);
150128
}
151129

152130
/** {@inheritDoc} */

0 commit comments

Comments
 (0)