使用select处理并发

1. 前言

socket模块默认使用的是阻塞方式,对于多并发场景,需要使用线程来支持。这种方式会导致为了互斥或同步,实现上需要加锁或使用事件。这样不可避免地增加了代码的复杂性,也容易引入缺陷。

一种较为简单的解决方法是使用python自带的select模块,在对性能要求不是特别高的场景下,都是可以适用的。

2. 函数介绍

select模块是一个纯Native实现的模块,最常用的函数为select.select

rfds, wfds, xfds = select.select(rlist, wlist, xlist[, timeout])

参数列表:

  • rlist参数是监听读操作的socket列表
  • wlist参数是监听写操作的socket列表
  • xlist参数是监听异常的socket列表
  • timeout参数是等待的超时时间,超时后函数会立即返回,避免阻塞

返回值:

  • rfds是发生读操作的socket列表
  • wfds是发生写操作的socket列表
  • xfds是发送异常的socket列表

通常情况下,我们只需要提供rlist即可,因为阻塞主要发生在读操作上。

3. 一个简单的例子

3.1. 服务端代码:

def create_server(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind(('0.0.0.0', port))
    s.listen(5)

    rlist = [s]
    while True:
        rfds, _, _ = select.select(rlist, [], [], 1)
        if rfds:
            for fd in rfds:
                if fd == s:
                    # Accept connection
                    conn, addr = s.accept()
                    print('Recv connection from %s:%d' % addr)
                    rlist.append(conn)
                else:
                    try:
                        buff = fd.recv(4096)
                    except socket.error as e:
                        print('Connection error: %s' % e)
                        rlist.remove(fd)
                    else:
                        if not buff:
                            print('Connection closed')
                            rlist.remove(fd)
                        else:
                            print(b'Recv %r from client' % (buff))
                            fd.send(b'[%f] %s' % (time.time(), buff)) # Add timestamp

3.2. 客户端代码:

def create_client(port, text=b'Hello python!!!'):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect(('127.0.0.1', port))

    s.send(text) # add try
    print(b'Recv: ' + s.recv(4096)) # add try
    s.close()

def create_ten_clients(port):
    thread_list = []
    for i in range(10):
        t = threading.Thread(target=create_client, args=(port, b'Text from thread %d' % (i + 1)))
        t.setDaemon(True)
        t.start()
        thread_list.append(t)

    while thread_list:
        for i, t in enumerate(thread_list):
            if not t.is_alive():
                thread_list.remove(t)
                break

3.3. 测试结果如下:

服务端:

Recv connection from 127.0.0.1:49409
b"Recv b'Text from thread 8' from client"
Recv connection from 127.0.0.1:49410
Recv connection from 127.0.0.1:49411
b"Recv b'Text from thread 7' from client"
b"Recv b'Text from thread 6' from client"
Recv connection from 127.0.0.1:49412
Recv connection from 127.0.0.1:49413
Recv connection from 127.0.0.1:49414
Recv connection from 127.0.0.1:49415
Recv connection from 127.0.0.1:49416
b"Recv b'Text from thread 3' from client"
b"Recv b'Text from thread 5' from client"
b"Recv b'Text from thread 4' from client"
Recv connection from 127.0.0.1:49417
b"Recv b'Text from thread 10' from client"
b"Recv b'Text from thread 2' from client"
Recv connection from 127.0.0.1:49418
b"Recv b'Text from thread 1' from client"
b"Recv b'Text from thread 9' from client"
Connection closed
Connection closed
Connection closed
Connection closed
Connection closed
Connection closed
Connection closed
Connection closed
Connection closed
Connection closed

客户端:

b'Recv: [1550212519.656387] Text from thread 8'
b'Recv: [1550212519.666388] Text from thread 7'
b'Recv: [1550212519.667388] Text from thread 6'
b'Recv: [1550212519.677389] Text from thread 3'
b'Recv: [1550212519.677389] Text from thread 5'
b'Recv: [1550212519.681390] Text from thread 9'
b'Recv: [1550212519.679389] Text from thread 10'
b'Recv: [1550212519.678389] Text from thread 4'
b'Recv: [1550212519.680389] Text from thread 1'
b'Recv: [1550212519.679389] Text from thread 2'

可以看出,10个客户端同时访问服务端,响应也是非常迅速的。

4. 高级用法

在Windows上,select只支持socket类型;而在类Unix系统上,select可以支持所有的文件句柄,比如文件、管道等。

但是在实际使用中,有时需要对非socket对象进行select操作,例如对于下面的DummyStream对象:

class DummyStream(object):

    def __init__(self):
        self._buff = b''

    def write(self, data):
        self._buff += data

    def read(self, size):
        buff = self._buff
        self._buff = b''
        return buff

执行select操作会报以下错误:

TypeError: argument must be an int, or have a fileno() method.

从报错可以知道,select支持实现了fileno()方法的对象。将DummyStream类增加以下方法:

    def fileno(self):
        return 0

但是在Windows上依然会报以下错误:

OSError: [WinError 10093] 应用程序没有调用 WSAStartup,或者 WSAStartup 失败。

也就是说,在Windows上, fileno()返回的必须是一个真实的socket对象。

因此,只要借助一个真实的socket对象就可以实现对任意对象的select了。

class DummyStream(object):
    def __init__(self):
        self._buff = b''
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.bind(('127.0.0.1', 0)) # use any free port
        s.listen(1)
        self._in_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._in_sock.connect(('127.0.0.1', s.getsockname()[1]))
        self._out_sock, _ = s.accept()
        s.close()

    def write(self, data):
        self._buff += data
        self._in_sock.send(b'0') # notify data arrived

    def read(self, size):
        self._out_sock.recv(1)
        buff = self._buff
        self._buff = b''
        return buff

    def fileno(self):
        return self._out_sock.fileno()

在上面的代码中,创建了_in_sock_out_sock两个socket对象,这两个socket正好是一个TCP连接的两端,在一端写数据,另一端就会收到数据,从而使select可以收到读事件。

fileno()中返回的是_out_sock.fileno(),因此,在write()方法被调用时,_in_sock发送的b'0'就会触发_out_sock的select,从而触发DummyStream的select。

为了更加通用地使用该逻辑,可以将其封装成SelectableObject类。

class SelectableObject(object):
    '''make an object to be selectable
    '''

    def __init__(self):
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.bind(('127.0.0.1', 0)) # use any free port
        s.listen(1)
        self._in_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._in_sock.connect(('127.0.0.1', s.getsockname()[1]))
        self._out_sock, _ = s.accept()
        s.close()

    def fileno(self):
        return self._out_sock.fileno()

    def fire_event(self):
        '''fire read event
        '''
        self._in_sock.send(b'0') # send any data

    def clear_event(self):
        '''clear read event
        '''
        self._out_sock.recv(1) # recv data

这样DummyStream就可以简化成下面这样了。

class DummyStream(SelectableObject):
    def __init__(self):
        super(DummyStream, self).__init__()
        self._buff = b''

    def write(self, data):
        self._buff += data
        self.fire_event() # notify data arrived

    def read(self, size):
        self.clear_event()
        buff = self._buff
        self._buff = b''
        return buff
drunkdream.cn 版权所有 粤ICP备17153329号 all right reserved,powered by Gitbook该文件修订时间: 2021-01-07 18:27:03

results matching ""

    No results matching ""