# -*- coding: utf-8 -*-### License: AGPLv3# This file is part of eduMFA. eduMFA is a fork of privacyIDEA which was forked from LinOTP.# Copyright (c) 2024 eduMFA Project-Team# Previous authors by privacyIDEA project:## 2018 Friedrich Weber <friedrich.weber@netknights.it>## This code is free software; you can redistribute it and/or# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE# License as published by the Free Software Foundation; either# version 3 of the License, or any later version.## This code is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU AFFERO GENERAL PUBLIC LICENSE for more details.## You should have received a copy of the GNU Affero General Public# License along with this program. If not, see <http://www.gnu.org/licenses/>.#importloggingfromhueyimportRedisHueyfromedumfa.lib.queues.baseimportBaseQueue,QueueErrorlog=logging.getLogger(__name__)
[docs]defregister_job(self,name,func):ifnameinself._jobs:raiseQueueError("Job function {!r} already exists".format(name))self._jobs[name]=self._huey.task(name=name)(func)
[docs]defenqueue(self,name,args,kwargs):ifnamenotinself._jobs:raiseQueueError("Unknown job: {!r}".format(name))log.info("Sending {!r} job to the queue ...".format(name))# We do not care about resultsself._jobs[name](*args,**kwargs)