diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 03db54860b703..503a4817d5c0d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -48,6 +48,7 @@ import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer; import org.apache.ignite.internal.codegen.ClusterMetricsUpdateMessageSerializer; import org.apache.ignite.internal.codegen.ContinuousRoutineStartResultMessageSerializer; +import org.apache.ignite.internal.codegen.DataStreamerResponseSerializer; import org.apache.ignite.internal.codegen.ErrorMessageSerializer; import org.apache.ignite.internal.codegen.ExchangeInfoSerializer; import org.apache.ignite.internal.codegen.GenerateEncryptionKeyRequestSerializer; @@ -418,7 +419,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider { factory.register((short)59, GridCacheQueryResponse::new, new GridCacheQueryResponseSerializer()); factory.register((short)61, GridContinuousMessage::new); factory.register((short)62, DataStreamerRequest::new); - factory.register((short)63, DataStreamerResponse::new); + factory.register((short)63, DataStreamerResponse::new, new DataStreamerResponseSerializer()); factory.register((short)76, GridTaskResultRequest::new, new GridTaskResultRequestSerializer()); factory.register((short)77, GridTaskResultResponse::new, new GridTaskResultResponseSerializer()); factory.register((short)78, MissingMappingRequestMessage::new, new MissingMappingRequestMessageSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index 65f225b22a923..f00624a4e6aae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -270,8 +270,7 @@ private void processRequest(final UUID nodeId, final DataStreamerRequest req) { topic, req.requestId(), new IgniteCheckedException("Failed to get deployment for request [sndId=" + nodeId + - ", req=" + req + ']'), - false); + ", req=" + req + ']')); return; } @@ -290,7 +289,7 @@ private void processRequest(final UUID nodeId, final DataStreamerRequest req) { catch (IgniteCheckedException e) { U.error(log, "Failed to unmarshal message [nodeId=" + nodeId + ", req=" + req + ']', e); - sendResponse(nodeId, topic, req.requestId(), e, false); + sendResponse(nodeId, topic, req.requestId(), e); return; } @@ -356,7 +355,7 @@ else if (!topFut.isDone()) } if (remapErr != null) { - sendResponse(nodeId, topic, req.requestId(), remapErr, req.forceLocalDeployment()); + sendResponse(nodeId, topic, req.requestId(), remapErr); return; } @@ -392,7 +391,7 @@ else if (topWaitFut == null) { try { job.call(); - sendResponse(nodeId, topic, req.requestId(), null, req.forceLocalDeployment()); + sendResponse(nodeId, topic, req.requestId(), null); } finally { if (waitFut != null) @@ -400,7 +399,7 @@ else if (topWaitFut == null) { } } catch (Throwable e) { - sendResponse(nodeId, topic, req.requestId(), e, req.forceLocalDeployment()); + sendResponse(nodeId, topic, req.requestId(), e); if (e instanceof Error) throw (Error)e; @@ -412,22 +411,11 @@ else if (topWaitFut == null) { * @param resTopic Response topic. * @param reqId Request ID. * @param err Error. - * @param forceLocDep Force local deployment. */ - private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err, - boolean forceLocDep) { - byte[] errBytes; + private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err) { + DataStreamerResponse res = new DataStreamerResponse(reqId, err); - try { - errBytes = err != null ? U.marshal(marsh, err) : null; - } - catch (Exception e) { - U.error(log, "Failed to marshal error [err=" + err + ", marshErr=" + e + ']', e); - - errBytes = marshErrBytes; - } - - DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, forceLocDep); + res.prepareMarshal(marsh, log, marshErrBytes); try { ctx.io().sendToCustomTopic(nodeId, resTopic, res, threadIoPolicy()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index f154866fc9805..3eda0cb9707b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -2099,11 +2099,11 @@ void onResponse(DataStreamerResponse res, UUID nodeId) { try { GridPeerDeployAware jobPda0 = jobPda; - final Throwable cause = U.unmarshal( - ctx, - errBytes, + res.finishUnmarshal(ctx.marshaller(), U.resolveClassLoader(jobPda0 != null ? jobPda0.classLoader() : null, ctx.config())); + final Throwable cause = res.error(); + final String msg = "DataStreamer request failed [node=" + nodeId + "]"; if (cause instanceof ClusterTopologyCheckedException) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java index 20ac080bca01b..83b51fb08001f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java @@ -17,34 +17,37 @@ package org.apache.ignite.internal.processors.datastreamer; -import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; /** * */ public class DataStreamerResponse implements Message { /** */ + @Order(value = 0, method = "requestId") private long reqId; /** */ - private byte[] errBytes; + private @Nullable Throwable err; /** */ - private boolean forceLocDep; + @Order(value = 1, method = "errorBytes") + private @Nullable byte[] errBytes; /** * @param reqId Request ID. - * @param errBytes Error bytes. - * @param forceLocDep Force local deployment. + * @param err Error. */ - public DataStreamerResponse(long reqId, byte[] errBytes, boolean forceLocDep) { + public DataStreamerResponse(long reqId, @Nullable Throwable err) { this.reqId = reqId; - this.errBytes = errBytes; - this.forceLocDep = forceLocDep; + this.err = err; } /** @@ -61,92 +64,67 @@ public long requestId() { return reqId; } + /** + * @param reqId Request ID. + */ + public void requestId(long reqId) { + this.reqId = reqId; + } + /** * @return Error bytes. */ - public byte[] errorBytes() { + public @Nullable byte[] errorBytes() { return errBytes; } /** - * @return {@code True} to force local deployment. + * @param errBytes Error bytes. */ - public boolean forceLocalDeployment() { - return forceLocDep; + public void errorBytes(@Nullable byte[] errBytes) { + this.errBytes = errBytes; } - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(DataStreamerResponse.class, this); + /** + * @return Error. + */ + public Throwable error() { + return err; } - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType())) - return false; - - writer.onHeaderWritten(); + /** + * @param marsh Marshaller. + * @param log Logger. + * @param marshErrBytes Marshalled error bytes. + */ + public void prepareMarshal(Marshaller marsh, IgniteLogger log, byte[] marshErrBytes) { + if (err != null && errBytes == null) { + try { + errBytes = U.marshal(marsh, err); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to marshal error [err=" + err + ", marshErr=" + e + ']', e); + + errBytes = marshErrBytes; + } } + } - switch (writer.state()) { - case 0: - if (!writer.writeByteArray(errBytes)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeBoolean(forceLocDep)) - return false; - - writer.incrementState(); - - case 2: - if (!writer.writeLong(reqId)) - return false; - - writer.incrementState(); + /** + * @param marsh Marshaller. + * @param ldr Class loader. + */ + public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException { + if (errBytes != null && err == null) { + err = U.unmarshal(marsh, errBytes, ldr); + errBytes = null; } - - return true; } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - switch (reader.state()) { - case 0: - errBytes = reader.readByteArray(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - forceLocDep = reader.readBoolean(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 2: - reqId = reader.readLong(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return true; + @Override public String toString() { + return S.toString(DataStreamerResponse.class, this); } /** {@inheritDoc} */