Python 的多进程编程

标签:Python

在并发编程的时候,多线程和多进程是经常会被使用的两种模式(此外还有协程等)。由于 CPython 的 GIL 限制(Jython 和 IronPython 没有 GIL,PyPy 在尝试去掉 GIL),只有获取了 GIL 的线程才能使用 CPU,所以除了需要处理一些可能会阻塞的 IO(读写文件、访问网络等)之外,基本没人会去使用 CPython 的多线程。因此,本文就来说说更有用的 Python 多进程编程。

注:
  1. 本文描述的环境为 Linux 操作系统(也适用于大部分 POSIX 系统)下的 CPython,可能不适用于 Windows 操作系统或其他的 Python 实现。
  2. 为了避免歧义,下文用「主进程」或「当前进程」表示创建子进程的那个进程,而不使用「父进程」,除非明确指明了「xx 的父进程」。

与多线程编程时需要传递一个 callable 对象不同的是,多进程编程的时候,是将主进程复制到子进程,并不能直接要求子进程执行某个 callable 对象。
在 POSIX 系统中,这个复制操作是由 clone()fork() 系统调用来完成的,一般主要使用后者。
如果 fork() 执行成功的话,会分别在主进程和子进程中返回子进程的 PID0,然后执行代码就开始不同了。如果失败的话(内存不够、PID 达到上限等),子进程就不会创建,主进程会返回 -1errno 会被设置为对应的错误码。
在 CPython 的实现中,os.fork() 主要是对 fork() 函数的封装,差别在于失败时抛出 OSError,该异常的 errno 属性是对应的错误码。
因此,多进程的 Python 代码大致会长成这样:
import os

try:
    pid = os.fork()
    if pid == 0:  # 子进程
        # 子进程的代码
    else:  # 主进程
        # 主进程的代码
except OSError:
    # 主进程处理 fork 失败的代码
如果想在子进程中执行一个 callable 的对象,直接在子进程的代码中调用它就行了。但是因为子进程是主进程的拷贝,所以并不是一个干净的环境,还是可能会出现一些问题的。

于是再来说明一下 fork() 函数有哪些注意事项吧:
  • 设置子进程自己的 PID,将其 PPID 设为主进程的 PID,而 PGID、SID、UID、GID、CWD 和 UMASK 则保持和主进程一致。
  • 复制主进程的所有地址空间到子进程。也就是说,子进程可以直接使用主进程的变量,包括 import 过的模块等。但是这种复制是写时拷贝(copy on write),也就是一旦你对它进行了改动,它和主进程中的变量就不是同一个了,因此表现出来的就像是值拷贝一样。
  • 复制主进程的环境变量到子进程。
  • 复制主进程的所有文件描述符(file descriptors)到子进程。文件描述符的状态在这些进程间是共享的,所以在一个进程中读写文件,会影响另一个进程的文件打开位置。但是在一个进程中关闭文件描述符,并不影响另一个进程对应的文件描述符。
  • 复制主进程的信号处理函数(由 signal.signal() 注册)、信号唤醒 fd(由 signal.set_wakeup_fd() 设置),是否重新启动被信号中断的系统调用(由 signal.siginterrupt() 设置),以及阻塞哪些信号(由 signal.pthread_sigmask() 设置)到子进程。但是,定时器(由 signal.setitimer() 设置)只对当前进程有效,不复制到子进程。
  • 保留通过 atexit()on_exit() 注册的函数。
  • 对于多线程程序而言,只有调用 fork() 的线程被复制到子进程,变成一个单线程的新进程。
  • 异步 IO 被取消。

一般来说,子进程会想要一个相对干净的环境,因此可以在 fork() 之后关闭不需要的 fd,取消注册不需要的信号处理函数,以及设置新的 SID 等。守护进程基本就是这样实现的。

有几个特殊的 fd 是需要特别注意的:0、1 和 2,它们分别代表 stdinstdoutstderr
如果不关闭的话,子进程就会使用主进程的标准输入输出。这有可能会带来麻烦,因此在子进程中,经常会将它们重定向到 /dev/null 或磁盘文件。
Python 的 sys 模块中定义了 stdinstdoutstderr 这 3 个变量。如果只是想临时性地重定向它们,将其赋值为新的文件对象即可;如果要恢复的话,sys 模块中还定义了 __stdin____stdout____stderr__ 这 3 个变量,将其赋值回来即可:
try:
    with open('/tmp/stdout', 'wb') as stdout:
        sys.stdout = stdout
        print 'this will be written to file'
