diff --git a/modules/dcache-webdav/src/main/java/org/dcache/webdav/DcacheResourceFactory.java b/modules/dcache-webdav/src/main/java/org/dcache/webdav/DcacheResourceFactory.java index e7be0392ca9..30aa97a5514 100644 --- a/modules/dcache-webdav/src/main/java/org/dcache/webdav/DcacheResourceFactory.java +++ b/modules/dcache-webdav/src/main/java/org/dcache/webdav/DcacheResourceFactory.java @@ -1700,6 +1700,13 @@ private class HttpTransfer extends RedirectedTransfer { private final String _requestPath; private String _transferTag = ""; + private static final String HEADER_SCITAG = "SciTag"; + private static final String HEADER_TRANSFER_HEADER_SCITAG = "TransferHeaderSciTag"; + private static final String[] SCITAG_HEADERS = { + HEADER_SCITAG, + HEADER_TRANSFER_HEADER_SCITAG + }; + public HttpTransfer(PnfsHandler pnfs, Subject subject, Restriction restriction, FsPath path) throws URISyntaxException { super(pnfs, subject, restriction, path); @@ -1709,7 +1716,38 @@ public HttpTransfer(PnfsHandler pnfs, Subject subject, var request = ServletRequest.getRequest(); request.setAttribute(TRANSACTION_ATTRIBUTE, getTransaction()); _requestPath = Requests.stripToPath(request.getRequestURL().toString()); - _transferTag = request.getHeader("SciTag"); + _transferTag = readTransferTag(request); + } + + private String readTransferTag(HttpServletRequest request) { + // SciTag takes precedence because it is checked first. + for (String header : SCITAG_HEADERS) { + String transferTag = request.getHeader(header); + if (transferTag != null && !transferTag.isBlank()) { + String trimmed = transferTag.trim(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} header found: {} (from client={})", + header, trimmed, request.getRemoteAddr()); + } + return trimmed; + } + } + + String flowFromQuery = request.getParameter("scitag.flow"); + if (flowFromQuery != null && !flowFromQuery.isBlank()) { + String trimmed = flowFromQuery.trim(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("scitag.flow query parameter found: {} (from client={})", + trimmed, request.getRemoteAddr()); + } + return trimmed; + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("No SciTag header/parameter found in request (client={})", + request.getRemoteAddr()); + } + return ""; } protected ProtocolInfo createProtocolInfo(InetSocketAddress address) { @@ -1729,6 +1767,10 @@ protected ProtocolInfo createProtocolInfo(InetSocketAddress address) { wantedChecksums); protocolInfo.setSessionId((int) getId()); protocolInfo.setTransferTag(_transferTag); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("ProtocolInfo created with transferTag='{}' for path={}", + _transferTag, _requestPath); + } return protocolInfo; } diff --git a/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdTransfer.java b/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdTransfer.java index 2a44adaadc0..abbcc49e626 100644 --- a/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdTransfer.java +++ b/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdTransfer.java @@ -20,9 +20,13 @@ import org.dcache.xrootd.protocol.XrootdProtocol; import org.dcache.xrootd.tpc.XrootdTpcInfo; import org.dcache.xrootd.util.ParseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class XrootdTransfer extends RedirectedTransfer { + private static final Logger LOGGER = LoggerFactory.getLogger(XrootdTransfer.class); + private UUID _uuid; private InetSocketAddress _doorAddress; private InetSocketAddress _internalAddress; @@ -39,6 +43,15 @@ public XrootdTransfer(PnfsHandler pnfs, Subject subject, this.restriction = requireNonNull(restriction); tpcInfo = new XrootdTpcInfo(opaque); _transferTag = opaque.getOrDefault("scitag.flow", ""); + if (LOGGER.isDebugEnabled()) { + if (!_transferTag.isEmpty()) { + LOGGER.debug("scitag.flow parameter found: {}", _transferTag); + } else if (opaque.containsKey("scitag.flow")) { + LOGGER.debug("scitag.flow parameter found but empty"); + } else { + LOGGER.debug("No scitag.flow parameter in this request"); + } + } try { tpcInfo.setUid(Subjects.getUid(subject)); } catch (NoSuchElementException e) { @@ -121,7 +134,11 @@ private XrootdProtocolInfo createXrootdProtocolInfo() { _uuid, _doorAddress); - protocolInfo.setTransferTag(_transferTag); + protocolInfo.setTransferTag(_transferTag); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("XrootdProtocolInfo created with transferTag='{}' for pnfs={}", + _transferTag, getPnfsId()); + } return protocolInfo; } diff --git a/modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java b/modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java index d451d696a69..b9fe3b702d9 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java +++ b/modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.net.InetAddresses.forString; +import com.google.common.base.Splitter; import com.google.common.base.Strings; import com.google.common.net.HostAndPort; @@ -50,6 +51,15 @@ public class TransferLifeCycle { private final static Logger LOGGER = LoggerFactory.getLogger(TransferLifeCycle.class); + private static final int MIN_VALID_TRANSFER_TAG = 64; + private static final int MAX_VALID_TRANSFER_TAG = 65535; + private static final int EXPERIMENT_ID_BIT_SHIFT = 6; + private static final int ACTIVITY_ID_MASK = 0x3F; + private static final int DEFAULT_ACTIVITY_ID = 1; + private static final Splitter FQAN_GROUP_SPLITTER = Splitter.on('/') + .trimResults() + .omitEmptyStrings() + .limit(2); /** * The UDP firefly default port as described in @@ -83,7 +93,7 @@ public void onStart(InetSocketAddress src, InetSocketAddress dst, ProtocolInfo p return; } - if (isLocalTransfer(src)) { + if (isExcludedTransfer(src, dst)) { return; } @@ -126,7 +136,7 @@ public void onEnd(InetSocketAddress src, InetSocketAddress dst, MoverInfoMessage return; } - if (isLocalTransfer(src)) { + if (isExcludedTransfer(src, dst)) { return; } @@ -254,12 +264,12 @@ private OptionalInt getExperimentId(ProtocolInfo protocolInfo, Subject subject) if (protocolInfo.getTransferTag() != null && !protocolInfo.getTransferTag().isEmpty()) { try { int transferTag = Integer.parseInt(protocolInfo.getTransferTag()); - if (transferTag <= 64 || transferTag >= 65536) { + if (transferTag < MIN_VALID_TRANSFER_TAG || transferTag > MAX_VALID_TRANSFER_TAG) { LOGGER.warn("Invalid integer range for transfer tag: {}", protocolInfo.getTransferTag()); return OptionalInt.empty(); } // scitag = exp_id << 6 | act_id - return OptionalInt.of(transferTag >> 6); + return OptionalInt.of(transferTag >> EXPERIMENT_ID_BIT_SHIFT); } catch (NumberFormatException e) { LOGGER.warn("Invalid transfer tag: {}", protocolInfo.getTransferTag()); return OptionalInt.empty(); @@ -271,23 +281,33 @@ private OptionalInt getExperimentId(ProtocolInfo protocolInfo, Subject subject) return OptionalInt.empty(); } - return voToExpId.containsKey(vo.getGroup().toLowerCase()) - ? OptionalInt.of(voToExpId.get(vo.getGroup().toLowerCase())) + String groupPath = vo.getGroup(); + if (groupPath == null || groupPath.isBlank()) { + return OptionalInt.empty(); + } + + groupPath = groupPath.toLowerCase(); + String voName = FQAN_GROUP_SPLITTER.splitToList(groupPath).get(0); + + return voToExpId.containsKey(voName) + ? OptionalInt.of(voToExpId.get(voName)) : OptionalInt.empty(); } - private boolean isLocalTransfer(InetSocketAddress dst) { - InetAddress addr = dst.getAddress(); - return localSubnet.test(addr); + private boolean isExcludedTransfer(InetSocketAddress src, InetSocketAddress dst) { + InetAddress srcAddress = src.getAddress(); + InetAddress dstAddress = dst.getAddress(); + return srcAddress != null && dstAddress != null + && localSubnet.test(srcAddress) + && localSubnet.test(dstAddress); } private int getActivity(ProtocolInfo protocolInfo) { if (!protocolInfo.getTransferTag().isEmpty()) { // scitag = exp_id << 6 | act_id - return Integer.parseInt(protocolInfo.getTransferTag()) & 0x3F; + return Integer.parseInt(protocolInfo.getTransferTag()) & ACTIVITY_ID_MASK; } else { - // default activity id = 1 - return 1; + return DEFAULT_ACTIVITY_ID; } } diff --git a/modules/dcache/src/test/java/org/dcache/pool/movers/TransferLifeCycleTest.java b/modules/dcache/src/test/java/org/dcache/pool/movers/TransferLifeCycleTest.java new file mode 100644 index 00000000000..af1b35adcae --- /dev/null +++ b/modules/dcache/src/test/java/org/dcache/pool/movers/TransferLifeCycleTest.java @@ -0,0 +1,143 @@ +package org.dcache.pool.movers; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import diskCacheV111.vehicles.ProtocolInfo; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; +import java.lang.reflect.Method; +import java.util.OptionalInt; +import javax.security.auth.Subject; +import org.dcache.auth.FQANPrincipal; +import org.junit.Before; +import org.junit.Test; + +public class TransferLifeCycleTest { + + private Method getExperimentId; + private TransferLifeCycle transferLifeCycle; + + @Before + public void setup() throws Exception { + transferLifeCycle = new TransferLifeCycle(); + transferLifeCycle.setVoMapping("atlas:2,cms:3"); + + getExperimentId = TransferLifeCycle.class.getDeclaredMethod( + "getExperimentId", ProtocolInfo.class, Subject.class); + getExperimentId.setAccessible(true); + } + + @Test + public void shouldAcceptMinimumValidSciTagValue() throws Exception { + OptionalInt experimentId = resolveExperimentId("64", new Subject()); + + assertTrue(experimentId.isPresent()); + assertEquals(1, experimentId.getAsInt()); + } + + @Test + public void shouldRejectSciTagValueBelowValidRange() throws Exception { + OptionalInt experimentId = resolveExperimentId("63", new Subject()); + + assertFalse(experimentId.isPresent()); + } + + @Test + public void shouldMapSlashPrefixedFqanToVoName() throws Exception { + Subject subject = new Subject(); + subject.getPrincipals().add(new FQANPrincipal("/atlas/usatlas", true)); + + OptionalInt experimentId = resolveExperimentId("", subject); + + assertTrue(experimentId.isPresent()); + assertEquals(2, experimentId.getAsInt()); + } + + @Test + public void shouldSuppressMarkerWhenBothEndpointsAreExcluded() throws Exception { + assertFalse(sendsStartMarker("10.10.10.10", "10.20.20.20", "10.0.0.0/8")); + } + + @Test + public void shouldNotSuppressMarkerWhenOnlySourceIsExcluded() throws Exception { + assertTrue(sendsStartMarker("10.10.10.10", "203.0.113.20", "10.0.0.0/8")); + } + + @Test + public void shouldNotSuppressMarkerWhenOnlyDestinationIsExcluded() throws Exception { + assertTrue(sendsStartMarker("203.0.113.20", "10.20.20.20", "10.0.0.0/8")); + } + + private OptionalInt resolveExperimentId(String transferTag, Subject subject) throws Exception { + return (OptionalInt) getExperimentId.invoke(transferLifeCycle, + new TestProtocolInfo("xrootd", transferTag), subject); + } + + private boolean sendsStartMarker(String srcIp, String dstIp, String excludes) throws Exception { + try (DatagramSocket socket = new DatagramSocket(0, InetAddress.getByName("127.0.0.1"))) { + socket.setSoTimeout(700); + + TransferLifeCycle lifecycle = new TransferLifeCycle(); + lifecycle.setEnabled(true); + lifecycle.setVoMapping("atlas:2"); + lifecycle.setExcludes(new String[]{excludes}); + lifecycle.setFireflyDestination("127.0.0.1:" + socket.getLocalPort()); + + lifecycle.onStart( + new InetSocketAddress(srcIp, 40000), + new InetSocketAddress(dstIp, 20066), + new TestProtocolInfo("xrootd", "129"), + new Subject()); + + var packet = new DatagramPacket(new byte[4096], 4096); + try { + socket.receive(packet); + return true; + } catch (SocketTimeoutException ignored) { + return false; + } + } + } + + private static class TestProtocolInfo implements ProtocolInfo { + + private static final long serialVersionUID = 1L; + private final String protocol; + private final String transferTag; + + private TestProtocolInfo(String protocol, String transferTag) { + this.protocol = protocol; + this.transferTag = transferTag; + } + + @Override + public String getProtocol() { + return protocol; + } + + @Override + public int getMinorVersion() { + return 0; + } + + @Override + public int getMajorVersion() { + return 0; + } + + @Override + public String getVersionString() { + return "test"; + } + + @Override + public String getTransferTag() { + return transferTag; + } + } +}