"""Reads and returns information about EMu's schema"""
import json as serialize
import glob
import os
import re
from ..dicts import DeepDict
from ..helpers import cprint
[docs]class XMuFields(object):
"""Reads and stores metadata about fields in EMu
Args:
schema_path (str): path to EMu schema file. If None, looks for
a copy of the schema stored in files.
whitelist (list): list of EMu modules to include. If None,
anything not on the blacklist is included.
blacklist (list): list of EMu modules to exclude. If None,
no modules are excluded.
cache (str): path to cache file. If specified, script will
check there for a cache file and create one if it isn't found.
verbose (bool): triggers verbose output
Attributes:
schema (dict): path-keyed dicts of field data
tables (dict): module-keyed lists of paths to tables
map_tables (dict): path-keyed lists of paths to tables
verbose (bool): triggers verbose output
"""
def __init__(self, schema_path=None, whitelist=None, blacklist=None,
cache=True, verbose=False):
self.verbose = verbose
self._fpath = os.path.join(os.path.dirname(__file__), 'files')
# Set defaults for blacklist
defaults = {
'blacklist': [
'eaccessionlots',
'edocuments',
'eevents',
'eexhibitobjects',
'eexports',
'egazetteer',
'einternal',
'eloans',
'eluts',
'eregistry',
'erights',
'eschedule',
'esites',
'estatistics',
'etemplate',
'etrapevents',
'etraps',
'evaluations',
'ewebgroups',
'ewebusers',
],
'schema_path': os.path.join(self._fpath, 'schema.pl')
}
blacklist = set(defaults['blacklist'] if not blacklist else blacklist)
if not schema_path:
schema_path = defaults['schema_path']
if cache:
cache_path = schema_path.rsplit('.', 1)[0] + '.json'
# Set params. These will be added to the cache file to determine
# if the cache request is valid.
params = {
'blacklist': list(blacklist) if blacklist else None,
'cache_path': cache_path,
'schema_path': schema_path,
'whitelist': list(whitelist) if whitelist else None
}
# Check cache
cached = self._check_cache(params) if cache else None
if cached is None:
cprint('Reading EMu schema...')
# Extend schema based on source file, if specified. This
# tries to assure that any paths in the source file are included
# in the resulting XMuFields object.
self.schema = self._read_schema(schema_path, whitelist, blacklist)
# Tables are stored as tuples
self.tables = {} # maps tables to modules
self.map_tables = {} # maps container paths to fields
self.hashed_tables = {} # maps hash of tables to tables
self.tables = self._read_tables()
self._map_fields_to_tables() # adds table fields to schema dict
# Cache fields object as JSON
if cache:
cprint('Caching XMuFields object...')
# Convert keys in map_tables to string
map_tables = {'|'.join(key): val for key, val
in self.map_tables.iteritems()}
fields = {
'params': params,
'schema': self.schema,
'tables': self.tables,
'map_tables': map_tables,
'hashed_tables': self.hashed_tables,
}
with open(cache_path, 'wb') as f:
serialize.dump(fields, f)
def __call__(self, *args):
"""Shorthand for :py:func:`~XMuFields.get(*args)`"""
return self.get(*args)
def _check_cache(self, params):
"""Check for cached XMuFields object"""
schema_path = params['schema_path']
cache_path = params['cache_path']
cached = None
# Check if JSON is newer than XML
try:
json_newer = os.path.getmtime(cache_path) > os.path.getmtime(schema_path)
except (IOError, OSError):
json_newer = False
if json_newer:
cprint('Reading cached XMuFields object...')
try:
with open(cache_path, 'rb') as f:
cached = serialize.load(f)
except IOError:
cprint('Cache file not found!')
except KeyError:
cprint('Cache file not JSON!')
else:
# Check logged in the cached file
if params != cached.get('params'):
cached = None
else:
try:
map_tables = {tuple(key.split('|')): val for key, val
in cached['map_tables'].iteritems()}
self.schema = cached['schema']
self.tables = cached['tables']
self.map_tables = map_tables
self.hashed_tables = cached['hashed_tables']
except KeyError:
cprint('Cache file missing required keys!')
cached = None
return cached
[docs] def get(self, *args):
"""Return data for an EMu export path
Modified from DeepDict.pull() to jump to a different module when
a reference is encountered.
Args:
*args: the path to a value in the dictionary, with one component
of that path per arg
Returns:
Dictionary with information about the given path
"""
mapping = self.schema
i = 0
while i < len(args):
try:
mapping = mapping[args[i]]
except KeyError:
try:
# Try jumping to a referenced module
mapping = self.schema[mapping['schema']['RefTable']]
except KeyError:
if args[0] is None:
raise KeyError('No module specified: {}'.format(args))
elif not self.schema:
# No error on bad path if the schema is not defined
print 'No schema defined'
else:
raise KeyError('Illegal path: {}'.format(args))
else:
i += 1
return mapping
def _read_schema(self, fp, whitelist=None, blacklist=None):
"""Reads EMu schema file to dictionary
See the class for details about the arguments used by this function.
The EMu schema file includes (but is not limted to) the following:
ColumnName: Name of field, table, or reference in current module
DataKind: dkAtom, dkNested, dkTable, dkTuple
DataType: Currency, Date, Float, Integer, Latitude,
Longitude, String, Text, Time, UserId, UserName
ItemName: Field name in current module
RefLink: Name with Ref
RefKey: Field used to link with other module
LookupName: Name of lookup list. Appears only in highest field
in a given lookup hierarchy.
LookupParent: The name of next highest field in a lookup hierarchy.
Returns:
DeepDict with information about the XML schema
"""
# Regexes are used to split the .pl file into modules and fields
re_module = re.compile(r'\te[a-z]+ =>.*?\{.*?\n\t\}', re.DOTALL)
re_field = re.compile(r"'[A-z].*?\},", re.DOTALL)
#re_lines = re.compile(r'[A-z].*,', re.DOTALL)
try:
with open(fp, 'rb') as f:
modules = re_module.findall(f.read())
except (IOError, OSError):
#raise Exception('{} not found'.format(fp))
return DeepDict()
schema = DeepDict()
for module in sorted(list(modules)):
module_name = module.split('\n')[0].strip().split(' ')[0]
# Check module name against whitelist and blacklist
if (blacklist is not None and module_name in blacklist
or whitelist is not None and not module_name in whitelist):
continue
schema[module_name] = {}
fields = re_field.findall(module)
for field in fields:
schema_data = {}
lines = [s.strip() for s in field.split('\n')
if bool(s.strip())]
#field_name = lines[0].split(' ')[0].strip('"\'')
lines = lines[2:len(lines)-1]
for line in lines:
try:
key, val = [s.strip('",\'') for s in line.split(' => ')]
except ValueError:
pass
else:
schema_data[key] = val.decode('cp1252')
schema_data['ModuleName'] = module_name
# ItemName appears only for fields that are editable in EMu
# (I think), so use it to cull copy fields, etc.
try:
schema_data['ItemName']
except KeyError:
#cprint('Skipped {}.{}'.format(module_name, field_name))
continue
# Get additional information about this field
path = self._derive_path(schema_data)
field_data = {
'path': '/'.join(path),
'table': is_table(*path),
'schema': schema_data
}
schema.push(field_data, *path)
return schema
@staticmethod
def _derive_path(schema_data):
"""Derive full path to field based on EMu schema
Args:
schema_data (dict): field-specific data from the EMu schema file
Returns:
String with slash-delimited path
"""
path = [schema_data['ModuleName']]
for key in ['ColumnName', 'ItemName', 'ItemBase']:
try:
val = schema_data[key]
except KeyError:
pass
else:
# Nested tables
if path[-1].endswith('_nesttab'):
path.append(val.rstrip('0') + '_nesttab_inner')
if not val.endswith(('0', 'Ref')):
path.append(val)
# Skip ItemName for references. This allows their
# paths to match the EMu import/export schema.
elif not (val.endswith('Ref') and key == 'ItemName'):
path.append(val)
# Reworked dedupe function to check against preceding value
keep = [i for i in xrange(len(path)) if not i or path[i] != path[i-1]]
return tuple([path[i] for i in keep])
def _read_tables(self):
"""Read data about tables from text files in files/tables"""
tables = {}
for fp in glob.iglob(os.path.join(self._fpath, 'tables', 'e*.txt')):
module_name = os.path.splitext(os.path.basename(fp))[0]
_tables = {}
with open(fp, 'rb') as f:
for line in [line.strip() for line in f.read().splitlines()
if ',' in line and not line.startswith('#')]:
table, column = line.split(',')
column = (module_name, column)
_tables.setdefault(table, []).append(column)
# Map nested tables as well
if column[1].endswith('_nesttab'):
table += 'Inner'
column = (column[0], column[1], column[1] + '_inner')
_tables.setdefault(table, []).append(column)
for table in _tables.values():
self.add_table(table)
tables[module_name] = [tuple(sorted(t)) for t in _tables.values()]
return tables
'''
def _map_tables(self):
"""Update path-keyed table map"""
cprint('Mapping tables...')
for module in set(self.tables.keys()) & set(self.schema.keys()):
for table in self.tables[module]:
for column in table:
try:
paths = self.schema.pathfinder(path=path)
except KeyError:
raise Exception('Table Error: {} not a valid'
' column'.format(column))
else:
for path in paths:
data = self.schema(path)
data['related'] = table
self.schema.push(path, data)
self.map_tables[path] = table
'''
def _map_fields_to_tables(self):
"""Add table data to field data in self.schema"""
cprint('Mapping tables...')
for module in set(self.tables.keys()) & set(self.schema.keys()):
for table in self.tables[module]:
for column in table:
data = self.schema(*column)
data['columns'] = table
self.schema.push(data, *column)
# Capture one-column tables
#for path in self.schema.pathfinder():
# data = self(path)
# if data['table'] and not 'columns' in data:
# data['columns'] = path,
def _map_aliases(self, module=None):
"""Update schema with user-defined aliases based on files/aliases.txt
Aliases can be called directly from schema. Additional aliases
can be set using set_aliases().
Args:
module (str): name of base module
Returns:
Dict of {alias: path} pairs
"""
cprint('Reading user-defined aliases...')
aliases = {}
with open(os.path.join(self._fpath, 'aliases.txt')) as f:
for line in [line.strip() for line in f.read().splitlines()
if ',' in line and not line.startswith('#')]:
alias, path = line.split(',')
# Exclude shortcuts to other modules if module specified
if module is not None and not path.startswith(module):
continue
try:
mapping = self.schema(path)
except KeyError:
cprint(' Alias error: Path not found: {}'.format(alias))
else:
#cprint('{} => {}'.format(alias, path))
mapping['alias'] = alias
self.schema.push(alias, mapping)
aliases[alias] = True
try:
self(path)['columns']
except KeyError:
if is_table(path):
cprint((' Alias error: Related table not'
' found: {}'.format(alias)))
# Not needed. Table included in data already.
#else:
# self.map_columns[alias] = table
return aliases
[docs] @staticmethod
def get_xpath(*args):
"""Reformat plain-text path to xpath
Args:
path (str): an XMuFields path
Returns:
Path string reformatted as in an EMu export
"""
xpath = []
for arg in args:
if is_table(arg):
xpath.append("table[@name='{}']".format(arg))
xpath.append('tuple')
elif is_reference(arg):
xpath.append("tuple[@name='{}']".format(arg))
else:
xpath.append("atom[@name='{}']".format(arg))
return '/'.join(xpath)
[docs] @staticmethod
def read_fields(fp):
"""Reads paths from the schema in an EMu XML export
Args:
fp (str): path to the EMu XML report
Returns:
List of paths in the EMu schema
"""
paths = []
schema = []
module = None
with open(fp, 'rb') as f:
for line in f:
if module is None and 'table name="e' in line:
module = line.split('"')[1]
schema.append(line.rstrip())
if line.strip() == '?>':
break
try:
schema = schema[schema.index('<?schema')+1:-1]
except ValueError:
paths = [module]
else:
containers = ['schema']
for field in schema:
kind, field = [s.strip() for s in field.rsplit(' ', 1)]
if kind in ('table', 'tuple'):
containers.append(field)
continue
if field == 'end':
containers.pop()
else:
paths.append('/'.join(containers[1:] + [field]))
return paths
[docs] def set_alias(self, alias, path):
"""Add alias: path to self.schema
Args:
alias (str): name of alias
path (str): path to alias
"""
self.schema.push(alias, self.schema(path))
'''
def reset_aliases(self):
"""Update schema to remove all aliases set using set_aliases()"""
self.schema = copy(self.master)
'''
[docs] def add_table(self, columns):
"""Update table containers with new table
Args:
columns (list): columns in the table being added
"""
module = columns[0][0]
columns.sort()
columns = tuple(columns)
hkey = hash(columns)
try:
self.hashed_tables[hkey]
except KeyError:
self.tables.setdefault(module, []).append(columns)
self.hashed_tables[hkey] = columns
for column in columns:
self.map_tables[column] = columns
[docs]def is_table(*args):
"""Checks whether a path points to a table
Args:
path (str): period-delimited path to a given field
Returns:
Boolean
"""
tabends = ('0', '_nesttab', '_nesttab_inner', '_tab')
return bool(len([s for s in args if s.endswith(tabends)]))
[docs]def is_reference(*args):
"""Checks whether a path is a reference
Args:
path (str): period-delimited path to a given field
Returns:
Boolean
"""
refends = ('Ref', 'Ref_tab')
return bool(len([s for s in args if s.endswith(refends)]))