finally:
    sys.stdout = sys.__stdout__
但这样做只在 Python 中有效,一旦子进程跳出了 Python 解释器的范围,这个设置就失效了。
在 C 语言中,正确的做法是关闭该 fd,打开一个新的 fd,将其 dup2() 到原 fd,再关闭新的 fd。而在 Python 中,上述的几个文件变量的 close() 方法并不会关闭对应的 fd,而只是将这个文件对象标记为 closed,对应的 fd 设为 None。于是只能模拟 C 的实现了:
def redirect_fd(fd, path, mode):
    try:
        os.close(fd)
    except OSError:
        pass
    new_fd = os.open(path, mode)
    if new_fd != fd:
        os.dup2(new_fd, fd)
        os.close(new_fd)

redirect_fd(0, os.devnull, os.O_RDONLY)
redirect_fd(1, '/tmp/stdout', os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
redirect_fd(2, '/tmp/stderr', os.O_WRONLY | os.O_CREAT | os.O_TRUNC)

另一种常见的做法是打开一个管道(PIPE),将其作为子进程的输入输出,使得主进程可以很容易地利用管道与子进程通信:
r, w = os.pipe()
pid = os.fork()
if pid == 0:
    os.close(w)  # 如果有必要的话,可以重定向到标准输入输出
    print os.read(r, 1024)
else:
    os.close(r)
    os.write(w, 'test')
这种做法非常常见,Linux 的 popen() 函数也是据此实现的,Python 也提供了基于此原理的 os.popen() 函数和 subprocess 模块。

除了标准输入输出外,还需要特别注意有没有使用一些网络库,例如 Requestssession 需要 close() 才会回收连接池,logging 模块中的 handlers 需要 close() 才会关闭 fd。也有一种粗暴的做法,就是将所有 fd
一股脑全关闭:
MAX_FD = os.sysconf("SC_OPEN_MAX")  # 或者 resource.getrlimit(resource.RLIMIT_NOFILE)[0]
os.closerange(3, MAX_FD)

再来说下信号处理。
信号(signal)是操作系统提供的一种进程间异步通知的方式。信号可以由 kill() 系统调用(Python 将其封装成了 os.kill() 函数)产生,也可能由内核根据事件触发。
内核给进程发出信号后,该进程会中断正常的执行流,转向执行该信号的处理函数(signal handler);执行完毕后,又回到正常的执行流。要注意的是,所有非原子的指令(可以理解为单条机器指令)都可以被信号中断,因此在执行信号处理函数时,也可能会收到新的信号,从而再次中断该进程,执行新的信号处理函数。
而在 Python 环境下,信号处理函数是被封装过的,并不会被直接执行。Python 解释器在接收到信号后,只是将其处理函数放到一个队列里,并标记有函数等待执行就返回了。当 Python 解释器开始执行下一条字节码指令时,会检查有没有需要执行的函数,然后看是不是在主线程中。是的话,就执行信号处理函数;否则就进行线程切换,然后继续之前的检查。这也就意味着在 Python 环境下,信号只能在 Python 解释器的原子指令(可以理解为单条 Python 字节码指令)间中断,所以 C 函数的调用、获取列表的 slice 这种可能会执行很久的操作也不会被自定义的信号处理函数中断,并且只有主线程才会执行信号处理函数。

每个线程有个自己的信号掩码(signal mask)。信号产生后,内核会设置对应进程(或线程)的该信号位有信号了。如果进程暂时不想处理这个信号的话,可以通过 sigprocmask()pthread_sigmask() 函数设置自己的信号掩码(signal mask)。内核检查到这个信号掩码被设置了,就会阻塞(block)该信号,而将其置于 pending 状态。此外,当正在执行信号处理函数时,内核会自动阻塞掉该信号,以避免重复调用,除非这个信号的 sigaction.sa_mask 设置了 SA_NODEFER 位。当进程解除阻塞信号时,处于 pending 状态的信号会继续传达给该进程。Python 3.3 起提供了 signal.pthread_sigmask() 函数来封装这个系统调用。

进程可以通过 sigaction()signal() 系统调用来设置信号的处理函数,Python 中则将其封装成了 signal.signal() 函数。
信号处理函数应该写成可重入的(reentrant,也就是可以在执行的过程中被任意中断,重新执行它不会有任何问题)。假如进程在执行时被信号中断了,它原本依赖的一些数据无征兆地被信号处理函数改写了,这个进程就可能就会出现各种未定义的 bug。
要避免这个问题的话,你有两种选择:
  1. 在信号处理函数中,只调用 async-signal-safe 的函数,且对于全局变量(含静态变量)的操作是可重入的。
  2. 在整个程序中,一旦调用非 async-signal-safe 的函数,或者操作可能被信号处理函数使用的全局变量时,先阻塞信号(即不会有信号处理函数被调用)。
一般来说,你肯定会选择前者,否则你需要写很多防御性的代码。
更安全的做法是在信号处理函数中不调用任何函数,只改动 volatile std::sig_atomic_t 类型的全局变量。其中,volatile 会告诉编译器这个变量可能在任意时间被当前作用域之外的代码改动,不能对其做某些优化(比如编译器已经将这个变量的值放入寄存器了,编译器会假定寄存器中的值和内存中的值是一样的,它并不能从当前的代码中预知这个值可能已经被信号处理函数或其他线程修改过了);sig_atomic_t 则是 C++ 定义的一个 typedef,一般会用 int 来实现,主要是能保证对这个变量的操作是原子性的(例如在 32 位的系统上操作一个 64 位的整数,可能会分成 2 条指令来执行)。
虽然 Python 的信号处理函数会被延迟调用,但要求基本也是差不多的。而且 Python 并没有办法定义 volatile 的全局变量,使用锁也不一定是安全的(因为信号处理函数是在主线程执行的,而被中断的线程可能就是主线程),因此最好不要操作全局变量。如果想输出点什么的话,使用 logging 模块是不安全的;实在需要的话,os.write()socket.send()socket.sendto() 是安全的,但是要自行处理被信号中断而导致数据没有完整写入的情况。

除了自定义信号处理函数外,还有两个常量可以使用:SIG_IGNSIG_DFL,前者表示忽略该信号,后者表示使用系统默认的行为。
信号的默认处理行为各不相同,有些会被忽略(如 SIGCHLD),有些会导致进程停止(如 SIGSTOP),有的可以使进程恢复运行(如 SIGCONT),有些会终止进程(如 SIGTERM)等。大多数的信号都可以通过设置信号处理函数来捕获,但是 SIGSTOPSIGKILL 是不能被捕获、忽略和阻塞的。

需要注意的是,系统调用也是可能被信号中断的,这可能会导致一些不必要的麻烦。Linux 允许在调用 sigaction() 时,设置 sa_flagsSA_RESTART 位;这样当被该信号中断时,执行完信号处理函数后,会自动重试这个系统调用。但这个重试机制只对部分系统调用有效,具体可以看 signal 手册的《Interruption of system calls and library functions by signal handlers》部分。
此外,Linux 还提供了 siginterrupt() 函数来修改这个设置。尽管该函数已经被摒弃了,但因为 Python 在调用 signal.signal() 时不设置 SA_RESTART,所以只好提供一个 signal.siginterrupt() 函数来曲线救国:
signal.signal(signal.SIGALRM, handle_alarm)
signal.siginterrupt(signal.SIGALRM, False)
当然,捕捉系统调用的异常,看 errno 是否为 EINTR,再自行重试也是可以的(下文会展示其代码)。

有了以上知识,就已经能正确处理子进程了。但是这样的子进程只能执行 Python 代码,并不是很够用。
为此 Linux 提供了 exec*() 这一系列的函数,用于执行一个可执行文件。这个家族一共有 6 个函数,差别在于是否传递环境变量,是否使用 PATH 环境变量来定位可执行文件,以及参数是用数组还是不定参数。Python 则将其封装成了 8 个 os.exec*() 函数。
成功执行该类函数后,当前进程的镜像(image)将被替换为新的进程的镜像,继承原进程的 UID 和 GID 等,需要注意的主要有:
  • 打开的文件描述符不会被刷新。
  • 设置的信号处理函数恢复为 SIG_DFL,但被设为 SIG_IGN 的保持不变。信号的阻塞也保持不变。
  • 定时器被取消。
  • 共享内存被取消映射。
  • 通过 atexit()on_exit() 注册的函数被取消。
  • 异步 IO 被取消。
  • 除当前线程外,该进程的其他线程都被销毁。

值得一提的是,os.execv() 的第二个参数 args 列表的第 0 号元素和 C 语言的 main() 函数一样,表示进程的执行文件名,但大部分程序其实会忽略它。例如执行 ls /,这两种写法都可以:
os.execv('/bin/ls', ['/bin/ls', '/'])
os.execv('/bin/ls', ['balabala', '/'])
对于较长的命令,解析参数会比较麻烦,可以用 shlex.split() 函数来分割。
如果要省事的话,也可以直接使用 sh -c 来执行:
os.execv('/bin/sh', ['/bin/sh', '-c', 'ls / && sleep 1 && ls /tmp'])
弊端就是多起了一个 sh 进程,而且如果这个 sh 进程意外挂掉的话,是拿不到它的子进程的退出状态的,甚至可能丧失了对其的控制(因为不知道子进程的 PID)。

另外,可以用 fcntl 设置文件描述符的 close-on-exec 标志,这样在执行 exec*() 家族的系统调用后,该文件描述符会被自动关闭:
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)

