使用select处理并发
1. 前言
socket模块默认使用的是阻塞方式,对于多并发场景,需要使用线程来支持。这种方式会导致为了互斥或同步,实现上需要加锁或使用事件。这样不可避免地增加了代码的复杂性,也容易引入缺陷。
一种较为简单的解决方法是使用python自带的select
模块,在对性能要求不是特别高的场景下,都是可以适用的。
2. 函数介绍
select
模块是一个纯Native实现的模块,最常用的函数为select.select
。
rfds, wfds, xfds = select.select(rlist, wlist, xlist[, timeout])
参数列表:
rlist
参数是监听读操作的socket列表wlist
参数是监听写操作的socket列表xlist
参数是监听异常的socket列表timeout
参数是等待的超时时间,超时后函数会立即返回,避免阻塞
返回值:
rfds
是发生读操作的socket列表wfds
是发生写操作的socket列表xfds
是发送异常的socket列表
通常情况下,我们只需要提供rlist
即可,因为阻塞主要发生在读操作上。
3. 一个简单的例子
3.1. 服务端代码:
def create_server(port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('0.0.0.0', port))
s.listen(5)
rlist = [s]
while True:
rfds, _, _ = select.select(rlist, [], [], 1)
if rfds:
for fd in rfds:
if fd == s:
# Accept connection
conn, addr = s.accept()
print('Recv connection from %s:%d' % addr)
rlist.append(conn)
else:
try:
buff = fd.recv(4096)
except socket.error as e:
print('Connection error: %s' % e)
rlist.remove(fd)
else:
if not buff:
print('Connection closed')
rlist.remove(fd)
else:
print(b'Recv %r from client' % (buff))
fd.send(b'[%f] %s' % (time.time(), buff)) # Add timestamp
3.2. 客户端代码:
def create_client(port, text=b'Hello python!!!'):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('127.0.0.1', port))
s.send(text) # add try
print(b'Recv: ' + s.recv(4096)) # add try
s.close()
def create_ten_clients(port):
thread_list = []
for i in range(10):
t = threading.Thread(target=create_client, args=(port, b'Text from thread %d' % (i + 1)))
t.setDaemon(True)
t.start()
thread_list.append(t)
while thread_list:
for i, t in enumerate(thread_list):
if not t.is_alive():
thread_list.remove(t)
break
3.3. 测试结果如下:
服务端:
Recv connection from 127.0.0.1:49409
b"Recv b'Text from thread 8' from client"
Recv connection from 127.0.0.1:49410
Recv connection from 127.0.0.1:49411
b"Recv b'Text from thread 7' from client"
b"Recv b'Text from thread 6' from client"
Recv connection from 127.0.0.1:49412
Recv connection from 127.0.0.1:49413
Recv connection from 127.0.0.1:49414
Recv connection from 127.0.0.1:49415
Recv connection from 127.0.0.1:49416
b"Recv b'Text from thread 3' from client"
b"Recv b'Text from thread 5' from client"
b"Recv b'Text from thread 4' from client"
Recv connection from 127.0.0.1:49417
b"Recv b'Text from thread 10' from client"
b"Recv b'Text from thread 2' from client"
Recv connection from 127.0.0.1:49418
b"Recv b'Text from thread 1' from client"
b"Recv b'Text from thread 9' from client"
Connection closed
Connection closed
Connection closed
Connection closed
Connection closed
Connection closed
Connection closed
Connection closed
Connection closed
Connection closed
客户端:
b'Recv: [1550212519.656387] Text from thread 8'
b'Recv: [1550212519.666388] Text from thread 7'
b'Recv: [1550212519.667388] Text from thread 6'
b'Recv: [1550212519.677389] Text from thread 3'
b'Recv: [1550212519.677389] Text from thread 5'
b'Recv: [1550212519.681390] Text from thread 9'
b'Recv: [1550212519.679389] Text from thread 10'
b'Recv: [1550212519.678389] Text from thread 4'
b'Recv: [1550212519.680389] Text from thread 1'
b'Recv: [1550212519.679389] Text from thread 2'
可以看出,10个客户端同时访问服务端,响应也是非常迅速的。
4. 高级用法
在Windows上,select只支持socket类型;而在类Unix系统上,select可以支持所有的文件句柄,比如文件、管道等。
但是在实际使用中,有时需要对非socket对象进行select
操作,例如对于下面的DummyStream
对象:
class DummyStream(object):
def __init__(self):
self._buff = b''
def write(self, data):
self._buff += data
def read(self, size):
buff = self._buff
self._buff = b''
return buff
执行select
操作会报以下错误:
TypeError: argument must be an int, or have a fileno() method.
从报错可以知道,select
支持实现了fileno()
方法的对象。将DummyStream
类增加以下方法:
def fileno(self):
return 0
但是在Windows上依然会报以下错误:
OSError: [WinError 10093] 应用程序没有调用 WSAStartup,或者 WSAStartup 失败。
也就是说,在Windows上, fileno()
返回的必须是一个真实的socket对象。
因此,只要借助一个真实的socket对象就可以实现对任意对象的select
了。
class DummyStream(object):
def __init__(self):
self._buff = b''
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('127.0.0.1', 0)) # use any free port
s.listen(1)
self._in_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._in_sock.connect(('127.0.0.1', s.getsockname()[1]))
self._out_sock, _ = s.accept()
s.close()
def write(self, data):
self._buff += data
self._in_sock.send(b'0') # notify data arrived
def read(self, size):
self._out_sock.recv(1)
buff = self._buff
self._buff = b''
return buff
def fileno(self):
return self._out_sock.fileno()
在上面的代码中,创建了_in_sock
和_out_sock
两个socket对象,这两个socket正好是一个TCP连接的两端,在一端写数据,另一端就会收到数据,从而使select可以收到读事件。
在fileno()
中返回的是_out_sock.fileno()
,因此,在write()
方法被调用时,_in_sock
发送的b'0'
就会触发_out_sock
的select,从而触发DummyStream
的select。
为了更加通用地使用该逻辑,可以将其封装成SelectableObject
类。
class SelectableObject(object):
'''make an object to be selectable
'''
def __init__(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('127.0.0.1', 0)) # use any free port
s.listen(1)
self._in_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._in_sock.connect(('127.0.0.1', s.getsockname()[1]))
self._out_sock, _ = s.accept()
s.close()
def fileno(self):
return self._out_sock.fileno()
def fire_event(self):
'''fire read event
'''
self._in_sock.send(b'0') # send any data
def clear_event(self):
'''clear read event
'''
self._out_sock.recv(1) # recv data
这样DummyStream
就可以简化成下面这样了。
class DummyStream(SelectableObject):
def __init__(self):
super(DummyStream, self).__init__()
self._buff = b''
def write(self, data):
self._buff += data
self.fire_event() # notify data arrived
def read(self, size):
self.clear_event()
buff = self._buff
self._buff = b''
return buff