from edumfa.api.lib.utils import getParam, attestation_certificate_allowed
from edumfa.lib.config import get_from_config
from edumfa.lib.tokenclass import TokenClass, CLIENTMODE, ROLLOUTSTATE
from edumfa.lib.token import get_tokens
from edumfa.lib.log import log_with
import logging
from edumfa.models import Challenge
from edumfa.lib import _
from edumfa.lib.decorators import check_token_locked
from edumfa.lib.crypto import geturandom
from edumfa.lib.tokens.u2f import (check_registration_data, url_decode,
parse_registration_data, url_encode,
parse_response_data, check_response,
from edumfa.lib.error import ValidateError, PolicyError, ParameterError
from edumfa.lib.policy import SCOPE, GROUP, ACTION, get_action_values_from_options
from edumfa.lib.policy import Match
from edumfa.lib.challenge import get_challenges
from edumfa.lib.utils import is_true, hexlify_and_unicode, to_unicode, convert_imagefile_to_dataimage
import binascii
import json
__doc__ = """
U2F is the "Universal 2nd Factor" specified by the FIDO Alliance.
The register and authentication process is described here:
But you do not need to be aware of this. eduMFA wraps all FIDO specific
communication, which should make it easier for you, to integrate the U2F
tokens managed by eduMFA into your application.
U2F Tokens can be either
* registered by administrators for users or
* registered by the users themselves.
The enrollment/registering can be completely performed within eduMFA.
But if you want to enroll the U2F token via the REST API you need to do it in
two steps:
1. Step
.. sourcecode:: http
POST /token/init HTTP/1.1
Host: example.com
Accept: application/json
This step returns a serial number.
2. Step
.. sourcecode:: http
POST /token/init HTTP/1.1
Host: example.com
Accept: application/json
*clientdata* and *regdata* are the values returned by the U2F device.
You need to call the javascript function
.. sourcecode:: javascript
u2f.register([registerRequest], [], function(u2fData) {} );
and the responseHandler needs to send the *clientdata* and *regdata* back to
eduMFA (2. step).
The U2F token is a challenge response token. I.e. you need to trigger a
challenge e.g. by sending the OTP PIN/Password for this token.
Get the challenge
.. sourcecode:: http
POST /validate/check HTTP/1.1
Host: example.com
Accept: application/json
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
"detail": {
"attributes": {
"hideResponseInput": true,
"img": ...imageUrl...
"u2fSignRequest": {
"challenge": "...",
"appId": "...",
"keyHandle": "...",
"version": "U2F_V2"
"message": "Please confirm with your U2F token (Yubico U2F EE ...)"
"transaction_id": "02235076952647019161"
"id": 1,
"jsonrpc": "2.0",
"result": {
"status": true,
"value": false,
"version": "eduMFA unknown"
Send the Response
The application now needs to call the javascript function *u2f.sign* with the
*u2fSignRequest* from the response.
var signRequests = [ error.detail.attributes.u2fSignRequest ];
u2f.sign(signRequests, function(u2fResult) {} );
The response handler function needs to call the */validate/check* API again with
the signatureData and clientData returned by the U2F device in the *u2fResult*:
.. sourcecode:: http
POST /validate/check HTTP/1.1
Host: example.com
Accept: application/json
# Images of the keys shown during enrollment.
# The solokeys image is copyright (C) 2020 Solokeys. License: CC-BY-SA 4.0
# The image is a relative file system path.
IMAGES = {"yubico": "edumfa/static/img/FIDO-U2F-Security-Key-444x444.png",
"plug-up": "edumfa/static/img/plugup.jpg",
"u2fzero.com": "edumfa/static/img/u2fzero.png",
"solokeys": "edumfa/static/img/solokeys.png"}
U2F_Version = "U2F_V2"
log = logging.getLogger(__name__)
optional = True
required = False
class U2FACTION:
FACETS = "u2f_facets"
REQ = "u2f_req"
NO_VERIFY_CERT = "u2f_no_verify_certificate"
class U2fTokenClass(TokenClass):
The U2F Token implementation.
client_mode = CLIENTMODE.U2F
def get_class_type():
Returns the internal token type identifier
:return: u2f
:rtype: basestring
return "u2f"
def get_class_prefix():
Return the prefix, that is used as a prefix for the serial numbers.
:return: U2F
:rtype: basestring
return "U2F"
def get_class_info(key=None, ret='all'):
returns a subtree of the token definition
:param key: subsection identifier
:type key: string
:param ret: default return value, if nothing is found
:type ret: user defined
:return: subsection if key exists or user defined
:rtype: dict or scalar
res = {'type': 'u2f',
'title': 'U2F Token',
'description': 'U2F: Enroll a U2F token.',
'init': {},
'config': {},
'user': ['enroll'],
# This tokentype is enrollable in the UI for...
'ui_enroll': ["admin", "user"],
'policy': {
'type': 'str',
'desc': _("This is a list of FQDN hostnames "
"trusting the registered U2F tokens.")},
'type': 'str',
'desc': _('Use an alternate challenge text for telling the '
'user to confirm with his U2F device.')
'type': 'str',
'desc': _("Only specified U2F tokens are "
'type': 'str',
'desc': _("Only specified U2F tokens are allowed "
"to be registered."),
'group': GROUP.TOKEN},
'type': 'bool',
'desc': _("Do not verify the U2F attestation certificate."),
'group': GROUP.TOKEN
'type': 'int',
'desc': _("The user may only have this maximum number of U2F tokens assigned."),
'group': GROUP.TOKEN
'type': 'int',
'desc': _(
"The user may only have this maximum number of active U2F tokens assigned."),
'group': GROUP.TOKEN
if key:
ret = res.get(key, {})
if ret == 'all':
ret = res
return ret
def __init__(self, db_token):
Create a new U2F Token object from a database object
:param db_token: instance of the orm db object
:type db_token: DB object
TokenClass.__init__(self, db_token)
self.hKeyRequired = False
def update(self, param, reset_failcount=True):
This method is called during the initialization process.
:param param: parameters from the token init
:type param: dict
:return: None
TokenClass.update(self, param)
reg_data = getParam(param, "regdata")
verify_cert = is_true(getParam(param, "u2f.verify_cert", default=True))
if not reg_data:
self.token.rollout_state = ROLLOUTSTATE.CLIENTWAIT
# Set the description in the first enrollment step
if "description" in param:
self.set_description(getParam(param, "description", default=""))
elif reg_data and self.token.rollout_state == ROLLOUTSTATE.CLIENTWAIT:
attestation_cert, user_pub_key, key_handle, \
signature, automatic_description = parse_registration_data(reg_data,
client_data = getParam(param, "clientdata", required)
client_data_str = url_decode(client_data)
app_id = self.get_tokeninfo("appId", "")
# Verify the registration data
# In case of any crypto error, check_data raises an exception
check_registration_data(attestation_cert, app_id, client_data_str,
user_pub_key, key_handle, signature)
self.add_tokeninfo("pubKey", user_pub_key)
# add attestation certificate info
issuer = x509name_to_string(attestation_cert.get_issuer())
serial = "{!s}".format(attestation_cert.get_serial_number())
subject = x509name_to_string(attestation_cert.get_subject())
self.add_tokeninfo("attestation_issuer", issuer)
self.add_tokeninfo("attestation_serial", serial)
self.add_tokeninfo("attestation_subject", subject)
# Reset rollout state
self.token.rollout_state = ""
# If no description has already been set, set the automatic description or the
# description given in the 2nd request
if not self.token.description:
self.set_description(getParam(param, "description", default=automatic_description))
raise ParameterError("regdata provided but token not in clientwait rollout_state.")
def get_init_detail(self, params=None, user=None):
At the end of the initialization we ask the user to press the button
response_detail = {}
# get_init_details runs after "update" method. So in the first step clientwait has already been set
if self.token.rollout_state == ROLLOUTSTATE.CLIENTWAIT:
# This is the first step of the init request
app_id = get_from_config("u2f.appId", "").strip("/")
from edumfa.lib.error import TokenAdminError
if not app_id:
raise TokenAdminError(_("You need to define the appId in the "
"token config!"))
nonce = url_encode(geturandom(32))
response_detail = TokenClass.get_init_detail(self, params, user)
register_request = {"version": U2F_Version,
"challenge": nonce,
"appId": app_id}
response_detail["u2fRegisterRequest"] = register_request
self.add_tokeninfo("appId", app_id)
elif self.token.rollout_state == "":
# This is the second step of the init request, the clientwait rollout state has been reset
response_detail["u2fRegisterResponse"] = {"subject":
return response_detail
def is_challenge_request(self, passw, user=None, options=None):
check, if the request would start a challenge
In fact every Request that is not a response needs to start a
challenge request.
At the moment we do not think of other ways to trigger a challenge.
This function is not decorated with ``@challenge_response_allowed``
as the U2F token is always a challenge response token!
:param passw: The PIN of the token.
:param options: dictionary of additional request parameters
:return: returns true or false
trigger_challenge = False
options = options or {}
pin_match = self.check_pin(passw, user=user, options=options)
if pin_match is True:
trigger_challenge = True
return trigger_challenge
def create_challenge(self, transactionid=None, options=None):
This method creates a challenge, which is submitted to the user.
The submitted challenge will be preserved in the challenge
If no transaction id is given, the system will create a transaction
id and return it, so that the response can refer to this transaction.
:param transactionid: the id of this challenge
:param options: the request context parameters / data
:type options: dict
:return: tuple of (bool, message, transactionid, attributes)
:rtype: tuple
The return tuple builds up like this:
``bool`` if submit was successful;
``message`` which is displayed in the JSON response;
additional challenge ``reply_dict``, which are displayed in the JSON challenges response.
options = options or {}
message = get_action_values_from_options(SCOPE.AUTH,
options)or _('Please confirm with your U2F token ({0!s})').format(
validity = int(get_from_config('DefaultChallengeValidityTime', 120))
tokentype = self.get_tokentype().lower()
lookup_for = tokentype.capitalize() + 'ChallengeValidityTime'
validity = int(get_from_config(lookup_for, validity))
# if a transaction id is given, check if there are other u2f token and
# reuse the challenge
challenge = None
if transactionid:
for c in get_challenges(transaction_id=transactionid):
if get_tokens(serial=c.serial, tokentype=self.get_class_type(),
challenge = c.challenge
if not challenge:
nonce = geturandom(32)
challenge = hexlify_and_unicode(nonce)
nonce = binascii.unhexlify(challenge)
# Create the challenge in the database
db_challenge = Challenge(self.token.serial,
sec_object = self.token.get_otpkey()
key_handle_hex = sec_object.getKey()
key_handle_bin = binascii.unhexlify(key_handle_hex)
key_handle_url = url_encode(key_handle_bin)
challenge_url = url_encode(nonce)
u2f_sign_request = {"appId": self.get_tokeninfo("appId"),
"version": U2F_Version,
"challenge": challenge_url,
"keyHandle": key_handle_url}
image_url = IMAGES.get(self.token.description.lower().split()[0], "")
dataimage = convert_imagefile_to_dataimage(image_url) if image_url else ""
reply_dict = {"attributes": {"u2fSignRequest": u2f_sign_request,
"hideResponseInput": self.client_mode != CLIENTMODE.INTERACTIVE,
"img": dataimage},
"image": dataimage}
return True, message, db_challenge.transaction_id, reply_dict
def check_otp(self, otpval, counter=None, window=None, options=None):
This checks the response of a previous challenge.
:param otpval: N/A
:param counter: The authentication counter
:param window: N/A
:param options: contains "clientdata", "signaturedata" and
:return: A value > 0 in case of success
ret = -1
clientdata = options.get("clientdata")
signaturedata = options.get("signaturedata")
transaction_id = options.get("transaction_id")
# The challenge in the challenge DB object is saved in hex
challenge = binascii.unhexlify(options.get("challenge", ""))
if clientdata and signaturedata and transaction_id and challenge:
# This is a valid response for a U2F token
challenge_url = url_encode(challenge)
clientdata = url_decode(clientdata)
clientdata_dict = json.loads(to_unicode(clientdata))
client_challenge = clientdata_dict.get("challenge")
if challenge_url != client_challenge:
return ret
if clientdata_dict.get("typ") != "navigator.id.getAssertion":
raise ValidateError("Incorrect navigator.id")
#client_origin = clientdata_dict.get("origin")
signaturedata = url_decode(signaturedata)
signaturedata_hex = hexlify_and_unicode(signaturedata)
user_presence, counter, signature = parse_response_data(
user_pub_key = self.get_tokeninfo("pubKey")
app_id = self.get_tokeninfo("appId")
if check_response(user_pub_key, app_id, clientdata,
hexlify_and_unicode(signature), counter,
# Signature verified.
# check, if the counter increased!
if counter > self.get_otp_count():
ret = counter
# At this point we can check, if the attestation
# certificate is authorized.
# If not, we can raise a policy exception
if not attestation_certificate_allowed(
"attestation_issuer": self.get_tokeninfo("attestation_issuer"),
"attestation_serial": self.get_tokeninfo("attestation_serial"),
"attestation_subject": self.get_tokeninfo("attestation_subject")
user_object=self.user if self.user else None)
"The U2F device {0!s} is not allowed to authenticate due to policy restriction"
raise PolicyError("The U2F device is not allowed "
"to authenticate due to policy "
log.warning("The signature of %s was valid, but contained "
"an old counter." % self.token.serial)
log.warning("Checking response for token {0!s} failed.".format(
return ret
def api_endpoint(cls, request, g):
This provides a function to be plugged into the API endpoint
The u2f token can return the facet list at this URL.
:param request: The Flask request
:param g: The Flask global object g
:return: Flask Response or text
configured_app_id = get_from_config("u2f.appId")
if configured_app_id is None:
raise ParameterError("u2f is not configured")
app_id = configured_app_id.strip("/")
# Read the facets from the policies
pol_facets = Match.action_only(g, scope=SCOPE.AUTH, action=U2FACTION.FACETS).action_values(unique=False)
facet_list = ["https://{0!s}".format(x) for x in pol_facets]
log.debug("Sending facets lists for appId {0!s}: {1!s}".format(app_id,
res = {"trustedFacets": [{"version": {"major": 1,
"minor": 0},
"ids": facet_list
return "fido.trusted-apps+json", res