Source code for edumfa.lib.tokens.radiustoken

# -*- coding: utf-8 -*-
#
# License:  AGPLv3
# This file is part of eduMFA. eduMFA is a fork of privacyIDEA which was forked from LinOTP.
# Copyright (c) 2024 eduMFA Project-Team
# Previous authors by privacyIDEA project:
#
# 2014 - 2019 Cornelius Kölbel <cornelius.koelbel@netknights.it>
#
# Copyright (C) 2010 - 2014 LSE Leading Security Experts GmbH
# License:  LSE
# contact:  http://www.linotp.org
#           http://www.lsexperts.de
#           linotp@lsexperts.de
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
__doc__ = """This module defines the RadiusTokenClass. The RADIUS token
forwards the authentication request to another RADIUS server.

The code is tested in tests/test_lib_tokens_radius
"""

import binascii
import logging
import traceback

import pyrad.packet
from pyrad.client import Client, Timeout
from pyrad.dictionary import Dictionary
from pyrad.packet import AccessAccept, AccessChallenge, AccessReject

from edumfa.api.lib.utils import ParameterError, getParam
from edumfa.lib import _
from edumfa.lib.challenge import get_challenges
from edumfa.lib.config import get_from_config
from edumfa.lib.decorators import check_token_locked
from edumfa.lib.log import log_with
from edumfa.lib.policy import ACTION, GROUP, SCOPE
from edumfa.lib.policydecorators import challenge_response_allowed
from edumfa.lib.radiusserver import get_radius
from edumfa.lib.tokenclass import AUTHENTICATIONMODE, TOKENKIND, TokenClass
from edumfa.lib.tokens.remotetoken import RemoteTokenClass
from edumfa.lib.utils import hexlify_and_unicode, is_true, to_bytes, to_unicode
from edumfa.models import Challenge

optional = True
required = False

log = logging.getLogger(__name__)


