tornado-tcp-server-code-analysis

编辑

three girls</img>

tornado tcp server流程

tornado除去外层httpserver的封装后, 底下都是教给tcpserver, 因此才可以很容易将http协议的tornado app改造成兼容tcp协议的app. tcpserver的使用doc:

server = TCPServer()
server.listen(8888)
IOLoop.instance().start()

初始化一个TCPServer实例, 在调用listen的时候:

def listen(self, port, address=""):
    sockets = bind_sockets(port, address=address)
    self.add_sockets(sockets)


def add_sockets(self, sockets):
    if self.io_loop is None:
        self.io_loop = IOLoop.current()
    for sock in sockets:
        self._sockets[sock.fileno()] = sock
        add_accept_handler(sock, self._handle_connection,
                           io_loop=self.io_loop) listen时使用创建bind_sockets一个socket,  bind_sockets主要做一些sock的初始化, 比如sock.setblocking,backlog,ipv4, ipv6等, 返回一个包含socket对象的列表; socket的列表sockets,  作为参数给add_sockets,  add_sockets会初始化一个当前的ioloop,  最后会将这些socket, 添加到add_accept_handler内,  add_accept_handler函数是增加一个ioloop的事件监听器,它有一个callback参数, 用于处理监听到sock有新连接时, 重要是调用add_accept_handler时, callback参数传的是tcpserver的self._handle_connection, add_accept_handler会在自己函数内部定义一个闭包函数accept_handler, accept_handler接收fd, events,  这个函数会挂死循环用来connection, address = sock.accept(), 得到connection, address作为callback调用的参数, 

def add_accept_handler(sock, callback, io_loop=None):
    if io_loop is None:
        io_loop = IOLoop.current()


    def accept_handler(fd, events):
        while True:
            try:
                connection, address = sock.accept()
            except socket.error as e:
                if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                    return
                raise
            callback(connection, address)
    io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ)  # 在当前ioloop中注册一个给定事件的监听处理, 当触发时, accept_handler就会被回调,  一路回溯就会回调callback函数,  也就是tcpserver的self._handle_connection函数. 代码中的描述是:

    """Registers the given handler to receive the given events for fd.
    The ``events`` argument is a bitwise or of the constants
    ``IOLoop.READ``, ``IOLoop.WRITE``, and ``IOLoop.ERROR``.
    When an event occurs, ``handler(fd, events)`` will be run.
    """

回到最终被调用的_handle_connection函数, 它属于tcpserver, 记得之前的:

        server = TCPServer()
        server.listen(8888)
        IOLoop.instance().start() 这里在IOLoop.instance().start()之前应该, 有一句 server.handle_stream = app 的, 这个app就是我们应用自己想接收到socket传过来的数据后, 要做的业务逻辑了. 不过我们定义的app函数, 应该要有2个参数, 分别是(stream, address), 是用来提供给后来的iostream的,  这里我们的app例子是: def app(stream, address):
callback = functools.partial(process, stream)
stream.read_until('\n', callback)

回到tcpserver, 在通过handle_stream=app之后, tcpserver的handle_stream已经被函数app替换了, 字面意思就能看出是为了handl 后面的stream流的一个方法, 因此那个app函数才一样需要stream, address这2个参数. 前面说的_handle_connection函数是ioloop检测到对应事件后, 最终的回调函数, 那么_handle_connection具体干了什么呢, 看下面:

def _handle_connection(self, connection, address):
    if self.ssl_options is not None:
        assert ssl, "Python 2.6+ and OpenSSL required for SSL"
        try:
            connection = ssl_wrap_socket(connection,
                                         self.ssl_options,
                                         server_side=True,
                                         do_handshake_on_connect=False)
        except ssl.SSLError as err:
            if err.args[0] == ssl.SSL_ERROR_EOF:
                return connection.close()
            else:
                raise
        except socket.error as err:
            if err.args[0] == errno.ECONNABORTED:
                return connection.close()
            else:
                raise
    try:
        if self.ssl_options is not None:
            stream = SSLIOStream(connection, io_loop=self.io_loop)
        else:
            stream = IOStream(connection, io_loop=self.io_loop)
        self.handle_stream(stream, address)
    except Exception:
        app_log.error("Error in connection callback", exc_info=True) 首先会检测是否有使用ssl协议加密, 否的话, 我们就跳过前面那一段代码了.  后面的代码是初始化一个IOStream的实例,  将调用_handle_connection时给的connnection, 和tcpserver的当前ioloop对象作为参数, 初始化IOStream, 然后将这个stream回传给hand_stream也就是我们的app函数处理, IOStream提供一个非阻塞的socket read方法,  我们经常会使用到它的read_until, 就像前面我们的app函数是:

def app(stream, address):
    callback = functools.partial(process, stream)
    stream.read_until('\n', callback)

之后的事情, 都是属于业务逻辑process的了. 框架部分已经基本完成, 需要注意的是process业务逻辑也需要接收stream作为一个参数, 因为业务逻辑完成后, 需要使用stream.write方法来将结果发送给客户端.