Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 91 additions & 27 deletions distributed/deploy/subprocess.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import abc
import asyncio
import copy
import json
Expand All @@ -15,13 +16,85 @@
from distributed.compatibility import WINDOWS
from distributed.deploy.spec import ProcessInterface, SpecCluster
from distributed.deploy.utils import nprocesses_nthreads
from distributed.scheduler import Scheduler
from distributed.worker_memory import parse_memory_limit

logger = logging.getLogger(__name__)


class SubprocessWorker(ProcessInterface):
class Subprocess(ProcessInterface, abc.ABC):
process: asyncio.subprocess.Process | None

def __init__(self):
if WINDOWS:
# FIXME: distributed#7434
raise RuntimeError("Subprocess does not support Windows.")
self.process = None
super().__init__()

async def start(self) -> None:
await self._start()
await super().start()

@abc.abstractmethod
async def _start(self) -> None:
"""Start the subprocess"""

async def close(self) -> None:
if self.process and self.process.returncode is None:
for child in psutil.Process(self.process.pid).children(recursive=True):
child.kill()
self.process.kill()
await self.process.communicate()
self.process = None
await super().close()


class SubprocessScheduler(Subprocess):
"""A local Dask scheduler running in a dedicated subprocess

Parameters
----------
scheduler_kwargs:
Keywords to pass on to the ``Scheduler`` class constructor
"""

scheduler_kwargs: dict
address: str | None

def __init__(
self,
scheduler_kwargs: dict | None = None,
):
self.scheduler_kwargs = scheduler_kwargs or {}
super().__init__()

async def _start(self):
cmd = [
"dask",
"spec",
"--spec",
json.dumps(
{"cls": "distributed.Scheduler", "opts": {**self.scheduler_kwargs}}
),
]
logger.info(" ".join(cmd))
self.process = await asyncio.create_subprocess_exec(
*cmd,
stderr=asyncio.subprocess.PIPE,
)

while True:
line = (await self.process.stderr.readline()).decode()
if not line.strip():
raise RuntimeError("Scheduler failed to start")
logger.info(line.strip())

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.info(line.strip())
sys.stderr.write(line)

We're not redirecting stderr from the workers; seems like output from the scheduler should be treated the same? Subtle, but logging could be configured differently from plain stderr—including to prefix some additional information which could be confusing—so just forwarding to stderr seems more consistent with the worker subprocesses.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #7727 (comment) for the general discrepancy between the generally configured log level and the one required to retrieve the address.

if "Scheduler at" in line:
self.address = line.split("Scheduler at:")[1].strip()
break
logger.debug(line)
Comment on lines +86 to +94

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once the Scheduler at: message happens, further logs from the scheduler will be swallowed, right? I think it would be nicer UX to keep forwarding scheduler stderr the whole time, just like stderr from workers will be visible. I'd even wonder if that pipe could get filled up in rare cases, blocking writes on the scheduler side.

That's a little more work though, since you'd probably need to start a Task in this start method that forwards stderr, and clean it up in close.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue we face here is that the default log level is WARN, but we need to enable INFO logs on the scheduler to be able to retrieve the scheduler address. I've borrowed this pattern from the SSHCluster implementation, which seemed "good enough". I'm not keen to dive into more work on this for now. , in particular, since it should also entail that we only write logs with the applicable minimum log level.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that makes sense.



class SubprocessWorker(Subprocess):
"""A local Dask worker running in a dedicated subprocess

Parameters
Expand All @@ -36,11 +109,10 @@ class SubprocessWorker(ProcessInterface):
Keywords to pass on to the ``Worker`` class constructor
"""

name: str | None
scheduler: str
worker_class: str
worker_kwargs: dict
name: str | None
process: asyncio.subprocess.Process | None

def __init__(
self,
Expand All @@ -49,34 +121,22 @@ def __init__(
name: str | None = None,
worker_kwargs: dict | None = None,
) -> None:
if WINDOWS:
# FIXME: distributed#7434
raise RuntimeError("SubprocessWorker does not support Windows.")
self.name = name
self.scheduler = scheduler
self.worker_class = worker_class
self.name = name
self.worker_kwargs = copy.copy(worker_kwargs or {})
self.process = None
super().__init__()

async def start(self) -> None:
self.process = await asyncio.create_subprocess_exec(
async def _start(self) -> None:
cmd = [
"dask",
"spec",
self.scheduler,
"--spec",
json.dumps({0: {"cls": self.worker_class, "opts": {**self.worker_kwargs}}}),
)
await super().start()

async def close(self) -> None:
if self.process and self.process.returncode is None:
for child in psutil.Process(self.process.pid).children(recursive=True):
child.kill()
self.process.kill()
await self.process.wait()
self.process = None
await super().close()
json.dumps({"cls": self.worker_class, "opts": {**self.worker_kwargs}}),
]
logger.info(" ".join(cmd))
self.process = await asyncio.create_subprocess_exec(*cmd)


def SubprocessCluster(
Expand All @@ -91,10 +151,9 @@ def SubprocessCluster(
silence_logs: int = logging.WARN,
**kwargs: Any,
) -> SpecCluster:
"""Create in-process scheduler and workers running in dedicated subprocesses
"""Create a scheduler and workers that run in dedicated subprocesses

This creates a "cluster" of a scheduler running in the current process and
workers running in dedicated subprocesses.
This creates a "cluster" of a scheduler and workers running in dedicated subprocesses.

.. warning::

Expand Down Expand Up @@ -178,7 +237,12 @@ def SubprocessCluster(
worker_kwargs,
)

scheduler = {"cls": Scheduler, "options": scheduler_kwargs}
scheduler = {
"cls": SubprocessScheduler,
"options": {
"scheduler_kwargs": scheduler_kwargs,
},
}
worker = {
"cls": SubprocessWorker,
"options": {"worker_class": worker_class, "worker_kwargs": worker_kwargs},
Expand Down
18 changes: 16 additions & 2 deletions distributed/deploy/tests/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

from distributed import Client
from distributed.compatibility import WINDOWS
from distributed.deploy.subprocess import SubprocessCluster, SubprocessWorker
from distributed.deploy.subprocess import (
SubprocessCluster,
SubprocessScheduler,
SubprocessWorker,
)
from distributed.utils_test import gen_test


Expand Down Expand Up @@ -53,20 +57,30 @@ async def test_scale_up_and_down():
cluster.scale(2)
await c.wait_for_workers(2)
assert len(cluster.workers) == 2
assert len(cluster.scheduler.workers) == 2

cluster.scale(1)
await cluster

assert len(cluster.workers) == 1


@pytest.mark.skipif(WINDOWS, reason="distributed#7434")
@gen_test()
async def test_raise_if_scheduler_fails_to_start():
with pytest.raises(RuntimeError, match="Scheduler failed to start"):
async with SubprocessCluster(scheduler_port=-1, asynchronous=True):
pass


@pytest.mark.skipif(
not WINDOWS, reason="Windows-specific error testing (distributed#7434)"
)
def test_raise_on_windows():
with pytest.raises(RuntimeError, match="not support Windows"):
SubprocessCluster()

with pytest.raises(RuntimeError, match="not support Windows"):
SubprocessScheduler()

with pytest.raises(RuntimeError, match="not support Windows"):
SubprocessWorker(scheduler="tcp://127.0.0.1:8786")