Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 13 additions & 28 deletions src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.RequestManagementSystem.private.RequestValidator import RequestValidator
from DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft import TimeLeft
from DIRAC.Resources.Computing.ComputingElementFactory import ComputingElementFactory
from DIRAC.WorkloadManagementSystem.Client import JobStatus, PilotStatus
from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient
Expand Down Expand Up @@ -81,10 +80,9 @@ def __init__(self, agentName, loadName, baseAgentName=False, properties=None):
self.defaultWrapperLocation = "DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py"

# Timeleft
self.initTimes = os.times()
self.initTimeLeft = 0.0
self.timeLeft = self.initTimeLeft
self.timeLeftUtil = None
self.initTime = time.time()
self.pilotInfoReportedFlag = False

# Attributes related to the processed jobs, it should take the following form:
Expand All @@ -109,16 +107,11 @@ def initialize(self):
if not result["OK"]:
return result

result = self._getCEDict(self.computingElement)
if not result["OK"]:
return result
ceDict = result["Value"][0]

self.initTimeLeft = ceDict.get("CPUTime", self.initTimeLeft)
self.initTimeLeft = gConfig.getValue("/Resources/Computing/CEDefaults/MaxCPUTime", self.initTimeLeft)
# Read initial CPU work left from config (seeded by pilot via dirac-wms-get-queue-cpu-time)
self.initTimeLeft = gConfig.getValue("/LocalSite/CPUTimeLeft", self.initTimeLeft)
self.timeLeft = self.initTimeLeft

self.initTimes = os.times()
self.initTime = time.time()
# Localsite options
self.siteName = siteName()
self.pilotReference = gConfig.getValue("/LocalSite/PilotReference", self.pilotReference)
Expand All @@ -136,9 +129,6 @@ def initialize(self):
self.logLevel = self.am_getOption("DefaultLogLevel", self.logLevel)
self.defaultWrapperLocation = self.am_getOption("JobWrapperTemplate", self.defaultWrapperLocation)

# Utilities
self.timeLeftUtil = TimeLeft()

# Some innerCEs may want to make use of CGroup2 support, so we prepare it globally here
res = CG2Manager().setUp()
if res["OK"]:
Expand Down Expand Up @@ -403,24 +393,19 @@ def _checkCEAvailability(self, computingElement):
return S_OK()

#############################################################################
def _computeCPUWorkLeft(self, processors=1):
def _computeCPUWorkLeft(self):
"""
Compute CPU Work Left in hepspec06 seconds
Compute CPU Work Left in hepspec06 seconds.

Uses a simple wall-clock countdown from the initial value (seeded by the pilot
via dirac-wms-get-queue-cpu-time). The elapsed wall-clock time is multiplied by
the CPU normalization factor to get the consumed CPU work.

:param int processors: number of processors available
:return: cpu work left (cpu time left * cpu power of the cpus)
"""
# Sum all times but the last one (elapsed_time) and remove times at init (is this correct?)
cpuTimeConsumed = sum(os.times()[:-1]) - sum(self.initTimes[:-1])
result = self.timeLeftUtil.getTimeLeft(cpuTimeConsumed, processors)
if not result["OK"]:
self.log.warn("There were errors calculating time left using the Timeleft utility", result["Message"])
self.log.warn("The time left will be calculated using os.times() and the info in our possession")
self.log.info(f"Current raw CPU time consumed is {cpuTimeConsumed}")
if self.cpuFactor:
return self.initTimeLeft - cpuTimeConsumed * self.cpuFactor
return self.timeLeft
return result["Value"]
elapsed = time.time() - self.initTime
cpuWorkConsumed = elapsed * self.cpuFactor
return self.initTimeLeft - cpuWorkConsumed