###############################################
[docs] class RadiusTokenClass(RemoteTokenClass): mode = [AUTHENTICATIONMODE.AUTHENTICATE, AUTHENTICATIONMODE.CHALLENGE] def __init__(self, db_token): RemoteTokenClass.__init__(self, db_token) self.set_type("radius")
[docs] @staticmethod def get_class_type(): return "radius"
[docs] @staticmethod def get_class_prefix(): return "PIRA"
[docs] @staticmethod @log_with(log) def get_class_info(key=None, ret="all"): """ returns a subtree of the token definition :param key: subsection identifier :type key: string :param ret: default return value, if nothing is found :type ret: user defined :return: subsection if key exists or user defined :rtype: dict or string """ res = { "type": "radius", "title": "RADIUS Token", "description": _( "RADIUS: Forward authentication request to a RADIUS server." ), "user": ["enroll"], # This tokentype is enrollable in the UI for... "ui_enroll": ["admin", "user"], "policy": { SCOPE.ENROLL: { ACTION.MAXTOKENUSER: { "type": "int", "desc": _( "The user may only have this maximum number of RADIUS tokens assigned." ), "group": GROUP.TOKEN, }, ACTION.MAXACTIVETOKENUSER: { "type": "int", "desc": _( "The user may only have this maximum number of active RADIUS tokens assigned." ), "group": GROUP.TOKEN, }, } }, } if key: ret = res.get(key, {}) else: if ret == "all": ret = res return ret
[docs] @log_with(log) def update(self, param): # New value radius_identifier = getParam(param, "radius.identifier") self.add_tokeninfo("radius.identifier", radius_identifier) # old values if not radius_identifier: radiusServer = getParam(param, "radius.server", optional=required) self.add_tokeninfo("radius.server", radiusServer) radius_secret = getParam(param, "radius.secret", optional=required) self.token.set_otpkey(hexlify_and_unicode(radius_secret)) system_settings = getParam(param, "radius.system_settings", default=False) self.add_tokeninfo("radius.system_settings", system_settings) if not (radiusServer or radius_secret) and not system_settings: raise ParameterError("Missing parameter: radius.identifier", id=905) # if another OTP length would be specified in /admin/init this would # be overwritten by the parent class, which is ok. self.set_otplen(6) TokenClass.update(self, param) val = getParam(param, "radius.local_checkpin", optional) or 0 self.add_tokeninfo("radius.local_checkpin", val) val = getParam(param, "radius.user", required) self.add_tokeninfo("radius.user", val) self.add_tokeninfo("tokenkind", TOKENKIND.VIRTUAL)
[docs] @log_with(log) @challenge_response_allowed def is_challenge_request(self, passw, user=None, options=None): """ This method checks, if this is a request, that triggers a challenge. It depends on the way, the pin is checked - either locally or remotely. In addition, the RADIUS token has to be configured to allow challenge response. communication with RADIUS server: yes modification of options: The communication with the RADIUS server can change the options, radius_state, radius_result, radius_message :param passw: password, which might be pin or pin+otp :type passw: string :param user: The user from the authentication request :type user: User object :param options: dictionary of additional request parameters :type options: dict :return: true or false """ if options is None: options = {} # should we check the pin locally? if self.check_pin_local: # With a local PIN the challenge response is always a eduMFA challenge response! res = self.check_pin(passw, user=user, options=options) return res else: state = options.get("radius_state") # The pin is checked remotely res = options.get("radius_result") if res is None: res = self._check_radius(passw, options=options, radius_state=state) return res == AccessChallenge
[docs] @log_with(log) def create_challenge(self, transactionid=None, options=None): """ create a challenge, which is submitted to the user This method is called after ``is_challenge_request`` has verified, that a challenge needs to be created. communication with RADIUS server: no modification of options: no :param transactionid: the id of this challenge :param options: the request context parameters / data :return: tuple of (bool, message and data) bool, if submit was successful message is submitted to the user data is preserved in the challenge reply_dict - additional attributes, which are displayed in the output """ if options is None: options = {} message = options.get("radius_message") or "Enter your RADIUS tokencode:" state = hexlify_and_unicode(options.get("radius_state") or b"") reply_dict = {"attributes": {"state": transactionid}} validity = int(get_from_config("DefaultChallengeValidityTime", 120)) db_challenge = Challenge( self.token.serial, transaction_id=transactionid, data=state, challenge=message, validitytime=validity, ) db_challenge.save() self.challenge_janitor() return True, message, db_challenge.transaction_id, reply_dict
[docs] @log_with(log) def is_challenge_response(self, passw, user=None, options=None): """ This method checks, if this is a request, that is the response to a previously sent challenge. But we do not query the RADIUS server. This is the first method in the loop ``check_token_list``. communication with RADIUS server: no modification of options: The "radius_result" key is set to None :param passw: password, which might be pin or pin+otp :type passw: string :param user: the requesting user :type user: User object :param options: dictionary of additional request parameters :type options: dict :return: true or false :rtype: bool """ if options is None: options = {} challenge_response = False # clear the radius_result since this is the first function called in the chain # this value will be utilized to ensure we do not _check_radius more than once in the loop options.update({"radius_result": None}) # fetch the transaction_id transaction_id = options.get("transaction_id") if transaction_id is None: transaction_id = options.get("state") if transaction_id: # get the challenges for this transaction ID challengeobject_list = get_challenges( serial=self.token.serial, transaction_id=transaction_id ) for challengeobject in challengeobject_list: if challengeobject.is_valid(): challenge_response = True return challenge_response
[docs] @log_with(log) @check_token_locked def check_challenge_response(self, user=None, passw=None, options=None): """ This method verifies if there is a matching question for the given passw and also verifies if the answer is correct. It then returns the the otp_counter = 1 :param user: the requesting user :type user: User object :param passw: the password - in fact it is the answer to the question :type passw: string :param options: additional arguments from the request, which could be token specific. Usually "transaction_id" :type options: dict :return: return otp_counter. If -1, challenge does not match :rtype: int """ if options is None: options = {} otp_counter = -1 # fetch the transaction_id transaction_id = options.get("transaction_id") or options.get("state") # get the challenges for this transaction ID if transaction_id is not None: challengeobject_list = get_challenges( serial=self.token.serial, transaction_id=transaction_id ) for challengeobject in challengeobject_list: if challengeobject.is_valid(): state = binascii.unhexlify(challengeobject.data) # challenge is still valid radius_response = self._check_radius( passw, options=options, radius_state=state ) if radius_response == AccessAccept: # We found the matching challenge, # and the RADIUS server returned AccessAccept challengeobject.delete() otp_counter = 1 break elif radius_response == AccessChallenge: # The response was valid but triggered a new challenge # Note: The second challenge currently does not work correctly # see https://github.com/privacyidea/privacyidea/issues/1792 challengeobject.delete() _, _, transaction_id, _ = self.create_challenge(options=options) options["transaction_id"] = transaction_id otp_counter = -1 break else: otp_counter = -1 # increase the received_count challengeobject.set_otp_status() self.challenge_janitor() return otp_counter
@property def check_pin_local(self): """ lookup if pin should be checked locally or on radius host :return: bool """ local_check = is_true(self.get_tokeninfo("radius.local_checkpin")) log.debug("local checking pin? {0!r}".format(local_check)) return local_check
[docs] @log_with(log) def split_pin_pass(self, passw, user=None, options=None): """ Split the PIN and the OTP value. Only if it is locally checked and not remotely. """ res = True pin = "" otpval = passw if self.check_pin_local: (res, pin, otpval) = TokenClass.split_pin_pass(self, passw) return res, pin, otpval
[docs] @log_with(log) @check_token_locked def authenticate(self, passw, user=None, options=None): """ do the authentication on base of password / otp and user and options, the request parameters. This is only called after it is verified, that the upper level is no challenge-request or challenge-response The "options" are read-only in this method. They are not modified here. authenticate is the last method in the loop ``check_token_list``. communication with RADIUS server: yes, if is no previous "radius_result" If there is a "radius" result in the options, we do not query the radius server modification of options: options can be modified if we query the radius server. However, this is not important since authenticate is the last call. :param passw: the password / otp :param user: the requesting user :param options: the additional request parameters :return: tuple of (success, otp_count - 0 or -1, reply) """ options = options or {} res = False otp_counter = -1 reply = None otpval = passw # should we check the pin locally? if self.check_pin_local: (_res, pin, otpval) = self.split_pin_pass(passw, user, options=options) if not self.check_pin(pin, user=user, options=options): return False, -1, {"message": "Wrong PIN"} # attempt to retrieve saved state/result state = options.get("radius_state") result = options.get("radius_result") if result is None: radius_response = self._check_radius( otpval, options=options, radius_state=state ) else: radius_response = result if radius_response == AccessAccept: res = True otp_counter = 1 return res, otp_counter, reply
[docs] @log_with(log) @check_token_locked def check_otp(self, otpval, counter=None, window=None, options=None): """ Originally check_otp returns an OTP counter. I.e. in a failed attempt we return -1. In case of success we return 1 :param otpval: :param counter: :param window: :param options: :return: """ res = self._check_radius(otpval, options=options) if res == AccessAccept: return 1 else: return -1
@log_with(log) @check_token_locked def _check_radius(self, otpval, options=None, radius_state=None): """ run the RADIUS request against the RADIUS server :param otpval: the OTP value :param options: additional token specific options :type options: dict :return: counter of the matching OTP value. :rtype: AccessAccept, AccessReject, AccessChallenge """ result = AccessReject radius_message = None if options is None: options = {} radius_dictionary = None radius_identifier = self.get_tokeninfo("radius.identifier") radius_user = self.get_tokeninfo("radius.user") system_radius_settings = self.get_tokeninfo("radius.system_settings") radius_timeout = 5 radius_retries = 3 if radius_identifier: # New configuration radius_server_object = get_radius(radius_identifier) radius_server = radius_server_object.config.server radius_port = radius_server_object.config.port radius_server = "{0!s}:{1!s}".format(radius_server, radius_port) radius_secret = radius_server_object.get_secret() radius_dictionary = radius_server_object.config.dictionary radius_timeout = int(radius_server_object.config.timeout or 10) radius_retries = int(radius_server_object.config.retries or 1) elif system_radius_settings: # system configuration radius_server = get_from_config("radius.server") radius_secret = get_from_config("radius.secret") else: # individual token settings radius_server = self.get_tokeninfo("radius.server") # Read the secret secret = self.token.get_otpkey() radius_secret = binascii.unhexlify(secret.getKey()) # here we also need to check for radius.user log.debug( "checking OTP len:{0!s} on radius server: {1!s}, user: {2!r}".format( len(otpval), radius_server, radius_user ) ) try: # pyrad does not allow to set timeout and retries. # it defaults to retries=3, timeout=5 # TODO: At the moment we support only one radius server. # No round robin. server = radius_server.split(":") r_server = server[0] r_authport = 1812 if len(server) >= 2: r_authport = int(server[1]) nas_identifier = get_from_config("radius.nas_identifier", "eduMFA") if not radius_dictionary: radius_dictionary = get_from_config( "radius.dictfile", "/etc/edumfa/dictionary" ) log.debug( "NAS Identifier: %r, " "Dictionary: %r" % (nas_identifier, radius_dictionary) ) log.debug( "constructing client object " "with server: %r, port: %r, secret: %r" % (r_server, r_authport, to_unicode(radius_secret)) ) srv = Client( server=r_server, authport=r_authport, secret=to_bytes(radius_secret), dict=Dictionary(radius_dictionary), ) # Set retries and timeout of the client srv.timeout = radius_timeout srv.retries = radius_retries req = srv.CreateAuthPacket( code=pyrad.packet.AccessRequest, User_Name=radius_user.encode("utf-8"), NAS_Identifier=nas_identifier.encode("ascii"), ) req["User-Password"] = req.PwCrypt(otpval) if radius_state: req["State"] = radius_state log.info( "Sending saved challenge to radius server: {0!r} ".format( radius_state ) ) try: response = srv.SendPacket(req) except Timeout: log.warning( "The remote RADIUS server {0!s} timeout out for user {1!s}.".format( r_server, radius_user ) ) return AccessReject # handle the RADIUS challenge if response.code == pyrad.packet.AccessChallenge: # now we map this to a eduMFA challenge if "State" in response: radius_state = response["State"][0] if "Reply-Message" in response: radius_message = response["Reply-Message"][0] result = AccessChallenge elif response.code == pyrad.packet.AccessAccept: radius_state = "<SUCCESS>" radius_message = "RADIUS authentication succeeded" log.info( "RADIUS server {0!s} granted access to user {1!s}.".format( r_server, radius_user ) ) result = AccessAccept else: radius_state = "<REJECTED>" radius_message = "RADIUS authentication failed" log.debug("radius response code {0!s}".format(response.code)) log.info( "Radiusserver {0!s} rejected access to user {1!s}.".format( r_server, radius_user ) ) result = AccessReject except Exception as ex: # pragma: no cover log.error("Error contacting radius Server: {0!r}".format((ex))) log.info("{0!s}".format(traceback.format_exc())) options.update({"radius_result": result}) options.update({"radius_state": radius_state}) options.update({"radius_message": radius_message}) return result