Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
372a76d
feat: BundleCE implementation, first approach
AcquaDiGiorgio Mar 26, 2025
83568fe
feat: add BundleDB and BundleService
AcquaDiGiorgio Apr 11, 2025
2f376d3
fix: SQL syntax error
AcquaDiGiorgio Apr 25, 2025
763b141
feat(test): Add BundleDB integration tests
AcquaDiGiorgio Apr 25, 2025
08ac0d4
fix(test): Bug during BundleDB integration tests
AcquaDiGiorgio Apr 25, 2025
dd70f9d
feat(BundleDB): Change TEXT datatype to VARCHAR
AcquaDiGiorgio May 8, 2025
da323d4
chore: Improve Bundle template and logging
AcquaDiGiorgio May 8, 2025
81abf8f
chore(BundleDB): Return input files as list
AcquaDiGiorgio May 8, 2025
d9e11e7
feat: Add BundlerService in ConfigTemplate
AcquaDiGiorgio May 8, 2025
f3b72f9
chore(BundleCE): Adapt communication to the Service (untested)
AcquaDiGiorgio May 8, 2025
be44c0d
chore(BundleDB): Change status to a PilotStatus style
AcquaDiGiorgio Jun 3, 2025
beccb6b
fix: refactor bundle bash template string into a constant
AcquaDiGiorgio Jun 3, 2025
f879cce
chore: Adapt Bundle status management and CE Building
AcquaDiGiorgio Jun 3, 2025
b855db6
chore: Improve BundleCE, BundleDB and BundleService readability
AcquaDiGiorgio Jun 6, 2025
34abe0a
feat: Add outputs and proxy to bundle creation
AcquaDiGiorgio Jun 25, 2025
4bcf49e
fix: Couple of bugs at BundleCE, BundleDB and BundleService
AcquaDiGiorgio Jul 9, 2025
f9b74cb
feat: Add a proper job status notification and output retrival
AcquaDiGiorgio Jul 21, 2025
9d688f9
chore(BundleCE): Setup bundled CE proxy
AcquaDiGiorgio Sep 17, 2025
8cc1339
feat(BundleDB): Add new table for long input treatment
AcquaDiGiorgio Sep 17, 2025
08edb0b
chore(BundleService): Change input insertion and status retrieval
AcquaDiGiorgio Sep 17, 2025
7a5e17e
chore(BundleTemplates): Remove unnecessary background process
AcquaDiGiorgio Sep 17, 2025
c84915e
fix: Obtain node variables at job wrapper offline wrapper (temporary)
AcquaDiGiorgio Sep 17, 2025
fd031bd
chore(BundleCE): Improve output retrieval
AcquaDiGiorgio Oct 1, 2025
644203b
chore(BundleTemplates): Added extra runner file at wrapper
AcquaDiGiorgio Oct 1, 2025
d8ce358
feat(BundleDB): Add a timestamp to avoid bundle stallin
AcquaDiGiorgio Oct 1, 2025
3f8ac48
chore: Remove unnecesary status files
AcquaDiGiorgio Oct 8, 2025
6114b59
fix(BundleDB): Avoid job insertion in running or finished bundles
AcquaDiGiorgio Oct 8, 2025
8fb75bc
chore: Remove debugging code
AcquaDiGiorgio Oct 8, 2025
c7140cc
feat: Add agent to monitor bundles (untested)
AcquaDiGiorgio Oct 8, 2025
3c71969
chore(PushJobAgent): Modify ce.submitJob to be the same as the submis…
AcquaDiGiorgio Oct 17, 2025
c71ca70
chore: Add BundleManagerAgent to ConfigTemplate
AcquaDiGiorgio Oct 17, 2025
c1e38f5
chore(BundleTemplate): Remove debug bundle monitoring
AcquaDiGiorgio Oct 17, 2025
100bd0f
feat(BundleDB): Add flags to control Bundle stages and accept the Job…
AcquaDiGiorgio Oct 17, 2025
b0c181a
feat(BundleManagerAgent): Add possibility to force-submit bundles
AcquaDiGiorgio Oct 17, 2025
6e2bd44
chore(BundleDB): Generalize Bundle Status using PilotStatus
AcquaDiGiorgio Oct 23, 2025
b2894ab
feat(BundleHandler): Send heartbeat to keep bundles alive
AcquaDiGiorgio Oct 23, 2025
3861d93
fix(PushJobAgent): Bug while obtaining job output in failed job wrappers
AcquaDiGiorgio Mar 2, 2026
0ad48f0
chore: Clean and document
AcquaDiGiorgio Mar 2, 2026
be0ebf8
chore(BundleManagerAgent): Remove unnecesary _cleanFinishedBundles
AcquaDiGiorgio Mar 2, 2026
5e241f6
chore(BundleDB): Remove unnecesary proxyPath
AcquaDiGiorgio Mar 2, 2026
6ba3e9e
chore: Remove ExecTemplate from BundleCE and BundleDB
AcquaDiGiorgio Mar 2, 2026
7cfc79e
chore(JobWrapperOfflineTemplate): Remove temporary debugging code
AcquaDiGiorgio Mar 2, 2026
a3e1623
chore(BundleDB): Split getJobsOfBundle in 2 functions
AcquaDiGiorgio Mar 3, 2026
6599af5
fix(BundleService): Wrapping inputs instead of executables
AcquaDiGiorgio Mar 3, 2026
906ba46
chore(BundleCE): Improve job output obtaining, tmp dir no longer needed
AcquaDiGiorgio Mar 3, 2026
d4ce6a4
chore: Remove unused imports
AcquaDiGiorgio Mar 3, 2026
944f38b
fix(BundleDB): Add extra safeguard while selecting best bundle
AcquaDiGiorgio Mar 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/DIRAC/Resources/Computing/AREXComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ def _writeXRSL(self, executableFile, inputs, outputs):

