From 87a8d2b9ae5e6d33ce96b4200e0afd78edcb2cfb Mon Sep 17 00:00:00 2001 From: Peter Law Date: Fri, 21 May 2021 19:36:17 +0100 Subject: [PATCH 01/15] Add a mechanism to pause work on (redis) queues This can be specified either as a duration or a time to un-pause, though either way the internal implementation relies on Redis' SETEX command. Both Redis backends are supported and use the same approach for compatibility. --- django_lightweight_queue/backends/redis.py | 38 ++++++++ .../backends/reliable_redis.py | 39 +++++++- .../management/commands/queue_pause.py | 92 +++++++++++++++++++ django_lightweight_queue/utils.py | 35 ++++++- tests/test_reliable_redis_backend.py | 22 +++++ 5 files changed, 224 insertions(+), 2 deletions(-) create mode 100644 django_lightweight_queue/management/commands/queue_pause.py diff --git a/django_lightweight_queue/backends/redis.py b/django_lightweight_queue/backends/redis.py index 7eb44e3..126d438 100644 --- a/django_lightweight_queue/backends/redis.py +++ b/django_lightweight_queue/backends/redis.py @@ -1,8 +1,13 @@ +import datetime + import redis from .. import app_settings from ..job import Job from .base import BaseBackend +from ..utils import block_for_time + +QueueName = str class RedisBackend(BaseBackend): @@ -20,6 +25,15 @@ def enqueue(self, job, queue): self.client.lpush(self._key(queue), job.to_json().encode('utf-8')) def dequeue(self, queue, worker_num, timeout): + if self.is_paused(queue): + # Block for a while to avoid constant polling ... + block_for_time( + lambda: self.is_paused(queue), + timeout=datetime.timedelta(minutes=5), + ) + # ... but always indicate that we did no work + return None + try: _, data = self.client.brpop(self._key(queue), timeout) @@ -30,6 +44,27 @@ def dequeue(self, queue, worker_num, timeout): def length(self, queue): return self.client.llen(self._key(queue)) + def pause(self, queue: QueueName, until: datetime.datetime) -> None: + """ + Pause the given queue by setting a pause marker. + """ + + pause_key = self._pause_key(queue) + + now = datetime.datetime.now(datetime.timezone.utc) + delta = until - now + + self.client.setex( + pause_key, + time=int(delta.total_seconds()), + # Store the value for debugging, we rely on setex behaviour for + # implementation. + value=until.isoformat(' '), + ) + + def is_paused(self, queue: QueueName) -> bool: + return self.client.exists(self._pause_key(queue)) + def _key(self, queue): if app_settings.REDIS_PREFIX: return '{}:django_lightweight_queue:{}'.format( @@ -38,3 +73,6 @@ def _key(self, queue): ) return 'django_lightweight_queue:{}'.format(queue) + + def _pause_key(self, queue: QueueName) -> str: + return self._key(queue) + ':pause' diff --git a/django_lightweight_queue/backends/reliable_redis.py b/django_lightweight_queue/backends/reliable_redis.py index 18e5667..65389d1 100644 --- a/django_lightweight_queue/backends/reliable_redis.py +++ b/django_lightweight_queue/backends/reliable_redis.py @@ -1,11 +1,15 @@ +import datetime + import redis from .. import app_settings from ..job import Job from .base import BaseBackend -from ..utils import get_worker_numbers +from ..utils import block_for_time, get_worker_numbers from ..progress_logger import NULL_PROGRESS_LOGGER +QueueName = str + class ReliableRedisBackend(BaseBackend): """ @@ -86,6 +90,15 @@ def dequeue(self, queue, worker_number, timeout): main_queue_key = self._key(queue) processing_queue_key = self._processing_key(queue, worker_number) + if self.is_paused(queue): + # Block for a while to avoid constant polling ... + block_for_time( + lambda: self.is_paused(queue), + timeout=datetime.timedelta(minutes=5), + ) + # ... but always indicate that we did no work + return None + # Get any jobs off our 'processing' queue - but do not block doing so - # this is to catch the fact there may be a job already in our # processing queue if this worker crashed and has just been restarted. @@ -163,11 +176,35 @@ def deduplicate(self, queue, *, progress_logger=NULL_PROGRESS_LOGGER): return original_size, self.client.llen(main_queue_key) + def pause(self, queue: QueueName, until: datetime.datetime) -> None: + """ + Pause the given queue by setting a pause marker. + """ + + pause_key = self._pause_key(queue) + + now = datetime.datetime.now(datetime.timezone.utc) + delta = until - now + + self.client.setex( + pause_key, + time=int(delta.total_seconds()), + # Store the value for debugging, we rely on setex behaviour for + # implementation. + value=until.isoformat(' '), + ) + + def is_paused(self, queue: QueueName) -> bool: + return self.client.exists(self._pause_key(queue)) + def _key(self, queue): key = 'django_lightweight_queue:{}'.format(queue) return self._prefix_key(key) + def _pause_key(self, queue: QueueName) -> str: + return self._key(queue) + ':pause' + def _processing_key(self, queue, worker_number): key = 'django_lightweight_queue:{}:processing:{}'.format( queue, diff --git a/django_lightweight_queue/management/commands/queue_pause.py b/django_lightweight_queue/management/commands/queue_pause.py new file mode 100644 index 0000000..4924068 --- /dev/null +++ b/django_lightweight_queue/management/commands/queue_pause.py @@ -0,0 +1,92 @@ +import re +import argparse +import datetime + +from django.core.management.base import BaseCommand, CommandError + +from ...utils import get_backend + +QueueName = str + +DURATION_PATTERN = r'^((?P\d+)h)?((?P\d+)m)?((?P\d+)s)?$' +TIME_FORMAT = r'%Y-%m-%dT%H:%M:%S%z' + + +def utcnow() -> datetime.datetime: + return datetime.datetime.now(datetime.timezone.utc) + + +def parse_duration_to_time(duration: str) -> datetime.datetime: + match = re.match(DURATION_PATTERN, duration) + if match is None: + raise ValueError( + f"Unknown duration format {duration!r}. Try something like '1h2m3s'.", + ) + + delta = datetime.timedelta( + hours=int(match['hours'] or 0), + minutes=int(match['minutes'] or 0), + seconds=int(match['seconds'] or 0), + ) + + return utcnow() + delta + + +def parse_time(date_string: str) -> datetime.datetime: + return datetime.datetime.strptime(date_string, TIME_FORMAT) + + +class Command(BaseCommand): + help = """ + Command to pause work on a redis-backed queue. + + New jobs can still be added to the queue, however no jobs will be pulled off + the queue for processing. + """ # noqa:A003 # inherited name + + def add_arguments(self, parser: argparse.ArgumentParser) -> None: + parser.add_argument( + 'queue', + action='store', + help="The queue to pause.", + ) + + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument( + '--for', + dest='until', + action='store', + type=parse_duration_to_time, + help=( + "The duration to pause the queue for. Specify a value like 1h2m3s, " + "all levels of precision are optional, so 5m and 1h are both valid." + ), + ) + group.add_argument( + '--until', + action='store', + type=parse_time, + help=( + "The time at which the queue should reactivate. Specify as an " + "ISO 8601 value, specifically one parsable via datetime.strptime " + f"using {TIME_FORMAT!r}." + ), + ) + + def handle(self, queue: QueueName, until: datetime.datetime, **options: object) -> None: + if until < utcnow(): + raise CommandError("Refusing to pause until a time in the past.") + + backend = get_backend(queue) + + if not hasattr(backend, 'pause'): + raise CommandError( + "Configured backend '{}.{}' doesn't support pausing".format( + type(backend).__module__, + type(backend).__name__, + ), + ) + + backend.pause(queue, until) + + self.stdout.write(f"Paused queue {queue} until {until.isoformat(' ')}.") diff --git a/django_lightweight_queue/utils.py b/django_lightweight_queue/utils.py index f97bbaa..ee79ffe 100644 --- a/django_lightweight_queue/utils.py +++ b/django_lightweight_queue/utils.py @@ -1,7 +1,9 @@ import imp +import time +import datetime import warnings import importlib -from typing import Mapping, Collection +from typing import Mapping, Callable, Collection from functools import lru_cache from django.apps import apps @@ -12,6 +14,8 @@ _accepting_implied_queues = True +THIRTY_SECONDS = datetime.timedelta(seconds=30) + def load_extra_config(file_path): extra_settings = imp.load_source('extra_settings', file_path) @@ -124,6 +128,35 @@ def load_all_tasks(): import_all_submodules('tasks', app_settings.IGNORE_APPS) +def block_for_time( + should_continue_blocking: Callable[[], bool], + timeout: datetime.timedelta, + check_frequency: datetime.timedelta = THIRTY_SECONDS, +) -> bool: + """ + Block until a cancellation function or timeout indicates otherwise. + + Returns whether or not the timeout was encountered. + """ + if not should_continue_blocking: + return False + + end = time.time() + timeout.total_seconds() + + while should_continue_blocking: + now = time.time() + if now > end: + # timed out + return True + + time.sleep(min( + check_frequency.total_seconds(), + end - now, + )) + + return False + + try: import setproctitle diff --git a/tests/test_reliable_redis_backend.py b/tests/test_reliable_redis_backend.py index 869633d..9d5e636 100644 --- a/tests/test_reliable_redis_backend.py +++ b/tests/test_reliable_redis_backend.py @@ -1,5 +1,7 @@ +import time import datetime import unittest +import itertools import contextlib import unittest.mock from typing import Mapping, Iterator @@ -324,3 +326,23 @@ def test_startup_doesnt_move_job_on_known_queue(self): actual_job.as_dict(), "The queue job should be the original one", ) + + def test_pause(self): + QUEUE = 'the-queue' + + self.enqueue_job(QUEUE) + + now = datetime.datetime.now(datetime.timezone.utc) + five_minutes_time = now + datetime.timedelta(minutes=5) + self.backend.pause(QUEUE, five_minutes_time) + + with unittest.mock.patch( + 'time.time', + side_effect=itertools.count(time.time(), 30), + ), unittest.mock.patch( + 'time.sleep', + ) as mock_sleep: + job = self.backend.dequeue(QUEUE, 2, 2) + + self.assertIsNone(job, "Should have indicated no work was done") + self.assertTrue(mock_sleep.called) From c142bbd9ee174bf2e8f28c0d05cab4f41d827a39 Mon Sep 17 00:00:00 2001 From: Peter Law Date: Sat, 22 May 2021 13:27:56 +0100 Subject: [PATCH 02/15] Add a resume command to remove a pause --- django_lightweight_queue/backends/redis.py | 6 ++++ .../backends/reliable_redis.py | 6 ++++ .../management/commands/queue_resume.py | 36 +++++++++++++++++++ 3 files changed, 48 insertions(+) create mode 100644 django_lightweight_queue/management/commands/queue_resume.py diff --git a/django_lightweight_queue/backends/redis.py b/django_lightweight_queue/backends/redis.py index 126d438..327a0cc 100644 --- a/django_lightweight_queue/backends/redis.py +++ b/django_lightweight_queue/backends/redis.py @@ -62,6 +62,12 @@ def pause(self, queue: QueueName, until: datetime.datetime) -> None: value=until.isoformat(' '), ) + def resume(self, queue: QueueName) -> None: + """ + Resume the given queue by deleting the pause marker (if present). + """ + self.client.delete(self._pause_key(queue)) + def is_paused(self, queue: QueueName) -> bool: return self.client.exists(self._pause_key(queue)) diff --git a/django_lightweight_queue/backends/reliable_redis.py b/django_lightweight_queue/backends/reliable_redis.py index 65389d1..8060fa6 100644 --- a/django_lightweight_queue/backends/reliable_redis.py +++ b/django_lightweight_queue/backends/reliable_redis.py @@ -194,6 +194,12 @@ def pause(self, queue: QueueName, until: datetime.datetime) -> None: value=until.isoformat(' '), ) + def resume(self, queue: QueueName) -> None: + """ + Resume the given queue by deleting the pause marker (if present). + """ + self.client.delete(self._pause_key(queue)) + def is_paused(self, queue: QueueName) -> bool: return self.client.exists(self._pause_key(queue)) diff --git a/django_lightweight_queue/management/commands/queue_resume.py b/django_lightweight_queue/management/commands/queue_resume.py new file mode 100644 index 0000000..a9f865a --- /dev/null +++ b/django_lightweight_queue/management/commands/queue_resume.py @@ -0,0 +1,36 @@ +import argparse + +from django.core.management.base import BaseCommand, CommandError + +from ...utils import get_backend + +QueueName = str + + +class Command(BaseCommand): + help = """ + Command to resume work immediately on a redis-backed queue. + + This removes a pause which may be in place for the given queue, though it + may not cause workers to resume work immediately. + """ # noqa:A003 # inherited name + + def add_arguments(self, parser: argparse.ArgumentParser) -> None: + parser.add_argument( + 'queue', + action='store', + help="The queue to resume.", + ) + + def handle(self, queue: QueueName, **options: object) -> None: + backend = get_backend(queue) + + if not hasattr(backend, 'resume'): + raise CommandError( + "Configured backend '{}.{}' doesn't support pausing".format( + type(backend).__module__, + type(backend).__name__, + ), + ) + + backend.resume(queue) From 774cd39915f056fe2e24ded568f1a7225f03c216 Mon Sep 17 00:00:00 2001 From: Peter Law Date: Tue, 15 Jun 2021 18:29:27 +0100 Subject: [PATCH 03/15] Use this check callback correctly --- django_lightweight_queue/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/django_lightweight_queue/utils.py b/django_lightweight_queue/utils.py index ee79ffe..c1bad5c 100644 --- a/django_lightweight_queue/utils.py +++ b/django_lightweight_queue/utils.py @@ -138,12 +138,12 @@ def block_for_time( Returns whether or not the timeout was encountered. """ - if not should_continue_blocking: + if not should_continue_blocking(): return False end = time.time() + timeout.total_seconds() - while should_continue_blocking: + while should_continue_blocking(): now = time.time() if now > end: # timed out From 8a263c8df2447c2febf6eda7367ca96c3606d088 Mon Sep 17 00:00:00 2001 From: Peter Law Date: Tue, 15 Jun 2021 18:43:20 +0100 Subject: [PATCH 04/15] Respect the timeout passed through by the worker even when paused --- django_lightweight_queue/backends/redis.py | 2 +- django_lightweight_queue/backends/reliable_redis.py | 2 +- django_lightweight_queue/utils.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/django_lightweight_queue/backends/redis.py b/django_lightweight_queue/backends/redis.py index 327a0cc..5d75910 100644 --- a/django_lightweight_queue/backends/redis.py +++ b/django_lightweight_queue/backends/redis.py @@ -29,7 +29,7 @@ def dequeue(self, queue, worker_num, timeout): # Block for a while to avoid constant polling ... block_for_time( lambda: self.is_paused(queue), - timeout=datetime.timedelta(minutes=5), + timeout=datetime.timedelta(seconds=timeout), ) # ... but always indicate that we did no work return None diff --git a/django_lightweight_queue/backends/reliable_redis.py b/django_lightweight_queue/backends/reliable_redis.py index 8060fa6..400bd84 100644 --- a/django_lightweight_queue/backends/reliable_redis.py +++ b/django_lightweight_queue/backends/reliable_redis.py @@ -94,7 +94,7 @@ def dequeue(self, queue, worker_number, timeout): # Block for a while to avoid constant polling ... block_for_time( lambda: self.is_paused(queue), - timeout=datetime.timedelta(minutes=5), + timeout=datetime.timedelta(seconds=timeout), ) # ... but always indicate that we did no work return None diff --git a/django_lightweight_queue/utils.py b/django_lightweight_queue/utils.py index c1bad5c..5bbe6e4 100644 --- a/django_lightweight_queue/utils.py +++ b/django_lightweight_queue/utils.py @@ -14,7 +14,7 @@ _accepting_implied_queues = True -THIRTY_SECONDS = datetime.timedelta(seconds=30) +FIVE_SECONDS = datetime.timedelta(seconds=5) def load_extra_config(file_path): @@ -131,7 +131,7 @@ def load_all_tasks(): def block_for_time( should_continue_blocking: Callable[[], bool], timeout: datetime.timedelta, - check_frequency: datetime.timedelta = THIRTY_SECONDS, + check_frequency: datetime.timedelta = FIVE_SECONDS, ) -> bool: """ Block until a cancellation function or timeout indicates otherwise. From b6d750ad5134b78c1307c8276d02f231c6232a54 Mon Sep 17 00:00:00 2001 From: Peter Law Date: Tue, 15 Jun 2021 18:46:09 +0100 Subject: [PATCH 05/15] Fix test now the default wait is shorter --- tests/test_reliable_redis_backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_reliable_redis_backend.py b/tests/test_reliable_redis_backend.py index 9d5e636..48f6233 100644 --- a/tests/test_reliable_redis_backend.py +++ b/tests/test_reliable_redis_backend.py @@ -338,7 +338,7 @@ def test_pause(self): with unittest.mock.patch( 'time.time', - side_effect=itertools.count(time.time(), 30), + side_effect=itertools.count(time.time(), 1), ), unittest.mock.patch( 'time.sleep', ) as mock_sleep: From b8ad26ce794ac6fd5e17b87623c988495c148344 Mon Sep 17 00:00:00 2001 From: Peter Law Date: Tue, 15 Jun 2021 18:47:30 +0100 Subject: [PATCH 06/15] Correct this error message --- django_lightweight_queue/management/commands/queue_resume.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/django_lightweight_queue/management/commands/queue_resume.py b/django_lightweight_queue/management/commands/queue_resume.py index a9f865a..edeb4a6 100644 --- a/django_lightweight_queue/management/commands/queue_resume.py +++ b/django_lightweight_queue/management/commands/queue_resume.py @@ -27,7 +27,7 @@ def handle(self, queue: QueueName, **options: object) -> None: if not hasattr(backend, 'resume'): raise CommandError( - "Configured backend '{}.{}' doesn't support pausing".format( + "Configured backend '{}.{}' doesn't support resuming from paused".format( type(backend).__module__, type(backend).__name__, ), From 602c7b4b737c7972dd7c8b17600a5134f886bd31 Mon Sep 17 00:00:00 2001 From: Peter Law Date: Sun, 12 Sep 2021 14:36:11 +0100 Subject: [PATCH 07/15] Use extension backends to signal pause/resume capability --- django_lightweight_queue/backends/base.py | 15 +++++++++++++++ django_lightweight_queue/backends/redis.py | 4 ++-- .../backends/reliable_redis.py | 4 ++-- .../management/commands/queue_pause.py | 3 ++- .../management/commands/queue_resume.py | 3 ++- 5 files changed, 23 insertions(+), 6 deletions(-) diff --git a/django_lightweight_queue/backends/base.py b/django_lightweight_queue/backends/base.py index 4ba9b8a..3fa6bae 100644 --- a/django_lightweight_queue/backends/base.py +++ b/django_lightweight_queue/backends/base.py @@ -1,7 +1,10 @@ +import datetime from abc import ABCMeta, abstractmethod from ..job import Job +QueueName = str + class BaseBackend(metaclass=ABCMeta): def startup(self, queue: str) -> None: @@ -21,3 +24,15 @@ def length(self, queue: str) -> int: def processed_job(self, queue: str, worker_num: int, job: Job) -> None: pass + + +class BackendWithPause(BaseBackend, metaclass=ABCMeta): + @abstractmethod + def pause(self, queue: QueueName, until: datetime.datetime) -> None: + raise NotImplementedError() + + +class BackendWithPauseResume(BackendWithPause, metaclass=ABCMeta): + @abstractmethod + def resume(self, queue: QueueName) -> None: + raise NotImplementedError() diff --git a/django_lightweight_queue/backends/redis.py b/django_lightweight_queue/backends/redis.py index 5d75910..84ca6e3 100644 --- a/django_lightweight_queue/backends/redis.py +++ b/django_lightweight_queue/backends/redis.py @@ -4,13 +4,13 @@ from .. import app_settings from ..job import Job -from .base import BaseBackend +from .base import BackendWithPauseResume from ..utils import block_for_time QueueName = str -class RedisBackend(BaseBackend): +class RedisBackend(BackendWithPauseResume): """ This backend has at-most-once semantics. """ diff --git a/django_lightweight_queue/backends/reliable_redis.py b/django_lightweight_queue/backends/reliable_redis.py index 400bd84..dc3ff6f 100644 --- a/django_lightweight_queue/backends/reliable_redis.py +++ b/django_lightweight_queue/backends/reliable_redis.py @@ -4,14 +4,14 @@ from .. import app_settings from ..job import Job -from .base import BaseBackend +from .base import BackendWithPauseResume from ..utils import block_for_time, get_worker_numbers from ..progress_logger import NULL_PROGRESS_LOGGER QueueName = str -class ReliableRedisBackend(BaseBackend): +class ReliableRedisBackend(BackendWithPauseResume): """ This backend manages a per-queue-per-worker 'processing' queue. E.g. if we had a queue called 'django_lightweight_queue:things', and two workers, we diff --git a/django_lightweight_queue/management/commands/queue_pause.py b/django_lightweight_queue/management/commands/queue_pause.py index 4924068..fb798d8 100644 --- a/django_lightweight_queue/management/commands/queue_pause.py +++ b/django_lightweight_queue/management/commands/queue_pause.py @@ -5,6 +5,7 @@ from django.core.management.base import BaseCommand, CommandError from ...utils import get_backend +from ...backends.base import BackendWithPause QueueName = str @@ -79,7 +80,7 @@ def handle(self, queue: QueueName, until: datetime.datetime, **options: object) backend = get_backend(queue) - if not hasattr(backend, 'pause'): + if not isinstance(backend, BackendWithPause): raise CommandError( "Configured backend '{}.{}' doesn't support pausing".format( type(backend).__module__, diff --git a/django_lightweight_queue/management/commands/queue_resume.py b/django_lightweight_queue/management/commands/queue_resume.py index edeb4a6..b39bde6 100644 --- a/django_lightweight_queue/management/commands/queue_resume.py +++ b/django_lightweight_queue/management/commands/queue_resume.py @@ -3,6 +3,7 @@ from django.core.management.base import BaseCommand, CommandError from ...utils import get_backend +from ...backends.base import BackendWithPauseResume QueueName = str @@ -25,7 +26,7 @@ def add_arguments(self, parser: argparse.ArgumentParser) -> None: def handle(self, queue: QueueName, **options: object) -> None: backend = get_backend(queue) - if not hasattr(backend, 'resume'): + if not isinstance(backend, BackendWithPauseResume): raise CommandError( "Configured backend '{}.{}' doesn't support resuming from paused".format( type(backend).__module__, From 418272a2ad21fc19dc21e6c7317f9b59a1fa2871 Mon Sep 17 00:00:00 2001 From: Peter Law Date: Sat, 16 Oct 2021 14:18:59 +0100 Subject: [PATCH 08/15] Updates for typing now we have more complete support --- django_lightweight_queue/backends/redis.py | 2 +- django_lightweight_queue/backends/reliable_redis.py | 2 +- django_lightweight_queue/management/commands/queue_pause.py | 3 +-- django_lightweight_queue/management/commands/queue_resume.py | 3 +-- 4 files changed, 4 insertions(+), 6 deletions(-) diff --git a/django_lightweight_queue/backends/redis.py b/django_lightweight_queue/backends/redis.py index d42a88d..bd5aadb 100644 --- a/django_lightweight_queue/backends/redis.py +++ b/django_lightweight_queue/backends/redis.py @@ -69,7 +69,7 @@ def resume(self, queue: QueueName) -> None: self.client.delete(self._pause_key(queue)) def is_paused(self, queue: QueueName) -> bool: - return self.client.exists(self._pause_key(queue)) + return bool(self.client.exists(self._pause_key(queue))) def _key(self, queue: QueueName) -> str: if app_settings.REDIS_PREFIX: diff --git a/django_lightweight_queue/backends/reliable_redis.py b/django_lightweight_queue/backends/reliable_redis.py index ab462ee..3cbaba8 100644 --- a/django_lightweight_queue/backends/reliable_redis.py +++ b/django_lightweight_queue/backends/reliable_redis.py @@ -212,7 +212,7 @@ def resume(self, queue: QueueName) -> None: self.client.delete(self._pause_key(queue)) def is_paused(self, queue: QueueName) -> bool: - return self.client.exists(self._pause_key(queue)) + return bool(self.client.exists(self._pause_key(queue))) def _key(self, queue: QueueName) -> str: key = 'django_lightweight_queue:{}'.format(queue) diff --git a/django_lightweight_queue/management/commands/queue_pause.py b/django_lightweight_queue/management/commands/queue_pause.py index fb798d8..80c12e0 100644 --- a/django_lightweight_queue/management/commands/queue_pause.py +++ b/django_lightweight_queue/management/commands/queue_pause.py @@ -4,11 +4,10 @@ from django.core.management.base import BaseCommand, CommandError +from ...types import QueueName from ...utils import get_backend from ...backends.base import BackendWithPause -QueueName = str - DURATION_PATTERN = r'^((?P\d+)h)?((?P\d+)m)?((?P\d+)s)?$' TIME_FORMAT = r'%Y-%m-%dT%H:%M:%S%z' diff --git a/django_lightweight_queue/management/commands/queue_resume.py b/django_lightweight_queue/management/commands/queue_resume.py index b39bde6..54b5849 100644 --- a/django_lightweight_queue/management/commands/queue_resume.py +++ b/django_lightweight_queue/management/commands/queue_resume.py @@ -2,11 +2,10 @@ from django.core.management.base import BaseCommand, CommandError +from ...types import QueueName from ...utils import get_backend from ...backends.base import BackendWithPauseResume -QueueName = str - class Command(BaseCommand): help = """ From 2a123dabb9a7ca17367ab25880e4e390c4b6743f Mon Sep 17 00:00:00 2001 From: Peter Law Date: Sat, 16 Oct 2021 14:21:44 +0100 Subject: [PATCH 09/15] Remove unused import --- django_lightweight_queue/backends/redis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/django_lightweight_queue/backends/redis.py b/django_lightweight_queue/backends/redis.py index bd5aadb..1920d20 100644 --- a/django_lightweight_queue/backends/redis.py +++ b/django_lightweight_queue/backends/redis.py @@ -5,7 +5,7 @@ from .. import app_settings from ..job import Job -from .base import BaseBackend, BackendWithPauseResume +from .base import BackendWithPauseResume from ..types import QueueName, WorkerNumber from ..utils import block_for_time From 1af2e9ee0f1c6799e34315433d7a7293185a60b2 Mon Sep 17 00:00:00 2001 From: Peter Law Date: Sat, 16 Oct 2021 14:49:01 +0100 Subject: [PATCH 10/15] Ensure this help text can be rendered This needs to escape % characters which argparse will otherwise try to use as placeholders. --- django_lightweight_queue/management/commands/queue_pause.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/django_lightweight_queue/management/commands/queue_pause.py b/django_lightweight_queue/management/commands/queue_pause.py index 80c12e0..79c2c0b 100644 --- a/django_lightweight_queue/management/commands/queue_pause.py +++ b/django_lightweight_queue/management/commands/queue_pause.py @@ -69,7 +69,7 @@ def add_arguments(self, parser: argparse.ArgumentParser) -> None: help=( "The time at which the queue should reactivate. Specify as an " "ISO 8601 value, specifically one parsable via datetime.strptime " - f"using {TIME_FORMAT!r}." + f"using {TIME_FORMAT.replace('%', r'%%')!r}." ), ) From 5fbf6ef7a65a1420cd974e3d9e7c8246cf2b69b1 Mon Sep 17 00:00:00 2001 From: Peter Law Date: Sat, 16 Oct 2021 14:49:40 +0100 Subject: [PATCH 11/15] Require that pausable backends implement is_paused too This is needed for testing and will be useful for introspection as well. --- django_lightweight_queue/backends/base.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/django_lightweight_queue/backends/base.py b/django_lightweight_queue/backends/base.py index b6a4ac5..73d6148 100644 --- a/django_lightweight_queue/backends/base.py +++ b/django_lightweight_queue/backends/base.py @@ -47,6 +47,10 @@ class BackendWithPause(BaseBackend, metaclass=ABCMeta): def pause(self, queue: QueueName, until: datetime.datetime) -> None: raise NotImplementedError() + @abstractmethod + def is_paused(self, queue: QueueName) -> bool: + raise NotImplementedError() + class BackendWithPauseResume(BackendWithPause, metaclass=ABCMeta): @abstractmethod From 288980ae03e3f42e8f8a7bab2e5d7f586a81459a Mon Sep 17 00:00:00 2001 From: Peter Law Date: Sat, 16 Oct 2021 14:52:31 +0100 Subject: [PATCH 12/15] Clarify this help message --- django_lightweight_queue/management/commands/queue_pause.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/django_lightweight_queue/management/commands/queue_pause.py b/django_lightweight_queue/management/commands/queue_pause.py index 79c2c0b..58ce46b 100644 --- a/django_lightweight_queue/management/commands/queue_pause.py +++ b/django_lightweight_queue/management/commands/queue_pause.py @@ -69,7 +69,7 @@ def add_arguments(self, parser: argparse.ArgumentParser) -> None: help=( "The time at which the queue should reactivate. Specify as an " "ISO 8601 value, specifically one parsable via datetime.strptime " - f"using {TIME_FORMAT.replace('%', r'%%')!r}." + f"using {TIME_FORMAT.replace('%', r'%%')!r}, such as {utcnow():{TIME_FORMAT}}." ), ) From e7c96a932cbe019af9cb81f00ceeaaea1fb3d4ae Mon Sep 17 00:00:00 2001 From: Peter Law Date: Sat, 16 Oct 2021 15:49:37 +0100 Subject: [PATCH 13/15] Add testing of the pause & resume commands --- poetry.lock | 32 +++++++++- pyproject.toml | 1 + tests/settings.py | 4 ++ tests/test_pause_resume.py | 120 +++++++++++++++++++++++++++++++++++++ 4 files changed, 156 insertions(+), 1 deletion(-) create mode 100644 tests/test_pause_resume.py diff --git a/poetry.lock b/poetry.lock index eda46e2..1f5c2bd 100644 --- a/poetry.lock +++ b/poetry.lock @@ -207,6 +207,17 @@ python-versions = ">=3.6" flake8 = ">=3.0,<3.2.0 || >3.2.0,<4" importlib-metadata = {version = "*", markers = "python_version < \"3.8\""} +[[package]] +name = "freezegun" +version = "1.1.0" +description = "Let your Python tests travel through time" +category = "dev" +optional = false +python-versions = ">=3.5" + +[package.dependencies] +python-dateutil = ">=2.7" + [[package]] name = "importlib-metadata" version = "4.0.1" @@ -299,6 +310,17 @@ category = "dev" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" +[[package]] +name = "python-dateutil" +version = "2.8.2" +description = "Extensions to the standard Python datetime module" +category = "dev" +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" + +[package.dependencies] +six = ">=1.5" + [[package]] name = "pytz" version = "2021.1" @@ -411,7 +433,7 @@ redis = ["redis"] [metadata] lock-version = "1.1" python-versions = ">=3.6" -content-hash = "225614c9b143973d7f9de425fe30723c8b7156dcd998ee69cb79c92e5469b350" +content-hash = "44461f804b2d6634e38d568b14367ddff0336c3987ee1a38af86cd0dec0cccd7" [metadata.files] asgiref = [ @@ -477,6 +499,10 @@ flake8-tidy-imports = [ {file = "flake8-tidy-imports-4.2.1.tar.gz", hash = "sha256:52e5f2f987d3d5597538d5941153409ebcab571635835b78f522c7bf03ca23bc"}, {file = "flake8_tidy_imports-4.2.1-py3-none-any.whl", hash = "sha256:76e36fbbfdc8e3c5017f9a216c2855a298be85bc0631e66777f4e6a07a859dc4"}, ] +freezegun = [ + {file = "freezegun-1.1.0-py2.py3-none-any.whl", hash = "sha256:2ae695f7eb96c62529f03a038461afe3c692db3465e215355e1bb4b0ab408712"}, + {file = "freezegun-1.1.0.tar.gz", hash = "sha256:177f9dd59861d871e27a484c3332f35a6e3f5d14626f2bf91be37891f18927f3"}, +] importlib-metadata = [ {file = "importlib_metadata-4.0.1-py3-none-any.whl", hash = "sha256:d7eb1dea6d6a6086f8be21784cc9e3bcfa55872b52309bc5fad53a8ea444465d"}, {file = "importlib_metadata-4.0.1.tar.gz", hash = "sha256:8c501196e49fb9df5df43833bdb1e4328f64847763ec8a50703148b73784d581"}, @@ -521,6 +547,10 @@ pyflakes = [ {file = "pyflakes-2.3.1-py2.py3-none-any.whl", hash = "sha256:7893783d01b8a89811dd72d7dfd4d84ff098e5eed95cfa8905b22bbffe52efc3"}, {file = "pyflakes-2.3.1.tar.gz", hash = "sha256:f5bc8ecabc05bb9d291eb5203d6810b49040f6ff446a756326104746cc00c1db"}, ] +python-dateutil = [ + {file = "python-dateutil-2.8.2.tar.gz", hash = "sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86"}, + {file = "python_dateutil-2.8.2-py2.py3-none-any.whl", hash = "sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9"}, +] pytz = [ {file = "pytz-2021.1-py2.py3-none-any.whl", hash = "sha256:eb10ce3e7736052ed3623d49975ce333bcd712c7bb19a58b9e2089d4057d0798"}, {file = "pytz-2021.1.tar.gz", hash = "sha256:83a4a90894bf38e243cf052c8b58f381bfe9a7a483f6a9cab140bc7f702ac4da"}, diff --git a/pyproject.toml b/pyproject.toml index 5cc88fe..747731c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,7 @@ progress = ["tqdm"] [tool.poetry.dev-dependencies] # Testing tools fakeredis = "^1.1.0" +freezegun = "^1.1.0" # Linting tools flake8 = "^3.8.0" diff --git a/tests/settings.py b/tests/settings.py index f16db1a..aaed1d9 100644 --- a/tests/settings.py +++ b/tests/settings.py @@ -1,3 +1,7 @@ SECRET_KEY = 'very-secret-value' LIGHTWEIGHT_QUEUE_REDIS_PREFIX = 'tests:' + +INSTALLED_APPS = [ + 'django_lightweight_queue', +] diff --git a/tests/test_pause_resume.py b/tests/test_pause_resume.py new file mode 100644 index 0000000..40f19f8 --- /dev/null +++ b/tests/test_pause_resume.py @@ -0,0 +1,120 @@ +import io +import datetime +import unittest +from unittest import mock + +import fakeredis +import freezegun +from django_lightweight_queue.types import QueueName +from django_lightweight_queue.utils import get_backend +from django_lightweight_queue.backends.base import BackendWithPauseResume +from django_lightweight_queue.backends.redis import RedisBackend +from django_lightweight_queue.management.commands.queue_pause import ( + parse_duration_to_time, +) + +from django.core.management import ( + call_command, + CommandError, + get_commands, + load_command_class, +) + + +class PauseResumeTests(unittest.TestCase): + longMessage = True + + def assertPaused(self, queue: QueueName, context: str) -> None: + self.assertTrue( + self.backend.is_paused(queue), + f"{queue} should be pauseed {context}", + ) + + def assertNotPaused(self, queue: QueueName, context: str) -> None: + self.assertFalse( + self.backend.is_paused(queue), + f"{queue} should not be pauseed {context}", + ) + + def setUp(self) -> None: + get_backend.cache_clear() + + redis_patch = mock.patch( + 'redis.StrictRedis', + autospec=True, + return_value=fakeredis.FakeStrictRedis(), + ) + redis_patch.start() + self.addCleanup(redis_patch.stop) + + # Arbitrary choice of backend, just needs to match the one used in tests + self.backend: BackendWithPauseResume = RedisBackend() + + super().setUp() + + # Can't use override_settings due to the copying of the settings values into + # module values at startup. + @mock.patch( + 'django_lightweight_queue.app_settings.BACKEND', + new='django_lightweight_queue.backends.redis.RedisBackend', + ) + def test_pause_resume(self) -> None: + QUEUE = QueueName('test-pauseable-queue') + OTHER_QUEUE = QueueName('other-pauseable-queue') + + self.assertNotPaused(QUEUE, "initially") + self.assertNotPaused(OTHER_QUEUE, "initially") + + when = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(minutes=5) + buffer = io.StringIO() + # Work around https://code.djangoproject.com/ticket/33205 by bypassing + # the argument processing + queue_pause = load_command_class(get_commands()['queue_pause'], 'queue_pause') + defaults = {'no_color': None, 'force_color': None, 'skip_checks': True} + queue_pause.execute(QUEUE, until=when, stdout=buffer, **defaults) + + self.assertPaused(QUEUE, "after being paused") + self.assertNotPaused(OTHER_QUEUE, f"after pausing {QUEUE}") + + self.assertIn(QUEUE, buffer.getvalue()) + self.assertIn(when.isoformat(' '), buffer.getvalue()) + + call_command('queue_resume', QUEUE) + + self.assertNotPaused(QUEUE, "after being resumed") + + def test_pause_resume_unsupported_backend(self) -> None: + QUEUE = QueueName('unsupported-queue') + + when = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(minutes=5) + # Work around https://code.djangoproject.com/ticket/33205 by bypassing + # the argument processing + queue_pause = load_command_class(get_commands()['queue_pause'], 'queue_pause') + with self.assertRaises(CommandError): + queue_pause.handle(QUEUE, until=when) + + with self.assertRaises(CommandError): + call_command('queue_resume', QUEUE) + + @freezegun.freeze_time() + def test_parse_duration(self) -> None: + durations = [ + ('3h', datetime.timedelta(hours=3)), + ('4m', datetime.timedelta(minutes=4)), + ('5s', datetime.timedelta(seconds=5)), + ('6h7s', datetime.timedelta(hours=6, seconds=7)), + ('2m8s', datetime.timedelta(minutes=2, seconds=8)), + ('6h7m', datetime.timedelta(hours=6, minutes=7)), + ('6h7m8s', datetime.timedelta(hours=6, minutes=7, seconds=8)), + ] + + now = datetime.datetime.now(datetime.timezone.utc) + + for duration, expected in durations: + with self.subTest(duration): + actual = parse_duration_to_time(duration) - now + self.assertEqual( + expected, + actual, + f"Wrong duration parsed for {duration}", + ) From 09a9eaf58c92bf5bc6150c2b168e3cd5243cf758 Mon Sep 17 00:00:00 2001 From: Peter Law Date: Mon, 18 Oct 2021 20:07:55 +0100 Subject: [PATCH 14/15] Remove workarounds for issue by using alternative spelling I wasn't previously aware that this spelling worked. It's even preferable as it appears to run the full command line argument text parsing. --- tests/test_pause_resume.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/tests/test_pause_resume.py b/tests/test_pause_resume.py index 40f19f8..078308c 100644 --- a/tests/test_pause_resume.py +++ b/tests/test_pause_resume.py @@ -10,6 +10,7 @@ from django_lightweight_queue.backends.base import BackendWithPauseResume from django_lightweight_queue.backends.redis import RedisBackend from django_lightweight_queue.management.commands.queue_pause import ( + TIME_FORMAT, parse_duration_to_time, ) @@ -66,18 +67,15 @@ def test_pause_resume(self) -> None: self.assertNotPaused(OTHER_QUEUE, "initially") when = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(minutes=5) + when_str = when.strftime(TIME_FORMAT) buffer = io.StringIO() - # Work around https://code.djangoproject.com/ticket/33205 by bypassing - # the argument processing - queue_pause = load_command_class(get_commands()['queue_pause'], 'queue_pause') - defaults = {'no_color': None, 'force_color': None, 'skip_checks': True} - queue_pause.execute(QUEUE, until=when, stdout=buffer, **defaults) + call_command('queue_pause', QUEUE, '--until', when_str, stdout=buffer) self.assertPaused(QUEUE, "after being paused") self.assertNotPaused(OTHER_QUEUE, f"after pausing {QUEUE}") self.assertIn(QUEUE, buffer.getvalue()) - self.assertIn(when.isoformat(' '), buffer.getvalue()) + self.assertIn(when.replace(microsecond=0).isoformat(' '), buffer.getvalue()) call_command('queue_resume', QUEUE) @@ -87,11 +85,9 @@ def test_pause_resume_unsupported_backend(self) -> None: QUEUE = QueueName('unsupported-queue') when = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(minutes=5) - # Work around https://code.djangoproject.com/ticket/33205 by bypassing - # the argument processing - queue_pause = load_command_class(get_commands()['queue_pause'], 'queue_pause') + when_str = when.strftime(TIME_FORMAT) with self.assertRaises(CommandError): - queue_pause.handle(QUEUE, until=when) + call_command('queue_pause', QUEUE, '--until', when_str) with self.assertRaises(CommandError): call_command('queue_resume', QUEUE) From 34428e25f08e0a8775d782844691e72e857ed5e9 Mon Sep 17 00:00:00 2001 From: Peter Law Date: Wed, 1 Dec 2021 17:34:20 +0000 Subject: [PATCH 15/15] Remove unused imports --- tests/test_pause_resume.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/tests/test_pause_resume.py b/tests/test_pause_resume.py index 078308c..d6ae69d 100644 --- a/tests/test_pause_resume.py +++ b/tests/test_pause_resume.py @@ -14,12 +14,7 @@ parse_duration_to_time, ) -from django.core.management import ( - call_command, - CommandError, - get_commands, - load_command_class, -) +from django.core.management import call_command, CommandError class PauseResumeTests(unittest.TestCase):