Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1700,6 +1700,13 @@ private class HttpTransfer extends RedirectedTransfer<String> {
private final String _requestPath;
private String _transferTag = "";

private static final String HEADER_SCITAG = "SciTag";
private static final String HEADER_TRANSFER_HEADER_SCITAG = "TransferHeaderSciTag";
private static final String[] SCITAG_HEADERS = {
HEADER_SCITAG,
HEADER_TRANSFER_HEADER_SCITAG
};

public HttpTransfer(PnfsHandler pnfs, Subject subject,
Restriction restriction, FsPath path) throws URISyntaxException {
super(pnfs, subject, restriction, path);
Expand All @@ -1709,7 +1716,38 @@ public HttpTransfer(PnfsHandler pnfs, Subject subject,
var request = ServletRequest.getRequest();
request.setAttribute(TRANSACTION_ATTRIBUTE, getTransaction());
_requestPath = Requests.stripToPath(request.getRequestURL().toString());
_transferTag = request.getHeader("SciTag");
_transferTag = readTransferTag(request);
}

private String readTransferTag(HttpServletRequest request) {
// SciTag takes precedence because it is checked first.
for (String header : SCITAG_HEADERS) {
String transferTag = request.getHeader(header);
if (transferTag != null && !transferTag.isBlank()) {
String trimmed = transferTag.trim();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} header found: {} (from client={})",
header, trimmed, request.getRemoteAddr());
}
return trimmed;
}
}

String flowFromQuery = request.getParameter("scitag.flow");
if (flowFromQuery != null && !flowFromQuery.isBlank()) {
String trimmed = flowFromQuery.trim();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("scitag.flow query parameter found: {} (from client={})",
trimmed, request.getRemoteAddr());
}
return trimmed;
}

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("No SciTag header/parameter found in request (client={})",
request.getRemoteAddr());
}
return "";
}

