Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion Lib/multiprocessing/popen_fork.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,17 @@ def _launch(self, process_obj):
code = 1
parent_r, child_w = os.pipe()
child_r, parent_w = os.pipe()
self.pid = os.fork()
# gh-146313: Tell the resource tracker's at-fork handler to keep
# the inherited pipe fd so this child reuses the parent's tracker
# (gh-80849) rather than closing it and launching its own.
from .resource_tracker import _fork_intent
_fork_intent.preserve_fd = True
try:
self.pid = os.fork()
finally:
# Reset in both parent and child so the flag does not leak
# into a subsequent raw os.fork() or nested Process launch.
_fork_intent.preserve_fd = False
if self.pid == 0:
try:
atexit._clear()
Expand Down
88 changes: 83 additions & 5 deletions Lib/multiprocessing/resource_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import signal
import sys
import threading
import time
import warnings
from collections import deque

Expand Down Expand Up @@ -75,6 +76,10 @@ def __init__(self):
# The reader should understand all formats.
self._use_simple_format = False

# Set to True by _stop_locked() if the waitpid polling loop ran to
# its timeout without reaping the tracker. Exposed for tests.
self._waitpid_timed_out = False

def _reentrant_call_error(self):
# gh-109629: this happens if an explicit call to the ResourceTracker
# gets interrupted by a garbage collection, invoking a finalizer (*)
Expand All @@ -87,16 +92,50 @@ def __del__(self):
# making sure child processess are cleaned before ResourceTracker
# gets destructed.
# see https://github.com/python/cpython/issues/88887
self._stop(use_blocking_lock=False)
# gh-146313: use a timeout to avoid deadlocking if a forked child
# still holds the pipe's write end open.
self._stop(use_blocking_lock=False, wait_timeout=1.0)

def _after_fork_in_child(self):
# gh-146313: Called in the child right after os.fork().
#
# The tracker process is a child of the *parent*, not of us, so we
# could never waitpid() it anyway. Clearing _pid means our __del__
# becomes a no-op (the early return for _pid is None).
#
# Whether we keep the inherited _fd depends on who forked us:
#
# - multiprocessing.Process with the 'fork' start method sets
# _fork_intent.preserve_fd before forking. The child keeps the
# fd and reuses the parent's tracker (gh-80849). This is safe
# because multiprocessing's atexit handler joins all children
# before the parent's __del__ runs, so by then the fd copies
# are gone and the parent can reap the tracker promptly.
#
# - A raw os.fork() leaves the flag unset. We close the fd so
# the parent's __del__ can reap the tracker without waiting
# for us to exit. If we later need a tracker, ensure_running()
# will launch a fresh one.
self._lock._at_fork_reinit()
self._reentrant_messages.clear()
self._pid = None
self._exitcode = None
if (self._fd is not None and
not getattr(_fork_intent, 'preserve_fd', False)):
try:
os.close(self._fd)
except OSError:
pass
self._fd = None

def _stop(self, use_blocking_lock=True):
def _stop(self, use_blocking_lock=True, wait_timeout=None):
if use_blocking_lock:
with self._lock:
self._stop_locked()
self._stop_locked(wait_timeout=wait_timeout)
else:
acquired = self._lock.acquire(blocking=False)
try:
self._stop_locked()
self._stop_locked(wait_timeout=wait_timeout)
finally:
if acquired:
self._lock.release()
Expand All @@ -106,6 +145,10 @@ def _stop_locked(
close=os.close,
waitpid=os.waitpid,
waitstatus_to_exitcode=os.waitstatus_to_exitcode,
monotonic=time.monotonic,
sleep=time.sleep,
WNOHANG=os.WNOHANG,
wait_timeout=None,
):
# This shouldn't happen (it might when called by a finalizer)
# so we check for it anyway.
Expand All @@ -122,7 +165,30 @@ def _stop_locked(
self._fd = None

try:
_, status = waitpid(self._pid, 0)
if wait_timeout is None:
_, status = waitpid(self._pid, 0)
else:
# gh-146313: A forked child may still hold the pipe's write
# end open, preventing the tracker from seeing EOF and
# exiting. Poll with WNOHANG to avoid blocking forever.
deadline = monotonic() + wait_timeout
delay = 0.001
while True:
result_pid, status = waitpid(self._pid, WNOHANG)
if result_pid != 0:
break
remaining = deadline - monotonic()
if remaining <= 0:
# The tracker is still running; it will be
# reparented to PID 1 (or the nearest subreaper)
# when we exit, and reaped there once all pipe
# holders release their fd.
self._pid = None
self._exitcode = None
self._waitpid_timed_out = True
return
delay = min(delay * 2, remaining, 0.1)
sleep(delay)
except ChildProcessError:
self._pid = None
self._exitcode = None
Expand Down Expand Up @@ -308,12 +374,24 @@ def _send(self, cmd, name, rtype):

self._ensure_running_and_write(msg)

# gh-146313: Per-thread flag set by .popen_fork.Popen._launch() just before
# os.fork(), telling _after_fork_in_child() to keep the inherited pipe fd so
# the child can reuse this tracker (gh-80849). Unset for raw os.fork() calls,
# where the child instead closes the fd so the parent's __del__ can reap the
# tracker. Using threading.local() keeps multiple threads calling
# popen_fork.Popen._launch() at once from clobbering eachothers intent.
_fork_intent = threading.local()

_resource_tracker = ResourceTracker()
ensure_running = _resource_tracker.ensure_running
register = _resource_tracker.register
unregister = _resource_tracker.unregister
getfd = _resource_tracker.getfd

# gh-146313: See _after_fork_in_child docstring.
if hasattr(os, 'register_at_fork'):
os.register_at_fork(after_in_child=_resource_tracker._after_fork_in_child)


def _decode_message(line):
if line.startswith(b'{'):
Expand Down
Loading
Loading