Source code for minsci.xmu.xmu

"""Reads and writes XML formatted for Axiell EMu"""

import glob
import hashlib
import json
import os
import re
from collections import namedtuple
from datetime import datetime

from lxml import etree

from .constants import FIELDS
from .containers import XMuRecord
from .fields import is_table, is_reference
from ..exceptions import RowMismatch
from ..helpers import cprint


Grid = namedtuple('Grid', ['fields', 'operator'])


[docs]class XMu(object): """Read and search XML export files from EMu Attributes: fields (XMuFields): based on fields kwarg module (str): name of base module record (dict): the currently active record schema (dict): XMuFields.schema tables (dict): XMuFields.tables verbose (bool): triggers verbose output xpaths (list): paths from source file Args: path (str): path to EMu XML report or directory containing multiple reports. If multiple reports are found, they are handled from newest to oldest. fields (XMuFields): contains data about field container (DeepDict): class to use to store EMu data """ def __init__(self, path, fields=None, container=None, module=None): # Class-wide switches self.path = path self.keep = [] self.verbose = False self.module = module self.from_json = False # Create a fields object based on the path if none provided if fields is None: fields = FIELDS self.fields = fields # DeepDict or subclass to use as container for EMu data if container is None: container = XMuRecord self._attributes = ['fields', 'module'] self._container = container self.xpaths = [] self.newest = [] self._files = [] self._paths_found = {} # Walk through a directory if path is None: xpaths = [] elif os.path.isdir(path): self._files = [fp for fp in glob.glob(os.path.join(path, '*.xml'))] self._files.sort(key=lambda fp: os.path.getmtime(fp), reverse=True) xpaths = [] for fp in self._files: xpaths.extend(self.fields.read_fields(fp)) xpaths = list(set(xpaths)) elif path.endswith('.xml'): xpaths = self.fields.read_fields(path) self._files = [path] else: raise Exception('Invalid path: {}'.format(path)) # Check that all xpaths are valid according to schema remove = [] for xpath in xpaths: path = xpath.split('/') try: self.fields(*path) except NameError: cprint('Removed invalid path: {}'.format('/'.join(path))) remove.append(path) self.xpaths = [xpath for xpath in xpaths if not xpath in remove] # Record basic metadata about the import file if xpaths or self.module is None: self.module = self.xpaths[0].split('/')[0] self.newest = max([os.path.getmtime(fp) for fp in self._files]) self._paths_found = {}
[docs] def parse(self, element): """Converts XML record to XMu dictionary""" rec = self.read(element).unwrap() rec.finalize() return rec
[docs] def container(self, *args): """Wraps dict in custom container with attributes needed for export""" container = self._container(*args) for attr in self._attributes: setattr(container, attr, getattr(self, attr, None)) container.finalize() return container
[docs] def set_carryover(self, *args): """Update the list of carryover attributes""" self._attributes = args
[docs] def iterate(self, element): """Placeholder for iteration method""" raise Exception('No iterate method is defined for this subclass')
[docs] def finalize(self): """Placeholder for finalize method run at end of iteration""" pass
[docs] def fast_iter(self, func=None, report=0, skip=0, limit=0, callback=None, callback_kwargs=None, **kwargs): """Use callback to iterate through an EMu export file Args: func (function): name of iteration function report (int): number of records at which to report progress. If 0, no progress report is made. skip (int): number of records to skip before processing limit (int): number of record at which to stop processing the file callback (function): name of function to run upon completion Returns: Boolean indicating whether the entire file was processed successfully. """ if func is None: func = self.iterate if report: starttime = datetime.now() keep_going = True n_total = 0 n_success = 0 limit += skip for fp in self._files: if report: cprint('Reading {}...'.format(fp)) context = etree.iterparse(fp, events=['end'], tag='tuple') for _, element in context: # Process children of module table only parent = element.getparent().get('name') if parent is not None and parent.startswith('e'): n_total += 1 if skip and n_total < skip: continue result = func(element, **kwargs) if result is False: keep_going = False break elif result is not True: n_success += 1 element.clear() while element.getprevious() is not None: del element.getparent()[0] if report and not n_total % report: now = datetime.now() elapsed = now - starttime starttime = now print ('{:,} records processed! ({:,}' ' successful, t={}s)').format(n_total, n_success, elapsed) if limit and not n_total % limit: keep_going = False break del context if not keep_going: break print ('{:,} records processed!' ' ({:,} successful)').format(n_total, n_success) self.finalize() if callback is not None: if callback_kwargs is None: callback_kwargs = {} callback(**callback_kwargs) return True
[docs] def autoiterate(self, keep=None, **kwargs): """Automatically iterates over the source file and caches the result""" if keep is None and self.keep: keep = self.keep if keep is not None: self.keep = keep try: self.load(**kwargs.get('callback_kwargs', {})) except (IOError, OSError, ValueError): callback = kwargs.pop('callback', self.save) self.fast_iter(callback=callback, **kwargs) else: self.fast_iter(**kwargs)
[docs] def save(self, fp=None): """Save attributes listed in the self.keep as json""" if fp is None: fp = os.path.splitext(self.path)[0] + '.json' print 'Saving data to {}...'.format(fp) data = {key: getattr(self, key) for key in self.keep} json.dump(data, open(fp, 'wb'), cls=ABCEncoder)
[docs] def load(self, fp=None): """Load data from json file created by self.save""" if fp is None: fp = os.path.splitext(self.path)[0] + '.json' # Always recreate the JSON if XML is newer if os.path.getmtime(fp) <= os.path.getmtime(self.path): raise IOError print 'Reading data from {}...'.format(fp) data = json.load(open(fp, 'rb')) for attr, val in data.iteritems(): setattr(self, attr, val) self.from_json = True
[docs] def set_keep(self, fields): """Sets the attributes to load/save when using JSON functions""" self.keep = fields
[docs] def read1(self, root, keys=None, result=None, counter=None): """Read an EMu XML record to a dictionary This is much faster than iterating through the XMu.xpaths list. Args: root (lxml.etree): an EMu XML record keys (list): parents of the current key result (XMuRecord): path-keyed representation of root updated as the record is read counter (dict): tracks row counts by path Returns: Path-keyed dictionary representing root """ if keys is None: keys = [self.module] if result is None: result = self.container() if counter is None: counter = {} for child in root: name = child.get('name') # Check for unnamed tuples, which represent rows inside a table if name is None: path = tuple(keys) try: counter[path] += 1 except KeyError: counter[path] = 0 name = counter[path] keys.append(name) if not len(child): # lxml always returns ascii-encoded strings in Python 2, so # so convert to unicode here val = unicode(child.text) if child.text is not None else u'' if child.tag == 'table': # Handle empty tables. These happen with nested tables # and possibly elsewhere. result.push([], *keys) elif val == '\n ' and isinstance(keys[-1], int): # Handle gaps in reference tables keys.append(None) result.push(None, *keys) keys.pop() else: # Strip double spaces while ' ' in val: val = val.replace(' ', ' ') result.push(val.strip(), *keys) else: result = self.read(child, keys, result) keys.pop() return result
[docs] def read(self, root, keys=None, result=None, counter=None): """Read an EMu XML record to a dictionary This is much faster than iterating through the XMu.xpaths list. Args: root (lxml.etree): an EMu XML record keys (list): parents of the current key result (XMuRecord): path-keyed representation of root updated as the record is read counter (dict): tracks row counts by path Returns: Path-keyed dictionary representing root """ if keys is None: keys = [self.module] if counter is None: counter = {} if result is None: result = self.container() result[self.module] = self.container() self.read(root, keys, result[self.module], counter) return result for child in root: name = child.get('name') # Check for unnamed tuples, which represent rows inside a table if name is None: path = tuple(keys) try: counter[path] += 1 except KeyError: counter[path] = 0 name = counter[path] keys.append(name) if not len(child): # lxml always returns ascii-encoded strings in Python 2, so # so convert to unicode here val = unicode(child.text) if child.text is not None else u'' if child.tag == 'table': # Handle empty tables. These happen with nested tables # and possibly elsewhere. result[name] = [] elif val == '\n ' and isinstance(keys[-1], int): # Handle gaps in reference tables keys.append(None) try: result[name] = None except IndexError: # Catches error if tuple is completely empty result.append(self.container()) keys.pop() else: # Strip double spaces while ' ' in val: val = val.replace(' ', ' ') result[name] = val.strip() else: if isinstance(name, int): try: result.append(self.container()) except IndexError: result = [self.container()] self.read(child, keys, result[-1]) elif name.endswith(('0', '_tab', '_inner', '_nesttab')): result[name] = [] self.read(child, keys, result[name]) else: result[name] = self.container() self.read(child, keys, result[name]) keys.pop() return result
[docs] def find(self, rec, *args): """Return value(s) for a given path in the EMu XML export Args: rec (lxml.etree.ElementTree): XML formatted for EMu *args (str): strings comprising the path to a field Returns: String (for atomic field) or list (for table) containing value(s) along the path given by *args. Blank rows that follow the last populated row in a table are not populated! """ xpath = self.fields('.'.join(args), self.module)['xpath'] results = [] for child in rec.xpath(xpath): if child.text: text = unicode(child.text) results.append(text) else: results.append(u'') self._paths_found.setdefault(xpath, []).append(len(results)) # Convert atoms to unicode if not 'table' in xpath: try: results = results[0] except IndexError: results = u'' return results
[docs] def harmonize(self, new_val, old_val, path, action='fill'): """Harmonize new values with existing values on the same path Args: new_val (str): new or replacement value old_val (str): existing value path (str): path to field in XMuSchema action: can be one of 'fill' (add new value if blank), 'append' (append new value using either a new row or delimiter), or 'replace'. The default is fill. Returns: Tuple containing (revised value, update boolean) """ action = action.lower() if action not in ['append', 'fill', 'replace']: raise Exception('Invalid action: {}'.format(action)) if new_val == old_val: return None, True elif action == 'fill' and not old_val: return new_val, False elif action == 'append': table = self.fields(path)['table'] if table: return new_val, True else: return old_val.rstrip('; ') + ';' + new_val, False elif action == 'replace': return new_val, False
[docs]class ABCEncoder(json.JSONEncoder): def __init__(self, *args, **kwargs): super(ABCEncoder, self).__init__(*args, **kwargs)
[docs] def default(self, abc): try: return abc.obj except AttributeError: return json.JSONEncoder.default(self, abc)
[docs]def check_table(rec, *args): """Check that the columns in a table are all the same length""" try: return check_columns(*[rec.smart_pull(arg) for arg in args]) except TypeError: rec.pprint() raise
[docs]def check_columns(*args): """Check if columns in the same table are the same length Args: *args: Lists of value for each column """ if len(set([len(arg) for arg in args if arg is not None and any(arg)])) > 1: raise RowMismatch(args)
def _emuize(rec, root=None, path=None, handlers=None, module=None, fields=None, group=None): """Formats record in XML suitable for EMu Args: rec (minsci.xmu.XMuRecord): contains data to be written root (lxml.etree.ElementTree): XML document updated as the record is written path (str): Return: EMu-formatted XML """ if root is None: module = rec.keys()[0] root = etree.Element('table') root.set('name', module) root.addprevious(etree.Comment('Data')) if path is None: path = root.getroottree().getroot().get('name') root = etree.SubElement(root, 'tuple') if handlers is None: handlers = {} if fields is None: fields = rec.fields rec = rec[path] if rec is None: return root # Check if for append, prepend, and replacement operators. If found, # determines the necessary attributes and passes it to any immediate # children. if hasattr(path, 'endswith') and path.endswith(')'): path, operator = path.rstrip(')').rsplit('(', 1) try: table = fields.map_tables[(module, path)] except KeyError: # Check for tables that aren't being handled if path.endswith(('tab', '0')): raise ValueError('Unassigned column: {}.{}'.format(module, path)) except AttributeError: pass else: grid_flds = '|'.join(['|'.join(field) for field in sorted(table)]) group = Grid(grid_flds, operator) if isinstance(rec, (int, long, float, basestring)): atom = etree.SubElement(root, 'atom') # Set path to parent if is a row in a table if isinstance(path, int): path = root.getparent().get('name').rsplit('_', 1)[0].rstrip('0') # Test multimedia if rec and path in ('Multimedia', 'Supplementary'): open(rec, 'rb') # Handle empties in the supplementary table. Empties are used as # placekeepers but should not themselves be loaded into EMu. operator = root.get('row') if path == 'Supplementary' and not rec and operator is not None: parent = root.getparent() parent.remove(root) root = parent try: atom.set('name', path.rstrip('_')) except TypeError: parent = etree.tostring(root.getparent()) raise ValueError('Path must be string. Got {} instead. Parent' ' is {}'.format(path, parent)) try: atom.text = str(rec) except UnicodeEncodeError: atom.text = rec else: try: paths = rec.keys() except AttributeError: paths = [i for i in xrange(len(rec))] if isinstance(path, (int, long)): root = etree.SubElement(root, 'tuple') # Add append attributes if required if group is not None: hashed = (hashlib.md5(group.fields +\ '|{}'.format(path)).hexdigest()) operator = group.operator.format(path + 1) if not re.match(r'^(\+|-|\d+=)$', operator): raise ValueError('Illegal operator: {}'.format(operator)) root.set('row', operator) if group.operator == '+': root.set('group', hashed) group = None elif is_table(path.rstrip('_')): root = etree.SubElement(root, 'table') root.set('name', path.rstrip('_')) elif is_reference(path): root = etree.SubElement(root, 'tuple') root.set('name', path) for path in _sort(paths): _emuize(rec, root, path, handlers, module, fields, group) # Get parent returns None when you hit the outermost container parent = root.getparent() if parent is not None: root = parent return root def _sort(paths): """Forces fields in an export to print in a certain order Args: path (list): list of paths in the current record set Returns: Sorted list of paths """ paths.sort() rules = { 'NamOrganisation': ['NamPartyType', 'NamInstitution', 'NamOrganisation'], 'OpeDateToRun': ['OpeExecutionTime', 'OpeDateToRun', 'OpeTimeToRun'], 'ClaScientificName': ['ClaScientificNameAuto', 'ClaScientificName'] } for key, group in rules.iteritems(): if key in paths: keep = [] for path in group: try: paths.remove(path) except ValueError: pass else: keep.append(path) paths.extend(keep) return paths def _check(rec, module=None): """Validate the data in a record, including tables Args: rec (xmu.DeepDict): object data module (str): the backend name of an EMu module Returns: Clean version of the original record """ # Check for irn, formatting the record to update if present if module is None: module = rec.module try: rec.fields except AttributeError: rec.fields = FIELDS else: if rec.fields is None: rec.fields = FIELDS ''' except AttributeError: print 'Warning: Could not check tables' return rec else: if rec.fields is None: print 'Warning: Could not check tables' return rec ''' # Convert values to XMuStrings and add attributes as needed tables = [] for key in rec.keys(): try: table = rec.fields.map_tables[(module, key.strip('+'))] except KeyError: # Check for tables that aren't being handled if key.endswith('tab'): print 'Unassigned column: {}'.format(key) # Convert strings to XMuStrings #path, val = rec.smart_drill(key)[0] #rec.push(rec.pull(*path), *path) else: # Assign row and group attributes if appropriate fields = [field[1] for field in table] if key.endswith('+'): fields = [field + '+' for field in fields] tables.append(fields) # Verify that all columns in tables are the correct length for table in tables: check_table(rec, *table) return rec
[docs]def emuize(records, module=None): """Checks record set and formats as EMu XML Args: records (list): list of records module (str): name of module """ if module is None: module = records[0].module checked = [_check(rec, module) for rec in records] root = None for rec in checked: try: root = _emuize(rec.wrap(module), root, module=module) except: rec.pprint() raise return root
[docs]def write(fp, records, module=None): """Convenience function for formatting and writing EMu XML Args: fp (str): path to file records (list): list of XMuRecord() objects module (str): name of module """ if records: _writer(fp, emuize(records, module)) else: print 'xmu.write: No records found'
def _writer(fp, root): """Write EMu-formatted XML to file Args: root (lxml.etree.ElementTree): EMu-formatted XML. This can be generated using XMu.format(). fp (str): path to file """ n_records = 1 for rec in list(root): rec.addprevious(etree.Comment('Row {}'.format(int(n_records)))) n_records += 1 root.getroottree().write(fp, pretty_print=True, xml_declaration=True, encoding='utf-8')