def _bundlePreamble(self, executableFile):
"""Bundle the preamble with the executable file"""
wrapperContent = f"{self.preamble}\n./{executableFile}"
wrapperContent = f"{self.preamble}\n./{os.path.basename(executableFile)}"

# We need to make sure the executable file can be executed by the wrapper
# By adding the execution mode to the file, the file will be processed as an "executable" in the XRSL
Expand Down
115 changes: 115 additions & 0 deletions src/DIRAC/Resources/Computing/AREXEnhancedComputingElement.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import os
import shutil

from DIRAC import S_ERROR, S_OK
from DIRAC.Resources.Computing.AREXComputingElement import AREXComputingElement


class AREXEnhancedComputingElement(AREXComputingElement):
def _getListOfAvailableOutputs(self, jobID, arcJobID, path=None):
"""Request a list of outputs available for a given jobID.

:param str jobID: job reference without the DIRAC stamp
:param str arcJobID: ARC job ID
:param str path: remote path
:return list: names of the available outputs
"""
query = self._urlJoin(os.path.join("jobs", arcJobID, "session", path or ""))

# Submit the GET request to retrieve the names of the outputs
# self.log.debug(f"Retrieving the names of the outputs for {jobID}")
self.log.debug(f"Retrieving the names of the outputs with {query}")
result = self._request("get", query)
if not result["OK"]:
self.log.error("Failed to retrieve at least some outputs", f"for {jobID}: {result['Message']}")
return S_ERROR(f"Failed to retrieve at least some outputs for {jobID}")
response = result["Value"]

if not response.text:
return S_ERROR(f"There is no output for job {jobID}")

# return S_OK(response.json()["file"])
return S_OK(response.json())

def getJobOutput(self, jobID, workingDirectory=None, path=None):
"""Get the outputs of the given job reference.

Outputs and stored in workingDirectory if present, else in a new directory named <ARC JobID>.

:param str jobID: job reference followed by the DIRAC stamp.
:param str workingDirectory: name of the directory containing the retrieved outputs.
:param str path: remote path
:return: content of stdout and stderr
"""
result = self._checkSession()
if not result["OK"]:
self.log.error("Cannot get job outputs", result["Message"])
return result

# Extract stamp from the Job ID
if ":::" in jobID:
jobRef, stamp = jobID.split(":::")
else:
return S_ERROR(f"DIRAC stamp not defined for {jobID}")
arcJob = self._jobReferenceToArcID(jobRef)

# Get the list of available outputs
result = self._getListOfAvailableOutputs(jobRef, arcJob, path)
if not result["OK"]:
return result
remoteOutputs = result["Value"]
self.log.debug("Outputs to get are", remoteOutputs)

remoteOutputsFiles = []
if "file" in remoteOutputs:
remoteOutputsFiles = remoteOutputs["file"]

remoteOutputsDirs = []
if "dir" in remoteOutputs:
remoteOutputsDirs = remoteOutputs["dir"]

if not workingDirectory:
if "WorkingDirectory" in self.ceParameters:
# We assume that workingDirectory exists
workingDirectory = os.path.join(self.ceParameters["WorkingDirectory"], arcJob)
else:
workingDirectory = arcJob

if not os.path.exists(workingDirectory):
os.mkdir(workingDirectory)

# Directories
for remoteOutput in remoteOutputsDirs:
self.getJobOutput(
jobID,
workingDirectory=os.path.join(workingDirectory, remoteOutput),
path=os.path.join(path or "", remoteOutput),
)

# Files
stdout = None
stderr = None
for remoteOutput in remoteOutputsFiles:
# Prepare the command
# query = self._urlJoin(os.path.join("jobs", arcJob, "session", remoteOutput))
query = self._urlJoin(os.path.join("jobs", arcJob, "session", path or "", remoteOutput))

# Submit the GET request to retrieve outputs
result = self._request("get", query, stream=True)
if not result["OK"]:
self.log.error("Error downloading", f"{remoteOutput} for {arcJob}: {result['Message']}")
return S_ERROR(f"Error downloading {remoteOutput} for {jobID}")
response = result["Value"]

localOutput = os.path.join(workingDirectory, remoteOutput)
with open(localOutput, "wb") as f:
shutil.copyfileobj(response.raw, f)

if remoteOutput == f"{stamp}.out":
with open(localOutput) as f:
stdout = f.read()
if remoteOutput == f"{stamp}.err":
with open(localOutput) as f:
stderr = f.read()

return S_OK((stdout, stderr))
Loading
Loading