因为 fork() + exec() 是个很常用的模式,Linux 将其封装成了 posix_spawn() 函数,此外,Linux 的 system() 函数、daemon() 函数和 Python 的 subprocess 模块都是用这个模式实现的。

再来看如何处理子进程的退出。
子进程执行结束时,会进入僵尸(zombie)状态,它的父进程会收到 SIGCHLD 信号。僵尸状态的子进程其实已经执行结束了,所以用 kill -SIGKILL 也是杀不掉的,只能由它的收割者(reaper)通过 waitpid()wait()waitid() 系统调用来获取子进程的退出状态并清理。这个收割者一般是该进程的父进程,如果这个父进程先退出了,它的父进程会被操作系统重设为 1 号进程(Linux 下为 init 进程)。
在 Python 中,这些系统调用被封装成了 os.wait()os.waitpid()
其中,os.wait() 不接受参数,等待直到任意一个子进程结束。os.waitpid() 接受 (pid, options) 两个参数。它的 pid 参数如果大于 0,就等待该 pid 对应的子进程退出;如果是 0,就等待该进程组(相同 PGID)的任意子进程结束;如果是 -1,就等待任意子进程结束;如果小于 -1,则等待 PGID 为 -pid 的任意子进程结束。它的 options 参数可以是 0 或者以下常量的或值:WNOHANG 为如果没有结束的子进程也立刻返回,WUNTRACED 为如果有停止状态(可以使用 ctrl + z 来发送 SIGSTOP 信号)的子进程也返回,WCONTINUED 为如果有停止状态的子进程恢复运行了也返回。
它们的返回值是一个由 (pid, status) 组成的元组。其中,pid 是子进程的 PID,如果设置了 WNOHANG 且没有子进程退出的话,会是 0。而 status 是子进程的退出状态,是一个 16 位的无符号整数,高 8 位是退出码,低 8 位是杀死它的信号。可以用 exit_code, kill_signal = divmod(exit_status, 256) 来获取它们,os 模块也提供了一些函数来检查进程是否正常退出等。
它们调用出错时会抛出 OSError,其 errno 属性为错误码。主要的错误码有:ECHILD 表示没有子进程,或者 pid 对应的不是当前进程的子进程,或者 SIGCHLD 信号的处理函数被设为了 SIG_IGNEINTR 表示等待的过程中被其他信号中断了。一般来说可以忽略 EINTR 并重试:
def wait_pid(pid, options):
    while True:
        try:
            return os.waitpid(pid, options)
        except OSError as e:
            if e.errno == errno.EINTR:
                continue
            raise
不过 Python 3 扩展了 OSError 的子类,提供了 InterruptedError,捕捉起来会更方便些。而从 Python 3.5 开始,根据 PEP 475,所有系统调用都会被自动重试,不需要捕捉了。

值得一提的是,一般情况下,只有某个进程的父进程才能拿到它的退出状态。如果这个进程的父进程已经退出了,也就无法得知其退出码了。
如果非要获取的话,可以使用 ptrace() 系统调用来成为这个进程的 tracer,然后就能通过 waitpid() 来获取退出码了。最常使用的 request 常量是 PTRACE_ATTACHPTRACE_SEIZE,前者会向 tracee 进程发送 SIGSTOP 信号,可以手动向其发送 SIGCONT 信号来恢复运行。
如果 ptrace() 调用失败的话,会返回 -1,并设置 errno。主要错误码有:ESRCH 表示进程不存在,EPERM 表示没有权限(例如 1 号进程和已经被 traced 过的进程等)。
Python 下有个叫 python-ptrace 的第三方库封装了这个调用。
此外,Linux 下的 prctl() 系统调用从 3.4 版开始提供了 PR_SET_CHILD_SUBREAPER 这个 option,如果设为 1 的话,会将当前进程的收割者设为离它最近的祖先进程(即如果它的父进程退出了,但爷进程还存活,则设为爷进程)。
Python 下也有个叫 python-prctl 的第三方库封装了这个调用。

