Skip to content

Run scheduler of SubprocessCluster in subprocess#7727

Merged
hendrikmakait merged 9 commits into
dask:mainfrom
hendrikmakait:subprocess-scheduler
Apr 6, 2023
Merged

Run scheduler of SubprocessCluster in subprocess#7727
hendrikmakait merged 9 commits into
dask:mainfrom
hendrikmakait:subprocess-scheduler

Conversation

@hendrikmakait

@hendrikmakait hendrikmakait commented Mar 30, 2023

Copy link
Copy Markdown
Member

In #7431, we added the SubprocessCluster, which runs a local cluster where all workers run in subprocesses. This PR also moves the scheduler to a subprocess.

  • Tests added / passed
  • Passes pre-commit run --all-files

@github-actions

github-actions Bot commented Mar 30, 2023

Copy link
Copy Markdown
Contributor

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       26 files  ±  0         26 suites  ±0   14h 21m 40s ⏱️ + 1h 13m 4s
  3 547 tests +  2    3 433 ✔️  -     2     106 💤 +    1  7 +2  1 🔥 +1 
44 872 runs  +11  42 623 ✔️  - 110  2 240 💤 +118  8 +2  1 🔥 +1 

For more details on these failures and errors, see this check.

Results for commit d07f83b. ± Comparison against base commit 78a926d.

This pull request skips 1 test.
distributed.protocol.tests.test_protocol ‑ test_large_messages

♻️ This comment has been updated with latest results.

@hendrikmakait hendrikmakait self-assigned this Apr 3, 2023
Comment thread distributed/deploy/subprocess.py Outdated
line = (await self.process.stderr.readline()).decode()
if not line.strip():
raise RuntimeError("Scheduler failed to start")
logger.info(line.strip())

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.info(line.strip())
sys.stderr.write(line)

We're not redirecting stderr from the workers; seems like output from the scheduler should be treated the same? Subtle, but logging could be configured differently from plain stderr—including to prefix some additional information which could be confusing—so just forwarding to stderr seems more consistent with the worker subprocesses.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #7727 (comment) for the general discrepancy between the generally configured log level and the one required to retrieve the address.

Comment on lines +91 to +99
while True:
line = (await self.process.stderr.readline()).decode()
if not line.strip():
raise RuntimeError("Scheduler failed to start")
logger.info(line.strip())
if "Scheduler at" in line:
self.address = line.split("Scheduler at:")[1].strip()
break
logger.debug(line)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once the Scheduler at: message happens, further logs from the scheduler will be swallowed, right? I think it would be nicer UX to keep forwarding scheduler stderr the whole time, just like stderr from workers will be visible. I'd even wonder if that pipe could get filled up in rare cases, blocking writes on the scheduler side.

That's a little more work though, since you'd probably need to start a Task in this start method that forwards stderr, and clean it up in close.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue we face here is that the default log level is WARN, but we need to enable INFO logs on the scheduler to be able to retrieve the scheduler address. I've borrowed this pattern from the SSHCluster implementation, which seemed "good enough". I'm not keen to dive into more work on this for now. , in particular, since it should also entail that we only write logs with the applicable minimum log level.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that makes sense.

Comment thread distributed/deploy/subprocess.py Outdated
hendrikmakait and others added 2 commits April 5, 2023 19:59
Co-authored-by: Gabe Joseph <gjoseph92@gmail.com>
@hendrikmakait hendrikmakait merged commit e72c309 into dask:main Apr 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants