Source code for edumfa.lib.eventhandler.tokenhandler

# -*- coding: utf-8 -*-
#
# License:  AGPLv3
# This file is part of eduMFA. eduMFA is a fork of privacyIDEA which was forked from LinOTP.
# Copyright (c) 2024 eduMFA Project-Team
# Previous authors by privacyIDEA project:
#
# 2018 Paul Lettich <paul.lettich@netknights.it>
# 2016 - 2018 Cornelius Kölbel <cornelius.koelbel@netknights.it>
#
# (c) 2016. Cornelius Kölbel
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
__doc__ = """This is the event handler module for token actions.
You can attach token actions like enable, disable, delete, unassign,... of the

 * current token
 * all the user's tokens
 * all unassigned tokens
 * all disabled tokens
 * ...
"""

import datetime
import json
import logging

import yaml
from dateutil.tz import tzlocal

from edumfa.api.lib.utils import getParam
from edumfa.lib import _
from edumfa.lib.crypto import generate_password
from edumfa.lib.eventhandler.base import BaseEventHandler
from edumfa.lib.machine import attach_token
from edumfa.lib.realm import get_realms
from edumfa.lib.smsprovider.SMSProvider import get_smsgateway
from edumfa.lib.smtpserver import get_smtpservers
from edumfa.lib.token import (
    add_tokeninfo,
    assign_tokengroup,
    delete_tokeninfo,
    enable_token,
    get_one_token,
    get_token_types,
    init_token,
    remove_token,
    set_count_window,
    set_description,
    set_failcounter,
    set_max_failcount,
    set_pin,
    set_realms,
    set_validity_period_end,
    set_validity_period_start,
    unassign_token,
    unassign_tokengroup,
)
from edumfa.lib.tokenclass import AUTH_DATE_FORMAT, DATE_FORMAT
from edumfa.lib.tokengroup import get_tokengroups
from edumfa.lib.utils import is_true, parse_date, parse_time_offset_from_now

log = logging.getLogger(__name__)


[docs] class ACTION_TYPE: """ Allowed actions """ SET_TOKENREALM = "set tokenrealm" DELETE = "delete" UNASSIGN = "unassign" DISABLE = "disable" ENABLE = "enable" INIT = "enroll" SET_DESCRIPTION = "set description" SET_VALIDITY = "set validity" SET_COUNTWINDOW = "set countwindow" SET_TOKENINFO = "set tokeninfo" SET_FAILCOUNTER = "set failcounter" SET_MAXFAIL = "set max failcount" CHANGE_FAILCOUNTER = "change failcounter" DELETE_TOKENINFO = "delete tokeninfo" SET_RANDOM_PIN = "set random pin" ADD_TOKENGROUP = "add tokengroup" REMOVE_TOKENGROUP = "remove tokengroup" ATTACH_APPLICATION = "attach application"
[docs] class TOKEN_APPLICATIONS: SSH = "ssh" OFFLINE = "offline" LUKS = "luks"
[docs] class VALIDITY: """ Allowed validity options """ START = "valid from" END = "valid till"
[docs] class TokenEventHandler(BaseEventHandler): """ An Eventhandler needs to return a list of actions, which it can handle. It also returns a list of allowed action and conditions It returns an identifier, which can be used in the eventhandlig definitions """ identifier = "Token" description = "This event handler can trigger new actions on tokens." @property def allowed_positions(cls): """ This returns the allowed positions of the event handler definition. :return: list of allowed positions """ return ["post", "pre"] @property def actions(cls): """ This method returns a dictionary of allowed actions and possible options in this handler module. :return: dict with actions """ realm_list = list(get_realms()) actions = { ACTION_TYPE.SET_TOKENREALM: { "realm": { "type": "str", "required": True, "description": _("set a new realm of the token"), "value": realm_list, }, "only_realm": { "type": "bool", "description": _( "The new realm will be the only " "realm of the token. I.e. all " "other realms will be removed " "from this token. If disabled, the " "realm will be added to the token." ), }, }, ACTION_TYPE.DELETE: {}, ACTION_TYPE.UNASSIGN: {}, ACTION_TYPE.DISABLE: {}, ACTION_TYPE.ENABLE: {}, ACTION_TYPE.SET_RANDOM_PIN: { "length": { "type": "int", "required": True, "description": _( "set the PIN of the token to a random PIN of this length." ), "value": list(range(1, 32)), } }, ACTION_TYPE.INIT: { "tokentype": { "type": "str", "required": True, "description": _("Token type to create"), "value": get_token_types(), }, "user": { "type": "bool", "description": _( "Assign token to user in request or to tokenowner." ), }, "realm": { "type": "str", "required": False, "description": _("Set the realm of the newly created token."), "value": realm_list, }, "dynamic_phone": { "type": "bool", "visibleIf": "tokentype", "visibleValue": "sms", "description": _( "Dynamically read the mobile number from the user store." ), }, "dynamic_email": { "type": "bool", "visibleIf": "tokentype", "visibleValue": "email", "description": _( "Dynamically read the email address from the user store." ), }, "smtp_identifier": { "type": "str", "visibleIf": "tokentype", "visibleValue": "email", "description": _( "Use a specific SMTP server configuration for this token." ), "value": [server.config.identifier for server in get_smtpservers()], }, "sms_identifier": { "type": "str", "visibleIf": "tokentype", "visibleValue": "sms", "description": _( "Use a specific SMS gateway configuration for this token." ), "value": [gateway.identifier for gateway in get_smsgateway()], }, "additional_params": { "type": "str", "description": _("A dictionary of additional init parameters."), }, "motppin": { "type": "str", "visibleIf": "tokentype", "visibleValue": "motp", "description": _( "Set the MOTP PIN of the MOTP " "token during enrollment. This " "is a required value for " "enrolling MOTP tokens." ), }, }, ACTION_TYPE.SET_DESCRIPTION: { "description": { "type": "str", "description": _("The new description of the token."), } }, ACTION_TYPE.SET_VALIDITY: { VALIDITY.START: { "type": "str", "description": _( "The token will be valid starting " "at the given date. Can be a fixed " "date or an offset like +10m, " "+24h, +7d." ), }, VALIDITY.END: { "type": "str", "description": _( "The token will be valid until " "the given date. Can be a fixed " "date or an offset like +10m, " "+24h, +7d." ), }, }, ACTION_TYPE.SET_COUNTWINDOW: { "count window": { # TODO: should be "int" but we do not support # this at the moment. "type": "str", "required": True, "description": _("Set the new count window of the token."), } }, ACTION_TYPE.SET_FAILCOUNTER: { "fail counter": { "type": "str", "required": True, "description": _("Set the failcounter of the token."), } }, ACTION_TYPE.CHANGE_FAILCOUNTER: { "change fail counter": { "type": "str", "required": True, "description": _( "Increase or decrease the fail counter of the token. " "Values of +n, -n with n being an integer are accepted." ), } }, ACTION_TYPE.SET_MAXFAIL: { "max failcount": { "type": "str", "required": True, "description": _("Set the maximum failcounter of the token."), } }, ACTION_TYPE.SET_TOKENINFO: { "key": { "type": "str", "required": True, "description": _("Set this tokeninfo key."), }, "value": { "type": "str", "description": _("Set the above key to this value."), }, }, ACTION_TYPE.DELETE_TOKENINFO: { "key": { "type": "str", "required": True, "description": _("Delete this tokeninfo key."), } }, ACTION_TYPE.ADD_TOKENGROUP: { "tokengroup": { "type": "str", "required": True, "description": _("Add a tokengroup to the token."), "value": [tg.name for tg in get_tokengroups()], } }, ACTION_TYPE.REMOVE_TOKENGROUP: { "tokengroup": { "type": "str", "required": True, "description": _("Remove a tokengroup from the token."), "value": [tg.name for tg in get_tokengroups()], } }, ACTION_TYPE.ATTACH_APPLICATION: { "machine ID": { "type": "str", "required": False, "description": _( "The ID of the machine you want to attach the token to" ), }, "service_id": { "type": "str", "required": False, "description": _("Set the service_id for an SSH application."), "visibleIf": "application", "visibleValue": TOKEN_APPLICATIONS.SSH, }, "application": { "type": "str", "required": True, "description": _( "Set a token application like 'offline' or 'SSH'. Note: Not all tokens" " work well with all applications!" ), "value": [ TOKEN_APPLICATIONS.SSH, TOKEN_APPLICATIONS.OFFLINE, TOKEN_APPLICATIONS.LUKS, ], }, "count": { "type": "str", "visibleIf": "application", "visibleValue": TOKEN_APPLICATIONS.OFFLINE, "description": _("The number of offline OTP values available"), "required": False, }, "rounds": { "type": "str", "visibleIf": "application", "visibleValue": TOKEN_APPLICATIONS.OFFLINE, "description": _("The number of rounds for password hashing"), "required": False, }, "user": { "type": "str", "visibleIf": "application", "visibleValue": TOKEN_APPLICATIONS.SSH, "required": False, }, "slot": { "type": "str", "visibleIf": "application", "visibleValue": TOKEN_APPLICATIONS.LUKS, "required": False, }, "partition": { "type": "str", "visibleIf": "application", "visibleValue": TOKEN_APPLICATIONS.LUKS, "required": False, }, }, } return actions
[docs] def do(self, action, options=None): """ This method executes the defined action in the given event. :param action: :param options: Contains the flask parameters g, request, response and the handler_def configuration :type options: dict :return: """ ret = True g = options.get("g") request = options.get("request") response = options.get("response") content = self._get_response_content(response) handler_def = options.get("handler_def") handler_options = handler_def.get("options", {}) serial = ( request.all_data.get("serial") or content.get("detail", {}).get("serial") or g.audit_object.audit_data.get("serial") ) if action.lower() in [ ACTION_TYPE.SET_TOKENREALM, ACTION_TYPE.SET_DESCRIPTION, ACTION_TYPE.DELETE, ACTION_TYPE.DISABLE, ACTION_TYPE.ENABLE, ACTION_TYPE.UNASSIGN, ACTION_TYPE.SET_VALIDITY, ACTION_TYPE.SET_COUNTWINDOW, ACTION_TYPE.SET_TOKENINFO, ACTION_TYPE.SET_FAILCOUNTER, ACTION_TYPE.SET_MAXFAIL, ACTION_TYPE.CHANGE_FAILCOUNTER, ACTION_TYPE.SET_RANDOM_PIN, ACTION_TYPE.DELETE_TOKENINFO, ACTION_TYPE.ADD_TOKENGROUP, ACTION_TYPE.REMOVE_TOKENGROUP, ACTION_TYPE.ATTACH_APPLICATION, ]: if serial: if "," in serial: serials = [t.strip() for t in serial.split(",")] else: serials = [serial] for serial in serials: if action.lower() != ACTION_TYPE.SET_TOKENINFO: log.info("{0!s} for token {1!s}".format(action, serial)) if action.lower() == ACTION_TYPE.SET_TOKENREALM: realm = handler_options.get("realm") only_realm = is_true(handler_options.get("only_realm")) # Set the realm.. log.info( "Setting realm of token {0!s} to {1!s}".format( serial, realm ) ) # Add the token realm set_realms(serial, [realm], add=not only_realm) elif action.lower() == ACTION_TYPE.SET_RANDOM_PIN: # If for any reason we have no value, we default to 6 length = int(handler_options.get("length") or 6) pin = generate_password(size=length) if set_pin(serial, pin): content.setdefault("detail", {})["pin"] = pin options.get("response").data = json.dumps(content) elif action.lower() == ACTION_TYPE.DELETE: remove_token(serial=serial) elif action.lower() == ACTION_TYPE.DISABLE: enable_token(serial, enable=False) elif action.lower() == ACTION_TYPE.ENABLE: enable_token(serial, enable=True) elif action.lower() == ACTION_TYPE.UNASSIGN: unassign_token(serial) elif action.lower() == ACTION_TYPE.SET_DESCRIPTION: description = handler_options.get("description") or "" description, td = parse_time_offset_from_now(description) s_now = (datetime.datetime.now(tzlocal()) + td).strftime( AUTH_DATE_FORMAT ) set_description( serial, description.format( current_time=s_now, now=s_now, client_ip=g.client_ip, ua_browser=request.user_agent.browser, ua_string=request.user_agent.string, ), ) elif action.lower() == ACTION_TYPE.SET_COUNTWINDOW: set_count_window( serial, int(handler_options.get("count window", 50)) ) elif action.lower() == ACTION_TYPE.SET_TOKENINFO: tokeninfo = handler_options.get("value") or "" tokeninfo, td = parse_time_offset_from_now(tokeninfo) s_now = (datetime.datetime.now(tzlocal()) + td).strftime( AUTH_DATE_FORMAT ) try: username = request.User.loginname realm = request.User.realm except Exception: username = "N/A" realm = "N/A" add_tokeninfo( serial, handler_options.get("key"), tokeninfo.format( current_time=s_now, now=s_now, client_ip=g.client_ip, username=username, realm=realm, ua_browser=request.user_agent.browser, ua_string=request.user_agent.string, ), ) log.info( "set tokeninfo for token {0!s} - {1!s}: {2!s}".format( serial, handler_options.get("key"), tokeninfo.format( current_time=s_now, now=s_now, client_ip=g.client_ip, username=username, realm=realm, ua_browser=request.user_agent.browser, ua_string=request.user_agent.string, ), ) ) elif action.lower() == ACTION_TYPE.DELETE_TOKENINFO: delete_tokeninfo(serial, handler_options.get("key")) elif action.lower() == ACTION_TYPE.SET_VALIDITY: start_date = handler_options.get(VALIDITY.START) end_date = handler_options.get(VALIDITY.END) if start_date: d = parse_date(start_date) set_validity_period_start( serial, None, d.strftime(DATE_FORMAT) ) if end_date: d = parse_date(end_date) set_validity_period_end( serial, None, d.strftime(DATE_FORMAT) ) elif action.lower() == ACTION_TYPE.SET_FAILCOUNTER: try: set_failcounter( serial, int(handler_options.get("fail counter")) ) except Exception as exx: log.warning("Misconfiguration: Failed to set fail counter!") elif action.lower() == ACTION_TYPE.SET_MAXFAIL: try: set_max_failcount( serial, int(handler_options.get("max failcount")) ) except Exception as exx: log.warning( "Misconfiguration: Failed to set max failcount!" ) elif action.lower() == ACTION_TYPE.CHANGE_FAILCOUNTER: try: token_obj = get_one_token(serial=serial) token_obj.set_failcount( token_obj.token.failcount + int(handler_options.get("change fail counter")) ) except Exception as exx: log.warning( "Misconfiguration: Failed to increase or decrease fail " "counter!" ) elif action.lower() == ACTION_TYPE.ADD_TOKENGROUP: try: assign_tokengroup(serial, handler_options.get("tokengroup")) except Exception as exx: log.warning( "Misconfiguration: Failed to add tokengroup " "to token {0!s}!".format(serial) ) elif action.lower() == ACTION_TYPE.REMOVE_TOKENGROUP: try: unassign_tokengroup( serial, handler_options.get("tokengroup") ) except Exception as exx: log.warning( "Misconfiguration: Failed to remove tokengroup " "from token {0!s}!".format(serial) ) elif action.lower() == ACTION_TYPE.ATTACH_APPLICATION: try: machine = handler_options.get("machine ID") application = handler_options.get("application") application_options = {} count = handler_options.get("count", None) if not (count is None): application_options.update({"count": count}) rounds = handler_options.get("rounds", None) if not (rounds is None): application_options.update({"rounds": rounds}) slot = handler_options.get("slot", None) if not (slot is None): application_options.update({"slot": slot}) partition = handler_options.get("partition", None) if not (partition is None): application_options.update({"partition": partition}) user = handler_options.get("user", None) if not (user is None): application_options.update({"user": user}) mt = attach_token( serial, application, machine_id=machine, options=application_options, ) except Exception as exx: log.warning( "Misconfiguration: Failed to attach token to machine." " Token serial: {!0s}".format(serial) ) elif not getParam(request.all_data, "webauthn_usernameless_authn"): log.info( "Action {0!s} requires serial number. But no serial " "number could be found in request {1!s}.".format(action, request) ) if action.lower() == ACTION_TYPE.INIT: log.info("Initializing new token") init_param = { "type": handler_options.get("tokentype"), "genkey": 1, "realm": handler_options.get("realm", ""), } user = None if is_true(handler_options.get("user")): user = self._get_tokenowner(request) tokentype = handler_options.get("tokentype") # Some tokentypes need additional parameters if handler_options.get("additional_params"): add_params = yaml.safe_load( handler_options.get("additional_params") ) if type(add_params) == dict: init_param.update(add_params) if tokentype == "sms": if is_true(handler_options.get("dynamic_phone")): init_param["dynamic_phone"] = 1 else: init_param["phone"] = user.get_user_phone( phone_type="mobile", index=0 ) if not init_param["phone"]: log.warning( "Enrolling SMS token. But the user " "{0!r} has no mobile number!".format(user) ) if handler_options.get("sms_identifier"): init_param["sms.identifier"] = handler_options.get( "sms_identifier" ) elif tokentype == "email": if is_true(handler_options.get("dynamic_email")): init_param["dynamic_email"] = 1 else: init_param["email"] = user.info.get("email", "") if not init_param["email"]: log.warning( "Enrolling EMail token. But the user {0!s}" "has no email address!".format(user) ) if handler_options.get("smtp_identifier"): init_param["email.identifier"] = handler_options.get( "smtp_identifier" ) elif tokentype == "motp": init_param["motppin"] = handler_options.get("motppin") t = init_token(param=init_param, user=user) log.info("New token {0!s} enrolled.".format(t.token.serial)) return ret