diff --git a/src/DIRAC/Resources/Computing/AREXComputingElement.py b/src/DIRAC/Resources/Computing/AREXComputingElement.py index c70a6a6a28c..1c58d85b102 100755 --- a/src/DIRAC/Resources/Computing/AREXComputingElement.py +++ b/src/DIRAC/Resources/Computing/AREXComputingElement.py @@ -490,7 +490,7 @@ def _writeXRSL(self, executableFile, inputs, outputs): def _bundlePreamble(self, executableFile): """Bundle the preamble with the executable file""" - wrapperContent = f"{self.preamble}\n./{executableFile}" + wrapperContent = f"{self.preamble}\n./{os.path.basename(executableFile)}" # We need to make sure the executable file can be executed by the wrapper # By adding the execution mode to the file, the file will be processed as an "executable" in the XRSL diff --git a/src/DIRAC/Resources/Computing/AREXEnhancedComputingElement.py b/src/DIRAC/Resources/Computing/AREXEnhancedComputingElement.py new file mode 100644 index 00000000000..45c1cbe76a6 --- /dev/null +++ b/src/DIRAC/Resources/Computing/AREXEnhancedComputingElement.py @@ -0,0 +1,115 @@ +import os +import shutil + +from DIRAC import S_ERROR, S_OK +from DIRAC.Resources.Computing.AREXComputingElement import AREXComputingElement + + +class AREXEnhancedComputingElement(AREXComputingElement): + def _getListOfAvailableOutputs(self, jobID, arcJobID, path=None): + """Request a list of outputs available for a given jobID. + + :param str jobID: job reference without the DIRAC stamp + :param str arcJobID: ARC job ID + :param str path: remote path + :return list: names of the available outputs + """ + query = self._urlJoin(os.path.join("jobs", arcJobID, "session", path or "")) + + # Submit the GET request to retrieve the names of the outputs + # self.log.debug(f"Retrieving the names of the outputs for {jobID}") + self.log.debug(f"Retrieving the names of the outputs with {query}") + result = self._request("get", query) + if not result["OK"]: + self.log.error("Failed to retrieve at least some outputs", f"for {jobID}: {result['Message']}") + return S_ERROR(f"Failed to retrieve at least some outputs for {jobID}") + response = result["Value"] + + if not response.text: + return S_ERROR(f"There is no output for job {jobID}") + + # return S_OK(response.json()["file"]) + return S_OK(response.json()) + + def getJobOutput(self, jobID, workingDirectory=None, path=None): + """Get the outputs of the given job reference. + + Outputs and stored in workingDirectory if present, else in a new directory named . + + :param str jobID: job reference followed by the DIRAC stamp. + :param str workingDirectory: name of the directory containing the retrieved outputs. + :param str path: remote path + :return: content of stdout and stderr + """ + result = self._checkSession() + if not result["OK"]: + self.log.error("Cannot get job outputs", result["Message"]) + return result + + # Extract stamp from the Job ID + if ":::" in jobID: + jobRef, stamp = jobID.split(":::") + else: + return S_ERROR(f"DIRAC stamp not defined for {jobID}") + arcJob = self._jobReferenceToArcID(jobRef) + + # Get the list of available outputs + result = self._getListOfAvailableOutputs(jobRef, arcJob, path) + if not result["OK"]: + return result + remoteOutputs = result["Value"] + self.log.debug("Outputs to get are", remoteOutputs) + + remoteOutputsFiles = [] + if "file" in remoteOutputs: + remoteOutputsFiles = remoteOutputs["file"] + + remoteOutputsDirs = [] + if "dir" in remoteOutputs: + remoteOutputsDirs = remoteOutputs["dir"] + + if not workingDirectory: + if "WorkingDirectory" in self.ceParameters: + # We assume that workingDirectory exists + workingDirectory = os.path.join(self.ceParameters["WorkingDirectory"], arcJob) + else: + workingDirectory = arcJob + + if not os.path.exists(workingDirectory): + os.mkdir(workingDirectory) + + # Directories + for remoteOutput in remoteOutputsDirs: + self.getJobOutput( + jobID, + workingDirectory=os.path.join(workingDirectory, remoteOutput), + path=os.path.join(path or "", remoteOutput), + ) + + # Files + stdout = None + stderr = None + for remoteOutput in remoteOutputsFiles: + # Prepare the command + # query = self._urlJoin(os.path.join("jobs", arcJob, "session", remoteOutput)) + query = self._urlJoin(os.path.join("jobs", arcJob, "session", path or "", remoteOutput)) + + # Submit the GET request to retrieve outputs + result = self._request("get", query, stream=True) + if not result["OK"]: + self.log.error("Error downloading", f"{remoteOutput} for {arcJob}: {result['Message']}") + return S_ERROR(f"Error downloading {remoteOutput} for {jobID}") + response = result["Value"] + + localOutput = os.path.join(workingDirectory, remoteOutput) + with open(localOutput, "wb") as f: + shutil.copyfileobj(response.raw, f) + + if remoteOutput == f"{stamp}.out": + with open(localOutput) as f: + stdout = f.read() + if remoteOutput == f"{stamp}.err": + with open(localOutput) as f: + stderr = f.read() + + return S_OK((stdout, stderr)) diff --git a/src/DIRAC/Resources/Computing/BundleComputingElement.py b/src/DIRAC/Resources/Computing/BundleComputingElement.py new file mode 100644 index 00000000000..da28a7cf928 --- /dev/null +++ b/src/DIRAC/Resources/Computing/BundleComputingElement.py @@ -0,0 +1,331 @@ +"""Bundle Computing Elemenet + +Allows grouping jobs in a single big job prior to their submission in an actual CE. + +**Configuration Parameters** + +Configuration for the BundleComputingElemenet submission can be done via the configuration system. +Below, you can find a list of parameters specific to the BundleCE. + +InnerCEType: + Type of the CE that will end up executing the templated wrapper. + +**CE Configuration** + +This CE must be configure in the same way as the one that will execute the jobs, the only +difference is that the CEType will become InnerCEType and it must have configured the template +to be used. + +For example: + +CEs +{ + host + { + CEType = SSH + SSHHost = host + SSHUser = user + SSHPassword = password + ... + Queues + { + dirac + { + ... + } + } + } +} + +Will become: + +CEs +{ + host + { + CEType = BUNDLE + InnerCEType = SSH + + SSHHost = host + SSHUser = user + SSHPassword = password + ... + Queues + { + dirac + { + ... + } + } + } +} + +**Code Documentation** +""" + +import copy +import inspect +import os +import uuid + +from DIRAC import S_ERROR, S_OK, gConfig +from DIRAC.Resources.Computing.ComputingElement import ComputingElement +from DIRAC.Resources.Computing.ComputingElementFactory import ComputingElementFactory +from DIRAC.WorkloadManagementSystem.Client import PilotStatus +from DIRAC.WorkloadManagementSystem.Client.BundlerClient import BundlerClient + + +class BundleTaskDict(dict): + def __init__(self, getProperty): + self.getProperty = getProperty + + def __contains__(self, jobId): + if super().__contains__(jobId): + return True + + res = self.getProperty(jobId) + if res: + self.__setitem__(jobId, res) + return True + + return False + + def __getitem__(self, jobId): + if jobId in self: + return super().__getitem__(jobId) + + res = self.getProperty(jobId) + if res: + super().__setitem__(jobId, res) + + return res + + +class BundleComputingElement(ComputingElement): + def __init__(self, ceUniqueID): + """Standard constructor.""" + if not ceUniqueID.startswith("bundled-"): + ceUniqueID = f"bundled-{ceUniqueID}" + + super().__init__(ceUniqueID) + + self.mandatoryParameters = ["InnerCEType"] + + self.innerCE = None + self.innerCEParams = {} + + self.bundler = BundlerClient() + self.ceFactory = ComputingElementFactory() + + self.taskResults = BundleTaskDict(self.__getTraskResult) + + ############################################################################# + + def _reset(self): + self.taskResults = BundleTaskDict(self.__getTraskResult) + + # Force the CE to make the job submissions asynchronous + self.ceParameters["AsyncSubmission"] = True + + # Create the InnerCE from the config obtained from the BundleCE + innerCEParams = copy.deepcopy(self.ceParameters) + innerCEType = innerCEParams.pop("InnerCEType") + innerCEParams["CEType"] = innerCEType + + innerCeName = self.ceParameters["GridCE"][len("bundled-") :] + + innerCEParams["GridCE"] = innerCeName + + # Building of the InnerCE + result = self.ceFactory.getCE(ceType=innerCEType, ceName=innerCeName, ceParametersDict=innerCEParams) + + if not result["OK"]: + self.log.error("Failure while creating the InnerCE") + return result + + self.innerCE = result["Value"] + self.innerCE.setParameters(innerCEParams) + self.innerCEParams = innerCEParams + + self.innerCEMethods = [ + name for name, _ in inspect.getmembers(self.innerCE, predicate=inspect.ismethod) if name[0] != "_" + ] + + self.bundlesBaseDir = gConfig.getValue("/LocalSite/BundlesBaseDir", "/tmp/bundles") + + return S_OK() + + ############################################################################# + + def submitJob(self, executableFile, proxy=None, numberOfProcessors=1, inputs=[], outputs=[], **kwargs): + jobId = str(uuid.uuid4().hex) + + proxy = self.proxy if self.proxy else proxy + + if not proxy: + self.log.error("Proxy not defined. Use setProxy or send proxy during job submission") + return S_ERROR("PROXY NOT DEFINED") + + # Store the job in a bundle using the ceDict of the InnerCE (containing the template) + if isinstance(proxy, str): + return S_ERROR("PROXY CANNOT BE IN A STRING FORMAT") + + proxyStr = proxy.dumpAllToString()["Value"] + result = self.writeProxyToFile(proxyStr) + + if not result["OK"]: + return result + + proxyPath = result["Value"] + + diracId = kwargs.get("jobDesc", {}).get("jobID", None) + if diracId: + diracId = int(diracId) + + result = self.bundler.storeInBundle( + jobId, executableFile, inputs, outputs, proxyPath, numberOfProcessors, self.innerCEParams, diracId + ) + + if not result["OK"]: + self.log.error(f"Failure while storing in the Bundle: {result}") + return result + + bundleId = result["Value"]["BundleID"] + submitted = result["Value"]["Executing"] # For logging purposes + + result = S_OK([jobId]) + result["PilotStampDict"] = {jobId: bundleId} + + if not submitted: + self.log.info(f"Job {jobId} stored successfully in bundle: ", bundleId) + else: + self.log.info("Submitting job to CE: ", self.innerCE.ceName) + + # Return the id of the job, setting the "PilotStamp" to the BundleID + return result + + def getJobOutput(self, jobId, workingDirectory="."): + bundleId = None + if ":::" in jobId: + jobId, bundleId = jobId.split(":::") + + if not bundleId: + bundleId = self.bundler.bundleIdFromJobId(jobId) + + result = self.bundler.getTaskInfo(bundleId) + + if not result["OK"]: + return result + + if result["Value"]["Status"] not in PilotStatus.PILOT_FINAL_STATES: + return S_ERROR("Output not ready yet") + + taskId = result["Value"]["TaskID"] + + result = self.innerCE.getJobOutput(taskId, workingDirectory=workingDirectory, path=jobId) + + error = os.path.join(workingDirectory, f"{bundleId}.err") + output = os.path.join(workingDirectory, f"{bundleId}.out") + + if not os.path.exists(output) or not os.path.exists(error): + return S_ERROR("Outputs unable to be obtained") + + with open(output) as f: + output = f.read() + + with open(error) as f: + error = f.read() + + return S_OK((output, error)) + + def getJobStatus(self, jobIDList): + resultDict = {} + + if not isinstance(jobIDList, list): + jobIDList = [jobIDList] + + for job in jobIDList: + jobId = job + bundleId = None + if ":::" in job: + jobId, bundleId = job.split(":::") + + if not bundleId: + result = self.bundler.bundleIdFromJobId(jobId) + if not result["OK"]: + return result + bundleId = result["Value"] + + self.log.debug(f"Obtaining the status of job: '{jobId}' with bundleID: '{bundleId}'") + result = self.bundler.getBundleStatus(bundleId) + + if not result["OK"]: + return result + + # Default Value: The one from the Bundle + resultDict[jobId] = result["Value"] + self.log.debug(f"Status of bundle '{bundleId}': {result['Value']}") + + return S_OK(resultDict) + + ############################################################################# + + def getCEStatus(self): + return self.innerCE.getCEStatus() + + def setProxy(self, proxy): + super().setProxy(proxy) + self.innerCE.setProxy(proxy) + + def setToken(self, token, valid=0): + super().setToken(token, valid) + self.innerCE.setToken(token, valid) + + def cleanJob(self, jobIDList): + if "cleanJob" not in self.innerCEMethods: + self.log.error(f"Inner CE {self.innerCE.ceName} has no function called 'cleanJob'") + return S_ERROR(f"Inner CE {self.innerCE.ceName} has no function called 'cleanJob'") + + if not isinstance(jobIDList, list): + jobIDList = [jobIDList] + + for job in jobIDList: + if ":::" in job: + job, bundleId = job.split(":::") + + return self.bundler.cleanJob(job) + + def killJob(self, jobIDList): + resultDict = {} + + for job in jobIDList: + if ":::" in job: + jobId, bundleId = job.split(":::") + + result = self.bundler.tryToKillJob(jobId) + resultDict[jobId] = result + + return resultDict + + ############################################################################# + + def __getTraskResult(self, jobId): + self.log.debug(f"Obtaining the task results of {jobId}") + + result = self.getJobStatus(jobId) + + if not result["OK"]: + return result + + if ":::" in jobId: + jobId, _ = jobId.split(":::") + + status = result["Value"][jobId] + + if status not in PilotStatus.PILOT_FINAL_STATES: + return S_OK() + + if status == PilotStatus.DONE: + return S_OK(0) + + return S_OK(1) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/BundleManagerAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/BundleManagerAgent.py new file mode 100644 index 00000000000..2ac6054684c --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Agent/BundleManagerAgent.py @@ -0,0 +1,158 @@ +from datetime import datetime, timedelta, timezone + +from DIRAC import S_ERROR, S_OK +from DIRAC.Core.Base.AgentModule import AgentModule +from DIRAC.WorkloadManagementSystem.Client import JobStatus, PilotStatus +from DIRAC.WorkloadManagementSystem.Client.BundlerClient import BundlerClient +from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient +from DIRAC.WorkloadManagementSystem.DB.BundleDB import BundleDB +from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB + + +class BundleManagerAgent(AgentModule): + def __init__(self, agentName, loadName, baseAgentName=False, properties=None): + if not properties: + properties = {} + super().__init__(agentName, loadName, baseAgentName, properties) + + self.bundleDB = None + self.jobDB = None + + ############################################################################# + + def initialize(self): + self.bundleDB = BundleDB() + self.jobDB = JobDB() + self.jobMonitor = JobMonitoringClient() + self.bundler = BundlerClient() + self.maxMinsInBundle = self.am_getOption("MaxMinutesInBundle", defaultValue=10) + return S_OK() + + def execute(self): + self.log.info("Sending stalled Bundles") + result = self._sendStalledBundles() + if not result["OK"]: + self.log.warn(f"Failed send the bundles: {result}") + + self.log.info("Deleting killed jobs from bundles") + result = self._removeKilledJobs() + if not result["OK"]: + self.log.warn(f"Failed to delete the inputs: {result}") + + self._checkHeartBeat() + + return S_OK() + + def finalize(self): + return S_OK() + + ############################################################################# + + def _removeKilledJobs(self): + killedJobs = [] + + result = self.bundleDB.getWaitingBundles() + if not result["OK"]: + return result + + bundles = result["Value"] + self.log.debug(f"> Found {len(bundles)} waiting bundles") + + for bundleInfo in bundles: + bundleId = bundleInfo["BundleID"] + + result = self.bundleDB.getJobsOfBundle(bundleId) + if not result["OK"]: + self.log.error(f"Failed to get the jobs of the bundle '{bundleId}'") + return result + + jobs = result["Value"] + jobIds = list(jobs.keys()) + + diracIds = [] + diracIdToJobId = {} + for jobId in jobIds: + if "DiracID" not in jobs[jobId]: + continue + + diracId = jobs[jobId]["DiracID"] + if diracId: + diracIds.append(diracId) + diracIdToJobId[diracId] = jobId + + result = self.jobMonitor.getJobsStatus(diracIds) + if not result["OK"]: + self.log.error(f"Failed to get the status of the jobs with ids: {diracIds}") + self.log.error(result) + return result + + statusDict = result["Value"] + for diracId, status in statusDict.items(): + if status == JobStatus.KILLED: + self.log.info(f"> Status of job '{diracId}' is 'Killed', adding it to the deletion list") + killedJobs.append(diracIdToJobId[diracId]) + + if not killedJobs: + self.log.verbose("Nothing to delete...") + return S_OK() + + result = self.bundleDB.removeJobsFromBundle(killedJobs) + if not result["OK"]: + return result + + deletedDict = result["Value"] + + failedDeletions = {} + for jobId, jobResult in deletedDict.items(): + if not jobResult["OK"]: + failedDeletions[jobId] = jobResult + + if failedDeletions: + return S_ERROR(f"Failed to delete the following jobs: {failedDeletions}") + + return S_OK() + + def _sendStalledBundles(self): + result = self.bundleDB.getWaitingBundles() + if not result["OK"]: + return result + + bundles = result["Value"] + self.log.debug(f"> Found {len(bundles)} waiting bundles") + + bundleIds = [] + currentTime = datetime.now(tz=timezone.utc).replace(tzinfo=None) + + for bundleInfo in bundles: + elapsedTime: timedelta = currentTime - bundleInfo["LastTimestamp"] + elapsedMinutes = elapsedTime.total_seconds() // 60 + + if elapsedMinutes > self.maxMinsInBundle: + _id = bundleInfo["BundleID"] + bundleIds.append(bundleInfo["BundleID"]) + + if bundleIds: + self.log.info(f"> Force-Submitting {len(bundleIds)} bundles due to timeout, IDs: ({bundleIds})") + result = self.bundler.forceSubmitBundles(bundleIds) + + return S_OK() + + def _checkHeartBeat(self): + """Hack to avoid stalled jobs when they are not""" + self.log.info("Sending heartbeats to running bundles") + result = self.bundleDB.getRunningBundles() + if not result["OK"]: + return result + + for bundleInfo in result["Value"]: + if bundleInfo["Status"] == PilotStatus.RUNNING: + result = self.bundleDB.getJobsOfBundle(bundleInfo["BundleID"]) + if not result["OK"]: + continue + + for _, jobDesc in result["Value"].items(): + diracId = jobDesc["DiracID"] + if not diracId: + continue + + self.jobDB.setHeartBeatData(diracId, {}) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py index 492c1c0e200..e3724d514c7 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py @@ -366,6 +366,9 @@ def execute(self): resourceParams=ceDict, optimizerParams=optimizerParams, processors=submissionParams["processors"], + wholeNode=submissionParams["wholeNode"], + maxNumberOfProcessors=submissionParams["maxNumberOfProcessors"], + mpTag=submissionParams["mpTag"], ) if not result["OK"]: self.failedQueues[queueName] += 1 @@ -521,6 +524,9 @@ def _submitJobWrapper( resourceParams: dict, optimizerParams: dict, processors: int, + wholeNode: bool, + maxNumberOfProcessors: int, + mpTag: bool, ): """Submit a JobWrapper to the remote site @@ -618,6 +624,13 @@ def _submitJobWrapper( proxy=None, inputs=inputs, outputs=outputs, + numberOfProcessors=processors, + maxNumberOfProcessors=maxNumberOfProcessors, + wholeNode=wholeNode, + mpTag=mpTag, + jobDesc=jobDesc, + log=self.log, + logLevel=self.logLevel, ) if not result["OK"]: rescheduleResult = rescheduleFailedJob( @@ -801,6 +814,12 @@ def _checkSubmittedJobWrappers(self, ce: ComputingElement, site: str): self.log.exception("JobWrapper failed the initialization phase", jobID) continue + if status == PilotStatus.FAILED: + job.jobReport.setJobStatus(status=JobStatus.FAILED, minorStatus="Payload failed", sendFlag=False) + job.sendFailoverRequest() + job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) + continue + # Get the output of the job self.log.info(f"Getting the outputs of taskID {taskID} for {jobID}") if not (result := ce.getJobOutput(f"{taskID}:::{stamp}", job.jobIDPath))["OK"]: diff --git a/src/DIRAC/WorkloadManagementSystem/Client/BundlerClient.py b/src/DIRAC/WorkloadManagementSystem/Client/BundlerClient.py new file mode 100644 index 00000000000..2ffdd986d8a --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Client/BundlerClient.py @@ -0,0 +1,22 @@ +""" Module that contains simple client access to Bundler service +""" + +from DIRAC.Core.Base.Client import Client, createClient + + +@createClient("WorkloadManagement/Bundler") +class BundlerClient(Client): + """Exposes the functionality available in the WorkloadManagement/BundlerHandler + + This inherits the DIRAC base Client for direct execution of server functionality. + The following methods are available (although not visible here). + """ + + def __init__(self, url=None, **kwargs): + super().__init__(**kwargs) + + if not url: + self.serverURL = "WorkloadManagement/Bundler" + + else: + self.serverURL = url diff --git a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg index f31d304d764..859d71cf6c2 100644 --- a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg +++ b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg @@ -158,6 +158,12 @@ Services { Port = 9175 } + ##BEGIN Bundler + Bundler + { + Port = 9176 + } + ##END } Agents { @@ -330,6 +336,14 @@ Agents PollingTime = 120 } ##END + ##BEGIN BundleManagerAgent + BundleManagerAgent + { + PollingTime = 120 + MaxMinutesInBundle = 60 + MaxDaysInDB = 2 + } + ##END } Executors { diff --git a/src/DIRAC/WorkloadManagementSystem/DB/BundleDB.py b/src/DIRAC/WorkloadManagementSystem/DB/BundleDB.py new file mode 100755 index 00000000000..d8f0c20bac5 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/DB/BundleDB.py @@ -0,0 +1,522 @@ +"""BundleDB class is a front-end to the bundle db""" + +import uuid +from ast import literal_eval +from datetime import datetime, timezone + +from DIRAC import S_ERROR, S_OK +from DIRAC.Core.Base.DB import DB +from DIRAC.FrameworkSystem.Client.Logger import contextLogger +from DIRAC.WorkloadManagementSystem.Client import PilotStatus + + +def formatSelectOutput(listOfResults, keys): + retVal = [] + + for kvTuple in listOfResults: + inner = {} + for k, v in zip(keys, list(kvTuple)): + inner[k] = v + retVal.append(inner) + + return retVal + + +class BundleDB(DB): + """BundleDB MySQL Database Manager""" + + def __init__(self, parentLogger=None): + super().__init__("BundleDB", "WorkloadManagement/BundleDB", parentLogger=parentLogger) + self._defaultLogger = self.log + + self.BUNDLES_INFO_TABLE = "BundlesInfo" + self.JOB_TO_BUNDLE_TABLE = "JobToBundle" + self.JOB_INPUTS_TABLE = "JobInputs" + + self.BUNDLES_INFO_COLUMNS = [ + "BundleID", + "ProcessorSum", + "MaxProcessors", + "Site", + "CE", + "Queue", + "CEDict", + "TaskID", + "Status", + "ProxyPath", + "Flags", + "FirstTimestamp", + "LastTimestamp", + ] + + self.JOB_TO_BUNDLE_COLUMNS = [ + "JobID", + "BundleID", + "DiracID", + "ExecutablePath", + "Outputs", + "Processors", + ] + + self.JOB_INPUTS_COLUMNS = [ + "InputID", + "JobID", + "InputPath", + ] + + self.MYSQL_DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" + + self.BUNDLE_FLAGS = { + "Cleaned": 1, + "Purged": 1 << 1, + } + + @property + def log(self): + return contextLogger.get() or self._defaultLogger + + @log.setter + def log(self, value): + self._defaultLogger = value + + ############################################################################# + + def insertJobToBundle(self, jobId, executable, inputs, outputs, processors, ceDict, proxyPath, diracId): + """Inserts a new job in a new or existing Bundle depending of the CE to be submitted.""" + result = self._getBundlesFromCEDict(ceDict) + + if not result["OK"]: + return result + + bundles = result["Value"] + + # No bundles matching ceDict, so create a new one + if not bundles: + result = self._createNewBundle(ceDict, proxyPath) + + if not result["OK"]: + return result + + bundleId = result["Value"] + result = self._insertJobInBundle(jobId, bundleId, executable, inputs, outputs, processors, diracId) + + if not result["OK"]: + return result + + return S_OK({"BundleId": bundleId, "Ready": result["Value"]["Ready"]}) + + # Check the best possible bundle to insert the job + bundleId = self.__selectBestBundle(bundles, processors) + + # If it does not fit in an already created bundle, create a new one + if not bundleId: + result = self._createNewBundle(ceDict, proxyPath) + + if not result["OK"]: + return result + + bundleId = result["Value"] + + # Insert it and obtain if it is ready to be submitted + result = self._insertJobInBundle(jobId, bundleId, executable, inputs, outputs, processors, diracId) + + if not result["OK"]: + return result + + return S_OK({"BundleId": bundleId, "Ready": result["Value"]["Ready"]}) + + def removeJobsFromBundle(self, jobIds): + """Receives a list of DIRAC JobIds, matches them to their corresponding bundle and removes them.""" + if not isinstance(jobIds, list): + jobIds = list(jobIds) + + for jobId in jobIds: + result = self.getFields(self.JOB_TO_BUNDLE_TABLE, ["BundleID", "Processors"], {"JobID": jobId}) + + if not result["OK"]: + return result + + jobInfo = result["Value"][0] + bundleId = jobInfo[0] + nProcs = jobInfo[1] + + result = self._reduceProcessorSum(bundleId, nProcs) + + if not result["OK"]: + return result + + result = self.deleteEntries(self.JOB_TO_BUNDLE_TABLE, {"JobID": jobIds}) + return S_OK(result) + + ############################################################################# + + def getUnpurgedBundles(self): + """Obtains the list of Bundles that inputs haven't been removed locally.""" + cmd = 'SELECT BundleID FROM BundlesInfo WHERE Status = "{status}" AND Flags & {flag} != {flag};'.format( + status=PilotStatus.DONE, flag=self.BUNDLE_FLAGS["Purged"] + ) + + result = self._query(cmd) + + if not result["OK"]: + return result + + return S_OK([entry[0] for entry in result["Value"]]) + + def isBundleCleaned(self, bundleId): + """Check if ce.cleanJob has been performed properly.""" + cmd = 'SELECT BundleID FROM BundlesInfo WHERE BundleID = "{bundleId}" AND Flags & {flag} = {flag};'.format( + bundleId=bundleId, flag=self.BUNDLE_FLAGS["Cleaned"] + ) + + result = self._query(cmd) + + if not result["OK"]: + return result + + cleaned = result["Value"] != [] + + return S_OK(cleaned) + + def getWaitingBundles(self): + return self._getBundlesWithStatus(PilotStatus.WAITING) + + def getRunningBundles(self): + return self._getBundlesWithStatus(PilotStatus.RUNNING) + + def _getBundlesWithStatus(self, status): + """Get Bundles that match certain status.""" + result = self.getFields(self.BUNDLES_INFO_TABLE, self.BUNDLES_INFO_COLUMNS, {"Status": status}) + + if not result["OK"]: + return result + + bundlesDict = formatSelectOutput(result["Value"], self.BUNDLES_INFO_COLUMNS) + return S_OK(bundlesDict) + + ############################################################################# + + def getBundleIdFromJobId(self, jobId): + """Returns the BundleId that corresponds to a DIRAC JobId.""" + result = self.getFields(self.JOB_TO_BUNDLE_TABLE, ["BundleID"], {"JobID": jobId}) + + if not result["OK"]: + return result + + if not result["Value"]: + return S_ERROR("JobId not present in any bundle") + + return S_OK(result["Value"][0][0]) + + def getBundleStatus(self, bundleId): + """Obtain the status of the Bundle.""" + result = self.getFields(self.BUNDLES_INFO_TABLE, ["Status"], {"BundleID": bundleId}) + + if not result["Value"]: + return S_ERROR("Failed to get bundle Status") + + return S_OK(result["Value"][0][0]) + + def getJobsAndInputsOfBundle(self, bundleId): + """Get every Job and Inputs that comprise a Bundle.""" + + cmd = """\ + SELECT JobToBundle.JobID, DiracID, ExecutablePath, Outputs, Processors, InputPath + FROM JobToBundle + LEFT JOIN JobInputs + ON JobToBundle.JobID = JobInputs.JobID + WHERE BundleID = "{bundleId}";""".format( + bundleId=bundleId + ) + + result = self._query(cmd) + + if not result["OK"]: + return result + + rows = list(result["Value"]) + retVal = {} + + # For each row (JobID, ExecutablePath, Outputs, Processors, InputPath) + for row in rows: + jobID, diracId, jobExecutablePath, jobOutputs, processors, jobInputPath = row + + if jobID not in retVal: + retVal[jobID] = { + "ExecutablePath": jobExecutablePath, + "DiracID": diracId, + "Outputs": [], + "Processors": processors, + "Inputs": [], + } + + retVal[jobID]["Outputs"].extend(literal_eval(jobOutputs)) + retVal[jobID]["Inputs"].append(jobInputPath) + + return S_OK(retVal) + + def getJobsOfBundle(self, bundleId): + """Get every Job that comprise a Bundle.""" + cmd = f""" SELECT JobID, DiracID, ExecutablePath, Outputs, Processors + FROM JobToBundle + WHERE BundleID = "{bundleId}";""" + + result = self._query(cmd) + + if not result["OK"]: + return result + + rows = list(result["Value"]) + retVal = {} + + # For each row (JobID, ExecutablePath, Outputs, Processors) + for row in rows: + jobID, diracId, jobExecutablePath, jobOutputs, processors = row + + if jobID not in retVal: + retVal[jobID] = { + "ExecutablePath": jobExecutablePath, + "DiracID": diracId, + "Outputs": [], + "Processors": processors, + } + + retVal[jobID]["Outputs"].extend(literal_eval(jobOutputs)) + + return S_OK(retVal) + + def getJobIDsOfBundle(self, bundleId): + """Returns the list of JobIds that are contained in a bundle""" + result = self.getFields(self.JOB_TO_BUNDLE_TABLE, ["JobID"], {"BundleID": bundleId}) + + if not result["OK"]: + return result + + return S_OK([entry[0] for entry in result["Value"]]) + + def removeJobInputs(self, jobIds): + """Removes the contents of the JobInputs table for each corresponding JobID.""" + if not isinstance(jobIds, list): + jobIds = [jobIds] + + return self.deleteEntries(self.JOB_INPUTS_TABLE, {"JobID": jobIds}) + + ############################################################################# + + def setTaskId(self, bundleId, taskId): + """Sets the value of the TaskID generetad by the real CE during Bundle submission.""" + result = self.updateFields( + self.BUNDLES_INFO_TABLE, ["TaskID", "Status"], [taskId, PilotStatus.RUNNING], {"BundleID": bundleId} + ) + return result + + def getTaskId(self, bundleId): + """Returns the value of the TaskId stored.""" + result = self.getFields(self.BUNDLES_INFO_TABLE, ["TaskID"], {"BundleID": bundleId}) + + if not result["OK"]: + return result + + return S_OK(result["Value"][0][0]) + + ############################################################################# + + def setBundleAsDone(self, bundleId): + result = self._updateBundleStatus(bundleId, PilotStatus.DONE) + return result + + def setBundleAsFailed(self, bundleId): + result = self._updateBundleStatus(bundleId, PilotStatus.FAILED) + return result + + def setBundleAsPurged(self, bundleId): + cmd = 'UPDATE BundlesInfo SET Flags = Flags | {flag} WHERE BundleID = "{bundleId}";'.format( + bundleId=bundleId, flag=self.BUNDLE_FLAGS["Purged"] + ) + + return self._query(cmd) + + def setBundleAsCleaned(self, bundleId): + cmd = 'UPDATE BundlesInfo SET Flags = Flags | {flag} WHERE BundleID = "{bundleId}";'.format( + bundleId=bundleId, flag=self.BUNDLE_FLAGS["Cleaned"] + ) + + return self._query(cmd) + + ############################################################################# + + def getWholeBundle(self, bundleId): + result = self.getFields(self.BUNDLES_INFO_TABLE, [], {"BundleID": bundleId}) + + if not result["OK"]: + return result + + if not result["Value"]: + return S_ERROR(f"No bundle with id {bundleId}") + + bundleDict = formatSelectOutput(result["Value"], self.BUNDLES_INFO_COLUMNS)[0] + + return S_OK(bundleDict) + + def getBundleCE(self, bundleId): + result = self.getFields(self.BUNDLES_INFO_TABLE, ["CEDict", "ProxyPath"], {"BundleID": bundleId}) + + if not result["OK"]: + return result + + return S_OK(formatSelectOutput(result["Value"], ["CEDict", "ProxyPath"])[0]) + + ############################################################################# + + def _reduceProcessorSum(self, bundleId, nProcessors): + cmd = 'UPDATE BundlesInfo SET ProcessorSum = ProcessorSum - {nProcs} WHERE BundleID = "{bundleId}";'.format( + bundleId=bundleId, nProcs=nProcessors + ) + return self._query(cmd) + + def _createNewBundle(self, ceDict, proxyPath): + """Initialize a new Bundle.""" + timestamp = datetime.now(tz=timezone.utc).strftime(self.MYSQL_DATETIME_FORMAT) + + bundleId = uuid.uuid4().hex + insertInfo = { + "BundleID": bundleId, + "ProcessorSum": 0, + "MaxProcessors": ceDict["NumberOfProcessors"], + "Site": ceDict["Site"], + "CE": ceDict["GridCE"], + "Queue": ceDict["Queue"], + "CEDict": str(ceDict), + "ProxyPath": proxyPath, + "FirstTimestamp": timestamp, + "LastTimestamp": timestamp, + } + + result = self.insertFields(self.BUNDLES_INFO_TABLE, list(insertInfo.keys()), list(insertInfo.values())) + + if not result["OK"]: + return result + + return S_OK(bundleId) + + def _insertJobInBundle(self, jobId, bundleId, executable, inputs, outputs, nProcessors, diracId): + """Add the info of a Job to a Bundle.""" + timestamp = datetime.now(tz=timezone.utc).strftime(self.MYSQL_DATETIME_FORMAT) + + # Job Insertion + insertInfo = { + "JobID": jobId, + "BundleID": bundleId, + "ExecutablePath": executable, + "Outputs": str(outputs), + "Processors": nProcessors, + } + + if diracId: + insertInfo["DiracID"] = diracId + + result = self.insertFields(self.JOB_TO_BUNDLE_TABLE, list(insertInfo.keys()), list(insertInfo.values())) + + if not result["OK"]: + return result + + for _input in inputs: + insertInfo = { + "JobID": jobId, + "InputPath": _input, + } + + result = self.insertFields(self.JOB_INPUTS_TABLE, list(insertInfo.keys()), list(insertInfo.values())) + + if not result["OK"]: + return result + + # Modify the number of processors that will be used by the bundle + cmd = """\ + UPDATE BundlesInfo + SET ProcessorSum = ProcessorSum + {nProcs}, LastTimestamp = "{timestamp}" + WHERE BundleID = "{bundleId}"; + """.format( + bundleId=bundleId, nProcs=nProcessors, timestamp=timestamp + ) + result = self._query(cmd) + + if not result["OK"]: + return result + + # TODO: Move all of this out of the function + # Obtain the info to be returned to the Service + result = self.getFields( + self.BUNDLES_INFO_TABLE, + ["ProcessorSum", "MaxProcessors", "Status", "FirstTimestamp", "LastTimestamp"], + {"BundleID": bundleId}, + ) + + if not result["OK"]: + return result + + selection = formatSelectOutput( + result["Value"], ["ProcessorSum", "MaxProcessors", "Status", "FirstTimestamp", "LastTimestamp"] + )[0] + + ready = selection["ProcessorSum"] == selection["MaxProcessors"] + + return S_OK({"BundleId": bundleId, "Ready": ready}) + + def _getBundlesFromCEDict(self, ceDict): + """Returns the bundles that match a CE (Site, CE and Queue).""" + cmd = 'SELECT * FROM BundlesInfo WHERE Site = "{Site}" AND CE = "{CE}" AND Queue = "{Queue}";'.format( + Site=ceDict["Site"], + CE=ceDict["GridCE"], + Queue=ceDict["Queue"], + ) + result = self._query(cmd) + + if not result["OK"]: + return result + + if not result["Value"]: + return S_OK([]) + + retVal = formatSelectOutput( + result["Value"], + self.BUNDLES_INFO_COLUMNS, + ) + return S_OK(retVal) + + def _updateBundleStatus(self, bundleId, newStatus): + """Changes the status of a Bundle.""" + cmd = 'UPDATE BundlesInfo SET Status = "{status}" WHERE BundleID = "{bundleId}";'.format( + bundleId=bundleId, status=newStatus + ) + result = self._query(cmd) + + if not result["OK"]: + return result + + return S_OK() + + def __selectBestBundle(self, bundles, nProcessors): + """Return the BundleID of the best match from a list of bundles and the number of processors requested.""" + bestBundleId = None + currentBestProcs = 0 + + for bundle in bundles: + bundleId = bundle["BundleID"] + procs = bundle["ProcessorSum"] + maxProcs = bundle["MaxProcessors"] + status = bundle["Status"] + + newProcSum = procs + nProcessors + + if status != PilotStatus.WAITING or newProcSum > maxProcs: + continue + + if newProcSum == maxProcs: + return bundleId + + if newProcSum > currentBestProcs: + currentBestProcs = newProcSum + bestBundleId = bundleId + + return bestBundleId diff --git a/src/DIRAC/WorkloadManagementSystem/DB/BundleDB.sql b/src/DIRAC/WorkloadManagementSystem/DB/BundleDB.sql new file mode 100644 index 00000000000..ba5f3ac4f4f --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/DB/BundleDB.sql @@ -0,0 +1,60 @@ +-- When installing via dirac tools, the following is not needed (still here for reference) +-- +-- DROP DATABASE IF EXISTS BundleDB; +-- CREATE DATABASE BundleDB; +-- ------------------------------------------------------------------------------ +-- Database owner definition +-- USE mysql; +-- +-- Must set passwords for database user by replacing "must_be_set". +-- +-- GRANT SELECT,INSERT,LOCK TABLES,UPDATE,DELETE,CREATE,DROP,ALTER,REFERENCES ON BundleDB.* TO Dirac@'%' IDENTIFIED BY 'must_be_set'; +-- FLUSH PRIVILEGES; + +USE BundleDB; + +-- ------------------------------------------------------------------------------ +DROP TABLE IF EXISTS `JobInputs`; +DROP TABLE IF EXISTS `JobToBundle`; +DROP TABLE IF EXISTS `BundlesInfo`; + +CREATE TABLE `BundlesInfo` ( + `BundleID` VARCHAR(32) NOT NULL, + `ProcessorSum` INT(5) UNSIGNED NOT NULL DEFAULT 0, + `MaxProcessors` INT(5) UNSIGNED NOT NULL, + `Site` VARCHAR(128) NOT NULL, + `CE` VARCHAR(128) NOT NULL, + `Queue` VARCHAR(128) NOT NULL, + `CEDict` TEXT NOT NULL, + `TaskID` VARCHAR(255), + `Status` ENUM('Waiting', 'Running', 'Done', 'Failed') NOT NULL DEFAULT 'Waiting', + `ProxyPath` VARCHAR(255), + `Flags` SET('Cleaned', 'Purged') NOT NULL DEFAULT '', + `FirstTimestamp` DATETIME, + `LastTimestamp` DATETIME, + PRIMARY KEY (`BundleID`), + INDEX (`Site`,`CE`,`Queue`), + INDEX (`Status`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +-- ------------------------------------------------------------------------------ +CREATE TABLE `JobToBundle` ( + `JobID` VARCHAR(255) NOT NULL, + `BundleID` VARCHAR(32) NOT NULL, + `DiracID` INTEGER, + `ExecutablePath` VARCHAR(255) NOT NULL, + `Outputs` VARCHAR(255) NOT NULL, + `Processors` INT(5) UNSIGNED NOT NULL DEFAULT 1, + PRIMARY KEY (`JobID`), + FOREIGN KEY (`BundleID`) REFERENCES `BundlesInfo`(`BundleID`), + INDEX (`DiracID`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +-- ------------------------------------------------------------------------------ +CREATE TABLE `JobInputs` ( + `InputID` INTEGER NOT NULL AUTO_INCREMENT, + `JobID` VARCHAR(255) NOT NULL, + `InputPath` VARCHAR(255) NOT NULL, + PRIMARY KEY (`InputID`), + FOREIGN KEY (`JobID`) REFERENCES `JobToBundle`(`JobID`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/src/DIRAC/WorkloadManagementSystem/Service/BundlerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/BundlerHandler.py new file mode 100644 index 00000000000..3c5ceaa81ad --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Service/BundlerHandler.py @@ -0,0 +1,488 @@ +"""The Bundler service provides an interface for bundling jobs into a a big job + +It connects to a BundleDB to store and retrive bundles. +""" + +import os +import shutil +from ast import literal_eval + +from DIRAC import S_ERROR, S_OK +from DIRAC.Core.DISET.RequestHandler import RequestHandler +from DIRAC.Core.Security.ProxyInfo import getProxyInfo +from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader +from DIRAC.Resources.Computing.ComputingElementFactory import ComputingElementFactory +from DIRAC.WorkloadManagementSystem.Client import PilotStatus +from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport +from DIRAC.WorkloadManagementSystem.DB.BundleDB import BundleDB +from DIRAC.WorkloadManagementSystem.Utilities.BundlerTemplates import BASH_RUN_TASK, BASH_WRAPPER + + +class BundlerHandler(RequestHandler): + @classmethod + def initializeHandler(cls, serviceInfoDict): + try: + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.BundleDB", "BundleDB") + if not result["OK"]: + return result + cls.bundleDB: BundleDB = result["Value"](parentLogger=cls.log) + + # Dictionaries entries should be removed afer some time + cls.jobToCE = {} + cls.bundleToCE = {} + cls.jobToBundle = {} + + cls.ceFactory = ComputingElementFactory() + + cls.jobReports: dict[int, JobReport] = {} + + except RuntimeError as excp: + return S_ERROR(f"Can't connect to DB: {excp}") + + return S_OK() + + def initialize(self): + self.killBundleOnError = self.getCSOption("KillBundleOnError", True) + self.bundlesBaseDir = self.getCSOption("/LocalSite/BundlesBaseDir", "/tmp/bundles") + + if not os.path.exists(self.bundlesBaseDir): + os.mkdir(self.bundlesBaseDir) + + ############################################################################# + + types_storeInBundle = [str, str, list, list, str, int, dict, [int, type(None)]] + + def export_storeInBundle(self, jobId, executable, inputs, outputs, proxyPath, processors, ceDict, diracId): + """Stores a job in a bundle depending on the information on the CEDict. + If the bundle fills, it automatically gets send. + """ + result = self._setupCE(ceDict, proxyPath) + + if not result["OK"]: + return result + + # Insert the Job into the DB + result = self.bundleDB.insertJobToBundle( + jobId, executable, inputs, outputs, processors, ceDict, proxyPath, diracId + ) + if not result["OK"]: + self.log.error("Failed to insert into a bundle the job with id ", str(jobId)) + return result + + bundleId = result["Value"]["BundleId"] + readyForSubmission = result["Value"]["Ready"] + + self.log.info("Job inserted in bundle successfully") + + if diracId: + self.jobReports[jobId] = JobReport(diracId, self.__class__.__name__) + + self.__reportJob(jobId, PilotStatus.RUNNING, "Job Stored in a bundle") + + if readyForSubmission: + self._submitBundle(bundleId) + + return S_OK({"BundleID": bundleId, "Executing": readyForSubmission}) + + ############################################################################# + + types_getTaskInfo = [str] + + def export_getTaskInfo(self, bundleId): + """Return the TaskID of the submitted bundle + If the Bundle hasn't been submitted yet, returns S_ERROR + """ + return self._getTaskInfo(bundleId) + + def _getTaskInfo(self, bundleId): + """Return the TaskID of the submitted bundle + If the Bundle hasn't been submitted yet, returns S_ERROR + """ + result = self.bundleDB.getBundleStatus(bundleId) + + if not result["OK"]: + self.log.error("Failed to obtain status of bundle ", str(bundleId)) + return result + + resultDict = {"Status": result["Value"]} + + # If it hasn't been uploaded yet + if resultDict["Status"] == PilotStatus.WAITING: + return S_OK(resultDict) + + result = self.bundleDB.getTaskId(bundleId) + + if not result["OK"]: + self.log.error("Failed to obtain taskId of bundle ", str(bundleId)) + return result + + resultDict["TaskID"] = result["Value"] + + return S_OK(resultDict) + + ############################################################################# + + types_bundleIdFromJobId = [str] + + def export_bundleIdFromJobId(self, jobId): + """Returns the BundleID of a specific job from its ID.""" + return self._getBundleIdFromJobId(jobId) + + ############################################################################# + + types_tryToKillJob = [str] + + def export_tryToKillJob(self, jobId): + result = self._killJob(jobId) + if result["OK"]: + self.log.info(f"Job {jobId} killed successfully") + return result + + self.log.warn("Failed to ONLY kill the job with id ", str(jobId)) + + if self.killBundleOnError: + self.log.warn("KillBundleOnError is on, killing the WHOLE bundle containing the job") + result = self._killBundleOfJob(jobId) + if not result["OK"]: + return result + + bundleId = result["Value"] + self.log.info(f"Bundle {bundleId} of Job {jobId} killed successfully") + return S_OK() + + else: + self.log.warn("KillBundleOnError is off, doing nothing") + return S_ERROR(message="KillBundleOnError is off, won't kill the bundle") + + def _killBundleOfJob(self, jobId): + result = self._getJobCE(jobId) + if not result["OK"]: + return result + ce = result["Value"]["CE"] + result = self._getBundleIdFromJobId(jobId) + + if not result["OK"]: + return result + + bundleId = result["Value"] + result = self._getTaskInfo(bundleId) + if not result["OK"]: + return result + + if result["Value"]["Status"] in PilotStatus.PILOT_FINAL_STATES: + return S_ERROR("Cannot kill finished jobs") + + result = ce.killJob([result["Value"]["TaskID"]]) + + if not result["OK"]: + return result + + self.bundleDB.setBundleAsFailed() + return + + def _killJob(self, jobId): + return S_ERROR("CAN'T STOP JOBS") + + ############################################################################# + + types_cleanJob = [str] + + def export_cleanJob(self, jobId): + """Tries to clean the working directory of a specific job both locally and remotely.""" + result = self._getBundleIdFromJobId(jobId) + if not result["OK"]: + return result + bundleId = result["Value"] + + result = self.bundleDB.isBundleCleaned(bundleId) + + if not result["OK"]: + return result + + # Bundle already got cleaned + if result["Value"]: + return S_OK() + + result = self._getTaskInfo(bundleId) + + if not result["OK"]: + return result + status = result["Value"]["Status"] + + if status not in PilotStatus.PILOT_FINAL_STATES: + return S_ERROR(f"The bundle hasn't finished, cleaning is not permitted. Current Status: {status}") + + taskId = result["Value"]["TaskID"] + + result = self._getJobCE(jobId) + if not result["OK"]: + return result + ce = result["Value"]["CE"] + + try: + result = ce.cleanJob(taskId) + if result["OK"]: + self.bundleDB.setBundleAsCleaned(bundleId) + except AttributeError as e: # If the CE has no method 'cleanJob' + return S_ERROR(e) + + # Remove bundle specific files (NOT THE OUTPUTS OF THE JOBS) + bundlePath = os.path.join(self.bundlesBaseDir, bundleId) + for item in os.listdir(bundlePath): + itemPath = os.path.join(bundlePath, item) + if os.path.isfile(item): + os.remove(itemPath) + + return S_OK() + + ############################################################################# + + types_getBundleStatus = [str] + + def export_getBundleStatus(self, bundleId): + """Reports the Bundle status. + Waiting -> Bundle still waiting for more jobs + Running -> Bundle submitted to CE + Done -> Bundle finished (not the specific job) + Failed -> Bundle failed to execute + """ + result = self._getTaskInfo(bundleId) + + if not result["OK"]: + return result + + status = result["Value"]["Status"] + + if status == PilotStatus.RUNNING: + task = result["Value"]["TaskID"] + + if ":::" in task: + task = task.split(":::")[0] + + result = self._getBundleCE(bundleId) + + if not result["OK"]: + return result + + ce = result["Value"]["CE"] + + result = ce.getJobStatus(task) + + if not result["OK"]: + return result + + status = result["Value"][task] + + if status == PilotStatus.DONE: + self.bundleDB.setBundleAsDone(bundleId) + elif status in PilotStatus.PILOT_FINAL_STATES: # ABORTED, DELETED or FAILED + self.bundleDB.setBundleAsFailed(bundleId) + + return S_OK(status) + + ############################################################################# + + types_forceSubmitBundles = [list] + + def export_forceSubmitBundles(self, bundleIds): + """Forcibly submits a list of bundles. + This is useful for stalled bundles. + """ + resultDict = {} + + if not isinstance(bundleIds, list): + bundleIds = [bundleIds] + + for bundleId in bundleIds: + result = self._submitBundle(bundleId) + resultDict[bundleId] = result + + return S_OK(resultDict) + + def _submitBundle(self, bundleId): + """Submits a Bundle from its ID.""" + result = self._getBundleCE(bundleId) + + if not result["OK"]: + return result + + ce = result["Value"]["CE"] + proxy = result["Value"]["Proxy"] + + result = self._wrapBundle(bundleId) + if not result["OK"]: + return result + + jobIds, bundle_exe, bundle_inputs, bundle_outputs = result["Value"] + extra_outputs = [item for job_id in jobIds for item in [f"{job_id}.out", f"{job_id}.status"]] + bundle_outputs.extend(extra_outputs) + + self.log.info(f"Submitting bundle '{bundleId}' to CE '{ce.ceName}'") + + ce.ceParameters["NumberOfProcessors"] = len(jobIds) + result = ce.submitJob(bundle_exe, proxy=proxy, inputs=bundle_inputs, outputs=bundle_outputs) + + if not result["OK"]: + self.bundleDB.setBundleAsFailed(bundleId) + return result + + innerJobId = result["Value"][0] + taskId = innerJobId + ":::" + result["PilotStampDict"][innerJobId] + + result = self.bundleDB.setTaskId(bundleId, taskId) + + if not result["OK"]: + return S_ERROR("Failed to set the task id of the Bundle") + + for jobId in jobIds: + self.__reportJob(jobId, PilotStatus.RUNNING, "Bundle of Job submitted to CE") + + return S_OK() + + ############################################################################# + + def _getBundleIdFromJobId(self, jobId): + """Obtains the BundleID corresponding to a JobID.""" + if jobId in self.jobToBundle: + return S_OK(self.jobToBundle[jobId]) + + result = self.bundleDB.getBundleIdFromJobId(jobId) + if not result["OK"]: + return result + + self.jobToBundle[jobId] = result["Value"] + return result + + def _wrapBundle(self, bundleId): + """Bundles the jobs in a bundle for its submission.""" + result = self.bundleDB.getJobsAndInputsOfBundle(bundleId) + + if not result["OK"]: + self.log.error("Failed to obtain bundled job while wrapping. BundleID=", str(bundleId)) + return result + + jobs: dict = result["Value"] + + executables = [] + inputs = [] + outputs = [] + jobIds = [] + + bundlePath = os.path.join(self.bundlesBaseDir, bundleId) + os.mkdir(bundlePath) + + for jobId, jobInfo in jobs.items(): + jobIds.append(jobId) + + # Copy the original file in a new location with the rest + job_executable = jobInfo["ExecutablePath"] + job_executable_dst = os.path.join(bundlePath, jobId + "_" + os.path.basename(job_executable)) + + shutil.copy(job_executable, job_executable_dst) + + executables.append(os.path.basename(job_executable_dst)) + inputs.append(job_executable_dst) + + for job_input in jobInfo["Inputs"]: + inputBasename = os.path.basename(job_input) + job_input_dst = os.path.join(bundlePath, jobId + "_" + inputBasename) + shutil.copy(job_input, job_input_dst) + inputs.append(job_input_dst) + + outputs.extend(jobInfo["Outputs"]) + + formatted_inputs = "(" + " ".join(executables) + ")" + formatMap = {"inputs": formatted_inputs, "bundleId": bundleId} + wrappedBundle = BASH_WRAPPER.format(**formatMap) + + wrapperPath = os.path.join(bundlePath, "bundle_wrapper") + runnerPath = os.path.join(bundlePath, "run_task.sh") + + with open(wrapperPath, "x") as f: + f.write(wrappedBundle) + + with open(runnerPath, "x") as f: + f.write(BASH_RUN_TASK) + + inputs.append(runnerPath) + + return S_OK((jobIds, wrapperPath, inputs, list(set(outputs)))) + + def _getBundleCEDict(self, bundleId): + """Returns the CEDict of a specific Bundle as a dictionary.""" + result = self.bundleDB.getBundleCE(bundleId) + if not result["OK"]: + return result + + # Convert the CEDict from string to a dictionary + ceDict = literal_eval(result["Value"]["CEDict"]) + + return S_OK({"CEDict": ceDict, "ProxyPath": result["Value"]["ProxyPath"]}) + + def _setupCE(self, ceDict, proxyPath): + """Prepares the CE instance.""" + result = getProxyInfo(proxy=proxyPath) + + if not result["OK"]: + self.log.error("Failed to obtain proxy from path") + return result + + proxy = result["Value"]["chain"] + + # CE Initialization + result = self.ceFactory.getCE(ceType=ceDict["CEType"], ceName=ceDict["GridCE"], ceParametersDict=ceDict) + + if not result["OK"]: + self.log.error("Failed obtain the CE with configuration: ", str(ceDict)) + return result + + ce = result["Value"] + + ce.setProxy(proxy) + + return S_OK({"CE": ce, "Proxy": proxy}) + + def _getBundleCE(self, bundleId): + """Returns the CE of a the corresponding Bundle from its ID.""" + if bundleId not in self.bundleToCE: + result = self._getBundleCEDict(bundleId) + + if not result["OK"]: + return result + + result = self._setupCE(result["Value"]["CEDict"], result["Value"]["ProxyPath"]) + + if not result["OK"]: + return result + + self.bundleToCE[bundleId] = result["Value"] # CE + Proxy + + return S_OK(self.bundleToCE[bundleId]) + + def _getJobCE(self, jobId): + """Returns the CE of a the corresponding Job from its ID.""" + if jobId not in self.jobToCE: + result = self._getBundleIdFromJobId(jobId) + + if not result["OK"]: + self.log.error("Failed to obtain BundleId with JobId ", str(jobId)) + return result + + bundleId = result["Value"] + + result = self._getBundleCE(bundleId) + + if not result["OK"]: + return result + + self.jobToCE[jobId] = result["Value"] + + return S_OK(self.jobToCE[jobId]) + + def __reportJob(self, jobId, status, minorStatus): + """Calls the JobReport of the Job if possible.""" + if jobId not in self.jobReports: + return + + self.jobReports[jobId].setJobStatus(status=status, minorStatus=minorStatus) + self.jobReports[jobId].commit() diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/BundlerTemplates.py b/src/DIRAC/WorkloadManagementSystem/Utilities/BundlerTemplates.py new file mode 100644 index 00000000000..31fb49e43e8 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/BundlerTemplates.py @@ -0,0 +1,55 @@ +BASH_WRAPPER = """\ +#!/bin/bash +BASEDIR=${{PWD}} +INPUT={inputs} +BUNDLE_ID={bundleId} + +get_id() {{ + echo $1 | cut -d '_' -f 1 +}} + +job_number=0 +chmod u+x run_task.sh + +# execute tasks +for input in ${{INPUT[@]}}; do + [ -f "$input" ] || break + + jobId=$(get_id ${{input}}) + mkdir ${{jobId}} + + for filename in ${{jobId}}*; do + [ -f ${{filename}} ] || continue + # Move the job specific files to its directory, removing the jobId from its name + mv $filename ${{jobId}}/${{filename#${{jobId}}_*}} + done + + ${{BASEDIR}}/run_task.sh ${{jobId}} ${{input}} ${{BUNDLE_ID}} ${{BASEDIR}} & +done + +# wait for all tasks +wait +""" + +BASH_RUN_TASK = """\ +#!/bin/bash +task_id=$1 +input=${2#${task_id}_*} +bundle_id=$3 +base_dir=$4 + +cd "$task_id" + +echo "[${task_id}] Executing task" + +# 'set -e' inside the job execution to obtain the real exit status in case of failure +bash -e ${input} \\ + 1> ${bundle_id}.out \\ + 2> ${bundle_id}.err + +task_status=$? + +# Report job ending and status +echo "[${task_id}] Task Finished" +echo "[${task_id}] Process final status: ${task_status}" +""" diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py b/src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py index 5a2eb8a13e5..fbb73b92a9d 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py @@ -73,13 +73,16 @@ def execute(self, command, workingDirectory=".", numberOfProcessors=1, cleanRemo # Request the whole directory as output outputs = ["/"] + absWorkingDirectory = os.path.abspath(workingDirectory) + asbInputs = [os.path.join(absWorkingDirectory, _input) for _input in inputs] + # Interactions with the CE might be unstable, we need to retry the operations maxRetries = 10 timeBetweenRetries = 120 # Submit the command as a job with retries for _ in range(maxRetries): - result = workloadCE.submitJob(self.executable, workloadCE.proxy, inputs=inputs, outputs=outputs) + result = workloadCE.submitJob(self.executable, workloadCE.proxy, inputs=asbInputs, outputs=outputs) if result["OK"]: break else: diff --git a/tests/Integration/WorkloadManagementSystem/Test_BundleDB.py b/tests/Integration/WorkloadManagementSystem/Test_BundleDB.py new file mode 100644 index 00000000000..c3b3fc11a6d --- /dev/null +++ b/tests/Integration/WorkloadManagementSystem/Test_BundleDB.py @@ -0,0 +1,148 @@ +# pylint: disable=invalid-name, missing-docstring +import pytest + +import DIRAC + +DIRAC.initialize() # Initialize configuration + +from DIRAC.WorkloadManagementSystem.DB.BundleDB import BundleDB # noqa: E402 + + +@pytest.fixture(name="jobInfos") +def fixtureJobInfo(): + return [ + { + "Executable": "./executable1.sh", + "Inputs": ["./input1.py", "./input1.json"], + "Proxy": "FAKE-PROXY", + "Processors": 2, + "CEDict": { + "NumberOfProcessors": 3, + "ExecTemplate": "bash {inputs}", + "Site": "DIRAC.Site1.fake", + "GridCE": "FakeCE", + "Queue": "FakeQueue", + }, + }, + { + "Executable": "./executable2.sh", + "Inputs": ["./input2.py", "./input2.json"], + "Proxy": "FAKE-PROXY", + "Processors": 2, + "CEDict": { + "NumberOfProcessors": 3, + "ExecTemplate": "bash {inputs}", + "Site": "DIRAC.Site1.fake", + "GridCE": "FakeCE", + "Queue": "FakeQueue", + }, + }, + { + "Executable": "./executable3.sh", + "Inputs": ["./input3.py", "./input3.json"], + "Proxy": "FAKE-PROXY", + "Processors": 2, + "CEDict": { + "NumberOfProcessors": 2, + "ExecTemplate": "bash {inputs}", + "Site": "DIRAC.Site2.fake", + "GridCE": "FakeCE", + "Queue": "FakeQueue", + }, + }, + { + "Executable": "./executable4.sh", + "Inputs": ["./input4.py", "./input4.json"], + "Proxy": "FAKE-PROXY", + "Processors": 1, + "CEDict": { + "NumberOfProcessors": 3, + "ExecTemplate": "bash {inputs}", + "Site": "DIRAC.Site1.fake", + "GridCE": "FakeCE", + "Queue": "FakeQueue", + }, + }, + ] + + +@pytest.fixture(name="bundleDB") +def fixtureBundleDB(): + db = BundleDB() + yield db + db._query("DELETE FROM JobToBundle") + db._query("DELETE FROM BundlesInfo") + + +@pytest.mark.skip(reason="Old tests, need to be remade") +def test_AddToBundle(bundleDB: BundleDB, jobInfos): + jobId = 0 + + # + # Should return error + result = bundleDB.getBundleIdFromJobId(jobId) + assert not result["OK"] + + # + # Should create a new bundle + job = jobInfos[0] + result = bundleDB.insertJobToBundle(jobId, job["Executable"], job["Inputs"], job["Processors"], job["CEDict"]) + assert result["OK"] + assert result["Value"] + assert not result["Value"]["Ready"] + + # Save the bundle and job ids for later use + bundleId1 = result["Value"]["BundleId"] + jobId1 = jobId + + # + # Should return the same bundle + result = bundleDB.getBundleIdFromJobId(jobId) + assert result["OK"] + assert result["Value"] + assert result["Value"] == bundleId1 + + jobId += 1 + + # + # Should create a new bundle because it does not fit + job = jobInfos[1] + result = bundleDB.insertJobToBundle(jobId, job["Executable"], job["Inputs"], job["Processors"], job["CEDict"]) + assert result["OK"] + assert result["Value"] + assert not result["Value"]["Ready"] + bundleId2 = result["Value"]["BundleId"] + assert bundleId2 != bundleId1 + + jobId += 1 + + # + # Should create a new bundle because a different CE + job = jobInfos[2] + result = bundleDB.insertJobToBundle(jobId, job["Executable"], job["Inputs"], job["Processors"], job["CEDict"]) + assert result["OK"] + assert result["Value"] + assert result["Value"]["Ready"] + bundleId3 = result["Value"]["BundleId"] + assert bundleId3 != bundleId2 and bundleId3 != bundleId1 + + jobId += 1 + + # + # Should add it to the very first bundle because it fits + job = jobInfos[3] + result = bundleDB.insertJobToBundle(jobId, job["Executable"], job["Inputs"], job["Processors"], job["CEDict"]) + assert result["OK"] + assert result["Value"] + assert result["Value"]["Ready"] + bundleId4 = result["Value"]["BundleId"] + assert bundleId4 == bundleId1 + jobId4 = jobId + + # + # Should contain the 2 added jobs + result = bundleDB.getJobsOfBundle(bundleId4) + assert result["OK"] + assert result["Value"] + jobIds = [job["JobID"] for job in result["Value"]] + assert jobId1 in jobIds and jobId4 in jobIds