Python基于Select模型实现Popen输出

参考了pssh的代码,实现了下面一个小的框架,用于实现并发处理涉及到的IO返回的标准输出和错误输出。IOMap里面保存了所有可读句柄对应的call函数。一旦select返回就read返回然后相应的处理,直接看代码吧。

#!/usr/bin/env python
#coding=utf-8

import os
import signal
import select, sys, subprocess

BUFFER_SIZE = 1024

class IOMap(object):
    """A manager for file descriptors and their associated handlers.

    The poll method dispatches events to the appropriate handlers.
    """
    def __init__(self):
        self.readmap = {}
        self.writemap = {}

        wakeup_readfd, wakeup_writefd = os.pipe()
        self.register_read(wakeup_readfd, self.wakeup_handler)
        # TODO: remove test when we stop supporting Python <2.5
        if hasattr(signal, 'set_wakeup_fd'):
            signal.set_wakeup_fd(wakeup_writefd)
            self.wakeup_writefd = None
        else:
            self.wakeup_writefd = wakeup_writefd

    def register_read(self, fd, handler):
        """Registers an IO handler for a file descriptor for reading."""
        self.readmap[fd] = handler

    def register_write(self, fd, handler):
        """Registers an IO handler for a file descriptor for writing."""
        self.writemap[fd] = handler

    def unregister(self, fd):
        """Unregisters the given file descriptor."""
        if fd in self.readmap:
            del self.readmap[fd]
        if fd in self.writemap:
            del self.writemap[fd]

    def poll(self, timeout=None):
        """Performs a poll and dispatches the resulting events."""
        if not self.readmap and not self.writemap:
            return
        rlist = list(self.readmap)
        wlist = list(self.writemap)
        try:
            rlist, wlist, _ = select.select(rlist, wlist, [], timeout)
        except select.error:
            _, e, _ = sys.exc_info()
            errno = e.args[0]
            if errno == EINTR:
                return
            else:
                raise
        for fd in rlist:
            handler = self.readmap[fd]
            handler(fd, self)
        for fd in wlist:
            handler = self.writemap[fd]
            handler(fd, self)

    def wakeup_handler(self, fd, iomap):
        """Handles read events on the signal wakeup pipe.

        This ensures that SIGCHLD signals aren't lost.
        """
        try:
            os.read(fd, READ_SIZE)
        except (OSError, IOError):
            _, e, _ = sys.exc_info()
            errno, message = e.args
            if errno != EINTR:
                sys.stderr.write('Fatal error reading from wakeup pipe: %s\n'% message)
                raise FatalError

class Task(object):
    """docstring for Task"""
    def __init__(self,):
        self.stdout = ""
        self.stderr = ""

    def run(self, io_map):
        """run command in Popen

        print stdout and stderr
        """
        test_pipe = subprocess.Popen('i=0;while [ $i -lt 1000 ];do echo "hello world $i";abcd;let i++;done', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        self.stdout = test_pipe.stdout
        self.stderr = test_pipe.stderr

        io_map.register_read(self.stdout.fileno(), self.handle_stdout)
        io_map.register_read(self.stderr.fileno(), self.handle_stderr)
        while True:
            try:
                io_map.poll()
            except KeyboardInterrupt:
            # This exception handler doesn't print out any fancy status
            # information--it just stops.
                self.interrupted()

    def handle_stdout(self, fd, iomap):
        """ handle Popen return stdout
        """
        try:
            buf = os.read(fd, BUFFER_SIZE)
            if buf:
                print "stdout : " + buf,
            else:
                self.close_stdout(iomap)
        except (OSError, IOError):
            _, e, _ = sys.exc_info()
            if e.errno != EINTR:
                self.close_stdout(iomap)
                pass

    def handle_stderr(self, fd, iomap):
        """ handle Popen return stderr
        """
        try:
            buf = os.read(fd, BUFFER_SIZE)
            if buf:
                print "stderr : " + buf,
            else:
                self.close_stderr(iomap)
                
        except (OSError, IOError):
            _, e, _ = sys.exc_info()
            if e.errno != EINTR:
                self.close_stderr(iomap)
                pass

    def close_stderr(self, iomap):
        if self.stderr:
            iomap.unregister(self.stderr.fileno())

    def close_stdout(self, iomap):
        if self.stdout:
            iomap.unregister(self.stdout.fileno())          

    def interrupted(self):
        """Cleans up after a keyboard interrupt."""
        sys.exit(255)

if __name__ == '__main__':
    iomap = IOMap()
    task = Task()
    task.run(iomap)

标签:none