protected ProtocolInfo createProtocolInfo(InetSocketAddress address) {
Expand All @@ -1729,6 +1767,10 @@ protected ProtocolInfo createProtocolInfo(InetSocketAddress address) {
wantedChecksums);
protocolInfo.setSessionId((int) getId());
protocolInfo.setTransferTag(_transferTag);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("ProtocolInfo created with transferTag='{}' for path={}",
_transferTag, _requestPath);
}
return protocolInfo;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@
import org.dcache.xrootd.protocol.XrootdProtocol;
import org.dcache.xrootd.tpc.XrootdTpcInfo;
import org.dcache.xrootd.util.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class XrootdTransfer extends RedirectedTransfer<InetSocketAddress> {

private static final Logger LOGGER = LoggerFactory.getLogger(XrootdTransfer.class);

private UUID _uuid;
private InetSocketAddress _doorAddress;
private InetSocketAddress _internalAddress;
Expand All @@ -39,6 +43,15 @@ public XrootdTransfer(PnfsHandler pnfs, Subject subject,
this.restriction = requireNonNull(restriction);
tpcInfo = new XrootdTpcInfo(opaque);
_transferTag = opaque.getOrDefault("scitag.flow", "");
if (LOGGER.isDebugEnabled()) {
if (!_transferTag.isEmpty()) {
LOGGER.debug("scitag.flow parameter found: {}", _transferTag);
} else if (opaque.containsKey("scitag.flow")) {
LOGGER.debug("scitag.flow parameter found but empty");
} else {
LOGGER.debug("No scitag.flow parameter in this request");
}
}
try {
tpcInfo.setUid(Subjects.getUid(subject));
} catch (NoSuchElementException e) {
Expand Down Expand Up @@ -121,7 +134,11 @@ private XrootdProtocolInfo createXrootdProtocolInfo() {
_uuid,
_doorAddress);

protocolInfo.setTransferTag(_transferTag);
protocolInfo.setTransferTag(_transferTag);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("XrootdProtocolInfo created with transferTag='{}' for pnfs={}",
_transferTag, getPnfsId());
}
return protocolInfo;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.net.InetAddresses.forString;

import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.net.HostAndPort;

Expand Down Expand Up @@ -50,6 +51,15 @@
public class TransferLifeCycle {

private final static Logger LOGGER = LoggerFactory.getLogger(TransferLifeCycle.class);
private static final int MIN_VALID_TRANSFER_TAG = 64;
private static final int MAX_VALID_TRANSFER_TAG = 65535;
private static final int EXPERIMENT_ID_BIT_SHIFT = 6;
private static final int ACTIVITY_ID_MASK = 0x3F;
private static final int DEFAULT_ACTIVITY_ID = 1;
private static final Splitter FQAN_GROUP_SPLITTER = Splitter.on('/')
.trimResults()
.omitEmptyStrings()
.limit(2);

/**
* The UDP firefly default port as described in
Expand Down Expand Up @@ -83,7 +93,7 @@ public void onStart(InetSocketAddress src, InetSocketAddress dst, ProtocolInfo p
return;
}

if (isLocalTransfer(src)) {
if (isExcludedTransfer(src, dst)) {
return;
}

Expand Down Expand Up @@ -126,7 +136,7 @@ public void onEnd(InetSocketAddress src, InetSocketAddress dst, MoverInfoMessage
return;
}

if (isLocalTransfer(src)) {
if (isExcludedTransfer(src, dst)) {
return;
}

Expand Down Expand Up @@ -254,12 +264,12 @@ private OptionalInt getExperimentId(ProtocolInfo protocolInfo, Subject subject)
if (protocolInfo.getTransferTag() != null && !protocolInfo.getTransferTag().isEmpty()) {
try {
int transferTag = Integer.parseInt(protocolInfo.getTransferTag());
if (transferTag <= 64 || transferTag >= 65536) {
if (transferTag < MIN_VALID_TRANSFER_TAG || transferTag > MAX_VALID_TRANSFER_TAG) {
LOGGER.warn("Invalid integer range for transfer tag: {}", protocolInfo.getTransferTag());
return OptionalInt.empty();
}
// scitag = exp_id << 6 | act_id
return OptionalInt.of(transferTag >> 6);
return OptionalInt.of(transferTag >> EXPERIMENT_ID_BIT_SHIFT);
} catch (NumberFormatException e) {
LOGGER.warn("Invalid transfer tag: {}", protocolInfo.getTransferTag());
return OptionalInt.empty();
Expand All @@ -271,23 +281,33 @@ private OptionalInt getExperimentId(ProtocolInfo protocolInfo, Subject subject)
return OptionalInt.empty();
}

return voToExpId.containsKey(vo.getGroup().toLowerCase())
? OptionalInt.of(voToExpId.get(vo.getGroup().toLowerCase()))
String groupPath = vo.getGroup();
if (groupPath == null || groupPath.isBlank()) {
return OptionalInt.empty();
}

groupPath = groupPath.toLowerCase();
String voName = FQAN_GROUP_SPLITTER.splitToList(groupPath).get(0);

return voToExpId.containsKey(voName)
? OptionalInt.of(voToExpId.get(voName))
: OptionalInt.empty();
}

private boolean isLocalTransfer(InetSocketAddress dst) {
InetAddress addr = dst.getAddress();
return localSubnet.test(addr);
private boolean isExcludedTransfer(InetSocketAddress src, InetSocketAddress dst) {
InetAddress srcAddress = src.getAddress();
InetAddress dstAddress = dst.getAddress();
return srcAddress != null && dstAddress != null
&& localSubnet.test(srcAddress)
&& localSubnet.test(dstAddress);
}

private int getActivity(ProtocolInfo protocolInfo) {
if (!protocolInfo.getTransferTag().isEmpty()) {
// scitag = exp_id << 6 | act_id
return Integer.parseInt(protocolInfo.getTransferTag()) & 0x3F;
return Integer.parseInt(protocolInfo.getTransferTag()) & ACTIVITY_ID_MASK;
} else {
// default activity id = 1
return 1;
return DEFAULT_ACTIVITY_ID;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package org.dcache.pool.movers;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import diskCacheV111.vehicles.ProtocolInfo;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.lang.reflect.Method;
import java.util.OptionalInt;
import javax.security.auth.Subject;
import org.dcache.auth.FQANPrincipal;
import org.junit.Before;
import org.junit.Test;

public class TransferLifeCycleTest {

private Method getExperimentId;
private TransferLifeCycle transferLifeCycle;

@Before
public void setup() throws Exception {
transferLifeCycle = new TransferLifeCycle();
transferLifeCycle.setVoMapping("atlas:2,cms:3");

getExperimentId = TransferLifeCycle.class.getDeclaredMethod(
"getExperimentId", ProtocolInfo.class, Subject.class);
getExperimentId.setAccessible(true);
}

@Test
public void shouldAcceptMinimumValidSciTagValue() throws Exception {
OptionalInt experimentId = resolveExperimentId("64", new Subject());

assertTrue(experimentId.isPresent());
assertEquals(1, experimentId.getAsInt());
}

@Test
public void shouldRejectSciTagValueBelowValidRange() throws Exception {
OptionalInt experimentId = resolveExperimentId("63", new Subject());

assertFalse(experimentId.isPresent());
}

@Test
public void shouldMapSlashPrefixedFqanToVoName() throws Exception {
Subject subject = new Subject();
subject.getPrincipals().add(new FQANPrincipal("/atlas/usatlas", true));

OptionalInt experimentId = resolveExperimentId("", subject);

assertTrue(experimentId.isPresent());
assertEquals(2, experimentId.getAsInt());
}

@Test
public void shouldSuppressMarkerWhenBothEndpointsAreExcluded() throws Exception {
assertFalse(sendsStartMarker("10.10.10.10", "10.20.20.20", "10.0.0.0/8"));
}

@Test
public void shouldNotSuppressMarkerWhenOnlySourceIsExcluded() throws Exception {
assertTrue(sendsStartMarker("10.10.10.10", "203.0.113.20", "10.0.0.0/8"));
}

@Test
public void shouldNotSuppressMarkerWhenOnlyDestinationIsExcluded() throws Exception {
assertTrue(sendsStartMarker("203.0.113.20", "10.20.20.20", "10.0.0.0/8"));
}

private OptionalInt resolveExperimentId(String transferTag, Subject subject) throws Exception {
return (OptionalInt) getExperimentId.invoke(transferLifeCycle,
new TestProtocolInfo("xrootd", transferTag), subject);
}

private boolean sendsStartMarker(String srcIp, String dstIp, String excludes) throws Exception {
try (DatagramSocket socket = new DatagramSocket(0, InetAddress.getByName("127.0.0.1"))) {
socket.setSoTimeout(700);

TransferLifeCycle lifecycle = new TransferLifeCycle();
lifecycle.setEnabled(true);
lifecycle.setVoMapping("atlas:2");
lifecycle.setExcludes(new String[]{excludes});
lifecycle.setFireflyDestination("127.0.0.1:" + socket.getLocalPort());

lifecycle.onStart(
new InetSocketAddress(srcIp, 40000),
new InetSocketAddress(dstIp, 20066),
new TestProtocolInfo("xrootd", "129"),
new Subject());

var packet = new DatagramPacket(new byte[4096], 4096);
try {
socket.receive(packet);
return true;
} catch (SocketTimeoutException ignored) {
return false;
}
}
}

private static class TestProtocolInfo implements ProtocolInfo {

private static final long serialVersionUID = 1L;
private final String protocol;
private final String transferTag;

private TestProtocolInfo(String protocol, String transferTag) {
this.protocol = protocol;
this.transferTag = transferTag;
}

@Override
public String getProtocol() {
return protocol;
}

@Override
public int getMinorVersion() {
return 0;
}

@Override
public int getMajorVersion() {
return 0;
}

@Override
public String getVersionString() {
return "test";
}

@Override
public String getTransferTag() {
return transferTag;
}
}
}
Loading