# -*- coding: utf-8 -*-
#
# License: AGPLv3
# This file is part of eduMFA. eduMFA is a fork of privacyIDEA which was forked from LinOTP.
# Copyright (c) 2024 eduMFA Project-Team
# Previous authors by privacyIDEA project:
#
# 2014 - 2016 Cornelius Kölbel <cornelius@privacyidea.org>
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see <http://www.gnu.org/licenses/>.
#
__doc__ = """The SQL Audit Module is used to write audit entries to an SQL
database.
The SQL Audit Module is configured like this:
EDUMFA_AUDIT_MODULE = "edumfa.lib.auditmodules.sqlaudit"
EDUMFA_AUDIT_KEY_PRIVATE = "tests/testdata/private.pem"
EDUMFA_AUDIT_KEY_PUBLIC = "tests/testdata/public.pem"
EDUMFA_AUDIT_SERVERNAME = "your choice"
Optional:
EDUMFA_AUDIT_SQL_URI = "sqlite://"
EDUMFA_AUDIT_SQL_TRUNCATE = True | False
EDUMFA_AUDIT_SQL_COLUMN_LENGTH = {"user": 60, "info": 10 ...}
If the EDUMFA_AUDIT_SQL_URI is omitted the Audit data is written to the
token database.
"""
import logging
from collections import OrderedDict
from edumfa.lib.auditmodules.base import (Audit as AuditBase, Paginate)
from edumfa.lib.crypto import Sign
from edumfa.lib.pooling import get_engine
from edumfa.lib.utils import censor_connect_string
from edumfa.lib.lifecycle import register_finalizer
from edumfa.lib.utils import truncate_comma_list, is_true
from sqlalchemy import MetaData, cast, String
from sqlalchemy import asc, desc, and_, or_
from sqlalchemy.sql.expression import FunctionElement
from sqlalchemy.ext.compiler import compiles
import datetime
import traceback
from edumfa.models import audit_column_length as column_length
from edumfa.models import Audit as LogEntry
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
log = logging.getLogger(__name__)
metadata = MetaData()
# Define function to convert SQL DateTime objects to an ISO-format string
# By using <https://docs.sqlalchemy.org/en/14/core/compiler.html> we can
# differentiate between different dialects.
class to_isodate(FunctionElement):
name = 'to_isodate'
inherit_cache = True
@compiles(to_isodate, 'oracle')
@compiles(to_isodate, 'postgresql')
def fn_to_isodate(element, compiler, **kw):
return "to_char(%s, 'IYYY-MM-DD HH24:MI:SS')" % compiler.process(element.clauses, **kw)
@compiles(to_isodate, 'sqlite')
def fn_to_isodate(element, compiler, **kw):
# sqlite does not have a DateTime type, they are already in ISO format
return "%s" % compiler.process(element.clauses, **kw)
@compiles(to_isodate)
def fn_to_isodate(element, compiler, **kw):
# The four percent signs are necessary for two format substitutions
return "date_format(%s, '%%%%Y-%%%%m-%%%%d %%%%H:%%%%i:%%%%s')" % compiler.process(
element.clauses, **kw)
[docs]
class Audit(AuditBase):
"""
This is the SQLAudit module, which writes the audit entries
to an SQL database table.
It requires the following configuration parameters in :ref:`cfgfile`:
* ``EDUMFA_AUDIT_KEY_PUBLIC``
* ``EDUMFA_AUDIT_KEY_PRIVATE``
If you want to host the SQL Audit database in another DB than the
token DB, you can use:
* ``EDUMFA_AUDIT_SQL_URI`` and
* ``EDUMFA_AUDIT_SQL_OPTIONS``
With ``EDUMFA_AUDIT_SQL_OPTIONS = {}`` You can pass options to the DB engine
creation. If ``EDUMFA_AUDIT_SQL_OPTIONS`` is not set,
``SQLALCHEMY_ENGINE_OPTIONS`` will be used.
This module also takes the following optional parameters:
* ``EDUMFA_AUDIT_POOL_SIZE``
* ``EDUMFA_AUDIT_POOL_RECYCLE``
* ``EDUMFA_AUDIT_SQL_TRUNCATE``
* ``EDUMFA_AUDIT_NO_SIGN``
* ``EDUMFA_CHECK_OLD_SIGNATURES``
You can use ``EDUMFA_AUDIT_NO_SIGN = True`` to avoid signing of the audit log.
If ``EDUMFA_CHECK_OLD_SIGNATURES = True`` old style signatures (text-book RSA) will
be checked as well, otherwise they will be marked as ``FAIL``.
"""
is_readable = True
def __init__(self, config=None, startdate=None):
super(Audit, self).__init__(config, startdate)
self.name = "sqlaudit"
self.sign_data = not self.config.get("EDUMFA_AUDIT_NO_SIGN")
self.sign_object = None
self.verify_old_sig = self.config.get('EDUMFA_CHECK_OLD_SIGNATURES')
# Disable the costly checking of private RSA keys when loading them.
self.check_private_key = not self.config.get("EDUMFA_AUDIT_NO_PRIVATE_KEY_CHECK", False)
if self.sign_data:
self.read_keys(self.config.get("EDUMFA_AUDIT_KEY_PUBLIC"),
self.config.get("EDUMFA_AUDIT_KEY_PRIVATE"))
self.sign_object = Sign(self.private, self.public,
check_private_key=self.check_private_key)
# Read column_length from the config file
config_column_length = self.config.get("EDUMFA_AUDIT_SQL_COLUMN_LENGTH", {})
# fill the missing parts with the default from the models
self.custom_column_length = {k: (v if k not in config_column_length else config_column_length[k])
for k, v in column_length.items()}
# We can use "sqlaudit" as the key because the SQLAudit connection
# string is fixed for a running eduMFA instance.
# In other words, we will not run into any problems with changing connect strings.
self.engine = get_engine(self.name, self._create_engine)
# create a configured "Session" class. ``scoped_session`` is not
# necessary because we do not share session objects among threads.
# We use it anyway as a safety measure.
self.session = scoped_session(sessionmaker(bind=self.engine))
# Ensure that the connection gets returned to the pool when the request has
# been handled. This may close an already-closed session, but this is not a problem.
register_finalizer(self._finalize_session)
self.session._model_changes = {}
def _create_engine(self):
"""
:return: a new SQLAlchemy engine connecting to the database specified in EDUMFA_AUDIT_SQL_URI.
"""
# an Engine, which the Session will use for connection
# resources
connect_string = self.config.get("EDUMFA_AUDIT_SQL_URI", self.config.get(
"SQLALCHEMY_DATABASE_URI"))
log.debug("using the connect string {0!s}".format(censor_connect_string(connect_string)))
# if no specific audit engine options are given, use the default from
# SQLALCHEMY_ENGINE_OPTIONS or none
sqa_options = self.config.get("EDUMFA_AUDIT_SQL_OPTIONS",
self.config.get('SQLALCHEMY_ENGINE_OPTIONS', {}))
log.debug("Using Audit SQLAlchemy engine options: {0!s}".format(sqa_options))
try:
pool_size = self.config.get("EDUMFA_AUDIT_POOL_SIZE", 20)
engine = create_engine(
connect_string,
pool_size=pool_size,
pool_recycle=self.config.get("EDUMFA_AUDIT_POOL_RECYCLE", 600),
**sqa_options)
log.debug("Using SQL pool size of {}".format(pool_size))
except TypeError:
# SQLite does not support pool_size
engine = create_engine(connect_string, **sqa_options)
log.debug("Using no SQL pool_size.")
return engine
def _finalize_session(self):
""" Close current session and dispose connections of db engine"""
self.session.remove()
def _truncate_data(self):
"""
Truncate self.audit_data according to the self.custom_column_length.
:return: None
"""
for column, l in self.custom_column_length.items():
if column in self.audit_data:
data = self.audit_data[column]
if isinstance(data, str):
if column == "policies":
# The policies column is shortened per comma entry
data = truncate_comma_list(data, l)
else:
data = data[:l]
self.audit_data[column] = data
@staticmethod
def _create_filter(param, timelimit=None):
"""
create a filter condition for the logentry
"""
conditions = []
param = param or {}
for search_key in param.keys():
search_value = param.get(search_key)
if search_key == "allowed_audit_realm":
# Add each realm in the allowed_audit_realm list to the
# search condition
realm_conditions = []
for realm in search_value:
realm_conditions.append(LogEntry.realm == realm)
filter_realm = or_(*realm_conditions)
conditions.append(filter_realm)
# We do not search if the search value only consists of '*'
elif search_value.strip() != '' and search_value.strip('*') != '':
try:
if search_key == "success":
# "success" is the only integer.
search_value = search_value.strip("*")
conditions.append(getattr(LogEntry, search_key) ==
int(is_true(search_value)))
else:
# All other keys are compared as strings
column = getattr(LogEntry, search_key)
if search_key in ["date", "startdate"]:
# but we cast a column with a DateTime type to an
# ISO-format string first
column = to_isodate(column)
search_value = search_value.replace('*', '%')
if '%' in search_value:
conditions.append(column.like(search_value))
else:
conditions.append(column == search_value)
except Exception as exx:
# The search_key was no search key but some
# bullshit stuff in the param
log.debug("Not a valid searchkey: {0!s}".format(exx))
if timelimit:
conditions.append(LogEntry.date >= datetime.datetime.now() -
timelimit)
# Combine them with or to a BooleanClauseList
filter_condition = and_(*conditions)
return filter_condition
[docs]
def get_total(self, param, AND=True, display_error=True, timelimit=None):
"""
This method returns the total number of audit entries
in the audit store
"""
count = 0
# if param contains search filters, we build the search filter
# to only return the number of those entries
filter_condition = self._create_filter(param, timelimit=timelimit)
try:
count = self.session.query(LogEntry.id) \
.filter(filter_condition) \
.count()
finally:
self.session.close()
return count
[docs]
def finalize_log(self):
"""
This method is used to log the data.
It should hash the data and do a hash chain and sign the data
"""
try:
for entry, value in self.audit_data.items():
if isinstance(value, list):
self.audit_data[entry] = ",".join(value)
if self.config.get("EDUMFA_AUDIT_SQL_TRUNCATE"):
self._truncate_data()
if "tokentype" in self.audit_data:
log.warning("We have a wrong 'tokentype' key. This should not happen. Fix it!. "
"Error occurs in action: {0!r}.".format(self.audit_data.get("action")))
if not "token_type" in self.audit_data:
self.audit_data["token_type"] = self.audit_data.get("tokentype")
if self.audit_data.get("startdate"):
duration = datetime.datetime.now() - self.audit_data.get("startdate")
else:
duration = None
le = LogEntry(action=self.audit_data.get("action"),
success=int(self.audit_data.get("success", 0)),
serial=self.audit_data.get("serial"),
token_type=self.audit_data.get("token_type"),
user=self.audit_data.get("user"),
realm=self.audit_data.get("realm"),
resolver=self.audit_data.get("resolver"),
administrator=self.audit_data.get("administrator"),
action_detail=self.audit_data.get("action_detail"),
info=self.audit_data.get("info"),
edumfa_server=self.audit_data.get("edumfa_server"),
client=self.audit_data.get("client", ""),
loglevel=self.audit_data.get("log_level"),
clearance_level=self.audit_data.get("clearance_level"),
policies=self.audit_data.get("policies"),
startdate=self.audit_data.get("startdate"),
duration=duration,
thread_id=self.audit_data.get("thread_id")
)
self.session.add(le)
self.session.commit()
# Add the signature
if self.sign_data and self.sign_object:
s = self._log_to_string(le)
sign = self.sign_object.sign(s)
le.signature = sign
self.session.merge(le)
self.session.commit()
except Exception as exx: # pragma: no cover
# in case of a Unicode Error in _log_to_string() we won't have
# a signature, but the log entry is available
log.error("exception {0!r}".format(exx))
log.error("DATA: {0!s}".format(self.audit_data))
log.debug("{0!s}".format(traceback.format_exc()))
self.session.rollback()
finally:
self.session.close()
# clear the audit data
self.audit_data = {}
def _check_missing(self, audit_id):
"""
Check if the audit log contains the entries before and after
the given id.
TODO: We can not check at the moment if the first or the last entries
were deleted. If we want to do this, we need to store some signed
meta information:
1. Which one was the first entry. (use initialize_log)
2. Which one was the last entry.
"""
res = False
try:
id_bef = self.session.query(LogEntry.id
).filter(LogEntry.id ==
int(audit_id) - 1).count()
id_aft = self.session.query(LogEntry.id
).filter(LogEntry.id ==
int(audit_id) + 1).count()
# We may not do a commit!
# self.session.commit()
if id_bef and id_aft:
res = True
except Exception as exx: # pragma: no cover
log.error("exception {0!r}".format(exx))
log.debug("{0!s}".format(traceback.format_exc()))
# self.session.rollback()
finally:
# self.session.close()
pass
return res
@staticmethod
def _log_to_string(le):
"""
This function creates a string from the logentry so
that this string can be signed.
Note: Not all elements of the LogEntry are used to generate the
string (the Signature is not!), otherwise we could have used pickle
:param le: LogEntry object containing the data
:type le: LogEntry
:rtype str
"""
# TODO: Add thread_id. We really should add a versioning to identify which audit data is signed.
s = "id=%s,date=%s,action=%s,succ=%s,serial=%s,t=%s,u=%s,r=%s,adm=%s," \
"ad=%s,i=%s,ps=%s,c=%s,l=%s,cl=%s" % (le.id,
le.date,
le.action,
le.success,
le.serial,
le.token_type,
le.user,
le.realm,
le.administrator,
le.action_detail,
le.info,
le.edumfa_server,
le.client,
le.loglevel,
le.clearance_level)
# If we have the new log entries, we also add them for signing and verification.
if le.startdate:
s += ",{0!s}".format(le.startdate)
if le.duration:
s += ",{0!s}".format(le.duration)
return s
@staticmethod
def _get_logentry_attribute(key):
"""
This function returns the LogEntry attribute for the given key value
"""
sortname = {'number': LogEntry.id,
'action': LogEntry.action,
'success': LogEntry.success,
'serial': LogEntry.serial,
'date': LogEntry.date,
'startdate': LogEntry.startdate,
'duration': LogEntry.duration,
'token_type': LogEntry.token_type,
'user': LogEntry.user,
'realm': LogEntry.realm,
'administrator': LogEntry.administrator,
'action_detail': LogEntry.action_detail,
'info': LogEntry.info,
'edumfa_server': LogEntry.edumfa_server,
'client': LogEntry.client,
'log_level': LogEntry.loglevel,
'policies': LogEntry.policies,
'clearance_level': LogEntry.clearance_level,
'thread_id': LogEntry.thread_id}
return sortname.get(key)
[docs]
def csv_generator(self, param=None, user=None, timelimit=None):
"""
Returns the audit log as csv file.
:param timelimit: Limit the number of dumped entries by time
:type timelimit: datetime.timedelta
:param param: The request parameters
:type param: dict
:param user: The user, who issued the request
:return: None. It yields results as a generator
"""
filter_condition = self._create_filter(param,
timelimit=timelimit)
logentries = self.session.query(LogEntry).filter(filter_condition).order_by(LogEntry.date).all()
for le in logentries:
audit_dict = self.audit_entry_to_dict(le)
yield ",".join(["'{0!s}'".format(x) for x in audit_dict.values()]) + "\n"
[docs]
def get_count(self, search_dict, timedelta=None, success=None):
# create filter condition
filter_condition = self._create_filter(search_dict)
conditions = [filter_condition]
if success is not None:
conditions.append(LogEntry.success == int(is_true(success)))
if timedelta is not None:
conditions.append(LogEntry.date >= datetime.datetime.now() -
timedelta)
filter_condition = and_(*conditions)
log_count = self.session.query(LogEntry).filter(filter_condition).count()
return log_count
[docs]
def search(self, search_dict, page_size=15, page=1, sortorder="asc",
timelimit=None):
"""
This function returns the audit log as a Pagination object.
:param timelimit: Only audit entries newer than this timedelta will
be searched
:type timelimit: timedelta
"""
page = int(page)
page_size = int(page_size)
paging_object = Paginate()
paging_object.page = page
paging_object.total = self.get_total(search_dict, timelimit=timelimit)
if page > 1:
paging_object.prev = page - 1
if paging_object.total > (page_size * page):
paging_object.next = page + 1
auditIter = self.search_query(search_dict, page_size=page_size,
page=page, sortorder=sortorder,
timelimit=timelimit)
while True:
try:
le = next(auditIter)
# Fill the list
paging_object.auditdata.append(self.audit_entry_to_dict(le))
except StopIteration as _e:
log.debug("Interation stopped.")
break
except UnicodeDecodeError as _e:
# Unfortunately if one of the audit entries fails, the whole
# iteration stops and we return an empty paging_object.
# TODO: Check if we can return the other entries in the auditIter
# or some meaningful error for the user.
log.warning('Could not read audit log entry! '
'Possible database encoding mismatch.')
log.debug("{0!s}".format(traceback.format_exc()))
return paging_object
[docs]
def search_query(self, search_dict, page_size=15, page=1, sortorder="asc",
sortname="number", timelimit=None):
"""
This function returns the audit log as an iterator on the result
:param timelimit: Only audit entries newer than this timedelta will
be searched
:type timelimit: timedelta
"""
logentries = None
try:
limit = int(page_size)
offset = (int(page) - 1) * limit
# create filter condition
filter_condition = self._create_filter(search_dict,
timelimit=timelimit)
if sortorder == "desc":
logentries = self.session.query(LogEntry).filter(
filter_condition).order_by(
desc(self._get_logentry_attribute("number"))).limit(
limit).offset(offset)
else:
logentries = self.session.query(LogEntry).filter(
filter_condition).order_by(
asc(self._get_logentry_attribute("number"))).limit(
limit).offset(offset)
except Exception as exx: # pragma: no cover
log.error("exception {0!r}".format(exx))
log.debug("{0!s}".format(traceback.format_exc()))
self.session.rollback()
finally:
self.session.close()
if logentries is None:
return iter([])
else:
return iter(logentries)
[docs]
def clear(self):
"""
Deletes all entries in the database table.
This is only used for test cases!
:return:
"""
self.session.query(LogEntry).delete()
self.session.commit()
[docs]
def audit_entry_to_dict(self, audit_entry):
sig = None
if self.sign_data:
try:
sig = self.sign_object.verify(self._log_to_string(audit_entry),
audit_entry.signature,
self.verify_old_sig)
except UnicodeDecodeError as _e:
# TODO: Unless we trace and eliminate the broken unicode in the
# audit_entry, we will get issues when packing the response.
log.warning('Could not verify log entry! We get invalid values '
'from the database, please check the encoding.')
log.debug('{0!s}'.format(traceback.format_exc()))
is_not_missing = self._check_missing(int(audit_entry.id))
# is_not_missing = True
audit_dict = OrderedDict()
audit_dict['number'] = audit_entry.id
audit_dict['date'] = audit_entry.date.isoformat()
audit_dict['sig_check'] = "OK" if sig else "FAIL"
audit_dict['missing_line'] = "OK" if is_not_missing else "FAIL"
audit_dict['action'] = audit_entry.action
audit_dict['success'] = audit_entry.success
audit_dict['serial'] = audit_entry.serial
audit_dict['token_type'] = audit_entry.token_type
audit_dict['user'] = audit_entry.user
audit_dict['realm'] = audit_entry.realm
audit_dict['resolver'] = audit_entry.resolver
audit_dict['administrator'] = audit_entry.administrator
audit_dict['action_detail'] = audit_entry.action_detail
audit_dict['info'] = audit_entry.info
audit_dict['edumfa_server'] = audit_entry.edumfa_server
audit_dict['policies'] = audit_entry.policies
audit_dict['client'] = audit_entry.client
audit_dict['log_level'] = audit_entry.loglevel
audit_dict['clearance_level'] = audit_entry.clearance_level
audit_dict['startdate'] = audit_entry.startdate.isoformat() if audit_entry.startdate else None
audit_dict['duration'] = audit_entry.duration.total_seconds() if audit_entry.duration else None
audit_dict['thread_id'] = audit_entry.thread_id
return audit_dict