I am writing a little application server for microservices written as compiled
binaries, and I would like to log execution statistics from getrusage(2)
.
The application server is written using asyncio, and processes are managed using asyncio subprocesses.
Unfortunately, asyncio uses os.waitpid
instead of os.wait4
to reap child
processes, and to get rusage information one has to delve into the asyncio
innards, and provide a custom ChildWatcher
implementation. Here's how I did
it:
import asyncio
from asyncio.log import logger
from contextlib import contextmanager
import os
class ExtendedResults:
def __init__(self):
self.rusage = None
self.returncode = None
class SafeChildWatcherWithRusage(asyncio.SafeChildWatcher):
"""
SafeChildWatcher that uses os.wait4 to also get rusage information.
"""
rusage_results = {}
@classmethod
@contextmanager
def monitor(cls, proc):
"""
Return an ExtendedResults that gets filled when the process exits
"""
assert proc.pid > 0
pid = proc.pid
extended_results = ExtendedResults()
cls.rusage_results[pid] = extended_results
try:
yield extended_results
finally:
cls.rusage_results.pop(pid, None)
def _do_waitpid(self, expected_pid):
# The original is in asyncio/unix_events.py; on new python versions, it
# makes sense to check changes to it and port them here
assert expected_pid > 0
try:
pid, status, rusage = os.wait4(expected_pid, os.WNOHANG)
except ChildProcessError:
# The child process is already reaped
# (may happen if waitpid() is called elsewhere).
pid = expected_pid
returncode = 255
logger.warning(
"Unknown child process pid %d, will report returncode 255",
pid)
else:
if pid == 0:
# The child process is still alive.
return
returncode = self._compute_returncode(status)
if self._loop.get_debug():
logger.debug('process %s exited with returncode %s',
expected_pid, returncode)
extended_results = self.rusage_results.get(pid)
if extended_results is not None:
extended_results.rusage = rusage
extended_results.returncode = returncode
try:
callback, args = self._callbacks.pop(pid)
except KeyError: # pragma: no cover
# May happen if .remove_child_handler() is called
# after os.waitpid() returns.
if self._loop.get_debug():
logger.warning("Child watcher got an unexpected pid: %r",
pid, exc_info=True)
else:
callback(pid, returncode, *args)
@classmethod
def install(cls):
loop = asyncio.get_event_loop()
child_watcher = cls()
child_watcher.attach_loop(loop)
asyncio.set_child_watcher(child_watcher)
To use it:
from .hacks import SafeChildWatcherWithRusage
SafeChildWatcherWithRusage.install()
...
@coroutine
def run(self, *args, **kw):
kw["stdin"] = asyncio.subprocess.PIPE
kw["stdout"] = asyncio.subprocess.PIPE
kw["stderr"] = asyncio.subprocess.PIPE
self.started = time.time()
self.proc = yield from asyncio.create_subprocess_exec(*args, **kw)
from .hacks import SafeChildWatcherWithRusage
with SafeChildWatcherWithRusage.monitor(self.proc) as results:
yield from asyncio.tasks.gather(
self.write_stdin(self.proc.stdin),
self.read_stdout(self.proc.stdout),
self.read_stderr(self.proc.stderr)
)
self.returncode = yield from self.proc.wait()
self.rusage = results.rusage
self.ended = time.time()