进程退出的方式也是挺多的,除了被信号杀死外,在 main() 函数中返回,或者调用 exit() 函数和 _exit() 系统调用都可以退出进程。其中,exit() 函数会调用通过 atexit()on_exit() 注册的函数,刷新和关闭所有打开的 stdio 流,删除用 tmpfile() 创建的文件,调用 C++ 等语言的析构函数;后者则只是关闭所有打开的 fd。它们都接收一个 int status 参数,它的低 8 位(0 ~ 255)将作为进程的退出码。
Python 提供了 __builtin__.exit()sys.exit()os._exit() 这 3 个函数来退出。其中,os._exit() 是对 _exit() 的简单封装,用来立即退出进程;__builtin__.exit()sys.exit() 都是抛出一个 SystemExit 异常,如果没有被捕捉的话,就会退出当前线程(多线程程序一般使用它来退出)。
Python 的文档中提到建议只在子进程中使用 os._exit() 函数。我在用 Python 的 unittest 模块时发现它会捕捉 SystemExit 异常,并将其当成一个失败的用例,而非退出进程。这就导致了子进程将继续执行后面的单元测试,输出也因此变得乱七八糟。所以我建议至少单元测试中的子进程都用 os._exit() 来退出。

此外,还可以用 abort() 函数来进行非正常的退出。它会先解除阻塞 SIGABRT 信号,然后给当前进程发送 SIGABRT 信号。如果当前进程设置了 SIGABRT 信号的处理函数,除非该信号处理函数没有返回(例如使用 longjmp() 跳转走了),否则在执行完处理函数后,或者发现处理函数设为了 SIG_IGN 后,abort() 函数会将其设为 SIG_DFL,然后再次发送 SIGABRT 信号,以确保进程退出。
它的表现行为与 _exit() 很相似,但它会刷新和关闭所有打开的 stdio 流,并且 SIGABRT 信号的默认处理行为会产生 core dump。
Python 同样也提供了 os.abort() 来封装这个函数,但是并不会调用通过 signal.signal() 注册的 SIGABRT 信号处理函数。

