最近在写一个配置推送客户端,结构如下图:
每一个应用服务进程会起一个额外的线程,与 ZooKeeper 保持连接,需要变更配置时,将新配置更新到 ZooKeeper,ZooKeeper 将配置推送到所有的客户端,客户端收到配置之后,即时更新进程内的配置信息,并将更新配置成功与否、延时、错误等信息反馈到 redis,以这样的方式做到不重启服务更新配置。
同时也会有一个独立的客户端与 ZooKeeper 保持连接,收到 ZooKeeper 推送的配置之后,将配置写回并提交到 Puppet 配置仓库中,这样仍然保持只在一个地方修改配置的习惯。
这个推送客户端的第一个应用场景是更新线上的 MySQL 配置,线上服务是 Quixote + SCGI,由于在 SCGI server fork 子进程之前就已经 import 了部分自有库,其中包括这个配置推送客户端的使用者,因此,这个客户端必须是 lazy 的,只能在 fork 发生之后才能启动线程、建立与 ZooKeeper 和 redis 的连接。
将这个客户端与 Quixote + SCGI 服务进行联调时,奇怪的事情发生了,配置发生变更时,客户端并没有立刻收到 ZooKeeper 的推送,而是等到下一次用户请求到达时才会收到,起初以为是线程根本就没有在工作,后来发现,线程也并不是完全不工作,而是阻塞在了某个地方,当有用户请求到达时,线程才会接着执行。这就意味着,如果一个进程闲置了比较长的时间,ZooKeeper 会认为客户端已失去响应,从而断开连接,而客户端重连 ZooKeeper 也只会发生在下一次请求到来之后,这是个很别扭很诡异的问题。
去掉目前使用的 Quixote 包装,写了一个最简单的 Quixote app,发现同样的问题依然存在,这排除了自有 Quixote 包装的问题。接下来又写了一个裸的 SCGI app:
#!/usr/bin/env python
# t.py
import sys
import time
import threading
class T(threading.Thread):
daemon = True
def run(self):
while True:
print >> sys.stderr, 'sleeping'
time.sleep(1)
print >> sys.stderr, 'id:%s time:%s\n' % (id(self), time.time())
t = T()
#!/usr/bin/env python
# s.py
import sys
from scgi import scgi_server
from t import t
class MyHandler(scgi_server.SCGIHandler):
def produce_cgilike(self, env, bodysize):
if not t.is_alive():
print >> sys.stderr, 'start thread: %s' % id(t)
t.start()
else:
sys.stdout.write('Content-Type: text/plain\r\n\r\n')
print id(t)
scgi_server.SCGIServer(MyHandler, host='0.0.0.0',port=9002).serve()
这样的一个服务仍然有上面提到的问题,每有一次用户访问,就会输出一行 “id:xxx time:xxx”,然后输出一行 “sleeping”,之后就阻塞在了 time.sleep(1) 处(起码看起来是阻塞在了这里)。至此,也可以排除 Quixote 的问题,问题应该出在 SCGI 这里,向 hongqn 请教之后,最终定位到了问题所在。
在 SCGIHandler.serve 方法中:
os.write(self.parent_fd, "1") # indicates that child is ready
fd = passfd.recvfd(self.parent_fd)</pre>
passfd.recvfd 是一个阻塞读,它的代码是这样的:
static PyObject *
passfd_recvfd(PyObject *self, PyObject *args)
{
int sockfd, fd;
if (!PyArg_ParseTuple(args, "i:revcfd", &sockfd))
return NULL;
if ((fd = recv_fd(sockfd)) < 0) {
PyErr_SetFromErrno(PyExc_IOError);
return NULL;
}
return PyInt_FromLong((long) fd);
}
fd = recv_fd(sockfd) 是一个阻塞读,在开始阻塞读之前没有释放 GIL ,于是就导致了整个解释器阻塞,这也与之前问题的症状吻合。将代码作如下修改,在开始阻塞读之前释放 GIL,可以解决这个问题:
static PyObject *
passfd_recvfd(PyObject *self, PyObject *args)
{
int sockfd, fd;
if (!PyArg_ParseTuple(args, "i:revcfd", &sockfd))
return NULL;
Py_BEGIN_ALLOW_THREADS
fd = recv_fd(sockfd);
Py_END_ALLOW_THREADS
if (fd < 0) {
PyErr_SetFromErrno(PyExc_IOError);
return NULL;
}
return PyInt_FromLong((long) fd);
}
当然了,还需要做进一步的测试和观察,看一下打这样的 patch 之后会不会有其他的副作用。
参考: http://docs.python.org/release/2.6.7/c-api/init.html#thread-state-and-the-global-interpreter-lock