def _checkCPUWorkLeft(self, cpuWorkLeft):
"""Check that fillingMode is enabled and time left is sufficient to continue the execution"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from DIRAC.Core.Security.X509Chain import X509Chain # pylint: disable=import-error

from DIRAC import S_ERROR, S_OK, gLogger
from DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft import TimeLeft
from DIRAC.Resources.Computing.ComputingElementFactory import ComputingElementFactory
from DIRAC.Resources.Computing.test.Test_PoolComputingElement import badJobScript, jobScript
from DIRAC.WorkloadManagementSystem.Agent.JobAgent import JobAgent
Expand Down Expand Up @@ -150,28 +149,27 @@ def test__checkCEAvailability(mocker, ceType, mockCEReply, expectedResult):


@pytest.mark.parametrize(
"initTimeLeft, timeLeft, cpuFactor, mockTimeLeftReply, expectedTimeLeft",
"initTimeLeft, cpuFactor, elapsedSeconds, expectedTimeLeft",
[
(100000, 75000, None, {"OK": False, "Message": "Error"}, 75000),
(100000, 75000, 10, {"OK": False, "Message": "Error"}, 100000),
(100000, 75000, 10, {"OK": True, "Value": 25000}, 25000),
# No CPU factor: no work consumed, time left equals initial
(100000, 0, 100, 100000),
# With CPU factor: elapsed * cpuFactor is subtracted from initTimeLeft
(100000, 10, 100, 99000),
# Longer elapsed time
(100000, 10, 5000, 50000),
],
)
def test__computeCPUWorkLeft(mocker, initTimeLeft, timeLeft, cpuFactor, mockTimeLeftReply, expectedTimeLeft):
def test__computeCPUWorkLeft(mocker, initTimeLeft, cpuFactor, elapsedSeconds, expectedTimeLeft):
"""Test JobAgent()._computeCPUWorkLeft()"""
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__")
mocker.patch(
"DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft.TimeLeft.getTimeLeft", return_value=mockTimeLeftReply
)

jobAgent = JobAgent("Test", "Test1")
jobAgent.log = gLogger
jobAgent.log.setLevel("DEBUG")
jobAgent.timeLeftUtil = TimeLeft()

jobAgent.initTimeLeft = initTimeLeft
jobAgent.timeLeft = timeLeft
jobAgent.cpuFactor = cpuFactor
jobAgent.initTime = time.time() - elapsedSeconds
result = jobAgent._computeCPUWorkLeft()

assert abs(result - expectedTimeLeft) < 10
Expand Down
101 changes: 50 additions & 51 deletions src/DIRAC/WorkloadManagementSystem/Client/CPUNormalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,11 @@


def getCPUTime(cpuNormalizationFactor):
"""Trying to get CPUTime left for execution (in seconds).
"""Compute the initial CPUTime left for execution (in seconds).

It will first look to get the work left looking for batch system information useing the TimeLeft utility.
If it succeeds, it will convert it in real second, and return it.

If it fails, it tries to get it from the static info found in CS.
If it fails, it returns the default, which is a large 9999999, that we may consider as "Infinite".

This is a generic method, independent from the middleware of the resource if TimeLeft doesn't return a value
This is called at pilot bootstrap (via dirac-wms-get-queue-cpu-time) to seed
the initial CPUTimeLeft value. It queries the batch system first, then falls
back to static CS configuration.

args:
cpuNormalizationFactor (float): the CPU power of the current Worker Node.
Expand All @@ -31,55 +27,58 @@ def getCPUTime(cpuNormalizationFactor):
returns:
cpuTimeLeft (int): the CPU time left, in seconds
"""
cpuTimeLeft = 0.0
cpuWorkLeft = gConfig.getValue("/LocalSite/CPUTimeLeft", 0)

if not cpuWorkLeft:
# Try and get the information from the CPU left utility
result = TimeLeft().getTimeLeft()
if result["OK"]:
cpuWorkLeft = result["Value"]

if cpuWorkLeft > 0:
# This is in HS06sseconds
# We need to convert in real seconds
if not cpuNormalizationFactor: # if cpuNormalizationFactor passed in is 0, try get it from the local cfg

# 1. Try to compute time left from the batch system (sacct, qstat, etc.)
result = TimeLeft().getTimeLeft()
if result["OK"]:
cpuWorkLeft = result["Value"]
# Batch system answered — trust it, even if 0
if not cpuNormalizationFactor:
cpuNormalizationFactor = gConfig.getValue("/LocalSite/CPUNormalizationFactor", 0.0)
if cpuNormalizationFactor:
cpuTimeLeft = cpuWorkLeft / cpuNormalizationFactor
return int(cpuWorkLeft / cpuNormalizationFactor)
return 0

