Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion codecarbon/core/cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,16 @@
from typing import Dict, Optional, Tuple

import pandas as pd
import psutil
from rapidfuzz import fuzz, process, utils

try:
import psutil

PSUTIL_AVAILABLE = True
except ImportError:
PSUTIL_AVAILABLE = False
psutil = None

from codecarbon.core.rapl import RAPLFile
from codecarbon.core.units import Time
from codecarbon.core.util import count_cpus, detect_cpu_model
Expand Down Expand Up @@ -207,6 +214,9 @@ def is_rapl_available(rapl_dir: Optional[str] = None) -> bool:


def is_psutil_available():
if not PSUTIL_AVAILABLE:
logger.debug("psutil module is not available.")
return False
try:
cpu_times = psutil.cpu_times()

Expand Down
115 changes: 87 additions & 28 deletions codecarbon/core/resource_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,37 @@ def _setup_fallback_tracking(self, tdp, max_power):
self.cpu_tracker = "TDP constant"
model = tdp.model

if (max_power is None) and self.tracker._force_cpu_power:
if self.tracker._force_cpu_power is not None:
user_input_power = self.tracker._force_cpu_power
logger.debug(f"Using user input TDP: {user_input_power} W")
self.cpu_tracker = "User Input TDP constant"
max_power = user_input_power
if max_power is None:
max_power = user_input_power

logger.info(f"CPU Model on constant consumption mode: {model}")
self.tracker._conf["cpu_model"] = model

if tdp:
# Check for forced constant mode first
if self.tracker._conf.get("force_mode_constant", False):
logger.info(
"Force constant mode requested - bypassing psutil and using constant CPU power"
)
model = tdp.model
if self.tracker._force_cpu_power is not None:
self.cpu_tracker = "User Input TDP constant"
else:
self.cpu_tracker = "TDP constant"
logger.info(f"CPU Model on forced constant consumption mode: {model}")
self.tracker._conf["cpu_model"] = model
hardware_cpu = CPU.from_utils(
self.tracker._output_dir, "constant", model, max_power
)
self.tracker._hardware.append(hardware_cpu)
return

if self.tracker._conf.get("force_mode_cpu_load", False) and (
tdp.tdp is not None or self.tracker._force_cpu_power is not None
):
if cpu.is_psutil_available():
logger.warning(
"No CPU tracking mode found. Falling back on CPU load mode."
Expand All @@ -156,7 +177,8 @@ def _setup_fallback_tracking(self, tdp, max_power):
hardware_cpu = CPU.from_utils(
self.tracker._output_dir, "constant", model, max_power
)
self.cpu_tracker = "global constant"
if max_power is None:
self.cpu_tracker = "global constant"
self.tracker._hardware.append(hardware_cpu)
else:
if cpu.is_psutil_available():
Expand All @@ -172,13 +194,45 @@ def _setup_fallback_tracking(self, tdp, max_power):
)
self.cpu_tracker = MODE_CPU_LOAD
else:
logger.warning(
"Failed to match CPU TDP constant. Falling back on a global constant."
)
self.cpu_tracker = "global constant"
hardware_cpu = CPU.from_utils(self.tracker._output_dir, "constant")
if max_power is None:
logger.warning(
"Failed to match CPU TDP constant. Falling back on a global constant."
)
self.cpu_tracker = "global constant"
hardware_cpu = CPU.from_utils(self.tracker._output_dir, "constant")
else:
logger.warning(
"No CPU tracking mode found. Falling back on CPU constant mode."
)
hardware_cpu = CPU.from_utils(
self.tracker._output_dir, "constant", model, max_power
)
self.tracker._hardware.append(hardware_cpu)

def _resolve_tdp_and_max_power(self, cpu_number, tdp=None, max_power=None):
"""Resolve CPU TDP object and max power estimate."""
if tdp is None:
tdp = cpu.TDP()
if max_power is None:
max_power = tdp.tdp * cpu_number if tdp.tdp is not None else None
return tdp, max_power

