from concurrent.futures import ProcessPoolExecutor
pool = ProcessPoolExecutor(max_workers=2)Diagnosing an issue with plugin modules, process pools, and the import system
This is a walkthrough of the rather surprising mechanics underlying a failing test that brings together plugin modules (that register themselves on import), process pools, and Python’s import system.
What went wrong
It starts with a failed test:
pat = r"\[\d, \d+.\d+, \d+.\d+, \d+.\d+, '\d\d:\d\d'\]"
test_stdout(lambda: learn.fit(1), pat, regex=True)The idea is to test that calling learn.fit(1) writes text to stdout (standard output) that matches the regex pattern pat. For those less familiar, this form of testing is common in projects that use nbdev.
Here’s what actually happens:
- The
nbprocess_testcommand creates aProcessPoolExecutorwith some number of workers and tasks (each notebook being a separate task). - Worker 1 processes one task, say task 1, which creates an IPython
InteractiveShell, then runsimport fastai.callbacks.progresswhich addsProgressCallbackto the the variablefastcore.basics.defaults. - Worker 1 processes another task, say task 3, which creates a fresh
InteractiveShell, callslearner.fit, and testsstdout. SinceProgressCallbackhas been registered in this process by task 1, a progress bar is also printed tostdout, breaking the test.
Let’s break down the underlying mechanics. There are three behaviours that come together to cause this sequence of events:
ProcessPoolExecutors reuse processes. It seems obvious in hindsight since that’s how pools usually work, but I had never realised it until now. In the example above, worker 1 executes task 1 and task 3 in the same process.fastaicallbacks register themselves on import. In this case,fastai.callbacks.progressaddsProgressCallbacktodefaults.callbacks.- Changes to imported modules persist across IPython
InteractiveShells.nbprocess_testruns each test in parallel usingexecnb, which implements a sub-class ofInteractiveShell.
Next, we’ll verify these behaviours with tiny experiments. I highly recommend using tiny experiments to understand complex systems.
ProcessPoolExecutors reuse processes
Perhaps we should know this simply from the name, but I didn’t, so we’ll figure it out with a tiny experiment. Start by creating a pool with 2 max_workers:
There aren’t any processes in the pool yet:
pool._processes{}
Submit a task: the function os.getpid, which will return the process ID of the worker that runs it. Since there are no processes in the pool, submit will start a new worker process, and have it execute the task. ProcessPoolExecutor.submit returns a Future object, and Future.result returns the task’s return value:
import os
future = pool.submit(os.getpid)
future.result()45907
No matter how many times you manually rerun the above cell, it will aways be executed on the same process. Notice that the process is now also available in the pool:
pool._processes{45907: <SpawnProcess name='SpawnProcess-1' pid=45907 parent=45899 started>}
If we submit another task:
future = pool.submit(os.getpid)
future.result()45907
…it’s still executed on the same process.
Let’s try executing two processes at the same time:
futures = [pool.submit(os.getpid) for _ in range(2)]
[o.result() for o in futures][45907, 45907]
Weird. They’re both executed on the same process…
pool._processes{45907: <SpawnProcess name='SpawnProcess-1' pid=45907 parent=45899 started>,
45908: <SpawnProcess name='SpawnProcess-2' pid=45908 parent=45899 started>}
It looks like another process was started! I haven’t confirmed this, but I suspect that when we submitted two futures, the pool determined that it needed more workers, so it started another. However, the first worker’s task ended before the second worker started up, so the first worker processed both.
Since we instantiated the pool with 2 max_workers, these two processes will execute all tasks, no matter how many we submit:
futures = [pool.submit(os.getpid) for _ in range(10)]
[o.result() for o in futures][45907, 45907, 45907, 45907, 45907, 45907, 45907, 45907, 45907, 45907]
Shutdown the pool to free up any resources:
pool.shutdown()fastai callbacks register themselves on import
This one is easy to demonstrate. defaults has no callbacks attribute to start with:
from fastcore.basics import defaults
defaultsnamespace(cpus=4)
defaults.callbacks is populated after importing ProgressCallback:
from fastai.callback.progress import ProgressCallback
defaultsnamespace(cpus=4,
benchmark=True,
use_cuda=None,
activation=torch.nn.modules.activation.ReLU,
callbacks=[fastai.callback.core.TrainEvalCallback,
fastai.learner.Recorder,
fastai.learner.CastToTensor,
fastai.callback.progress.ProgressCallback],
lr=0.001)
Changes to imported modules persist across IPython InteractiveShells
Why is any of the above a problem? Didn’t we say that nbprocess_test creates a separate shell for each notebook? Yes it does, but it turns out that changes to imported modules persist across shells.
from execnb.shell import CaptureShellFirst make sure that CaptureShell doesn’t have a foo attribute - this will make sense in a second:
assert not hasattr(CaptureShell, 'foo')Now add the foo attribute:
CaptureShell.foo = 'bar'We can see foo inside a CaptureShell:
shell = CaptureShell()
shell.run('from execnb.shell import CaptureShell; CaptureShell.foo')
shell.result'bar'
This happens because when we first imported from the execnb.shell module it was cached in sys.modules:
import sys
sys.modules['execnb.shell']<module 'execnb.shell' from '/Users/seem/code/execnb/execnb/shell.py'>
In fact, sys.modules['execnb.shell'].CaptureShell is another name for CaptureShell:
sys.modules['execnb.shell'].CaptureShell is CaptureShellTrue
Python caches imports to speed up consecutive imports from the same modules. InteractiveShell (and its sub-classes) have the same sys.modules which causes this behaviour:
shell = CaptureShell()
shell.run("import sys; sys.modules['execnb.shell'].CaptureShell.foo")
shell.result'bar'
exec with empty globals and locals uses the same sys.modules too so it doesn’t avoid the issue:
exec("import sys; print('execnb.shell' in sys.modules)", {}, {})True
exec("from execnb.shell import CaptureShell; print(CaptureShell.foo)", {}, {})bar
In the end, we agreed that the test itself was broken, because it made assumptions about its environment without ensuring that they were true. Tests like this may fail - regardless of the above behaviour - if we import any module that registers a callback. For example, this fails too:
from fastai.callback.progress import *
from nbprocess.test import test_nb
test_nb('nbs/nb_with_failing_test.ipynb')The fix is to explicitly run learner.fit with the precise list of callbacks required.