这里顺带说下刷新和关闭文件吧。
POSIX 系统提供了非常多的函数用于刷新:
  • fflush() 函数:用于刷新 stdio 流(stream)。
    对于输出流,它强制将用户空间的缓冲区(buffer),通过这个流底层的 write() 函数写入到该流。对于关联到可定位(seekable)文件的输入流,它丢弃掉已经被底层文件读取(fetch),但还没被应用消费(consume)的 buffer(因为对于 seekable 的文件而言,只需要重新定位再读一次即可)。如果它的 FILE *stream 参数为 NULL,则刷新所有打开的流。简单来说,它的作用主要是将用户空间的缓冲区写入内核空间。
    但是用户空间的缓冲区是和所用的语言和库相关的,fflush() 刷新的只是 C 语言的 stdio 库创建的 FILE 类型的 buffer。对于 Python 而言,你应该使用 file.flush() 方法;如果没有输出缓冲区的话,直接使用 os.write()socket.send() 就能写入内核空间了,不需要刷新。
  • fsync()fdatasync() 系统调用:用于将内核中缓存的对文件的修改写入磁盘。两者的区别在于后者不写入元数据(metadata)。它们都是 async-signal-safe 的函数。
    很显然,这两个函数都只对磁盘文件有效,不能用于刷新 socket。此外,磁盘也有自己的缓冲区,内核也只是将数据写入磁盘缓冲区而已,并不一定真正被写入磁盘了。macOS 的 fcntl() 系统调用提供了 F_FULLFSYNC 选项,可以要求将数据写入磁盘的永久存储设备中(只对 HFS、FAT 和 UDF 文件系统有效)。
    大部分的程序都不需要用到它们,因为只要不是断电等意外原因导致非正常关机的话,内核和磁盘驱动最终都会将文件写入磁盘的(尽管那时你可能已经关闭文件或者退出进程了)。要考虑到断电时的数据安全的话,主要也就是数据库之类的应用了。
    Python 提供了 os.fsync() 函数来封装 fsync() 系统调用。
  • aio_fsync() 函数:用于刷新异步 IO 操作到磁盘。
    功能类似于 fsync()fdatasync(2),但是只发出请求,不等待实际的操作完成就返回。
  • sync()syncfs() 系统调用:将文件系统的缓存写入磁盘。二者区别在于后者只刷新该 fd 所对应的文件的文件系统。
    很显然,这是两个很重的系统调用,一般不需要使用。
而 POSIX 的 close()_exit() 系统调用并不会进行任何刷新操作,fclose()exit()abort() 函数可以看成隐式地调用了 fflush()close()
对应到 Python 中,文件对象的 close() 方法会刷新用户空间的缓冲区;os.close()os._exit()os.abort() 与对应的 C 函数行为一致,并且因为 abort() 函数并不知道 Python 是怎样使用用户空间的缓冲区的,所以并不会刷新 Python 的文件对象;其他正常的退出方式会刷新用户空间的缓冲区,并进行其他的清理工作。

最后,我们来利用上述知识来实现一个需求:编写一个长期运行的程序,它每隔一段时间(如 5 秒)访问一次远程服务器,查询一次是否有新的命令。有的话就执行该命令,并且上报命令的执行结果和输出。不会有并发的任务需要执行,但对于执行时间较长的命令,需要每隔一段时间上报它当前的输出。

很显然,这个程序应该是一个多进程程序,使用子进程来执行该命令;如果是多线程程序的话,执行时很可能导致主线程挂掉。
于是先来看看这个主进程的架构:
class Daemon(object):
    def __init__(self):
        self._action_id = None
        self._done_action = False

    def run(self):
        while True:
            start_time = time.time()

            status = self.collect_status()
            if status and self.report_status(status) and self._done_action:
                self.clean_status()

            if not self._action_id:
                action = self.fetch_action()
                if action and self.accept_action(action):
                    self.handle_action(action)

            wait_time = FETCH_ACTION_INTERVAL - (time.time() - start_time)
            if wait_time > 0:
                self.wait(wait_time)

    def fetch_action(self):
        raise NotImplementedError

    def handle_action(self, action):
        raise NotImplementedError

    def accept_action(self, action):
        raise NotImplementedError

    def collect_status(self):
        raise NotImplementedError

    def report_status(self, status):
        raise NotImplementedError

    def clean_status(self):
        raise NotImplementedError

    @staticmethod
    def wait(wait_time):
        time.sleep(wait_time)