def _setup_preferred_cpu_backend(self):
"""Set up the best available CPU backend when not forced."""
if cpu.is_powergadget_available() and self.tracker._force_cpu_power is None:
self._setup_power_gadget()
return True
if cpu.is_rapl_available() and self.tracker._force_cpu_power is None:
self._setup_rapl()
return True
if (
powermetrics.is_powermetrics_available()
and self.tracker._force_cpu_power is None
):
self._setup_powermetrics()
return True
return False

def set_CPU_tracking(self):
logger.info("[setup] CPU Tracking...")
cpu_number = self.tracker._conf.get("cpu_physical_count")
Expand All @@ -192,32 +246,37 @@ def set_CPU_tracking(self):
self.cpu_tracker = "User Input TDP constant"
max_power = self.tracker._force_cpu_power

# Force constant mode takes precedence over every other CPU tracking backend.
if self.tracker._conf.get("force_mode_constant", False):
tdp, max_power = self._resolve_tdp_and_max_power(cpu_number, tdp, max_power)

logger.info(
"Force constant mode requested - bypassing dynamic CPU power backends"
)
if self.tracker._force_cpu_power is not None:
self.cpu_tracker = "User Input TDP constant"
else:
self.cpu_tracker = "TDP constant"
self.tracker._conf["cpu_model"] = tdp.model
hardware_cpu = CPU.from_utils(
self.tracker._output_dir, "constant", tdp.model, max_power
)
self.tracker._hardware.append(hardware_cpu)
return

# Try force CPU load mode if requested
if self.tracker._conf.get("force_mode_cpu_load", False):
if tdp is None:
tdp = cpu.TDP()
if max_power is None:
max_power = tdp.tdp * cpu_number if tdp.tdp is not None else None
tdp, max_power = self._resolve_tdp_and_max_power(cpu_number, tdp, max_power)
if tdp.tdp is not None or self.tracker._force_cpu_power is not None:
if self._setup_cpu_load_mode(tdp, max_power):
return

# Try various tracking methods in order of preference
if cpu.is_powergadget_available() and self.tracker._force_cpu_power is None:
self._setup_power_gadget()
elif cpu.is_rapl_available() and self.tracker._force_cpu_power is None:
self._setup_rapl()
elif (
powermetrics.is_powermetrics_available()
and self.tracker._force_cpu_power is None
):
self._setup_powermetrics()
else:
if tdp is None:
tdp = cpu.TDP()
if max_power is None:
max_power = tdp.tdp * cpu_number if tdp.tdp is not None else None
self._setup_fallback_tracking(tdp, max_power)
if self._setup_preferred_cpu_backend():
return

tdp, max_power = self._resolve_tdp_and_max_power(cpu_number, tdp, max_power)
self._setup_fallback_tracking(tdp, max_power)

def set_GPU_tracking(self):
logger.info("[setup] GPU Tracking...")
Expand Down
15 changes: 14 additions & 1 deletion codecarbon/core/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@
from typing import Optional, Union

import cpuinfo
import psutil

try:
import psutil

PSUTIL_AVAILABLE = True
except ImportError:
PSUTIL_AVAILABLE = False
psutil = None

from codecarbon.external.logger import logger

Expand Down Expand Up @@ -146,6 +153,12 @@ def _windows_get_physical_sockets():


def count_cpus() -> int:
if not PSUTIL_AVAILABLE:
logger.warning("psutil not available, using fallback CPU count detection")
# Fallback to using os.cpu_count() or physical CPU count
cpu_count = os.cpu_count()
return cpu_count if cpu_count is not None else 1

if SLURM_JOB_ID is None:
return psutil.cpu_count()

