Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ dependencies:
- typing >=3.6.6
# Pin OpenSSL to avoid: https://github.com/DIRACGrid/DIRAC/issues/4489
- openssl <1.1
- selectors2
- pip:
- diraccfg
# This is a fork of tornado with a patch to allow for configurable iostream
Expand Down
42 changes: 23 additions & 19 deletions src/DIRAC/Core/DISET/ServiceReactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
from __future__ import division
from __future__ import print_function

import select
import time
import socket
import signal
import os
import sys
import multiprocessing

try:
import selectors
except ImportError:
import selectors2 as selectors

from DIRAC import gLogger, S_OK, S_ERROR
from DIRAC.Core.DISET.private.Service import Service
from DIRAC.Core.DISET.private.GatewayService import GatewayService
Expand Down Expand Up @@ -188,14 +192,14 @@ def __startCloneProcess(self, svcName, i):
while self.__alive:
self.__acceptIncomingConnection(svcName)

def __getListeningSocketsList(self, svcName=False):
def __getListeningSelector(self, svcName=False):
sel = selectors.DefaultSelector()
if svcName:
sockets = [self.__listeningConnections[svcName]['socket']]
sel.register(self.__listeningConnections[svcName]['socket'], selectors.EVENT_READ, svcName)
else:
sockets = []
for svcName in self.__listeningConnections:
sockets.append(self.__listeningConnections[svcName]['socket'])
return sockets
for svcName, svcInfo in self.__listeningConnections.items():
sel.register(svcInfo['socket'], selectors.EVENT_READ, svcName)
return sel

def __acceptIncomingConnection(self, svcName=False):
"""
Expand All @@ -207,20 +211,20 @@ def __acceptIncomingConnection(self, svcName=False):
:param str svcName=False: Name of a service if you use multiple
services at the same time
"""
sockets = self.__getListeningSocketsList(svcName)
sel = self.__getListeningSelector(svcName)
while self.__alive:
try:
inList, _outList, _exList = select.select(sockets, [], [], 10)
if len(inList) == 0:
events = sel.select(timeout=10)
if len(events) == 0:
return
for inSocket in inList:
for svcName in self.__listeningConnections:
if inSocket == self.__listeningConnections[svcName]['socket']:
retVal = self.__listeningConnections[svcName]['transport'].acceptConnection()
if not retVal['OK']:
gLogger.warn("Error while accepting a connection: ", retVal['Message'])
return
clientTransport = retVal['Value']
for key, event in events:
if event & selectors.EVENT_READ:
svcName = key.data
retVal = self.__listeningConnections[svcName]['transport'].acceptConnection()
if not retVal['OK']:
gLogger.warn("Error while accepting a connection: ", retVal['Message'])
return
clientTransport = retVal['Value']
except socket.error:
return
self.__maxFD = max(self.__maxFD, clientTransport.oSocket.fileno())
Expand All @@ -243,7 +247,7 @@ def __acceptIncomingConnection(self, svcName=False):
if result['OK']:
renewed = True
if renewed:
sockets = self.__getListeningSocketsList()
sel = self.__getListeningSelector()

def __closeListeningConnections(self):
for svcName in self.__listeningConnections:
Expand Down
47 changes: 24 additions & 23 deletions src/DIRAC/Core/DISET/private/MessageBroker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@
__RCSID__ = "$Id$"

import threading
import select
import time
import select
Comment thread
fstagni marked this conversation as resolved.
import socket
import os

try:
import selectors
except ImportError:
import selectors2 as selectors

# TODO: Remove ThreadPool later
useThreadPoolExecutor = False
if os.getenv('DIRAC_USE_NEWTHREADPOOL', 'YES').lower() in ('yes', 'true'):
Expand Down Expand Up @@ -135,40 +140,36 @@ def __listenAutoReceiveConnections(self):
while self.__listeningForMessages:
self.__trInOutLock.acquire()
try:
sIdList = []
# TODO: A single DefaultSelector instance can probably be shared by all threads
sel = selectors.DefaultSelector()
for trid in self.__messageTransports:
mt = self.__messageTransports[trid]
if not mt['listen']:
continue
sIdList.append((trid, mt['transport'].getSocket()))
if not sIdList:
sel.register(mt['transport'].getSocket(), selectors.EVENT_READ, trid)
if not sel.get_map():
self.__listeningForMessages = False
return
finally:
self.__trInOutLock.release()

try:
try:
inList, _outList, _exList = select.select([pos[1] for pos in sIdList], [], [], 1)
if len(inList) == 0:
continue
except socket.error:
time.sleep(0.001)
continue
except select.error:
time.sleep(0.001)
continue
events = sel.select(timeout=1)
except (socket.error, select.error):
# TODO: When can this happen?
time.sleep(0.001)
continue
except Exception as e:
gLogger.exception("Exception while selecting persistent connections", lException=e)
continue
for sock in inList:
for iPos in range(len(sIdList)):
if sock == sIdList[iPos][1]:
trid = sIdList[iPos][0]
if trid in self.__messageTransports:
result = self.__receiveMsgDataAndQueue(trid)
if not result['OK']:
self.removeTransport(trid)
break

