"""Subclass of DeepDict with methods specific to XMu"""
import re
from collections import namedtuple
from datetime import datetime
from itertools import izip_longest
from pytz import timezone
from dateparser import parse
from ..constants import FIELDS
from ...dicts import DeepDict
Row = namedtuple('Row', ['irn', 'field', 'row', 'val'])
[docs]class XMuRecord(DeepDict):
"""Contains methods for reading data from EMu XML exports"""
def __init__(self, *args):
super(XMuRecord, self).__init__(*args)
self._attributes = ['fields', 'module']
# Set defaults for carryover attributes
for attr in self._attributes:
setattr(self, attr, None)
self.tabends = ('0', '_nesttab', '_nesttab_inner', '_tab')
self.refends = ('Ref', 'Ref_tab')
self.fields = FIELDS
def __call__(self, *args, **kwargs):
"""Shorthand for XMuRecord.smart_pull(*args)"""
return self.smart_pull(*args)
'''
def __getattribute__(self, attr):
try:
val = super(XMuRecord, self).__getattribute__(attr)
except AttributeError:
if attr == 'fields':
self.fields = FIELDS
return FIELDS
raise
else:
if attr == 'fields' and val is None:
self.fields = FIELDS
return FIELDS
return val
'''
[docs] def finalize(self, *args, **kwargs):
pass
[docs] def add(self, path, val, delim='|'):
if isinstance(path, basestring):
path = path.split('/')
rec = self
for i, seg in enumerate(path):
is_tab = seg.endswith(self.tabends)
is_ref = seg.endswith(self.refends)
if is_tab and is_ref:
# This catches lists of irns
if isinstance(val, list) and all([v.isnumeric() for v in val]):
rec.setdefault(seg, []).extend(val)
# This catches tables with values that don't work with this func
elif val and i == len(path) - 1:
raise ValueError('{}: {}'.format(path, val))
else:
rec.setdefault(seg, []).append(self.clone())
rec = rec[seg]
elif is_ref:
rec = rec.setdefault(seg, self.clone())
elif path[-1] == seg:
vals = []
if val is not None:
vals = [s.strip() for s in val.split(delim)]
else:
val = u''
rec[seg] = vals if seg.endswith(self.tabends) else val
else:
raise ValueError('{}: {}'.format(path, val))
[docs] def setdefault(self, key, val, delim='|'):
if '/' in key:
raise KeyError('Illegal key: {}'.format(key))
if key.endswith(self.tabends) and not isinstance(val, list):
val = val.split(delim)
return super(XMuRecord, self).setdefault(key, val)
[docs] def simple_pull(self, path):
"""Returns data from path in DeepDict
Args:
path (mixed): the path to an EMu field as a string or list
Returns:
Value for the given path
"""
if isinstance(path, basestring):
return self(path)
else:
return self(*path)
def _guess_module(self):
"""Attempts to guess the module if no module attribute set"""
# FIXME: Fill out and move to a config file
keys = {
'ArtTitle': 'ebibliography',
'CatPrefix': 'ecatalogue',
'CatNumber': 'ecatalogue',
'CatSuffix': 'ecatalogue',
'LocCountry': 'ecollectionevents',
'LocStateProvinceTerritory': 'ecollectionevents',
'LocDistrictCountyShire': 'ecollectionevents',
'LocTownship': 'ecollectionevents',
'MulTitle': 'emultimedia',
'NamLast': 'eparties',
'NamOrganisation': 'eparties',
'NamRoles_tab': 'eparties',
'ShpContactRef': 'eshipments',
'TraNumber': 'enmnhtransactions'
}
try:
assert self.module
except AssertionError:
modules = []
for key, module in keys.iteritems():
if self.get(key) is not None:
modules.append(module)
if len(set(modules)) == 1:
self.module = modules[0]
else:
raise
[docs] def smart_pull(self, *args, **kwargs):
"""Pull data from the record, formatting the result based on the path
Args:
*args: the path to a value in the dictionary, with one component
of that path per arg. If args[0] contains one or more dots,
the path will be expanded from that and ignore subsequent args.
Returns:
Value for the given path, formatted as follows:
An atomic field returns a string
A reference pointing to a single field returns a string
A simple table returns a list of values
A reference table that specifies a field returns a list
A reference table returns a list of XMuRecord objects
A nested table returns a list of lists
"""
self._guess_module()
if '.' in args[0]:
args = args[0].split('.')
# Nested tables need to be handled very carefully
nested = [arg for arg in args if arg.endswith('_nesttab')]
if nested:
args = list(args)
nesttab = nested[0]
nesttab_inner = nesttab + '_inner'
if not nesttab_inner in args:
args.insert(args.index(nesttab) + 1, nesttab_inner)
# Split into inner and outer tables
outer_table = args[:args.index(nesttab_inner)]
inner_table = args[args.index(nesttab_inner):]
if not inner_table:
inner_table = [nesttab.split('_')[0]]
try:
retval = [row.get_rows(*inner_table, **kwargs)
for row in self.pull(*outer_table)]
except AttributeError:
retval = [[]]
except KeyError:
retval = [[]]
# Reference tables return a list of dictionaries, unless a field
# is specified, in which case they return a list of values
elif [arg for arg in args if arg.endswith('Ref_tab')]:
retval = self.get_reference(*args, **kwargs)
if retval is None:
retval = []
# One-dimensional tables return a list of values
elif [arg for arg in args if arg.endswith(self.tabends)]:
retval = self.get_rows(*args, **kwargs)
# Atomic references return a single dictionary, whereas atomic
# fields return a value
else:
default = self.clone() if args[-1].endswith(self.refends) else u''
try:
val = self.pull(*args, **kwargs)
except KeyError:
retval = default
else:
retval = val if val is not None else default
# Update module attribute for references/attachments
if args[-1].endswith(self.refends):
path = [self.module] + list(args)
field_data = self.fields.get(*path)
if isinstance(retval, list):
for val in retval:
try:
val.module
except AttributeError:
val = self.clone(val)
val.module = field_data['schema'].get('RefTable')
else:
retval.module = field_data['schema']['RefTable']
# Verify path against the schema if no value is returned. A failed
# call does not itself return an error because not all fields will
# be present in all records.
if not retval:
path = [self.module] + list(args)
try:
self.fields.get(*path)
except KeyError:
raise KeyError('/'.join(args))
# Last check
if retval is None:
raise TypeError
return retval
[docs] def is_new(self, found):
"""Checks if current module:irn exists in found
Args:
found (dict): marks irns already found as True
Returns:
Boolean expressing if the current record has already been seen
This method can be invoked manually inside the XMu subclass when
reading XML exports from a directory containing multiple, potentially
overlapping record sets to prevent (a) the same record from being read
twice or (b) an older version of a record from overwriting a more
recent one.
"""
key = ':'.join([self.module, self('irn')])
try:
return not found[key]
except KeyError:
found[key] = True
return True
[docs] def verify(self):
for path in self.get_paths():
path.insert(0, self.module)
path = [seg.rsplit('(', 1)[0].rstrip('_') for seg in path]
try:
self.fields.get(*path)
except KeyError:
if not [seg for seg in path if seg.startswith('_')]:
raise KeyError('/'.join(path))
[docs] def get_paths(self, rec=None, path=None, paths=None):
if rec is None:
rec = self
if path is None:
path = []
if paths is None:
paths = []
for key in rec:
path.append(key)
try:
child = rec(key)
except IOError:
for child in rec:
paths = self.get_paths(rec=child, path=path, paths=paths)
else:
if isinstance(child, dict):
paths = self.get_paths(rec=child, path=path, paths=paths)
else:
paths.append(path[:])
path.pop()
return paths
'''
def smart_push(self, val, *args):
"""Add value to paths stipulated by args
Not recommended, needs testing.
"""
# Confim that paths to tables are valid
temp = []
for i in xrange(len(args)):
temp.append(args[i])
stripped = args[i].rstrip('+')
stem = stripped.rsplit('_', 1)[0]
if (stripped.endswith(self.tabends)
and not stripped.endswith('Ref_tab')
and (i == (len(args) - 1) or args[i+1] != stem)):
temp.append(stem)
args = temp
# Process args using a modification of the base pull function
d = self
for i in xrange(len(args) - 1):
arg = args[i]
append = False
table = False
if arg.rstrip('+').endswith(self.tabends):
table = True
if arg.endswith('+'):
append = True
arg = arg.rstrip('+')
try:
d = d[arg]
except KeyError:
if table:
d[arg] = [self.__class__()]
d = d[arg][0]
else:
d[arg] = self.__class__()
d = d[arg]
else:
# Append to an existing table
if table and append:
d.append(self.__class__())
# Replace an existing table
elif table and not append:
d = [self.__class__()]
d = d[-1]
d[args[-1].rstrip('+')] = val
'''
[docs] def get_rows(self, *args):
"""Returns a list of values corresponding to the table rows
Args:
*args: the path to a value in the dictionary, with one component
of that path per arg
Returns:
List of values, one per row
"""
# Clean up tables
for i in xrange(len(args)):
if args[-(i+1)].endswith(self.tabends):
if i:
args = args[:-i]
break
try:
table = self.pull(*args)
except (KeyError, KeyError):
return []
else:
rows = []
for row in table:
try:
rows.extend(row.values())
except AttributeError:
raise AttributeError('No values attribute found for {}. Try'
' expanding the record.'.format(args))
return rows
[docs] def get_reference(self, *args):
"""Returns a list of values corresponding to the table rows
Args:
*args: the path to a value in the dictionary, with one component
of that path per arg.
Returns:
If the last arg is a field (as opposed to a reference table),
this function will return a list of values, one per row. If the
last arg is a reference table, it will return a list of XMuRecords.
"""
# Check for key within reference
key = None
while not args[-1].endswith(('Ref_tab')):
key = args[-1]
args = args[:-1]
try:
ref = self.pull(*args)
except KeyError:
return []
else:
if ref and key is None:
return ref
elif ref:
rows = []
for row in ref:
rows.append(row.get(key, []))
return rows
[docs] def get_matching_rows(self, match, label_field, value_field):
"""Helper function to find rows in any table matching a kind/label
Args:
match (str): the name of the label to match
label_field (str): field in a table containing the label
value_field (str): field in a table containing the value
Returns:
List of values matching the match string
"""
labels = self.simple_pull(label_field)
values = self.simple_pull(value_field)
rows = izip_longest(labels, values)
match = standardize(match)
return [val for label, val in rows if standardize(label) == match]
[docs] def get_location(self, current=False, keyword=None):
"""Returns the current or permanent location of a specimen"""
locs = self('LocLocationRef_tab' if current else 'LocPermanentLocationRef')
if not current:
locs = [locs]
for i, loc in enumerate(locs):
try:
locs[i] = loc['SummaryData']
except KeyError:
val = [loc('LocLevel{}'.format(x)) for x in xrange(1,9)]
locs[i] = ' - '.join([s for s in val if s]).upper()
# Filter multiple locations on keyword
if keyword:
try:
return [s for s in locs if keyword.lower() in s.lower()][0]
except IndexError:
pass
return locs[-1]
[docs] def get_date(self, date_from, date_to=None, date_format='%Y-%m-%d'):
"""Returns dates and date ranges
Args:
date_from (mixed): path to date from field
date_to (mixed): path to date to field
date_format (str): formatting mask for date
Returns:
Date or date range as a string
"""
dates = [self.simple_pull(date_from)]
if date_to is not None:
dates.append(self.simple_pull(date_to))
date_range = []
for date in [dt for dt in dates if dt]:
parsed = parse(date).strftime(date_format)
if not parsed in date_range:
date_range.append(parsed)
return ' to '.join(date_range)
[docs] def get_datetime(self, date_from, date_to=None, date_modifier=None,
time_from=None, time_to=None, time_modifier=None,
conjunction=' to ', format='%Y%m%dT%H%M%S'):
pass
[docs] def get_notes(self, kind):
"""Return the note matching the given kind"""
fields = [
('NotNmnhType_tab', 'NotNmnhText0'),
('NteType_tab', 'NteText0')
]
for note_kind, note_content in fields:
if note_kind in self and note_content in self:
return self.get_matching_rows(kind, note_kind, note_content)
return []
[docs] def get_created_time(self, timezone_id='US/Eastern', mask=None):
"""Gets datetime of record creation"""
return self._localize_datetime(self('AdmDateInserted'),
self('AdmTimeInserted'),
timezone_id,
mask)
[docs] def get_modified_time(self, timezone_id='US/Eastern', mask=None):
"""Gets datetime of last modification"""
return self._localize_datetime(self('AdmDateModified'),
self('AdmTimeModified'),
timezone_id,
mask)
[docs] def get_current_weight(self, decimal_places=2):
"""Gets the current weight of the object
Args:
decimal_places (int): the number of decimal places to which to
round the weight
Returns:
Unicode-encoded string with the weight and unit, if any
"""
assert isinstance(decimal_places, int)
weight = self('MeaCurrentWeight').rstrip('0.')
unit = self('MeaCurrentUnit')
if weight and unit:
if '.' in weight:
weight = float(weight)
mask = u'{weight:.' + str(decimal_places) + 'f} {unit}'
return mask.format(weight=weight, unit=unit)
else:
weight = int(weight)
return u'{weight:,} {unit}'.format(weight=weight, unit=unit)
return u''
@staticmethod
def _localize_datetime(date, time, timezone_id, mask):
if not (date and time):
raise ValueError('Both date and time are required')
iso_datetime = '{}T{}'.format(date, time)
timestamp = datetime.strptime(iso_datetime, '%Y-%m-%dT%H:%M:%S')
localized = timezone(timezone_id).localize(timestamp)
if mask is not None:
return localized.strftime(mask)
return localized
[docs] def get_guid(self, kind='EZID', allow_multiple=False):
"""Gets value from the GUID table for a given key
Args:
kind (str): name of GUID
allow_multiple (bool): if False, raises error if multiple
values with same type are found
Returns:
First match from the GUID table for the key (if allow_multiple
is False) or the full set of matches (if allow_multiple is True)
"""
args = (kind, 'AdmGUIDType_tab', 'AdmGUIDValue_tab')
matches = self.get_matching_rows(*args)
if len(matches) > 1 and not allow_multiple:
raise Exception('Multiple values found for {}'.format(kind))
if allow_multiple:
return matches
else:
try:
return matches[0]
except IndexError:
return None
[docs] def get_url(self):
"""Gets the ark link to this record"""
ezid = self.get_guid('EZID')
if ezid:
return 'http://n2t.net/{}'.format(ezid)
[docs] def wrap(self, module):
"""Wraps the XMuRecord with name of module
Args:
module (str): name of module to use as key
Returns:
Wrapped XMuRecord. In a typical use case, this means the paths
used to retrieve data need to include the module name.
"""
return self.clone({module: self})
[docs] def unwrap(self):
"""Removes outermost level of XMuRecord
This simplifies the paths needed to pull data from the record. The
record will need to be wrapped again before writing to XML.
Returns:
Unwrapped XMuRecord. In a typical use case, this means the paths
used to retrieve data do not need to include the module name.
"""
return self[self.keys()[0]]
[docs] def expand(self, keep_empty=False):
"""Expands and verifies a flattened record"""
self._expand(keep_empty=keep_empty)
self.verify()
return self
def _expand(self, keep_empty=False):
"""Expands a flattened record"""
# Clear pre/append logic if record is not an update
try:
self['irn']
except KeyError:
pass
else:
keep_empty = True
# Empty atoms should be excluded from appends; they show up as empty
# tags and will therefore erase any value currently in the table.
# Also strips append markers from records that do not include an irn.
for key in self.keys():
if key.endswith(')') and not self[key]:
del self[key]
elif not keep_empty:
k = key.rsplit('(', 1)[0]
if k != key:
if self[key]:
self[k] = self[key]
del self[key]
elif key.startswith('_'):
del self[key]
# Expand shorthand keys, including tables and simple references.
# Keys pointing to other XMuRecord objects are left alone.
for key in self.keys():
val = self[key]
k = key.rsplit('(', 1)[0] # key stripped of row logic
base = key.rstrip('_').split('_', 1)[0].rstrip('(0+)') # strip _tab
# Confirm that data type appears to be correct
if (key.rstrip('_').endswith(('0', 'tab', ')'))
and not isinstance(val, list)):
raise ValueError('{} must be a list'.format(key))
elif (val
and not key.startswith('_')
and not key.rstrip('_').endswith(('0', 'tab', ')', 'Ref'))
and not isinstance(val, (basestring, int, long, float))):
raise ValueError('{} must be atomic'.format(key))
# Handle nested tables
if k.endswith('_nesttab'):
# Test if the table has already been expanded by looking
# for a corresponding _nesttab_inner key
try:
expanded = any([k + '_inner' in v.keys() for v in val])
except (AttributeError, IndexError):
expanded = False
if not expanded and any(val):
if 'Ref_' in k:
base = 'irn'
if isinstance(val[0], basestring):
self[key] = [self.clone({
k + '_inner': [self.clone({base: s}) for s in val]
})]
else:
self[key] = []
for s in val:
self[key].append(self.clone({
k + '_inner': [self.clone({base: s}) for s in s]
}))
elif not expanded:
self[key] = []
elif (k.endswith('Ref')
and isinstance(val, (int, str, unicode))
and val):
self[key] = self.clone({'irn': val})
elif k.endswith('Ref'):
try:
self[key]._expand(keep_empty=True)
except AttributeError:
self[key] = self.clone(self[key])._expand(keep_empty=True)
elif (k.endswith('Ref_tab')
and isinstance(val, list)
and any(val)
and isinstance(val[0], (int, str, unicode))):
self[key] = [self.clone({'irn': s}) if s
else self.clone() for s in val]
elif (k.endswith('Ref_tab')
and isinstance(val, list)
and any(val)):
self[key] = [self.clone(d)._expand(keep_empty=True) for d in val]
elif (k.rstrip('_').endswith(self.tabends)
and isinstance(val, list)
and any(val)
and isinstance(val[0], (int, str, unicode))):
try:
self[key] = [self.clone({base: s}) if base not in s else s for s in self[key]]
except:
print key
raise
elif (k.rstrip('_').endswith(self.tabends)
and isinstance(val, list)
and not any(val)):
self[key] = []
return self
[docs] def to_refine(self):
"""Maps EMu data to Google Refine
FIXME: Needs to be cleaned up and tested
"""
irn = self('irn')
rows = []
for field in self:
vals = self(field)
if isinstance(vals, basestring):
rows.append(Row(irn, field, None, vals))
elif isinstance(vals, list):
for i, val in enumerate(vals):
rows.append(Row(irn, field, i + 1, val))
elif isinstance(vals, XMuRecord):
# Excludes attachments
pass
else:
print field, vals, type(val)
return rows
[docs] def delete_rows(self, key, indexes=None, conditions=None):
"""Deletes any rows matching the given conditions from a table"""
assert key.endswith(self.tabends)
assert indexes is not None or conditions is not None
if indexes is not None:
if not isinstance(indexes, list):
indexes = [indexes]
indexes.sort(reverse=True)
for i in indexes:
self.delete_row(key, i)
else:
matches = {}
for field, condition in conditions.iteritems():
for i, val in enumerate(self(field)):
if val == condition:
matches.setdefault(field, []).append(i)
if matches.values():
values = [set(val) for val in matches.values()]
indexes = list(values[0].intersection(*values))
indexes.sort(reverse=True)
for i in indexes:
self.delete_row(key, i)
# Add blank rows for any fields not represented
for key in self.get_table(key):
if not self(key):
self[key] = []
[docs] def delete_row(self, key, i):
"""Deletes the row matching the given index"""
for field in self.get_table(key):
try:
del self[field][i]
except (IndexError, KeyError):
pass
[docs] def get_table(self, *path):
"""Returns the table to which the field specified in path belongs"""
fields = self.fields.get(self.module, *path).get('columns', [])
return ['/'.join(field[1:]) for field in fields]
[docs] def zip(self, *args):
"""Zips the set of lists, padding each list to the max length"""
return izip_longest(*[self(arg) for arg in args])
[docs]def standardize(val):
"""Standardize the format of a value"""
if val is None:
val = u''
return re.sub(r'[\W]', u'', val.upper()).upper()