Source code for minsci.xmu.tools.mapper

"""Alias handling for processing EMu data that doesn't use full paths"""

import os

from ...xmu import XMu, MinSciRecord, is_table, is_reference


[docs]class FieldMapper(object): """Map field aliases to full paths in EMu Attributes: aliases (dict): maps aliases to full paths in EMu schema module (str): the name of the EMu module being matched against references (dict): maps aliases to reference fields schema (dict): the EMu schema tables (dict): maps columns to table fields Args: module (str): the name of the module being matched against """ def __init__(self, module): xmudata = XMu(None, module=module, container=MinSciRecord) self.schema = xmudata.fields.schema self.module = module self.aliases = {} self.paths = {} self.read_aliases(self.module) self.tables = {} self.references = {} self._safe = False # True if fields have been verified def __call__(self, field, schema_path=False): """Convenience function calling the get_path function""" return self.get_path(field, schema_path)
[docs] def read_aliases(self, module): """Read aliases for the given module from file Args: module (str): the backend name of an EMu module Returns: A dict mapping aliases to paths """ aliases = {} fp = os.path.join('mapper', '{}.txt'.format(module)) with open(fp, 'rb') as f: for line in f: if line.startswith('#'): continue try: alias, path = [s.strip() for s in line.split(':', 1)] except ValueError: pass else: if path: path = path.split('/') self.set_alias(alias, path) return aliases
[docs] def set_alias(self, alias, path): """Sets the path for a given alias in class-wide lookups Args: alias (str): the alias to assign to the given path path (str or iterable): the full path """ # Standardize the form of the path if isinstance(path, basestring): path = [path] path = tuple(path) # Verify the given path paths = [path] try: schema_path = tuple(self.get_data(*path)['path'].split('/')) except KeyError: # The path check fails for reference tables. This does not cause # any obvious downstream issues. pass else: paths.append(schema_path) # If alias points to a table, add that path as well if [field for field in schema_path if is_table(field)]: paths.append(schema_path[:-1]) # Set aliases self.aliases[alias] = path for path in paths: try: self.paths[path] except KeyError: self.paths[path] = alias
[docs] def get_path(self, alias, schema_path=False): """Returns the path for a given alias Args: alias (str): the alias for a given path schema_path (bool): if true, uses the format needed for schema Returns: If schema_path is True, returns a list containing the path. If not, returns a tuple with the path formatted for schema. """ if schema_path: return tuple(self.get_data(alias)['path'].split('/')) else: try: return list(self.aliases[alias]) except KeyError: return list(self._guess_path(alias))
[docs] def get_alias(self, path): """Returns the alias for a given path Args: path (str): the full path to an EMu field Returns: Alias for a given path, if it exists """ if isinstance(path, basestring): path = [path] return self.paths[tuple(path)]
def _guess_path(self, alias): """Attempts to guess the path for an unrecognized field""" suffixes = ['', '0', '_tab', '_nesttab', 'Ref', 'Ref_tab'] for field in [alias + suffix for suffix in suffixes]: try: self.schema[self.module][field] except KeyError: pass else: path = [field] if is_table(field) and not is_reference(field): path.append(field.split('_', 1)[0].rstrip('0')) self.set_alias(alias, path) return path raise Exception('{} {}'.format(alias, 0))
[docs] def get_data(self, *args): """Returns data for a given path or alias""" val = self.schema[self.module] # Check if first key is an alias if len(args) == 1 and val.get(args[0]) is None: args = self.get_path(args[0]) for arg in args: val = val[arg] # Test for references (changes in module) try: val = self.schema[val['schema']['RefTable']] except (KeyError, TypeError): pass return val
[docs] def get_tables(self, fields): """Map columns in tables Args: fields (list): list of fields and aliases Returns: List of tables """ tables = {} for field in fields: path = self(field) if is_table(*path): col = [col for col in path if is_table(col)][0] tables.setdefault(col, []).append(field) self.tables = tables return tables
[docs] def get_references(self, fields): """Map columns in references Args: fields (list): list of fields and aliases Returns: List of references """ references = {} for field in fields: path = self(field) if is_reference(*path): col = [col for col in path if is_reference(col)][0] references.setdefault(col, []).append(field) self.references = references return references
[docs] def expand(self, rec): """Expand fields in record based on known aliases This should be used instead of the DeepDict.expand() function for records constructed from spreadsheets using the Mineral Sciences alias set. Args: rec (dict): record data """ for field in rec.keys(): if rec[field]: path = self(field) if isinstance(path, list) and len(path) > 1: d = rec last = path.pop() for segment in path: container = [] if is_table(segment) else {} try: d = d.setdefault(segment, container) except AttributeError: try: d = d[0].setdefault(segment, container) except IndexError: d.append({}) d = d[0].setdefault(segment, container) if isinstance(rec[field], list): # This conditional tries to handle atomic references # inside a reference table. In this case, the list # index applies to the reference table, and the # internal reference is atomic. if ((is_reference(segment) or '_nesttab' in path) and not is_table(segment)): if len(rec[field]) > 1: raise Exception('Reference length error') d[last] = rec[field][0] else: for i, val in enumerate(rec[field]): try: d[i][last] = val except KeyError: d = [{last: val}] except IndexError: d.append({last: val}) else: d[last] = rec[field] del rec[field] else: del rec[field] return rec