Xupeng's blog

圆外之大,心向往之

SCGI 与线程

最近在写一个配置推送客户端,结构如下图:

cfgreceiver architecture

每一个应用服务进程会起一个额外的线程,与 ZooKeeper 保持连接,需要变更配置时,将新配置更新到 ZooKeeper,ZooKeeper 将配置推送到所有的客户端,客户端收到配置之后,即时更新进程内的配置信息,并将更新配置成功与否、延时、错误等信息反馈到 redis,以这样的方式做到不重启服务更新配置。

同时也会有一个独立的客户端与 ZooKeeper 保持连接,收到 ZooKeeper 推送的配置之后,将配置写回并提交到 Puppet 配置仓库中,这样仍然保持只在一个地方修改配置的习惯。

这个推送客户端的第一个应用场景是更新线上的 MySQL 配置,线上服务是 Quixote + SCGI,由于在 SCGI server fork 子进程之前就已经 import 了部分自有库,其中包括这个配置推送客户端的使用者,因此,这个客户端必须是 lazy 的,只能在 fork 发生之后才能启动线程、建立与 ZooKeeper 和 redis 的连接。

将这个客户端与 Quixote + SCGI 服务进行联调时,奇怪的事情发生了,配置发生变更时,客户端并没有立刻收到 ZooKeeper 的推送,而是等到下一次用户请求到达时才会收到,起初以为是线程根本就没有在工作,后来发现,线程也并不是完全不工作,而是阻塞在了某个地方,当有用户请求到达时,线程才会接着执行。这就意味着,如果一个进程闲置了比较长的时间,ZooKeeper 会认为客户端已失去响应,从而断开连接,而客户端重连 ZooKeeper 也只会发生在下一次请求到来之后,这是个很别扭很诡异的问题。

去掉目前使用的 Quixote 包装,写了一个最简单的 Quixote app,发现同样的问题依然存在,这排除了自有 Quixote 包装的问题。接下来又写了一个裸的 SCGI app:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env python
# t.py
import sys
import time
import threading

class T(threading.Thread):
    daemon = True
    def run(self):
        while True:
            print >> sys.stderr, 'sleeping'
            time.sleep(1)
            print >> sys.stderr, 'id:%s time:%s\n' % (id(self), time.time())

t = T()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/usr/bin/env python
# s.py
import sys
from scgi import scgi_server
from t import t

class MyHandler(scgi_server.SCGIHandler):
def produce_cgilike(self, env, bodysize):
    if not t.is_alive():
        print >> sys.stderr, 'start thread: %s' % id(t)
        t.start()
    else:
        sys.stdout.write('Content-Type: text/plain\r\n\r\n')
        print id(t)

scgi_server.SCGIServer(MyHandler, host='0.0.0.0',port=9002).serve()

这样的一个服务仍然有上面提到的问题,每有一次用户访问,就会输出一行 “id:xxx time:xxx”,然后输出一行 “sleeping”,之后就阻塞在了 time.sleep(1) 处(起码看起来是阻塞在了这里)。至此,也可以排除 Quixote 的问题,问题应该出在 SCGI 这里,向 hongqn 请教之后,最终定位到了问题所在。

在 SCGIHandler.serve 方法中:

1
2
os.write(self.parent_fd, "1") # indicates that child is ready
fd = passfd.recvfd(self.parent_fd)</pre>

passfd.recvfd 是一个阻塞读,它的代码是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static PyObject *
passfd_recvfd(PyObject *self, PyObject *args)
{
    int sockfd, fd;

    if (!PyArg_ParseTuple(args, "i:revcfd", &sockfd))
        return NULL;

    if ((fd = recv_fd(sockfd)) < 0) {
        PyErr_SetFromErrno(PyExc_IOError);
        return NULL;
    }

    return PyInt_FromLong((long) fd);
}

fd = recv_fd(sockfd) 是一个阻塞读,在开始阻塞读之前没有释放 GIL ,于是就导致了整个解释器阻塞,这也与之前问题的症状吻合。将代码作如下修改,在开始阻塞读之前释放 GIL,可以解决这个问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static PyObject *
passfd_recvfd(PyObject *self, PyObject *args)
{
    int sockfd, fd;

    if (!PyArg_ParseTuple(args, "i:revcfd", &sockfd))
        return NULL;

    Py_BEGIN_ALLOW_THREADS
    fd = recv_fd(sockfd);
    Py_END_ALLOW_THREADS
    if (fd < 0) {
        PyErr_SetFromErrno(PyExc_IOError);
        return NULL;
    }

    return PyInt_FromLong((long) fd);
}

当然了,还需要做进一步的测试和观察,看一下打这样的 patch 之后会不会有其他的副作用。

参考:http://docs.python.org/release/2.6.7/c-api/init.html#thread-state-and-the-global-interpreter-lock

Comments