Source code for edumfa.lib.smsprovider.HttpSMSProvider

# -*- coding: utf-8 -*-
#
# License:  AGPLv3
# This file is part of eduMFA. eduMFA is a fork of privacyIDEA which was forked from LinOTP.
# Copyright (c) 2024 eduMFA Project-Team
# Previous authors by privacyIDEA project:
#
# 2018 Pascal Fuks <pascal@foxit.pro>
# 2014 - 2018 Cornelius Kölbel <cornelius.koelbel@netknights.it>
#
# Copyright (C) LinOTP: 2010 - 2014 LSE Leading Security Experts GmbH
#
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public
# License, version 3, as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the
#            GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

__doc__ = """This is the SMSClass to send SMS via HTTP Gateways
It can handle HTTP/HTTPS POST and GET requests also with Proxy support

The code is tested in tests/test_lib_smsprovider
"""

import logging
from urllib.parse import urlparse

import requests

from edumfa.lib import _
from edumfa.lib.smsprovider.SMSProvider import ISMSProvider, SMSError

log = logging.getLogger(__name__)


[docs] class HttpSMSProvider(ISMSProvider):
[docs] def submit_message(self, phone, message): """ send a message to a phone via an http sms gateway :param phone: the phone number :param message: the message to submit to the phone :return: """ parameter = {} headers = {} if self.smsgateway: phone = self._mangle_phone(phone, self.smsgateway.option_dict) url = self.smsgateway.option_dict.get("URL") method = self.smsgateway.option_dict.get("HTTP_METHOD", "GET") username = self.smsgateway.option_dict.get("USERNAME") password = self.smsgateway.option_dict.get("PASSWORD") ssl_verify = self.smsgateway.option_dict.get("CHECK_SSL", "yes") == "yes" json_data = ( self.smsgateway.option_dict.get("SEND_DATA_AS_JSON", "no") == "yes" ) # FIXME: The Proxy option is deprecated and will be removed a version > 2.21 proxy = self.smsgateway.option_dict.get("PROXY") http_proxy = self.smsgateway.option_dict.get("HTTP_PROXY") https_proxy = self.smsgateway.option_dict.get("HTTPS_PROXY") timeout = self.smsgateway.option_dict.get("TIMEOUT") or 3 for k, v in self.smsgateway.option_dict.items(): if k not in self.parameters().get("parameters"): # This is an additional option parameter[k] = v.format(otp=message, phone=phone) headers = self.smsgateway.header_dict else: phone = self._mangle_phone(phone, self.config) url = self.config.get("URL") method = self.config.get("HTTP_Method", "GET") username = self.config.get("USERNAME") password = self.config.get("PASSWORD") ssl_verify = self.config.get("CHECK_SSL", True) json_data = False # FIXME: The Proxy option is deprecated and will be removed a version > 2.21 proxy = self.config.get("PROXY") http_proxy = self.config.get("HTTP_PROXY") https_proxy = self.config.get("HTTPS_PROXY") parameter = self._get_parameters(message, phone) timeout = self.config.get("TIMEOUT") or 3 log.debug("submitting message {0!r} to {1!s}".format(message, phone)) if url is None: log.warning("can not submit message. URL is missing.") raise SMSError(-1, "No URL specified in the provider config.") basic_auth = None # there might be the basic authentication in the request url # like http://user:passw@hostname:port/path if password is None and username is None: parsed_url = urlparse(url) if "@" in parsed_url[1]: puser, server = parsed_url[1].split("@") username, password = puser.split(":") if username and password is not None: basic_auth = (username, password) proxies = {} if http_proxy: proxies["http"] = http_proxy if https_proxy: proxies["https"] = https_proxy if not proxies and proxy: # No new proxy config but only the old one. protocol = proxy.split(":")[0] proxies = {protocol: proxy} # url, parameter, username, password, method requestor = requests.get params = parameter data = None json_param = None if method == "POST": requestor = requests.post params = None if json_data: json_param = parameter log.debug("passing JSON data: {0!s}".format(json_param)) else: data = parameter log_dict = { "params": params, "headers": headers, "method": method, "basic_auth": basic_auth, "url": url, "data": data, "json_param": json_param, } log.debug( "issuing request with parameters {params} (data: {data}, " "json: {json_param}), headers {headers}, method {method} and" "authentication {basic_auth} " "to url {url}.".format(**log_dict) ) # Todo: drop basic auth if Authorization-Header is given? r = requestor( url, params=params, headers=headers, data=data, json=json_param, verify=ssl_verify, auth=basic_auth, timeout=float(timeout), proxies=proxies, ) log.debug( "queued SMS on the HTTP gateway. status code returned: {0!s}".format( r.status_code ) ) # We assume, that all gateways return with HTTP Status Code 200, # 201 or 202 if r.status_code not in [200, 201, 202]: raise SMSError(r.status_code, "SMS could not be sent: %s" % r.status_code) success = self._check_success(r) return success
def _get_parameters(self, message, phone): urldata = {} # transfer the phone key phoneKey = self.config.get("SMS_PHONENUMBER_KEY", "phone") urldata[phoneKey] = phone # transfer the sms key messageKey = self.config.get("SMS_TEXT_KEY", "sms") urldata[messageKey] = message params = self.config.get("PARAMETER", {}) urldata.update(params) log.debug("[getParameters] urldata: {0!s}".format(urldata)) return urldata def _check_success(self, response): """ Check the success according to the reply 1. if RETURN_SUCCESS is defined 2. if RETURN_FAIL is defined :response reply: A response object. """ reply = response.text ret = False if self.smsgateway: return_success = self.smsgateway.option_dict.get("RETURN_SUCCESS") return_fail = self.smsgateway.option_dict.get("RETURN_FAIL") else: return_success = self.config.get("RETURN_SUCCESS") return_fail = self.config.get("RETURN_FAIL") if return_success: if return_success in reply: log.debug("sending sms success") ret = True else: log.warning( "failed to send sms. Reply %s does not match " "the RETURN_SUCCESS definition" % reply ) raise SMSError( response.status_code, "We received a none success reply from the " "SMS Gateway: {0!s} ({1!s})".format(reply, return_success), ) elif return_fail: if return_fail in reply: log.warning( "sending sms failed. %s was not found in %s" % (return_fail, reply) ) raise SMSError( response.status_code, "We received the predefined error from the SMS Gateway.", ) else: log.debug("sending sms success") ret = True else: ret = True return ret
[docs] @classmethod def parameters(cls): """ Return a dictionary, that describes the parameters and options for the SMS provider. Parameters are required keys to values. :return: dict """ params = { "options_allowed": True, "headers_allowed": True, "parameters": { "URL": { "required": True, "description": _("The base URL of the HTTP Gateway"), }, "HTTP_METHOD": { "required": True, "description": _( "Should the HTTP Gateway be " "connected via an HTTP GET or POST " "request." ), "values": ["GET", "POST"], }, "RETURN_SUCCESS": { "description": _( "Specify a substring, " "that indicates, that the SMS was " "delivered successfully." ) }, "RETURN_FAIL": { "description": _( "Specify a substring, " "that indicates, that the SMS " "failed to be delivered." ) }, "USERNAME": { "description": _("Username in case of basic authentication.") }, "PASSWORD": { "description": _("Password in case of basic authentication.") }, "CHECK_SSL": { "required": True, "description": _("Should the SSL certificate be verified."), "values": ["yes", "no"], }, "SEND_DATA_AS_JSON": { "required": True, "description": _( "Should the data in a POST Request be sent as JSON." ), "values": ["yes", "no"], }, "REGEXP": {"description": cls.regexp_description}, "PROXY": { "description": _( "An optional proxy string. DEPRECATED. Do not use " "this anymore. Rather use HTTP_PROXY for http " "connections and HTTPS_PROXY for https " "connection. The PROXY option will be removed in " "future." ) }, "HTTP_PROXY": {"description": _("Proxy setting for HTTP connections.")}, "HTTPS_PROXY": { "description": _("Proxy setting for HTTPS connections.") }, "TIMEOUT": {"description": _("The timeout in seconds.")}, }, } return params