Skip to content

use the ProactorEventLoop on windows#7494

Closed
fjetter wants to merge 5 commits into
dask:mainfrom
fjetter:tornado_proactor
Closed

use the ProactorEventLoop on windows#7494
fjetter wants to merge 5 commits into
dask:mainfrom
fjetter:tornado_proactor

Conversation

@fjetter

@fjetter fjetter commented Jan 20, 2023

Copy link
Copy Markdown
Member

Supersedes #5833
Closes #7492
Closes #7434

@github-actions

github-actions Bot commented Jan 20, 2023

Copy link
Copy Markdown
Contributor

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       20 files  ±  0         20 suites  ±0   11h 25m 12s ⏱️ + 4m 44s
  3 704 tests +  4    3 576 ✔️  - 10     106 💤 ±0  19 +11  3 🔥 +3 
35 835 runs  +49  34 045 ✔️ +16  1 748 💤  - 1  33 +25  9 🔥 +9 

For more details on these failures and errors, see this check.

Results for commit 445d001. ± Comparison against base commit fca4b35.

♻️ This comment has been updated with latest results.

@fjetter

fjetter commented Jan 20, 2023

Copy link
Copy Markdown
Member Author

Looks like test_quiet_close_process[True] is a related failure

    @pytest.mark.slow
    # These lines sometimes appear:
    #     Creating scratch directories is taking a surprisingly long time
    #     Future exception was never retrieved
    #     tornado.util.TimeoutError
    #     Batched Comm Closed
    @pytest.mark.flaky(reruns=5, reruns_delay=5)
    @pytest.mark.parametrize("processes", [True, False])
    def test_quiet_close_process(processes, tmp_path):
        with open(tmp_path / "script.py", mode="w") as f:
            f.write(client_script % processes)
    
        with popen([sys.executable, tmp_path / "script.py"], capture_output=True) as proc:
>           out, _ = proc.communicate(timeout=60)

