diff --git a/environment.yml b/environment.yml index 6b486b6d04b..4750366fb52 100644 --- a/environment.yml +++ b/environment.yml @@ -70,6 +70,7 @@ dependencies: - typing >=3.6.6 # Pin OpenSSL to avoid: https://github.com/DIRACGrid/DIRAC/issues/4489 - openssl <1.1 + - selectors2 - pip: - diraccfg # This is a fork of tornado with a patch to allow for configurable iostream diff --git a/src/DIRAC/Core/DISET/ServiceReactor.py b/src/DIRAC/Core/DISET/ServiceReactor.py index 5a255b34599..cb1e690767b 100644 --- a/src/DIRAC/Core/DISET/ServiceReactor.py +++ b/src/DIRAC/Core/DISET/ServiceReactor.py @@ -18,7 +18,6 @@ from __future__ import division from __future__ import print_function -import select import time import socket import signal @@ -26,6 +25,11 @@ import sys import multiprocessing +try: + import selectors +except ImportError: + import selectors2 as selectors + from DIRAC import gLogger, S_OK, S_ERROR from DIRAC.Core.DISET.private.Service import Service from DIRAC.Core.DISET.private.GatewayService import GatewayService @@ -188,14 +192,14 @@ def __startCloneProcess(self, svcName, i): while self.__alive: self.__acceptIncomingConnection(svcName) - def __getListeningSocketsList(self, svcName=False): + def __getListeningSelector(self, svcName=False): + sel = selectors.DefaultSelector() if svcName: - sockets = [self.__listeningConnections[svcName]['socket']] + sel.register(self.__listeningConnections[svcName]['socket'], selectors.EVENT_READ, svcName) else: - sockets = [] - for svcName in self.__listeningConnections: - sockets.append(self.__listeningConnections[svcName]['socket']) - return sockets + for svcName, svcInfo in self.__listeningConnections.items(): + sel.register(svcInfo['socket'], selectors.EVENT_READ, svcName) + return sel def __acceptIncomingConnection(self, svcName=False): """ @@ -207,20 +211,20 @@ def __acceptIncomingConnection(self, svcName=False): :param str svcName=False: Name of a service if you use multiple services at the same time """ - sockets = self.__getListeningSocketsList(svcName) + sel = self.__getListeningSelector(svcName) while self.__alive: try: - inList, _outList, _exList = select.select(sockets, [], [], 10) - if len(inList) == 0: + events = sel.select(timeout=10) + if len(events) == 0: return - for inSocket in inList: - for svcName in self.__listeningConnections: - if inSocket == self.__listeningConnections[svcName]['socket']: - retVal = self.__listeningConnections[svcName]['transport'].acceptConnection() - if not retVal['OK']: - gLogger.warn("Error while accepting a connection: ", retVal['Message']) - return - clientTransport = retVal['Value'] + for key, event in events: + if event & selectors.EVENT_READ: + svcName = key.data + retVal = self.__listeningConnections[svcName]['transport'].acceptConnection() + if not retVal['OK']: + gLogger.warn("Error while accepting a connection: ", retVal['Message']) + return + clientTransport = retVal['Value'] except socket.error: return self.__maxFD = max(self.__maxFD, clientTransport.oSocket.fileno()) @@ -243,7 +247,7 @@ def __acceptIncomingConnection(self, svcName=False): if result['OK']: renewed = True if renewed: - sockets = self.__getListeningSocketsList() + sel = self.__getListeningSelector() def __closeListeningConnections(self): for svcName in self.__listeningConnections: diff --git a/src/DIRAC/Core/DISET/private/MessageBroker.py b/src/DIRAC/Core/DISET/private/MessageBroker.py index f8152bbda20..b57925a8e71 100644 --- a/src/DIRAC/Core/DISET/private/MessageBroker.py +++ b/src/DIRAC/Core/DISET/private/MessageBroker.py @@ -7,11 +7,16 @@ __RCSID__ = "$Id$" import threading -import select import time +import select import socket import os +try: + import selectors +except ImportError: + import selectors2 as selectors + # TODO: Remove ThreadPool later useThreadPoolExecutor = False if os.getenv('DIRAC_USE_NEWTHREADPOOL', 'YES').lower() in ('yes', 'true'): @@ -135,40 +140,36 @@ def __listenAutoReceiveConnections(self): while self.__listeningForMessages: self.__trInOutLock.acquire() try: - sIdList = [] + # TODO: A single DefaultSelector instance can probably be shared by all threads + sel = selectors.DefaultSelector() for trid in self.__messageTransports: mt = self.__messageTransports[trid] if not mt['listen']: continue - sIdList.append((trid, mt['transport'].getSocket())) - if not sIdList: + sel.register(mt['transport'].getSocket(), selectors.EVENT_READ, trid) + if not sel.get_map(): self.__listeningForMessages = False return finally: self.__trInOutLock.release() + try: - try: - inList, _outList, _exList = select.select([pos[1] for pos in sIdList], [], [], 1) - if len(inList) == 0: - continue - except socket.error: - time.sleep(0.001) - continue - except select.error: - time.sleep(0.001) - continue + events = sel.select(timeout=1) + except (socket.error, select.error): + # TODO: When can this happen? + time.sleep(0.001) + continue except Exception as e: gLogger.exception("Exception while selecting persistent connections", lException=e) continue - for sock in inList: - for iPos in range(len(sIdList)): - if sock == sIdList[iPos][1]: - trid = sIdList[iPos][0] - if trid in self.__messageTransports: - result = self.__receiveMsgDataAndQueue(trid) - if not result['OK']: - self.removeTransport(trid) - break + + for key, event in events: + if event & selectors.EVENT_READ: + trid = key.data + if trid in self.__messageTransports: + result = self.__receiveMsgDataAndQueue(trid) + if not result['OK']: + self.removeTransport(trid) # Process received data functions diff --git a/src/DIRAC/Core/DISET/private/Transports/BaseTransport.py b/src/DIRAC/Core/DISET/private/Transports/BaseTransport.py index 2a4ee58411c..8dcd9f7f2ac 100755 --- a/src/DIRAC/Core/DISET/private/Transports/BaseTransport.py +++ b/src/DIRAC/Core/DISET/private/Transports/BaseTransport.py @@ -24,11 +24,15 @@ __RCSID__ = "$Id$" import time -import select import six from six import BytesIO from hashlib import md5 +try: + import selectors +except ImportError: + import selectors2 as selectors + from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK from DIRAC.FrameworkSystem.Client.Logger import gLogger from DIRAC.Core.Utilities import MixedEncode @@ -145,8 +149,9 @@ def getSocket(self): def _readReady(self): if not self.iReadTimeout: return True - inList, dummy, dummy = select.select([self.oSocket], [], [], self.iReadTimeout) - if self.oSocket in inList: + sel = selectors.DefaultSelector() + sel.register(self.oSocket, selectors.EVENT_READ) + if sel.select(timeout=self.iReadTimeout): return True return False diff --git a/src/DIRAC/Core/DISET/private/Transports/PlainTransport.py b/src/DIRAC/Core/DISET/private/Transports/PlainTransport.py index ed93623b0fc..5a0876a95cf 100755 --- a/src/DIRAC/Core/DISET/private/Transports/PlainTransport.py +++ b/src/DIRAC/Core/DISET/private/Transports/PlainTransport.py @@ -4,9 +4,14 @@ __RCSID__ = "$Id$" import socket -import select import time import os + +try: + import selectors +except ImportError: + import selectors2 as selectors + from DIRAC.Core.DISET.private.Transports.BaseTransport import BaseTransport from DIRAC.FrameworkSystem.Client.Logger import gLogger from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK @@ -24,8 +29,9 @@ def initAsClient(self): if e.args[0] != 115: return S_ERROR("Can't connect: %s" % str(e)) #Connect in progress - oL = select.select([], [self.oSocket], [], self.extraArgsDict['timeout'])[1] - if len(oL) == 0: + sel = selectors.DefaultSelector() + sel.register(self.oSocket, selectors.EVENT_READ) + if not sel.select(timeout=self.extraArgsDict['timeout']): self.oSocket.close() return S_ERROR("Connection timeout") errno = self.oSocket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) diff --git a/src/DIRAC/Core/DISET/private/Transports/test/Test_SSLTransport.py b/src/DIRAC/Core/DISET/private/Transports/test/Test_SSLTransport.py index 17fedbeb35d..82c934ef2d0 100644 --- a/src/DIRAC/Core/DISET/private/Transports/test/Test_SSLTransport.py +++ b/src/DIRAC/Core/DISET/private/Transports/test/Test_SSLTransport.py @@ -4,12 +4,15 @@ from __future__ import print_function import os -import select import socket import threading -from pytest import fixture +try: + import selectors +except ImportError: + import selectors2 as selectors +from pytest import fixture from diraccfg import CFG from DIRAC.Core.Security.test.x509TestUtilities import CERTDIR, USERCERT, getCertOption @@ -105,10 +108,10 @@ def __acceptIncomingConnection(self): """ This method just gets the incoming connection, and handle it, once. """ - sockets = [self.transport.getSocket()] - try: - _inList, _outList, _exList = select.select(sockets, [], [], 2) + sel = selectors.DefaultSelector() + sel.register(self.transport.getSocket(), selectors.EVENT_READ) + assert sel.select(timeout=2) result = self.transport.acceptConnection() assert result["OK"], result clientTransport = result['Value'] diff --git a/src/DIRAC/Core/Utilities/Subprocess.py b/src/DIRAC/Core/Utilities/Subprocess.py index 4e3225e7de6..84e11705f47 100644 --- a/src/DIRAC/Core/Utilities/Subprocess.py +++ b/src/DIRAC/Core/Utilities/Subprocess.py @@ -33,13 +33,17 @@ from multiprocessing import Process, Manager import threading import time -import select import os import sys import subprocess32 as subprocess import signal import psutil +try: + import selectors +except ImportError: + import selectors2 as selectors + # Very Important: # Here we can not import directly from DIRAC, since this file it is imported # at initialization time therefore the full path is necessary @@ -238,11 +242,11 @@ def __selectFD(self, readSeq, timeout=False): pass if not validList: return False - if self.timeout and not timeout: - timeout = self.timeout - if not timeout: - return select.select(validList, [], [])[0] - return select.select(validList, [], [], timeout)[0] + sel = selectors.DefaultSelector() + for socket in validList: + sel.register(socket, selectors.EVENT_READ) + events = sel.select(timeout=timeout or self.timeout or None) + return [key.fileobj for key, event in events if event & selectors.EVENT_READ] def __killPid(self, pid, sig=9): """ send signal :sig: to process :pid: @@ -372,7 +376,12 @@ def __readFromFile(self, fd, baseLength): try: dataString = "" fn = fd.fileno() - while fd in select.select([fd], [], [], 1)[0]: + + sel = selectors.DefaultSelector() + sel.register(fd, selectors.EVENT_READ) + while True: + if not sel.select(timeout=1): + break if isinstance(fn, int): nB = os.read(fn, self.bufferLimit) else: