I am writing a little application server for microservices written as compiled
binaries, and I would like to log execution statistics from getrusage(2)
.
The application server is written using asyncio, and processes are managed using asyncio subprocesses.
Unfortunately, asyncio uses os.waitpid
instead of os.wait4
to reap child
processes, and to get rusage information one has to delve into the asyncio
innards, and provide a custom ChildWatcher
implementation. Here's how I did
it:
import asyncio from asyncio.log import logger from contextlib import contextmanager import os class ExtendedResults: def __init__(self): self.rusage = None self.returncode = None class SafeChildWatcherWithRusage(asyncio.SafeChildWatcher): """ SafeChildWatcher that uses os.wait4 to also get rusage information. """ rusage_results = {} @classmethod @contextmanager def monitor(cls, proc): """ Return an ExtendedResults that gets filled when the process exits """ assert proc.pid > 0 pid = proc.pid extended_results = ExtendedResults() cls.rusage_results[pid] = extended_results try: yield extended_results finally: cls.rusage_results.pop(pid, None) def _do_waitpid(self, expected_pid): # The original is in asyncio/unix_events.py; on new python versions, it # makes sense to check changes to it and port them here assert expected_pid > 0 try: pid, status, rusage = os.wait4(expected_pid, os.WNOHANG) except ChildProcessError: # The child process is already reaped # (may happen if waitpid() is called elsewhere). pid = expected_pid returncode = 255 logger.warning( "Unknown child process pid %d, will report returncode 255", pid) else: if pid == 0: # The child process is still alive. return returncode = self._compute_returncode(status) if self._loop.get_debug(): logger.debug('process %s exited with returncode %s', expected_pid, returncode) extended_results = self.rusage_results.get(pid) if extended_results is not None: extended_results.rusage = rusage extended_results.returncode = returncode try: callback, args = self._callbacks.pop(pid) except KeyError: # pragma: no cover # May happen if .remove_child_handler() is called # after os.waitpid() returns. if self._loop.get_debug(): logger.warning("Child watcher got an unexpected pid: %r", pid, exc_info=True) else: callback(pid, returncode, *args) @classmethod def install(cls): loop = asyncio.get_event_loop() child_watcher = cls() child_watcher.attach_loop(loop) asyncio.set_child_watcher(child_watcher)
To use it:
from .hacks import SafeChildWatcherWithRusage SafeChildWatcherWithRusage.install() ... @coroutine def run(self, *args, **kw): kw["stdin"] = asyncio.subprocess.PIPE kw["stdout"] = asyncio.subprocess.PIPE kw["stderr"] = asyncio.subprocess.PIPE self.started = time.time() self.proc = yield from asyncio.create_subprocess_exec(*args, **kw) from .hacks import SafeChildWatcherWithRusage with SafeChildWatcherWithRusage.monitor(self.proc) as results: yield from asyncio.tasks.gather( self.write_stdin(self.proc.stdin), self.read_stdout(self.proc.stdout), self.read_stderr(self.proc.stderr) ) self.returncode = yield from self.proc.wait() self.rusage = results.rusage self.ended = time.time()