FETCH_ACTION_INTERVAL = 5
先来看 wait() 方法,如果子进程提前执行结束了,应该尽快上报结果,而不该傻傻地 sleep。考虑到 signal 模块有个定时器的功能,先用它来实现吧:
class Daemon(object):
    def run(self):
        try:
            self.register_signals()
            
            while True:
                start_time = time.time()

                status = self.collect_status()
                if status and self.report_status(status) and self._done_action:
                    self.clean_status()

                if not self._action_id:
                    action = self.fetch_action()
                    if action and self.accept_action(action):
                        self.handle_action(action)

                wait_time = FETCH_ACTION_INTERVAL - (time.time() - start_time)
                if wait_time > 0:
                    self.wait(wait_time)
        finally:
            self.deregister_signals()

    @staticmethod
    def ignore_signal(signum, frame):
        return

    def register_signals(self):
        signal.signal(signal.SIGALRM, self.ignore_signal)
        signal.signal(signal.SIGCHLD, self.ignore_signal)

    def deregister_signals(self):
        signal.setitimer(signal.ITIMER_REAL, 0)
        signal.signal(signal.SIGALRM, signal.SIG_DFL)
        signal.signal(signal.SIGCHLD, signal.SIG_DFL)

    def wait(self):
        signal.setitimer(signal.ITIMER_REAL, wait_time)
        signal.pause()
        signal.setitimer(signal.ITIMER_REAL, 0)
因为 signal.pause() 会在收到任意信号后继续运行,所以定时器的 SIGALRM 信号和子进程退出的 SIGCHLD 都可以唤醒它,也就实现了子进程退出时立刻唤醒,否则等待最多 5 秒的功能。但是我们并不知道这次唤醒是不是由 SIGALRM 信号造成的,所以安全起见还是清除一下定时器,避免它可能打断之后的系统调用。
然而这种实现仍然有不足之处,如果使用 IDE 来调试的话,暂停在 signal.pause() 这步可能导致错过定时器发出的 SIGALRM 信号,使得程序没法继续执行,有时还可能出现各种意外的退出情况。更可靠的方式是使用唤醒 fd,再用 select 等方式监听 fd 是否可读,并设置超时时间。为了使用比较高效的 select 模型,又不用考虑平台的可移植性(例如我经常在 macOS 和 Linux 之间切换),我采用了 selectors 标准库(Python 3.4 之前的版本可以使用 selectors2selectors34 这两个第三方库来替代):
class Daemon(object):
    def __init__(self):
        self._action_id = None
        self._done_action = False
        self._selector = None
        r, w = os.pipe()
        set_non_blocking(r)
        set_non_blocking(w)  # wakeup fd 需要是非阻塞的
        self._waker = r, w

    def register_signals(self):
        signal.signal(signal.SIGCHLD, self.ignore_signal)
        self._selector = selectors.DefaultSelector()
        self._selector.register(self._waker[0], selectors.EVENT_READ)
        signal.set_wakeup_fd(self._waker[1])

    def deregister_signals(self):
        signal.signal(signal.SIGALRM, signal.SIG_DFL)
        signal.set_wakeup_fd(-1)
        if self._selector:
            self._selector.close()
            self._selector = None

    def wait(self, wait_time):
        ready = self._selector.select(wait_time)
        if ready:
            waker = self._waker[0]
            # 读完并忽略所有的数据,避免 _waker[1] 被阻塞而无法写入
            try:
                while True:
                    data = os.read(waker, BUF_SIZE)
                    if not data or len(data) < BUF_SIZE:  # 没有更多数据了
                        break
            except OSError:  # 忽略 EAGAIN 和 EINTR
                pass


def set_non_blocking(fd):
    flags = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)


BUF_SIZE = 1024
经过这样改造后,就没有上述缺点了。顺带还获得了一个好处,如果和远程服务器之间是长链接,那就可以监听该 socket 的 fd,一有消息就立刻跳到下个循环,加快 fetch_action() 的调用。