distributed\tests\test_client.py:7894: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Miniconda3\envs\dask-distributed\lib\subprocess.py:1154: in communicate
    stdout, stderr = self._communicate(input, endtime, timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <Popen: returncode: 3221225786 args: ['C:\\Miniconda3\\envs\\dask-distribute...>
input = None, endtime = 2187.093, orig_timeout = 60

    def _communicate(self, input, endtime, orig_timeout):
        # Start reader threads feeding into a list hanging off of this
        # object, unless they've already been started.
        if self.stdout and not hasattr(self, "_stdout_buff"):
            self._stdout_buff = []
            self.stdout_thread = \
                    threading.Thread(target=self._readerthread,
                                     args=(self.stdout, self._stdout_buff))
            self.stdout_thread.daemon = True
            self.stdout_thread.start()
        if self.stderr and not hasattr(self, "_stderr_buff"):
            self._stderr_buff = []
            self.stderr_thread = \
                    threading.Thread(target=self._readerthread,
                                     args=(self.stderr, self._stderr_buff))
            self.stderr_thread.daemon = True
            self.stderr_thread.start()
    
        if self.stdin:
            self._stdin_write(input)
    
        # Wait for the reader threads, or time out.  If we time out, the
        # threads remain reading and the fds left open in case the user
        # calls communicate again.
        if self.stdout is not None:
            self.stdout_thread.join(self._remaining_time(endtime))
            if self.stdout_thread.is_alive():
>               raise TimeoutExpired(self.args, orig_timeout)
E               subprocess.TimeoutExpired: Command '['C:\\Miniconda3\\envs\\dask-distributed\\python.exe', WindowsPath('C:/Users/runneradmin/AppData/Local/Temp/pytest-of-runneradmin/pytest-0/test_quiet_close_process_True_5/script.py')]' timed out after 60 seconds

C:\Miniconda3\envs\dask-distributed\lib\subprocess.py:1530: TimeoutExpired

cc @graingert

@crusaderky I believe you have a win machine. could you try reproducing this?

@crusaderky

Copy link
Copy Markdown
Collaborator

@crusaderky I believe you have a win machine. could you try reproducing this?

I'll add it to my pile :)

@crusaderky crusaderky self-assigned this Jan 20, 2023
@crusaderky

Copy link
Copy Markdown
Collaborator

reproduced. investigating

@crusaderky

Copy link
Copy Markdown
Collaborator

I'm lost. @graingert I could use your expertise.

This issue is specifically about having a LocalCluster running when the interpreter shuts down:

import distributed
if __name__ == "__main__":
    cluster = distributed.LocalCluster(processes=True, n_workers=1)

What I could find:

  1. The atexit hook distributed.deploy.spec.close_clusters() calls LocalCluster.close()
  2. which calls sync(SpecCluster._close)
  3. which calls SpecCluster._correct_state
  4. which calls SpecCluster._correct_state_internal
  5. which calls
    await self.scheduler_comm.retire_workers(workers=list(to_close))
  6. which gets cancelled after 10 seconds by sync() from step 2, never reaching Scheduler.retire_workers:
  File "distributed\deploy\spec.py", line 351, in _correct_state_internal
    await self.scheduler_comm.retire_workers(workers=list(to_close))
  File "distributed\core.py", line 1140, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "distributed\core.py", line 986, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "distributed\comm\tcp.py", line 225, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
asyncio.exceptions.CancelledError

The same code, in main, executes Scheduler.retire_workers.
Neither branch calls Scheduler.close() if not after retire_workers().

@graingert

Copy link
Copy Markdown
Member

@graingert

Copy link
Copy Markdown
Member

Maybe on windows we could default to "asyncio_tcp" to avoid going via the tornado selector thread?

@graingert

Copy link
Copy Markdown
Member

I opened a draft PR to try this out: #7532

@crusaderky

Copy link
Copy Markdown
Collaborator

Blocked by tornadoweb/tornado#3173

@AWangHDA

AWangHDA commented Jul 5, 2023

Copy link
Copy Markdown

Hi @fjetter, it looks like tornadoweb/tornado#3173 has been resolved. Is it possible to get this PR merged?

I've been following this issue #7492, and it's no longer a problem if I copy the change
made in this PR e.g.

elif event_loop in {"asyncio", "tornado"}:
    if WINDOWS:
        # WindowsProactorEventLoopPolicy is not compatible with tornado 6
        # fallback to the pre-3.8 default of Selector
        # https://github.com/tornadoweb/tornado/issues/2608
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
else:
    raise ValueError(
        "Expected distributed.admin.event-loop to be in ('asyncio', 'tornado', 'uvloop'), got %s"
        % dask.config.get("distributed.admin.event-loop")

to

elif event_loop not in {"asyncio", "tornado"}:
    raise ValueError(
        "Expected distributed.admin.event-loop to be in ('asyncio', 'tornado', 'uvloop'), got %s"
        % dask.config.get("distributed.admin.event-loop")

@fjetter fjetter force-pushed the tornado_proactor branch from 5cb4966 to 3783ab2 Compare July 5, 2023 09:19
@fjetter

fjetter commented Jul 5, 2023

Copy link
Copy Markdown
Member Author

I rebased the PR and we can have a look

@fjetter

fjetter commented Jul 5, 2023

Copy link
Copy Markdown
Member Author

I see that the tornado changes are not, yet, released. We likely need that new version. Possibly we'll even need to implement some compat layer then. I don't have a problem bumping the minimal version of tornado but maybe not to the most recent one.

cc @graingert can you chime in and take a look?

@graingert

Copy link
Copy Markdown
Member

@fjetter even with the new tornado code, although the AddThreadSelectorEventLoop is correctly closed when asyncio.run is used, tornado still sets an atexit hook that also closes the loop. Because we need the loop running when our atexit callback is run for Client.close we need to disable the tornado hook.

I've pushed a commit to try this out.

Comment thread distributed/utils.py Outdated
@graingert

graingert commented Jul 5, 2023

Copy link
Copy Markdown
Member

@crusaderky I'm getting:

FAILED distributed/tests/test_worker_metrics.py::test_send_metrics_to_scheduler - AssertionError: assert [('execute', ...'count'), ...] == [('execute', ...econds'), ...]
  At index 5 diff: ('get-data', 'memory-read', 'count') != ('gather-dep', 'decompress', 'seconds')
  Full diff:
    [
     ('execute', None, 'x', 'deserialize', 'seconds'),
     ('execute', None, 'x', 'thread-cpu', 'seconds'),
     ('execute', None, 'x', 'thread-noncpu', 'seconds'),
     ('execute', None, 'x', 'executor', 'seconds'),
     ('execute', None, 'x', 'other', 'seconds'),
  +  ('get-data', 'memory-read', 'count'),
  +  ('get-data', 'memory-read', 'bytes'),
  +  ('get-data', 'serialize', 'seconds'),
  +  ('get-data', 'compress', 'seconds'),
  +  ('get-data', 'network', 'seconds'),
     ('gather-dep', 'decompress', 'seconds'),
     ('gather-dep', 'deserialize', 'seconds'),
     ('gather-dep', 'network', 'seconds'),
     ('gather-dep', 'other', 'seconds'),
  -  ('get-data', 'memory-read', 'count'),
  -  ('get-data', 'memory-read', 'bytes'),
  -  ('get-data', 'serialize', 'seconds'),
  -  ('get-data', 'compress', 'seconds'),
     ('execute', None, 'x', 'memory-read', 'count'),
     ('execute', None, 'x', 'memory-read', 'bytes'),
  -  ('get-data', 'network', 'seconds'),
    ]

which I don't understand, do we expect the order of these operations to change if selector calls are deferred to a thread with AddThreadSelectorEventLoop?

@crusaderky

Copy link
Copy Markdown
Collaborator

@graingert
The error is saying that, in these lines of get_data,

with context_meter.meter("network", func=time) as m:
compressed = await comm.write(msg, serializers=serializers)
response = await comm.read(deserializers=serializers)

  • in main, the response from get_data_from_worker won't arrive until after it's finished decompressing etc.
  • in your PR, the response arrives earlier.

It looks like an (inconsequential) race condition to me. It's ok to change the test to compare sets.

@fjetter fjetter closed this Jul 10, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

SelectorEventLoop on Windows does not support subprocesses Import of Dask prevents async subprocess management on Windows

4 participants