if not cpuTimeLeft:
# now we know that we have to find the CPUTimeLeft by looking in the CS
# this is not granted to be correct as the CS units may not be real seconds
gridCE = gConfig.getValue("/LocalSite/GridCE")
ceQueue = gConfig.getValue("/LocalSite/CEQueue")
if not ceQueue:
# we have to look for a ceQueue in the CS
# A bit hacky. We should better profit from something generic
gLogger.warn("No CEQueue in local configuration, looking to find one in CS")
siteName = DIRAC.siteName()
queueSection = f"/Resources/Sites/{siteName.split('.')[0]}/{siteName}/CEs/{gridCE}/Queues"
res = gConfig.getSections(queueSection)
if not res["OK"]:
raise RuntimeError(res["Message"])
queues = res["Value"]
cpuTimes = [gConfig.getValue(queueSection + "/" + queue + "/maxCPUTime", 9999999.0) for queue in queues]
# These are (real, wall clock) minutes - damn BDII!
cpuTimeLeft = 0.0

# 2. Fall back to queue configuration in the CS.
# These values are wall-clock minutes from BDII, so we convert to seconds.
gridCE = gConfig.getValue("/LocalSite/GridCE")
ceQueue = gConfig.getValue("/LocalSite/CEQueue")
if not ceQueue:
# we have to look for a ceQueue in the CS
# A bit hacky. We should better profit from something generic
gLogger.warn("No CEQueue in local configuration, looking to find one in CS")
siteName = DIRAC.siteName()
queueSection = f"/Resources/Sites/{siteName.split('.')[0]}/{siteName}/CEs/{gridCE}/Queues"
res = gConfig.getSections(queueSection)
if not res["OK"]:
raise RuntimeError(res["Message"])
queues = res["Value"]
cpuTimes = [gConfig.getValue(queueSection + "/" + queue + "/maxCPUTime", 0.0) for queue in queues]
cpuTimes = [t for t in cpuTimes if t > 0]
if cpuTimes:
cpuTimeLeft = min(cpuTimes) * 60
else:
queueInfo = getQueueInfo(f"{gridCE}/{ceQueue}")
if not queueInfo["OK"] or not queueInfo["Value"]:
gLogger.warn("Can't find a CE/queue in CS")
else:
queueInfo = getQueueInfo(f"{gridCE}/{ceQueue}")
cpuTimeLeft = 9999999.0
if not queueInfo["OK"] or not queueInfo["Value"]:
gLogger.warn("Can't find a CE/queue, defaulting CPUTime to %d" % cpuTimeLeft)
queueCSSection = queueInfo["Value"]["QueueCSSection"]
cpuTimeInMinutes = gConfig.getValue(f"{queueCSSection}/maxCPUTime", 0.0)
if cpuTimeInMinutes:
cpuTimeLeft = cpuTimeInMinutes * 60.0
gLogger.info(f"CPUTime for {queueCSSection}: {cpuTimeLeft:f}")
else:
queueCSSection = queueInfo["Value"]["QueueCSSection"]
# These are (real, wall clock) minutes - damn BDII!
cpuTimeInMinutes = gConfig.getValue(f"{queueCSSection}/maxCPUTime", 0.0)
if cpuTimeInMinutes:
cpuTimeLeft = cpuTimeInMinutes * 60.0
gLogger.info(f"CPUTime for {queueCSSection}: {cpuTimeLeft:f}")
else:
gLogger.warn(f"Can't find maxCPUTime for {queueCSSection}, defaulting CPUTime to {cpuTimeLeft:f}")
gLogger.warn(f"Can't find maxCPUTime for {queueCSSection}")

if not cpuTimeLeft:
# 3. Last resort: global default from CS, or 0 (fail safe: match no more jobs)
cpuTimeLeft = gConfig.getValue("/Resources/Computing/CEDefaults/MaxCPUTime", 0)
if cpuTimeLeft:
gLogger.warn(f"Using fallback MaxCPUTime: {cpuTimeLeft}")
else:
gLogger.warn("Could not determine CPUTime left")

return int(cpuTimeLeft)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
"""Unit tests for CPUNormalization.getCPUTime()"""
from unittest.mock import patch

from DIRAC import S_OK, S_ERROR


@patch("DIRAC.WorkloadManagementSystem.Client.CPUNormalization.TimeLeft")
@patch("DIRAC.WorkloadManagementSystem.Client.CPUNormalization.gConfig")
class TestGetCPUTime:
"""Tests for getCPUTime() fallback chain."""

def _import_getCPUTime(self):
from DIRAC.WorkloadManagementSystem.Client.CPUNormalization import getCPUTime

return getCPUTime

def test_from_batch_system(self, mock_gConfig, mock_TimeLeft):
"""Primary path: batch system returns CPU work left."""
mock_gConfig.getValue.return_value = 0
mock_TimeLeft.return_value.getTimeLeft.return_value = S_OK(30000) # HS06*s