for key, event in events:
if event & selectors.EVENT_READ:
trid = key.data
if trid in self.__messageTransports:
result = self.__receiveMsgDataAndQueue(trid)
if not result['OK']:
self.removeTransport(trid)

# Process received data functions

Expand Down
11 changes: 8 additions & 3 deletions src/DIRAC/Core/DISET/private/Transports/BaseTransport.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@
__RCSID__ = "$Id$"

import time
import select
import six
from six import BytesIO
from hashlib import md5

try:
import selectors
except ImportError:
import selectors2 as selectors

from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK
from DIRAC.FrameworkSystem.Client.Logger import gLogger
from DIRAC.Core.Utilities import MixedEncode
Expand Down Expand Up @@ -145,8 +149,9 @@ def getSocket(self):
def _readReady(self):
if not self.iReadTimeout:
return True
inList, dummy, dummy = select.select([self.oSocket], [], [], self.iReadTimeout)
if self.oSocket in inList:
sel = selectors.DefaultSelector()
sel.register(self.oSocket, selectors.EVENT_READ)
if sel.select(timeout=self.iReadTimeout):
return True
return False

Expand Down
12 changes: 9 additions & 3 deletions src/DIRAC/Core/DISET/private/Transports/PlainTransport.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@
__RCSID__ = "$Id$"

import socket
import select
import time
import os

try:
import selectors
except ImportError:
import selectors2 as selectors

from DIRAC.Core.DISET.private.Transports.BaseTransport import BaseTransport
from DIRAC.FrameworkSystem.Client.Logger import gLogger
from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK
Expand All @@ -24,8 +29,9 @@ def initAsClient(self):
if e.args[0] != 115:
return S_ERROR("Can't connect: %s" % str(e))
#Connect in progress
oL = select.select([], [self.oSocket], [], self.extraArgsDict['timeout'])[1]
if len(oL) == 0:
sel = selectors.DefaultSelector()
sel.register(self.oSocket, selectors.EVENT_READ)
if not sel.select(timeout=self.extraArgsDict['timeout']):
self.oSocket.close()
return S_ERROR("Connection timeout")
errno = self.oSocket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
from __future__ import print_function

import os
import select
import socket
import threading

from pytest import fixture
try:
import selectors
except ImportError:
import selectors2 as selectors

from pytest import fixture

from diraccfg import CFG
from DIRAC.Core.Security.test.x509TestUtilities import CERTDIR, USERCERT, getCertOption
Expand Down Expand Up @@ -105,10 +108,10 @@ def __acceptIncomingConnection(self):
"""
This method just gets the incoming connection, and handle it, once.
"""
sockets = [self.transport.getSocket()]

try:
_inList, _outList, _exList = select.select(sockets, [], [], 2)
sel = selectors.DefaultSelector()
sel.register(self.transport.getSocket(), selectors.EVENT_READ)
assert sel.select(timeout=2)
result = self.transport.acceptConnection()
assert result["OK"], result
clientTransport = result['Value']
Expand Down
23 changes: 16 additions & 7 deletions src/DIRAC/Core/Utilities/Subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@
from multiprocessing import Process, Manager
import threading
import time
import select
import os
import sys
import subprocess32 as subprocess
import signal
import psutil

try:
import selectors
except ImportError:
import selectors2 as selectors

# Very Important:
# Here we can not import directly from DIRAC, since this file it is imported
# at initialization time therefore the full path is necessary
Expand Down Expand Up @@ -238,11 +242,11 @@ def __selectFD(self, readSeq, timeout=False):
pass
if not validList:
return False
if self.timeout and not timeout:
timeout = self.timeout
if not timeout:
return select.select(validList, [], [])[0]
return select.select(validList, [], [], timeout)[0]
sel = selectors.DefaultSelector()
for socket in validList:
sel.register(socket, selectors.EVENT_READ)
events = sel.select(timeout=timeout or self.timeout or None)
return [key.fileobj for key, event in events if event & selectors.EVENT_READ]

def __killPid(self, pid, sig=9):
""" send signal :sig: to process :pid:
Expand Down Expand Up @@ -372,7 +376,12 @@ def __readFromFile(self, fd, baseLength):
try:
dataString = ""
fn = fd.fileno()
while fd in select.select([fd], [], [], 1)[0]:

sel = selectors.DefaultSelector()
sel.register(fd, selectors.EVENT_READ)
while True:
if not sel.select(timeout=1):
break
if isinstance(fn, int):
nB = os.read(fn, self.bufferLimit)
else:
Expand Down