From d5fae938e4842f9db59e7814df52de1adf2a86bb Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 30 Mar 2023 13:58:11 +0200 Subject: [PATCH 1/9] Minor cleanup --- distributed/deploy/subprocess.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/deploy/subprocess.py b/distributed/deploy/subprocess.py index 0b4323ba107..8c6491c8498 100644 --- a/distributed/deploy/subprocess.py +++ b/distributed/deploy/subprocess.py @@ -65,7 +65,7 @@ async def start(self) -> None: "spec", self.scheduler, "--spec", - json.dumps({0: {"cls": self.worker_class, "opts": {**self.worker_kwargs}}}), + json.dumps({"cls": self.worker_class, "opts": {**self.worker_kwargs}}), ) await super().start() From d16932a0ebb065f629462426bcc554752ed39502 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 30 Mar 2023 16:48:03 +0200 Subject: [PATCH 2/9] Subprocess scheduler --- distributed/deploy/subprocess.py | 121 ++++++++++++++++---- distributed/deploy/tests/test_subprocess.py | 18 ++- 2 files changed, 113 insertions(+), 26 deletions(-) diff --git a/distributed/deploy/subprocess.py b/distributed/deploy/subprocess.py index 8c6491c8498..9643d10ee94 100644 --- a/distributed/deploy/subprocess.py +++ b/distributed/deploy/subprocess.py @@ -1,5 +1,6 @@ from __future__ import annotations +import abc import asyncio import copy import json @@ -15,13 +16,88 @@ from distributed.compatibility import WINDOWS from distributed.deploy.spec import ProcessInterface, SpecCluster from distributed.deploy.utils import nprocesses_nthreads -from distributed.scheduler import Scheduler from distributed.worker_memory import parse_memory_limit logger = logging.getLogger(__name__) -class SubprocessWorker(ProcessInterface): +class Subprocess(ProcessInterface, abc.ABC): + name: str | None + process: asyncio.subprocess.Process | None + + def __init__(self, name: str | None = None): + if WINDOWS: + # FIXME: distributed#7434 + raise RuntimeError("Subprocess does not support Windows.") + self.name = name + self.process = None + super().__init__() + + async def start(self) -> None: + await self._start() + await super().start() + + @abc.abstractmethod + async def _start(self) -> None: + """Start the subprocess""" + raise NotImplementedError + + async def close(self) -> None: + if self.process and self.process.returncode is None: + for child in psutil.Process(self.process.pid).children(recursive=True): + child.kill() + self.process.kill() + await self.process.communicate() + self.process = None + await super().close() + + +class SubprocessScheduler(Subprocess): + """A local Dask scheduler running in a dedicated subprocess""" + + scheduler_kwargs: dict + address: str | None + + def __init__( + self, + name: str | None = None, + scheduler_kwargs: dict | None = None, + ): + self.scheduler_kwargs = scheduler_kwargs or {} + super().__init__(name) + + async def _start(self): + cmd = [ + "dask", + "spec", + "--spec", + json.dumps( + {"cls": "distributed.Scheduler", "opts": {**self.scheduler_kwargs}} + ), + ] + logger.info(" ".join(cmd)) + self.process = await asyncio.create_subprocess_exec( + "dask", + "spec", + "--spec", + json.dumps( + {"cls": "distributed.Scheduler", "opts": {**self.scheduler_kwargs}} + ), + stderr=asyncio.subprocess.PIPE, + ) + + while True: + line = (await self.process.stderr.readline()).decode() + if not line.strip(): + raise RuntimeError("Scheduler failed to start") + logger.info(line.strip()) + if "Scheduler at" in line: + self.address = line.split("Scheduler at:")[1].strip() + break + logger.debug(line) + + +class SubprocessWorker(Subprocess): """A local Dask worker running in a dedicated subprocess Parameters @@ -39,8 +115,6 @@ class SubprocessWorker(ProcessInterface): scheduler: str worker_class: str worker_kwargs: dict - name: str | None - process: asyncio.subprocess.Process | None def __init__( self, @@ -49,34 +123,28 @@ def __init__( name: str | None = None, worker_kwargs: dict | None = None, ) -> None: - if WINDOWS: - # FIXME: distributed#7434 - raise RuntimeError("SubprocessWorker does not support Windows.") self.scheduler = scheduler self.worker_class = worker_class - self.name = name self.worker_kwargs = copy.copy(worker_kwargs or {}) - self.process = None - super().__init__() + super().__init__(name) - async def start(self) -> None: + async def _start(self) -> None: + cmd = [ + "dask", + "spec", + self.scheduler, + "--spec", + json.dumps({"cls": self.worker_class, "opts": {**self.worker_kwargs}}), + ] + logger.info(" ".join(cmd)) self.process = await asyncio.create_subprocess_exec( "dask", "spec", self.scheduler, "--spec", json.dumps({"cls": self.worker_class, "opts": {**self.worker_kwargs}}), + stderr=asyncio.subprocess.PIPE, ) - await super().start() - - async def close(self) -> None: - if self.process and self.process.returncode is None: - for child in psutil.Process(self.process.pid).children(recursive=True): - child.kill() - self.process.kill() - await self.process.wait() - self.process = None - await super().close() def SubprocessCluster( @@ -88,7 +156,7 @@ def SubprocessCluster( n_workers: int | None = None, threads_per_worker: int | None = None, worker_kwargs: dict | None = None, - silence_logs: int = logging.WARN, + silence_logs: int = logging.INFO, **kwargs: Any, ) -> SpecCluster: """Create in-process scheduler and workers running in dedicated subprocesses @@ -123,7 +191,7 @@ def SubprocessCluster( worker_kwargs: Keywords to pass on to the ``Worker`` class constructor silence_logs: - Level of logs to print out to stdout, defaults to ``logging.WARN`` + Level of logs to print out to stdout, defaults to ``logging.INFO`` Use a falsy value like False or None to disable log silencing. @@ -178,7 +246,12 @@ def SubprocessCluster( worker_kwargs, ) - scheduler = {"cls": Scheduler, "options": scheduler_kwargs} + scheduler = { + "cls": SubprocessScheduler, + "options": { + "scheduler_kwargs": scheduler_kwargs, + }, + } worker = { "cls": SubprocessWorker, "options": {"worker_class": worker_class, "worker_kwargs": worker_kwargs}, diff --git a/distributed/deploy/tests/test_subprocess.py b/distributed/deploy/tests/test_subprocess.py index be7a11ce322..3382fef88a8 100644 --- a/distributed/deploy/tests/test_subprocess.py +++ b/distributed/deploy/tests/test_subprocess.py @@ -4,7 +4,11 @@ from distributed import Client from distributed.compatibility import WINDOWS -from distributed.deploy.subprocess import SubprocessCluster, SubprocessWorker +from distributed.deploy.subprocess import ( + SubprocessCluster, + SubprocessScheduler, + SubprocessWorker, +) from distributed.utils_test import gen_test @@ -53,7 +57,6 @@ async def test_scale_up_and_down(): cluster.scale(2) await c.wait_for_workers(2) assert len(cluster.workers) == 2 - assert len(cluster.scheduler.workers) == 2 cluster.scale(1) await cluster @@ -61,6 +64,14 @@ async def test_scale_up_and_down(): assert len(cluster.workers) == 1 +@pytest.mark.skipif(WINDOWS, reason="distributed#7434") +@gen_test() +async def test_raise_if_scheduler_fails_to_start(): + with pytest.raises(RuntimeError, match="Scheduler failed to start"): + async with SubprocessCluster(scheduler_port=-1, asynchronous=True): + pass + + @pytest.mark.skipif( not WINDOWS, reason="Windows-specific error testing (distributed#7434)" ) @@ -68,5 +79,8 @@ def test_raise_on_windows(): with pytest.raises(RuntimeError, match="not support Windows"): SubprocessCluster() + with pytest.raises(RuntimeError, match="not support Windows"): + SubprocessScheduler() + with pytest.raises(RuntimeError, match="not support Windows"): SubprocessWorker(scheduler="tcp://127.0.0.1:8786") From 05c48e91b4de897cff7c2d90a5980d891cd054cb Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 30 Mar 2023 16:53:11 +0200 Subject: [PATCH 3/9] log level --- distributed/deploy/subprocess.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/deploy/subprocess.py b/distributed/deploy/subprocess.py index 9643d10ee94..ce08781e72c 100644 --- a/distributed/deploy/subprocess.py +++ b/distributed/deploy/subprocess.py @@ -156,7 +156,7 @@ def SubprocessCluster( n_workers: int | None = None, threads_per_worker: int | None = None, worker_kwargs: dict | None = None, - silence_logs: int = logging.INFO, + silence_logs: int = logging.WARN, **kwargs: Any, ) -> SpecCluster: """Create in-process scheduler and workers running in dedicated subprocesses @@ -191,7 +191,7 @@ def SubprocessCluster( worker_kwargs: Keywords to pass on to the ``Worker`` class constructor silence_logs: - Level of logs to print out to stdout, defaults to ``logging.INFO`` + Level of logs to print out to stdout, defaults to ``logging.WARN`` Use a falsy value like False or None to disable log silencing. From 3ceff542149db610631b72215ed5b7835ebecbb1 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 30 Mar 2023 16:57:54 +0200 Subject: [PATCH 4/9] Docs --- distributed/deploy/subprocess.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/distributed/deploy/subprocess.py b/distributed/deploy/subprocess.py index ce08781e72c..cb43627fd13 100644 --- a/distributed/deploy/subprocess.py +++ b/distributed/deploy/subprocess.py @@ -159,10 +159,9 @@ def SubprocessCluster( silence_logs: int = logging.WARN, **kwargs: Any, ) -> SpecCluster: - """Create in-process scheduler and workers running in dedicated subprocesses + """Create a scheduler and workers run in dedicated subprocesses - This creates a "cluster" of a scheduler running in the current process and - workers running in dedicated subprocesses. + This creates a "cluster" of a scheduler and workers running in dedicated subprocesses. .. warning:: From 0b3eec1d6e028eb06432f16086fbda5fc974fa3b Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 30 Mar 2023 17:05:09 +0200 Subject: [PATCH 5/9] Parameters --- distributed/deploy/subprocess.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/distributed/deploy/subprocess.py b/distributed/deploy/subprocess.py index cb43627fd13..cf7cfdffab7 100644 --- a/distributed/deploy/subprocess.py +++ b/distributed/deploy/subprocess.py @@ -22,14 +22,12 @@ class Subprocess(ProcessInterface, abc.ABC): - name: str | None process: asyncio.subprocess.Process | None - def __init__(self, name: str | None = None): + def __init__(self): if WINDOWS: # FIXME: distributed#7434 raise RuntimeError("Subprocess does not support Windows.") - self.name = name self.process = None super().__init__() @@ -53,18 +51,23 @@ async def close(self) -> None: class SubprocessScheduler(Subprocess): - """A local Dask scheduler running in a dedicated subprocess""" + """A local Dask scheduler running in a dedicated subprocess + + Parameters + ---------- + scheduler_kwargs: + Keywords to pass on to the ``Scheduler`` class constructor + """ scheduler_kwargs: dict address: str | None def __init__( self, - name: str | None = None, scheduler_kwargs: dict | None = None, ): self.scheduler_kwargs = scheduler_kwargs or {} - super().__init__(name) + super().__init__() async def _start(self): cmd = [ @@ -112,6 +115,7 @@ class SubprocessWorker(Subprocess): Keywords to pass on to the ``Worker`` class constructor """ + name: str | None scheduler: str worker_class: str worker_kwargs: dict @@ -123,10 +127,11 @@ def __init__( name: str | None = None, worker_kwargs: dict | None = None, ) -> None: + self.name = name self.scheduler = scheduler self.worker_class = worker_class self.worker_kwargs = copy.copy(worker_kwargs or {}) - super().__init__(name) + super().__init__() async def _start(self) -> None: cmd = [ From 9bfb2f4ce6e0bc490f4f3d064ed1ecbf14c674dc Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 30 Mar 2023 17:08:02 +0200 Subject: [PATCH 6/9] Minor --- distributed/deploy/subprocess.py | 1 - 1 file changed, 1 deletion(-) diff --git a/distributed/deploy/subprocess.py b/distributed/deploy/subprocess.py index cf7cfdffab7..df6092fae8f 100644 --- a/distributed/deploy/subprocess.py +++ b/distributed/deploy/subprocess.py @@ -38,7 +38,6 @@ async def start(self) -> None: @abc.abstractmethod async def _start(self) -> None: """Start the subprocess""" - raise NotImplementedError async def close(self) -> None: if self.process and self.process.returncode is None: From 19f3b47a04200dc203fd998762f9a5490b43bb15 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 30 Mar 2023 17:08:42 +0200 Subject: [PATCH 7/9] PIPE --- distributed/deploy/subprocess.py | 1 - 1 file changed, 1 deletion(-) diff --git a/distributed/deploy/subprocess.py b/distributed/deploy/subprocess.py index df6092fae8f..f9373342cf3 100644 --- a/distributed/deploy/subprocess.py +++ b/distributed/deploy/subprocess.py @@ -147,7 +147,6 @@ async def _start(self) -> None: self.scheduler, "--spec", json.dumps({"cls": self.worker_class, "opts": {**self.worker_kwargs}}), - stderr=asyncio.subprocess.PIPE, ) From ea7056b481bb095f2b165dcf3aa99a7440a07aeb Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 5 Apr 2023 19:59:30 +0200 Subject: [PATCH 8/9] Update distributed/deploy/subprocess.py Co-authored-by: Gabe Joseph --- distributed/deploy/subprocess.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/deploy/subprocess.py b/distributed/deploy/subprocess.py index f9373342cf3..35d34846fea 100644 --- a/distributed/deploy/subprocess.py +++ b/distributed/deploy/subprocess.py @@ -162,7 +162,7 @@ def SubprocessCluster( silence_logs: int = logging.WARN, **kwargs: Any, ) -> SpecCluster: - """Create a scheduler and workers run in dedicated subprocesses + """Create a scheduler and workers that run in dedicated subprocesses This creates a "cluster" of a scheduler and workers running in dedicated subprocesses. From d07f83b38b8f284120aa7ac5d8ed2b228de9f65e Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 5 Apr 2023 20:16:09 +0200 Subject: [PATCH 9/9] cmd --- distributed/deploy/subprocess.py | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/distributed/deploy/subprocess.py b/distributed/deploy/subprocess.py index 35d34846fea..c2284a138b3 100644 --- a/distributed/deploy/subprocess.py +++ b/distributed/deploy/subprocess.py @@ -79,12 +79,7 @@ async def _start(self): ] logger.info(" ".join(cmd)) self.process = await asyncio.create_subprocess_exec( - "dask", - "spec", - "--spec", - json.dumps( - {"cls": "distributed.Scheduler", "opts": {**self.scheduler_kwargs}} - ), + *cmd, stderr=asyncio.subprocess.PIPE, ) @@ -141,13 +136,7 @@ async def _start(self) -> None: json.dumps({"cls": self.worker_class, "opts": {**self.worker_kwargs}}), ] logger.info(" ".join(cmd)) - self.process = await asyncio.create_subprocess_exec( - "dask", - "spec", - self.scheduler, - "--spec", - json.dumps({"cls": self.worker_class, "opts": {**self.worker_kwargs}}), - ) + self.process = await asyncio.create_subprocess_exec(*cmd) def SubprocessCluster(