result = self._import_getCPUTime()(cpuNormalizationFactor=10.0)

# 30000 / 10.0 = 3000 seconds
assert result == 3000
mock_TimeLeft.return_value.getTimeLeft.assert_called_once()

def test_batch_system_returns_zero(self, mock_gConfig, mock_TimeLeft):
"""When batch system reports 0 time left, trust it and return 0."""
mock_gConfig.getValue.return_value = 0
mock_TimeLeft.return_value.getTimeLeft.return_value = S_OK(0)

result = self._import_getCPUTime()(cpuNormalizationFactor=10.0)

assert result == 0
# Should NOT fall through to CS fallbacks
mock_gConfig.getValue.assert_not_called()

def test_from_queue_cs(self, mock_gConfig, mock_TimeLeft):
"""Fallback: batch system fails, uses queue maxCPUTime from CS."""
mock_TimeLeft.return_value.getTimeLeft.return_value = S_ERROR("No batch info")

config_values = {
"/LocalSite/GridCE": "ce.example.com",
"/LocalSite/CEQueue": "default",
"/LocalSite/Site": "LCG.Example.com",
}

def mock_getValue(key, default=0):
if key in config_values:
return config_values[key]
# maxCPUTime in minutes
if "maxCPUTime" in key:
return 120.0 # 120 minutes
return default

mock_gConfig.getValue.side_effect = mock_getValue

with patch(
"DIRAC.WorkloadManagementSystem.Client.CPUNormalization.getQueueInfo",
return_value=S_OK(
{"QueueCSSection": "/Resources/Sites/LCG/LCG.Example.com/CEs/ce.example.com/Queues/default"}
),
):
result = self._import_getCPUTime()(cpuNormalizationFactor=10.0)

# 120 minutes * 60 = 7200 seconds
assert result == 7200

def test_fallback_max_cpu_time(self, mock_gConfig, mock_TimeLeft):
"""Last resort: everything fails, uses /Resources/Computing/CEDefaults/MaxCPUTime."""
mock_TimeLeft.return_value.getTimeLeft.return_value = S_ERROR("No batch info")

config_values = {
"/LocalSite/GridCE": "ce.example.com",
"/LocalSite/CEQueue": "default",
"/LocalSite/Site": "LCG.Example.com",
"/Resources/Computing/CEDefaults/MaxCPUTime": 86400,
}

def mock_getValue(key, default=0):
if key in config_values:
return config_values[key]
return default

mock_gConfig.getValue.side_effect = mock_getValue

with patch(
"DIRAC.WorkloadManagementSystem.Client.CPUNormalization.getQueueInfo",
return_value=S_OK(
{"QueueCSSection": "/Resources/Sites/LCG/LCG.Example.com/CEs/ce.example.com/Queues/default"}
),
):
result = self._import_getCPUTime()(cpuNormalizationFactor=10.0)

# maxCPUTime from queue returned 0, so falls through to CEDefaults/MaxCPUTime
assert result == 86400

def test_nothing_available_returns_zero(self, mock_gConfig, mock_TimeLeft):
"""Fail safe: no batch info, no CS config, returns 0."""
mock_TimeLeft.return_value.getTimeLeft.return_value = S_ERROR("No batch info")

config_values = {
"/LocalSite/GridCE": "ce.example.com",
"/LocalSite/CEQueue": "default",
"/LocalSite/Site": "LCG.Example.com",
}

def mock_getValue(key, default=0):
if key in config_values:
return config_values[key]
return default

mock_gConfig.getValue.side_effect = mock_getValue

with patch(
"DIRAC.WorkloadManagementSystem.Client.CPUNormalization.getQueueInfo",
return_value=S_OK(
{"QueueCSSection": "/Resources/Sites/LCG/LCG.Example.com/CEs/ce.example.com/Queues/default"}
),
):
result = self._import_getCPUTime()(cpuNormalizationFactor=10.0)

assert result == 0

def test_cpu_normalization_factor_from_config(self, mock_gConfig, mock_TimeLeft):
"""When cpuNormalizationFactor=0, it should be read from local config."""
mock_TimeLeft.return_value.getTimeLeft.return_value = S_OK(50000) # HS06*s

mock_gConfig.getValue.side_effect = lambda key, default=0: {
"/LocalSite/CPUNormalizationFactor": 5.0,
}.get(key, default)

result = self._import_getCPUTime()(cpuNormalizationFactor=0)

# 50000 / 5.0 = 10000 seconds
assert result == 10000
Loading
Loading