"""Tools to parse audits data from EMu"""
from random import randint
from ...xmu import XMu, AuditRecord
from ..containers.auditrecord import Change
[docs]class Auditor(XMu):
"""Processes an eaudits XML export into HTML for easy viewing
Args:
keep (int): percent of records to include in the output
whitelist (list): list of fields to output. All other fields
will be ignored. The whitelist supersedes the blacklist.
blacklist (list): list of fields to exclude from output. all
other fields will be included.
modules (list): list of modules to include in the report
users (list): list of users to include in the report
Attributes:
keep (int): percent of records to include in the output
whitelist (list): list of fields to output. All other fields
will be ignored. The whitelist supersedes the blacklist.
blacklist (list): list of fields to exclude from output. all
other fields will be included.
"""
def __init__(self, *args, **kwargs):
# Process Audtito-specific keywaords
self.percent_to_review = kwargs.pop('percent_to_review', 2)
self.blacklist = kwargs.pop('blacklist', [])
self.whitelist = kwargs.pop('whitelist', [])
# Filter params
self.modules = kwargs.pop('modules', [])
self.users = kwargs.pop('users', [])
super(Auditor, self).__init__(*args, **kwargs)
print 'Reviewing around {}% of records'.format(self.percent_to_review)
# Default values for the blacklist. Fields included here are not
# printed in the HTML report.
if not self.blacklist:
self.blacklist = [
'AdmDateModified',
'AdmModifiedBy',
'AdmTimeModified',
'DarDateLastModified',
'ExiIfd_tab',
'ExiName_tab',
'ExiTag_tab',
'ExiValue_tab',
'ExtendedData',
'SummaryData'
]
self._container = AuditRecord
self.records = {}
self._html = [] # results is a list of self.containers
[docs] def iterate(self, element):
"""Groups audit records by module and irn"""
rec = self.parse(element)
if rec('AudTable') == 'egroups':
return True
key = '-'.join([rec('AudTable'), rec('AudKey')])
self.records.setdefault(key, []).append(rec)
[docs] def itermodified(self, element):
rec = self.parse(element)
self.records.setdefault(rec('AudTable'), []).append(rec('AudKey'))
[docs] def combine(self, records=None, keep_all=False):
"""Parses audit records into HTML"""
if records is None:
records = self.records
combined = {}
for irn, recs in records.iteritems():
# Filter trails that don't include the specified users
if self.users:
if not [rec for rec in recs if rec('AudUser') in self.users]:
continue
# Filter trails that don't include the specified modules
if self.modules:
if not [rec for rec in recs if rec('AudTable') in self.modules]:
continue
# Filter trails that end in a delete
if [rec for rec in recs if rec('AudOperation') == 'delete']:
continue
# Get the audit trail
for rec in recs:
rec.parse_changes(self.whitelist, self.blacklist)
if len(recs) > 1:
# Sort from least to most recent modification time
recs.sort(key=lambda rec: 'T'.join([rec('AudDate'),
rec('AudTime')]))
# Get the original values from the first record
changes = recs[0].changes
oldest = {fld: chg.old for fld, chg in changes.iteritems()}
newest = {fld: chg.new for fld, chg in changes.iteritems()}
for rec in recs[1:]:
# Update original with values not modified in first audit
old = {fld: chg.old for fld, chg in rec.changes.iteritems()}
for key, val in old.iteritems():
if oldest.get(key) is None:
oldest[key] = val
# Overwrite newest with new
new = {fld: chg.new for fld, chg in rec.changes.iteritems()}
newest.update(new)
# Summarize the oldest and newest dictionaries into one
# record combining the metadata of all the records
summarized = AuditRecord()
summarized['AudKey'] = rec('AudKey')
summarized['AudTable'] = rec('AudTable')
fields = ['irn', 'AudUser', 'AudOperation']
for fld in fields:
items = '</li><li>'.join([rec(fld) for rec in recs])
summarized[fld] = '<ul><li>{}</li></ul>'.format(items)
summarized.changes = {
fld: Change(fld, oldest.get(fld), newest.get(fld))
for fld in set(oldest.keys() + newest.keys())
}
#summarized.pprint(True)
recs = [summarized]
combined[irn] = recs[0]
return combined
[docs] def finalize(self):
html = []
try:
combined = self.combine()
except TypeError:
pass
else:
print '{:,} distinct records were modified'.format(len(combined))
for irn, rec in combined.iteritems():
if (self.percent_to_review == 100
or randint(1, 100) <= self.percent_to_review):
html.extend(self.to_html(rec))
self._html = html
return html
[docs] def to_html(self, rec):
"""Converts an audit record to HTML for display"""
html = ['<h1>{}: {}</h1>'.format(rec('AudTable'), rec('AudKey'))]
html.append(u'<table>')
keys = ['irn', 'AudUser', 'AudOperation']
for key in keys:
html.append('<tr><th>{}</th><td colspan="2">{}'
'</td></tr>'.format(key, rec(key)))
if rec.changes is None:
rec.parse_changes(self.whitelist, self.blacklist)
for field in sorted(rec.changes):
change = rec.changes[field]
old = self.format_value(field, change.old)
new = self.format_value(field, change.new)
# Capture changes only
if old != new:
html.append(u'<tr>')
html.append(u'<th>{}</th>'.format(field))
html.append(u'<td>{}</td>'.format(old))
html.append(u'<td>{}</td>'.format(new))
html.append(u'</tr>')
html.append(u'</table>')
return html
[docs] def write_html(self, fp, html=None):
"""Writes HTML document containing the HTMLized records"""
header = ['<!DOCTYPE html>'
'<html>'
'<head><title>Audit Review Tool</title></head>'
'<meta charset="utf-8">'
'<style>'
'body { font: 11pt calibri; }'
'h1 { color: #39f; }'
'table { width: 70%; font: 10pt calibri; border-collapse: collapse; margin-bottom: 1%; }'
'tr:hover { background-color: #e6f2ff; }'
'th, td { vertical-align: top; border: 1px solid #ccc; padding: 1%; }'
'th { width: 10%; font-weight: 800; text-align: left; }'
'td { width: 25%; }'
'ol { padding: 1%; margin: 2%; }'
'</style>'
'<body>']
footer = ['</body>'
'</html>']
html = header + self._html if html is None else html + footer
with open(fp, 'wb') as f:
f.write(''.join([s.encode('utf-8') for s in html]))