Source code for minsci.xmu.tools.operations

"""Schedules operations using the EMu Operations module"""

from datetime import timedelta

from dateparser import parse

from ...xmu import XMu, MinSciRecord, write


[docs]class Operation(XMu): """Contains methods to construct an Operations import file""" def __init__(self, *args, **kwargs): super(Operation, self).__init__(*args, **kwargs) self.module = 'eoperations' #self.fields = FIELDS self.group = [] self.records = {}
[docs] def read_notes(self, element): """Read notes from EMu record""" rec = self.parse(element) self.records[rec('irn')] = rec('NotNotes').rstrip('. ')
[docs] def retired_merged(self, element, lookup): """Writes import to retire merged records Args: element (etree.XML): an EMu record as XML lookup (dict): contains existing notes keyed to irn """ rec = self.parse(element) module = rec('OpeModule') if self.module is None: self.module = module elif module != self.module: raise Exception('Multiple modules in record set') # Check if all records have been processed if rec('NumberToProcess') == rec('NumberProcessed'): for irn in rec('IrnsToBeProcessedRef_tab', 'IrnsToBeProcessedRef'): notice = ('Retired: Duplicate of' ' irn {}').format(rec('MerTargetRef')) try: # Construct a new note from the existing note and the # retirement notice note = '\n\n'.join([s for s in [lookup[irn], notice]]) except KeyError: # IRN doesn't exist, so nothing to do pass else: self.group.append(self.container({ 'irn': irn, 'NotNotes': note.lstrip('. ').lstrip(), 'SecRecordStatus': 'Retired' })) else: return True
OPERATIONS = Operation(path=None, module='eoperations')
[docs]def write_operation(func, module, username, records, date, outpath='operations.xml'): """Writes an EMu import file containing a set of operations Args: func (callable): the function used to create the operation module (str): the backend name for an EMu module username (str): the user whose account will be used to import and run the operation records (list): a list of records to be operated upon date (mixed): the date on which to run the operation as either a datetime.datetime object or a parseable date string outpath (str): the path to which to write the import file """ try: date = parse(date) except AttributeError: pass operations = [func(module, username, date=date, delay=60*i, **rec) for i, rec in enumerate(records)] write(outpath, operations, 'eoperations')
[docs]def merge(module, username, primary, duplicates, mask=None, date=None, delay=0): """Creates an operation to merge a set of duplicates Args: module (str): the backend name for an EMu module username (str): the user whose account will be used to import and run the operation primary (int): the irn of the primary record (i.e., the record to merged the duplicated into) duplicates (list): list of irns to merge into the primary record name_key (str): the EMu field name, if any, used to name the operation date (datetime.datetime): the base date/time for a set of operations delay (int): the number of seconds to between operations Returns: xmu.DeepDict object containing the merge operation """ operation = OPERATIONS.container({ 'MerTargetRef_': primary('irn'), 'OpeName': mask.format(**primary) if mask is not None else 'Merge', 'OpeType': 'Merge', 'OpeModule': module, 'OpeExecutionTime': 'Yes', 'OpeRunAsUser': username, 'OpeNotifyCompleteRef_tab': [ OPERATIONS.container({'AddEmuUserId' : username}) ] }) _set_operation_time(operation, date, delay) for rec in duplicates: operation.setdefault('IrnsToBeProcessedRef_tab_', []).append(rec('irn')) return operation.expand()
[docs]def delete(module, username, irns_to_delete, name='Delete', date=None, delay=0): """Creates an operation to delete a set of records Args: module (str): the backend name for an EMu module username (str): the user whose account will be used to import and run the operation irns_to_delete (list): list of irns to delete name_key (str): the EMu field name, if any, used to name the operation date (datetime.datetime): the base date/time for a set of operations delay (int): the number of seconds to between operations Returns: xmu.DeepDict object containing the delete operation """ operation = OPERATIONS.container({ 'OpeName': name, 'OpeType': 'Delete', 'OpeModule': module, 'OpeExecutionTime': 'Yes', 'OpeRunAsUser': username, 'OpeNotifyCompleteRef_tab': [ OPERATIONS.container({'AddEmuUserId' : username}) ] }) _set_operation_time(operation, date, delay) for rec in irns_to_delete: operation.setdefault('IrnsToBeProcessedRef_tab', []).append(rec('irn')) return operation.expand()
[docs]def retire_merged(merged_path, record_path, output='retire.xml'): """Write EMu import to retire merged records Args: merged_path (str): record_path (str): path to records(?) output (str): path to EMu import file containing the retired records """ # Check for existing notes in the records slated for retirement records = Operation(record_path, container=MinSciRecord) records.fast_iter(records.read_notes, report=10000) # Prepare the update file merged = Operation(merged_path, container=MinSciRecord) merged.fast_iter(merged.retired_merged, report=1000, lookup=records.records) if merged.group: write(output, merged.group, merged.module)
def _set_operation_time(operation, date=None, delay=None): """Calculates start time for the current operation Used to stagger operations throughout the delay. The passed operation is modified in place. Args: operation (xmu.DeepDict): data about the current operation date (datetime.datetime): the base date/time for a set of operations delay (int): the number of seconds between operations Returns: The operation data modified to include a starttime """ if delay is not None: #date = date.replace(hour=6, minute=31, second=0, microsecond=0) date += timedelta(seconds=delay) operation['OpeExecutionTime'] = 'No' operation['OpeDateToRun'] = date.strftime('%Y-%m-%d') operation['OpeTimeToRun'] = date.strftime('%H:%M:%S') elif date is not None: operation['OpeExecutionTime'] = 'No' operation['OpeDateToRun'] = date.strftime('%Y-%m-%d') operation['OpeTimeToRun'] = date.strftime('00:00:00') return operation