Expand Down
4 changes: 4 additions & 0 deletions codecarbon/emissions_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ def __init__(
wue: Optional[float] = _sentinel,
force_carbon_intensity_g_co2e_kwh: Optional[float] = _sentinel,
force_mode_cpu_load: Optional[bool] = _sentinel,
force_mode_constant: Optional[bool] = _sentinel,
allow_multiple_runs: Optional[bool] = _sentinel,
rapl_include_dram: Optional[bool] = _sentinel,
rapl_prefer_psys: Optional[bool] = _sentinel,
Expand Down Expand Up @@ -275,6 +276,8 @@ def __init__(
(CPU + chipset + PCIe). When False, uses package domains which
are more reliable. Note: psys can report higher values than
CPU TDP and may be unreliable on older systems.
:param force_mode_constant: Force the addition of a CPU in constant mode, bypassing psutil
:param allow_multiple_runs: Allow multiple instances of codecarbon running in parallel. Defaults to False.
"""

# logger.info("base tracker init")
Expand Down Expand Up @@ -377,6 +380,7 @@ def __init__(
self._set_from_conf(force_mode_cpu_load, "force_mode_cpu_load", False, bool)
self._set_from_conf(rapl_include_dram, "rapl_include_dram", False, bool)
self._set_from_conf(rapl_prefer_psys, "rapl_prefer_psys", False, bool)
self._set_from_conf(force_mode_constant, "force_mode_constant", False, bool)
self._set_from_conf(
experiment_id, "experiment_id", "5b0fa12a-3dd7-45bb-9766-cc326314d9f1"
)
Expand Down
29 changes: 29 additions & 0 deletions tests/test_core_util.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import builtins
import importlib.util
import shutil
import tempfile
from unittest import mock
Expand All @@ -13,6 +15,21 @@
)


def _load_module_without_psutil(module_name, file_path):
real_import = builtins.__import__

def fake_import(name, globals=None, locals=None, fromlist=(), level=0):
if name == "psutil":
raise ImportError("psutil unavailable for test")
return real_import(name, globals, locals, fromlist, level)

spec = importlib.util.spec_from_file_location(module_name, file_path)
module = importlib.util.module_from_spec(spec)
with mock.patch("builtins.__import__", side_effect=fake_import):
spec.loader.exec_module(module)
return module


def test_detect_cpu_model_caching():
"""Test that detect_cpu_model() results are cached."""
# Clear cache to ensure clean state
Expand Down Expand Up @@ -102,6 +119,18 @@ def test_count_cpus_no_slurm():
assert count_cpus() == 4


def test_util_module_import_without_psutil_uses_cpu_count_fallback():
util_module = _load_module_without_psutil(
"codecarbon.core.util_no_psutil_test",
__import__("codecarbon.core.util", fromlist=["util"]).__file__,
)

assert util_module.PSUTIL_AVAILABLE is False
assert util_module.psutil is None
with mock.patch.object(util_module.os, "cpu_count", return_value=6):
assert util_module.count_cpus() == 6


def test_count_cpus_slurm():
with mock.patch("codecarbon.core.util.SLURM_JOB_ID", "12345"):
with mock.patch(
Expand Down
27 changes: 27 additions & 0 deletions tests/test_cpu.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import builtins
import importlib.util
import os
import subprocess
import sys
Expand Down Expand Up @@ -30,7 +32,32 @@
from codecarbon.input import DataSource


def _load_module_without_psutil(module_name, file_path):
real_import = builtins.__import__

def fake_import(name, globals=None, locals=None, fromlist=(), level=0):
if name == "psutil":
raise ImportError("psutil unavailable for test")
return real_import(name, globals, locals, fromlist, level)

spec = importlib.util.spec_from_file_location(module_name, file_path)
module = importlib.util.module_from_spec(spec)
with mock.patch("builtins.__import__", side_effect=fake_import):
spec.loader.exec_module(module)
return module


class TestCPU(unittest.TestCase):
def test_cpu_module_import_without_psutil_sets_fallback_flag(self):
cpu_module = _load_module_without_psutil(
"codecarbon.core.cpu_no_psutil_test",
sys.modules["codecarbon.core.cpu"].__file__,
)

self.assertFalse(cpu_module.PSUTIL_AVAILABLE)
self.assertIsNone(cpu_module.psutil)
self.assertFalse(cpu_module.is_psutil_available())

@mock.patch("codecarbon.core.cpu.IntelPowerGadget", side_effect=Exception("boom"))
def test_is_powergadget_available_returns_false_on_exception(
self, mock_powergadget
Expand Down
Loading
Loading