-
-
Notifications
You must be signed in to change notification settings - Fork 761
Run scheduler of SubprocessCluster in subprocess
#7727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d5fae93
d16932a
05c48e9
3ceff54
0b3eec1
9bfb2f4
19f3b47
ea7056b
d07f83b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import abc | ||
| import asyncio | ||
| import copy | ||
| import json | ||
|
|
@@ -15,13 +16,85 @@ | |
| from distributed.compatibility import WINDOWS | ||
| from distributed.deploy.spec import ProcessInterface, SpecCluster | ||
| from distributed.deploy.utils import nprocesses_nthreads | ||
| from distributed.scheduler import Scheduler | ||
| from distributed.worker_memory import parse_memory_limit | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class SubprocessWorker(ProcessInterface): | ||
| class Subprocess(ProcessInterface, abc.ABC): | ||
| process: asyncio.subprocess.Process | None | ||
|
|
||
| def __init__(self): | ||
| if WINDOWS: | ||
| # FIXME: distributed#7434 | ||
| raise RuntimeError("Subprocess does not support Windows.") | ||
| self.process = None | ||
| super().__init__() | ||
|
|
||
| async def start(self) -> None: | ||
| await self._start() | ||
| await super().start() | ||
|
|
||
| @abc.abstractmethod | ||
| async def _start(self) -> None: | ||
| """Start the subprocess""" | ||
|
|
||
| async def close(self) -> None: | ||
| if self.process and self.process.returncode is None: | ||
| for child in psutil.Process(self.process.pid).children(recursive=True): | ||
| child.kill() | ||
| self.process.kill() | ||
| await self.process.communicate() | ||
| self.process = None | ||
| await super().close() | ||
|
|
||
|
|
||
| class SubprocessScheduler(Subprocess): | ||
| """A local Dask scheduler running in a dedicated subprocess | ||
|
|
||
| Parameters | ||
| ---------- | ||
| scheduler_kwargs: | ||
| Keywords to pass on to the ``Scheduler`` class constructor | ||
| """ | ||
|
|
||
| scheduler_kwargs: dict | ||
| address: str | None | ||
|
|
||
| def __init__( | ||
| self, | ||
| scheduler_kwargs: dict | None = None, | ||
| ): | ||
| self.scheduler_kwargs = scheduler_kwargs or {} | ||
| super().__init__() | ||
|
|
||
| async def _start(self): | ||
| cmd = [ | ||
| "dask", | ||
| "spec", | ||
| "--spec", | ||
| json.dumps( | ||
| {"cls": "distributed.Scheduler", "opts": {**self.scheduler_kwargs}} | ||
| ), | ||
| ] | ||
| logger.info(" ".join(cmd)) | ||
| self.process = await asyncio.create_subprocess_exec( | ||
| *cmd, | ||
| stderr=asyncio.subprocess.PIPE, | ||
| ) | ||
|
|
||
| while True: | ||
| line = (await self.process.stderr.readline()).decode() | ||
| if not line.strip(): | ||
| raise RuntimeError("Scheduler failed to start") | ||
| logger.info(line.strip()) | ||
| if "Scheduler at" in line: | ||
| self.address = line.split("Scheduler at:")[1].strip() | ||
| break | ||
| logger.debug(line) | ||
|
Comment on lines
+86
to
+94
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. once the That's a little more work though, since you'd probably need to start a
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One issue we face here is that the default log level is
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, that makes sense. |
||
|
|
||
|
|
||
| class SubprocessWorker(Subprocess): | ||
| """A local Dask worker running in a dedicated subprocess | ||
|
|
||
| Parameters | ||
|
|
@@ -36,11 +109,10 @@ class SubprocessWorker(ProcessInterface): | |
| Keywords to pass on to the ``Worker`` class constructor | ||
| """ | ||
|
|
||
| name: str | None | ||
| scheduler: str | ||
| worker_class: str | ||
| worker_kwargs: dict | ||
| name: str | None | ||
| process: asyncio.subprocess.Process | None | ||
|
|
||
| def __init__( | ||
| self, | ||
|
|
@@ -49,34 +121,22 @@ def __init__( | |
| name: str | None = None, | ||
| worker_kwargs: dict | None = None, | ||
| ) -> None: | ||
| if WINDOWS: | ||
| # FIXME: distributed#7434 | ||
| raise RuntimeError("SubprocessWorker does not support Windows.") | ||
| self.name = name | ||
| self.scheduler = scheduler | ||
| self.worker_class = worker_class | ||
| self.name = name | ||
| self.worker_kwargs = copy.copy(worker_kwargs or {}) | ||
| self.process = None | ||
| super().__init__() | ||
|
|
||
| async def start(self) -> None: | ||
| self.process = await asyncio.create_subprocess_exec( | ||
| async def _start(self) -> None: | ||
| cmd = [ | ||
| "dask", | ||
| "spec", | ||
| self.scheduler, | ||
| "--spec", | ||
| json.dumps({0: {"cls": self.worker_class, "opts": {**self.worker_kwargs}}}), | ||
| ) | ||
| await super().start() | ||
|
|
||
| async def close(self) -> None: | ||
| if self.process and self.process.returncode is None: | ||
| for child in psutil.Process(self.process.pid).children(recursive=True): | ||
| child.kill() | ||
| self.process.kill() | ||
| await self.process.wait() | ||
| self.process = None | ||
| await super().close() | ||
| json.dumps({"cls": self.worker_class, "opts": {**self.worker_kwargs}}), | ||
| ] | ||
| logger.info(" ".join(cmd)) | ||
| self.process = await asyncio.create_subprocess_exec(*cmd) | ||
|
|
||
|
|
||
| def SubprocessCluster( | ||
|
|
@@ -91,10 +151,9 @@ def SubprocessCluster( | |
| silence_logs: int = logging.WARN, | ||
| **kwargs: Any, | ||
| ) -> SpecCluster: | ||
| """Create in-process scheduler and workers running in dedicated subprocesses | ||
| """Create a scheduler and workers that run in dedicated subprocesses | ||
|
|
||
| This creates a "cluster" of a scheduler running in the current process and | ||
| workers running in dedicated subprocesses. | ||
| This creates a "cluster" of a scheduler and workers running in dedicated subprocesses. | ||
|
|
||
| .. warning:: | ||
|
|
||
|
|
@@ -178,7 +237,12 @@ def SubprocessCluster( | |
| worker_kwargs, | ||
| ) | ||
|
|
||
| scheduler = {"cls": Scheduler, "options": scheduler_kwargs} | ||
| scheduler = { | ||
| "cls": SubprocessScheduler, | ||
| "options": { | ||
| "scheduler_kwargs": scheduler_kwargs, | ||
| }, | ||
| } | ||
| worker = { | ||
| "cls": SubprocessWorker, | ||
| "options": {"worker_class": worker_class, "worker_kwargs": worker_kwargs}, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're not redirecting stderr from the workers; seems like output from the scheduler should be treated the same? Subtle, but logging could be configured differently from plain stderr—including to prefix some additional information which could be confusing—so just forwarding to stderr seems more consistent with the worker subprocesses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #7727 (comment) for the general discrepancy between the generally configured log level and the one required to retrieve the address.