再来看子进程的实现:
class Worker(object):
    def __init__(self, action):
        try:
            os.setsid()
        except OSError:
            worker_logger.warning('failed to setsid, the worker should be created in a forked process')

        user_name = action.get('user_name')
        if not user_name:
            raise InvalidAction('user_name is required')
        if not isinstance(user_name, basestring):
            raise InvalidAction('user_name must be a string object')

        try:
            user = pwd.getpwnam(user_name)
        except KeyError:
            raise UserNotFound

        group_name = action.get('group_name')
        if group_name:
            if not isinstance(group_name, basestring):
                raise InvalidAction('group_name must be a string object')
            try:
                user_group = grp.getgrnam(group_name)
            except KeyError:
                raise GroupNotFound
            if user_name not in user_group.gr_mem:
                raise UserNotInGroup('user %s not in group %s' % (user_name, group_name))
            os.setgid(user_group.gr_gid)
        else:
            os.setgid(user.pw_gid)

        os.setuid(user.pw_uid)
        os.chdir(user.pw_dir)

        redirect_fd(0, os.devnull, os.O_RDONLY)  # 避免运行一些需要 TTY 的命令,如 top
        redirect_fd(1, STDOUT_FILE_PATH), os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
        redirect_fd(2, STDERR_FILE_NAME), os.O_WRONLY | os.O_CREAT | os.O_TRUNC)

        self.action = action

        self.save_worker_pid()

    def do_action(self):
        gc.disable()  # https://bugs.python.org/issue1336
        try:
            pid = os.fork()
        except OSError:
            worker_logger.exception('failed to fork executor for action ' + self.action.id)
            gc.enable()
            raise
        else:
            if pid == 0:  # executor
                worker_logger.handlers[0].close()
                os.execve('/bin/bash', ['/bin/bash', '--norc', '-c', self.action.execution], {})  # 使用干净的环境来执行
            else:  # worker
                gc.enable()
                result = wait_pid(pid, 0)
                executor_exit_status = result[1]
                self.save_executor_exit_status(exit_status)

    def save_worker_pid(self):
        raise NotImplementedError

    def save_executor_exit_status(self, exit_status):
        raise NotImplementedError


def redirect_fd(fd, path, mode):
    try:
        os.close(fd)
    except OSError:
        pass
    new_fd = os.open(path, mode)
    if new_fd != fd:
        os.dup2(new_fd, fd)
        os.close(new_fd)


def wait_pid(pid, options):
    while True:
        try:
            return os.waitpid(pid, options)
        except OSError as e:
            if e.errno == errno.EINTR:  # 忽略等待时被中断
                continue
            raise
这里先做的工作是设置 UID 和 GID 等,因为执行的用户和 woker 的运行用户可能不一样(后者一般是用 root 用户)。
而在 do_action() 里,worker 并没有直接调用 os.execve(),而是让它的子进程去执行,为的是执行完毕后有机会调用 save_executor_exit_status() 方法。这个方法和 save_worker_pid() 方法的实现我就不列出了,只需要写入文件即可。
为什么要将退出状态和  PID 写入文件,而不是直接通知 daemon 进程呢?因为 daemon 进程虽然是长期运行的,但是有可能因为意外情况退出,也可能需要重启。在它重启之后,新的 daemon 已经和正在运行的 worker 没有联系了,就只能通过文件读取到 worker 的 PID 和 executor 的退出状态,从而恢复运行。这也是我不用 subprocess 模块的原因,因为进程退出后 PIPE 就丢失了。FIFO 也是不可靠的,因为读出来后如果 daemon 进程崩溃了,数据也没法找回;甚至也有可能两个进程都崩溃了,这个 FIFO 就被销毁了。

最后,daemon 剩下的一些繁琐的功能实现我也不列出了,只把 handle_action() 实现一下:
class Daemon(object):
    def handle_action(self, action):
        gc.disable()
        try:
            pid = os.fork()
        except OSError:
            daemon_logger.exception('failed to create worker for action ' + action.id)
            gc.enable()
            # 保存 action exit status
        else:
            if pid == 0:  # worker
                self.deregister_signals()
                os.close(self._waker[0])
                os.close(self._waker[1])
                daemon_logger.handlers[0].close()
                gc.enable()
                exit_code = 0
                try:
                    worker = Worker(action)
                    worker.do_action()
                except SystemExit as e:
                    exit_code = e.code
                except:
                    exit_code = 1  # 根据实际的设置返回不同的错误码
                finally:
                    try:
                        logging.shutdown()  # 刷新未写入的日志
                    except:
                        pass
                    os._exit(exit_code)
            else:  # daemon
                gc.enable()
                # 保存 worker PID,因为 worker 有可能还没保存 PID 就退出了

0条评论 你不来一发么↓

    想说点什么呢?