Skip to content

Commit eb4f029

Browse files
committed
定时任务改成redis
1 parent e5985c0 commit eb4f029

File tree

3 files changed

+12
-12
lines changed

3 files changed

+12
-12
lines changed

app/scheduler/scheduler.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
1-
from apscheduler.triggers.cron import CronTrigger
2-
from funboost import funboost_aps_scheduler
1+
from funboost.timing_job.apscheduler_use_redis_store import funboost_background_scheduler_redis_store
32

43
from core.lib import logger
54

65
LOGGER = logger.for_service('scheduler')
76

87

98
def register_scheduler(app):
10-
funboost_aps_scheduler.start()
9+
funboost_background_scheduler_redis_store.start()

app/tasks/common.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,29 +6,29 @@
66

77
class FunboostCommonConfig(BoosterParams):
88
# 中间件选型见3.1章节 https://funboost.readthedocs.io/zh/latest/articles/c3.html
9-
broker_kind: str = BrokerEnum.SQLITE_QUEUE
9+
broker_kind: BrokerEnum = BrokerEnum.REDIS_STREAM
1010
# 最大自动重试次数
1111
max_retry_times: int = 4
1212
# 函数出错后间隔多少秒再重试.
1313
retry_interval: typing.Union[float, int] = 30
1414
# 消费者和发布者的日志级别
15-
log_level: int = logging.INFO
15+
log_level: int = logging.DEBUG
1616
# 是否使用分布式控频
17-
is_using_distributed_frequency_control: bool = False
17+
is_using_distributed_frequency_control: bool = True
1818
# # 函数达到最大重试次数仍然没成功,是否发送到死信队列,死信队列的名字是 队列名字 + _dlx。
1919
is_push_to_dlx_queue_when_retry_max_times: bool = True
2020
# 任务过滤的失效期,为0则永久性过滤任务。例如设置过滤过期时间是1800秒 , 30分钟前发布过1 + 2 的任务,现在仍然执行,如果是30分钟以内发布过这个任务,则不执行1 + 2
2121
task_filtering_expire_seconds: int = 0
2222
# 消息过期时间,为0永不过期,为10则代表,10秒之前发布的任务如果现在才轮到消费则丢弃任务。
23-
msg_expire_senconds: int = 120
23+
msg_expire_senconds: int = 20
2424
# # 是否对函数入参进行过滤去重.
2525
do_task_filtering: bool = False
2626
# 运行时候,是否记录从消息队列获取出来的消息内容
2727
is_show_message_get_from_broker: bool = True
2828
# 提供一个用户自定义的保存消息处理记录到某个地方例如mysql数据库的函数,函数仅仅接受一个入参,入参类型是 FunctionResultStatus,用户可以打印参数
2929
# user_custom_record_process_info_func: typing.Callable = save_result_status_to_sqlalchemy
3030
# 是否将发布者的心跳发送到redis,有些功能的实现需要统计活跃消费者。因为有的中间件不是真mq。这个功能,需要安装redis.
31-
is_send_consumer_hearbeat_to_redis: bool = False
31+
is_send_consumer_hearbeat_to_redis: bool = True
3232
# 是否支持远程任务杀死功能,如果任务数量少,单个任务耗时长,确实需要远程发送命令来杀死正在运行的函数,才设置为true,否则不建议开启此功能。
3333
is_support_remote_kill_task: bool = False
3434
function_timeout: typing.Union[int, float] = 600

app/tasks/task_syncer.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
from apscheduler.triggers.cron import CronTrigger
77
from funboost import boost, funboost_aps_scheduler, funboost_current_task
8+
from funboost.timing_job.apscheduler_use_redis_store import funboost_background_scheduler_redis_store
89
from nb_time import NbTime
910

1011
from app.model.syncer_model import Jobs, DiscoveryInstance, Registration
@@ -51,7 +52,7 @@ def get_gateway_client(name: str) -> Gateway:
5152

5253

5354
def clear_client():
54-
funboost_aps_scheduler.remove_all_jobs()
55+
funboost_background_scheduler_redis_store.remove_all_jobs()
5556
from app.model.config import discovery_clients, gateway_clients
5657

5758
discovery_clients.clear()
@@ -206,18 +207,18 @@ def reload():
206207
Jobs(**target.dict()).save_or_update(sqla_helper)
207208
# 注册定时任务
208209
if next_run_time:
209-
funboost_aps_scheduler.add_push_job(syncer, id=target.id, name=target.id,
210+
funboost_background_scheduler_redis_store.add_push_job(syncer, id=target.id, name=target.id,
210211
next_run_time=next_run_time,
211212
kwargs={"target": target.model_dump()}, replace_existing=True)
212213
else:
213-
funboost_aps_scheduler.add_push_job(syncer, id=target.id, name=target.id,
214+
funboost_background_scheduler_redis_store.add_push_job(syncer, id=target.id, name=target.id,
214215
trigger=CronTrigger(second=second, minute=minute, hour=hour,
215216
day=day, month=month,
216217
day_of_week=day_of_week),
217218
kwargs={"target": target.model_dump()}, replace_existing=True)
218219
# 健康检查
219220
if target.healthcheck:
220-
funboost_aps_scheduler.add_push_job(health_check, id="health-check",
221+
funboost_background_scheduler_redis_store.add_push_job(health_check, id="health-check",
221222
name="health-check",
222223
trigger=CronTrigger(second=second, minute=minute, hour=hour,
223224
day=day, month=month,

0 commit comments

Comments
 (0)