Source code for minsci.xmu.xmungo

"""Reads data from NMNH MongoDB collections database"""

import time

import getpass
import json
from datetime import datetime

import pymongo
from pymongo.operations import ReplaceOne, DeleteOne

from .xmu import XMu, XMuRecord
from ..helpers import cprint


[docs]class MongoBot(object): """Contains methods to connect and interact with NMNH MongoDB""" def __init__(self, username, password, instance=None, container=None): self.username = username self.password = password self.instances = { 'production': { 'host': 'nmnh-rcismngo01-int,nmnh-rcismngo02-int', 'login_db': 'admin', 'db': 'cs', 'collections': ['catalog', 'narrative'] }, #'development': { # 'host': 'nmnh-rcisdev2:27017', # 'login_db': 'ms', # 'db': 'ms', # 'collections': ['catalog', 'narrative'] # } } self.jsonpath = 'xmungo.json' self.connections = {} for nickname in self.instances: self.connect(nickname) self.collection = None if instance is not None and container is not None: self.collection = self.set_collection(instance, container)
[docs] def connect(self, nickname): """Store connection to a server in a dict""" instance = self.instances[nickname] host = instance['host'] login_db = instance['login_db'] collections = instance['collections'] login_server = '/'.join([host.rstrip('/'), login_db.strip('/')]) client = pymongo.MongoClient('mongodb://{}'.format(login_server)) while True: print 'Connecting to {}...'.format(login_server) # User credentials if self.password is None: print 'Username: ' + self.username self.password = getpass.getpass('Password: ') try: client[login_db].authenticate(self.username, self.password) except (ValueError, pymongo.errors.OperationFailure): print 'Invalid password!' else: break client_db = client[instance['db']] connection = {} for collection in collections: connection[collection] = client_db[collection] self.connections[nickname] = connection
[docs] def set_collection(self, instance, collection): """Selects the collection to use""" self.collection = self.connections[instance][collection]
[docs] @staticmethod def change_password(username, database): """Changes password on db""" np1 = getpass.getpass('New password : ') np2 = getpass.getpass('Confirm password: ') if np1 == np2: database.add_user(username, np1)
[docs] def sync(self, sync_from, sync_to, collection, query=None): """Synchronizes development server to production""" if sync_to == 'production' and sync_from == 'development': raise Exception('Sync is going the wrong way!') print 'Syncing {} in {} to {}...'.format(collection, sync_to, sync_from) src = self.connections[sync_from][collection] dst = self.connections[sync_to][collection] queue = [] checked = 0 updated = 0 # Set up query if query is None: query = {} query.update({'catdp': 'ms'}) # Update records based on changes in production cursor = src.find(query) print (' {:,} records matching {} have' ' been found!').format(cursor.count(), query) for sdoc in cursor: irn = sdoc['_id'] try: ddoc = dst.find({'_id': irn})[0] except IndexError: ddoc = None if sdoc != ddoc: queue.append(ReplaceOne({'_id': irn}, sdoc, True)) if len(queue) == 500: dst.bulk_write(queue) updated += len(queue) queue = [] checked += 1 if not checked % 1000: print (' {:,} records updated' ' ({:,} checked)').format(updated, checked) if len(queue): dst.bulk_write(queue) updated += len(queue) queue = [] # Look for records that have been deleted from production deleted = 0 print 'Looking for records deleted from {}...'.format(sync_from) sirns = [doc['_id'] for doc in src.find(query, [])] print ' {:,} irns found in {}'.format(len(sirns), sync_from) dirns = [doc['_id'] for doc in dst.find(query, [])] print ' {:,} irns found in {}'.format(len(dirns), sync_to) irns = set(dirns) -set(sirns) queue = [] for irn in irns: queue.append(DeleteOne({'_id': irn})) if len(queue) == 1000: dst.bulk_write(queue) deleted += len(queue) queue = [] if len(queue): dst.bulk_write(queue) deleted += len(queue) queue = [] print ' {:,} records deleted ({:,} checked)'.format(deleted, len(dirns))
[docs]class MongoDoc(dict): """Dict sublass with methods supporting Mongo-style paths""" def __init__(self, *args, **kwargs): super(MongoDoc, self).__init__(*args, **kwargs) self._convert_children(self) def __call__(self, path): """Shorthand to retrieve data from a Mongo path""" return self.getpath(path)
[docs] def getpath(self, path, default=None): """Retrieves value from Mongo-style path""" keys = path.split('.') doc = self for key in keys: doc = doc.get(key, {}) return doc if doc != {} else default
[docs] def pprint(self): """Pretty prints the dict""" cprint(self)
def _convert_children(self, obj): """Converts nested dictionaries to MongoDoc""" if isinstance(obj, dict): if not isinstance(obj, MongoDoc): MongoDoc(obj) for key in obj.keys(): self._convert_children(obj[key]) elif isinstance(obj, list): for i in xrange(len(obj)): self._convert_children(obj[i])
[docs]class XMungo(MongoBot): """Contains methods to interact with Mongo data using XMu tools""" def __init__(self, *args, **kwargs): self._skip = kwargs.pop('skip', 0) container = kwargs.pop('container', XMuRecord) if container.geotree is None: raise AttributeError('Set container.geotree = get_tree()') module = kwargs.pop('module') super(XMungo, self).__init__(*args, **kwargs) # Create a private xmudata attribute so XMungo can use write XML self._xmudata = XMu(None, module=module, container=container) self.from_json = False self.keep = [] # populated using set_keep() method
[docs] def parse(self, doc): """Converts Mongo document to XMu dictionary""" return mongo2xmu(doc, self.container)
[docs] def container(self, *args): """Wraps dict in custom container with attributes needed for export""" return self._xmudata.container(*args)
[docs] def iterate(self, element): """Placeholder for iteration method""" raise Exception('No iterate method is defined for this subclass')
[docs] def finalize(self): """Placeholder for finalize method run at end of iteration""" pass
def _fast_iter(self, query=None, func=None, report=0, skip=0, limit=0, callback=None, **kwargs): if func is None: func = self.iterate if report: starttime = datetime.now() # Forumulate and run query _query = {'catdp': 'ms'} if query is None: query = {} _query.update(query) if skip: self._skip = skip if self._skip: print 'Skipping first {:,} records...'.format(self._skip) cursor = self.collection.find(_query, skip=self._skip) else: cursor = self.collection.find(_query) cursor.batch_size(500) print '{:,} matching records found!'.format(cursor.count()) # Process documents using func n_success = 0 for doc in cursor: self._skip += 1 result = func(doc, **kwargs) if result is False: break elif result is not True: n_success += 1 if report and not self._skip % report: now = datetime.now() elapsed = now - starttime starttime = now print ('{:,} records processed! ({:,}' ' successful, t={}s)').format(self._skip, n_success, elapsed) if limit and not self._skip % limit: break print '{:,} records processed! ({:,} successful)'.format(self._skip, n_success) if callback is not None: callback() self.finalize() return True
[docs] def fast_iter(self, query=None, func=None, report=0, skip=0, limit=0, callback=None, **kwargs): """Use function to iterate through a MongoDB record set This method reproduces most (but not all) of the functionality of the XMu.fast_iter() method. Args: func (function): name of iteration function report (int): number of records at which to report progress. If 0, no progress report is made. limit (int): number of record at which to stop callback (function): name of function to run upon completion Returns: Boolean indicating whether the entire record set was processed successfully. """ # Wrapper in a while loop to catch cursor errors self._skip = kwargs.pop('skip', 0) num_retries = 0 skipped = 0 # used to track consecutive failures while True: try: return self._fast_iter(query, func, report, skip, limit, callback, **kwargs) except pymongo.errors.CursorNotFound: if num_retries > 8: raise # Try to reconnect after backoff backoff = 30 * 2 ** num_retries print ('Cursor not found! Retrying' ' in {} seconds...').format(backoff) time.sleep(backoff) num_retries += 1 # Reset counter if additional records have been processed if skipped != self._skip: num_retries = 0
[docs] def save(self): """Save attributes listed in the self.keep as json""" print 'Saving data to {}...'.format(self.jsonpath) data = {key: getattr(self, key) for key in self.keep} json.dump(data, open(self.jsonpath, 'wb'))
[docs] def load(self): """Load data from json file created by self.save""" print 'Reading data from {}...'.format(self.jsonpath) data = json.load(open(self.jsonpath, 'rb')) for attr, val in data.iteritems(): setattr(self, attr, val) self.from_json = True
[docs] def set_keep(self, fields): """Sets the attributes to load/save when using JSON functions""" self.keep = fields
[docs] def set_skip(self, skip): """Sets the attributes to load/save when using JSON functions""" self._skip = skip
[docs]def mongo2xmu(doc, container): """Maps Mongo document to EMu XML format Args: doc (dict): sample data from mongodb Returns: Sample data as container """ doc = MongoDoc(doc) cat = container({ 'irn': doc.getpath('_id'), 'CatPrefix': doc.getpath('catnb.catpr'), 'CatNumber': doc.getpath('catnb.catnm'), 'CatSuffix': doc.getpath('catnb.catsf'), 'CatDivision': doc.getpath('catdv'), 'CatCatalog': doc.getpath('catct'), 'CatCollectionName_tab': doc.getpath('catcn', []), 'CatSpecimenCount': str(int(doc.getpath('darin'))), 'MinName': doc.getpath('minnm'), 'MinJeweleryType': doc.getpath('minjt'), 'MetMeteoriteName': doc.getpath('metnm'), 'MetMeteoriteType': doc.getpath('metmt'), 'PetEruptionDate': doc.getpath('peted'), 'PetLavaSource': doc.getpath('petls'), 'MeaCurrentWeight': doc.getpath('meacw'), 'MeaCurrentUnit': doc.getpath('meacu'), 'AdmGUIDType_tab': ['EZID'], 'BioEventSiteRef': container({ 'LocSiteNumberSource': doc.getpath('bions'), 'LocSiteStationNumber': doc.getpath('biosn'), 'LocCountry': doc.getpath('darct'), 'LocProvinceStateTerritory': doc.getpath('darst'), 'LocDistrictCountyShire': doc.getpath('darcy'), 'LocTownship': doc.getpath('biotw'), 'LocOcean': doc.getpath('biooc'), 'LocSeaGulf': doc.getpath('biosg'), 'LocIslandName': doc.getpath('daris'), 'LocMineName': doc.getpath('biomn'), 'LocMiningDistrict': doc.getpath('biomt'), 'LocGeologicSetting': doc.getpath('biogs'), 'LocPreciseLocation': doc.getpath('biopl'), 'VolVolcanoName': doc.getpath('biovl'), 'VolVolcanoNumber': doc.getpath('biovm'), 'ColCollectionMethod': doc.getpath('biocm'), 'ColParticipantRole_tab': doc.getpath('biorl', []), 'ExpExpeditionName': doc.getpath('bioex'), 'AquVesselName': doc.getpath('biovn'), 'TerElevationFromMet': doc.getpath('darm1'), 'LatGeoreferencingNotes0': doc.getpath('latgn', []) }), 'LocPermanentLocationRef': container({ 'SummaryData': doc.getpath('locpl') }) }) if doc.getpath('biopr') and '(' in doc.getpath('biopr'): raw_input(doc.getpath('biopr')) # Format EZID guid = doc['admuu'] # this HAS to be present, so use the basic lookup guid = '-'.join([guid[:8], guid[8:12], guid[12:16], guid[16:20], guid[20:]]) cat['AdmGUIDValue_tab'] = [guid] # Map nested tables lat = doc.getpath('darlt') if lat: cat['BioEventSiteRef']['LatLatitudeDecimal_nesttab'] = [[lat]] lng = doc.getpath('darln') if lng: cat['BioEventSiteRef']['LatLongitudeDecimal_nesttab'] = [[lng]] # Map complex arrays for caton in doc.getpath('caton', []): catnt = caton.get('catnt', '') catnv = caton.get('catnv', '') cat.setdefault('CatOtherNumbersType_tab', []).append(catnt) cat.setdefault('CatOtherNumbersValue_tab', []).append(catnv) for agega in doc.getpath('agega', []): agaid = agega.get('agaid', '') #ageaa = agega.get('ageaa', '') ageae = agega.get('ageae', '') ageay = agega.get('ageay', '') ageas = agega.get('ageas', '') ageat = agega.get('ageat', '') cat.setdefault('AgeGeologicAgeAuthorityRef_tab', []).append(agaid) #cat.setdefault('AgeGeologicAgeAuthorityRef_tab.SummaryData', []).append(ageaa) cat.setdefault('AgeGeologicAgeEra_tab', []).append(ageae) cat.setdefault('AgeGeologicAgeSystem_tab', []).append(ageay) cat.setdefault('AgeGeologicAgeSeries_tab', []).append(ageas) cat.setdefault('AgeGeologicAgeStage_tab', []).append(ageat) for agest in doc.getpath('agest', []): asaid = agest.get('asaid', '') #agesa = agest.get('agesa', '') agesf = agest.get('agesf', '') agesg = agest.get('agesg', '') agesm = agest.get('agesm', '') cat.setdefault('AgeStratigraphyAuthorityRef_tab', []).append(asaid) #cat.setdefault('AgeStratigraphyAuthorityRef_tab.SummaryData', []).append(agesa) cat.setdefault('AgeStratigraphyFormation_tab', []).append(agesf) cat.setdefault('AgeStratigraphyGroup_tab', []).append(agesg) cat.setdefault('AgeStratigraphyMember_tab', []).append(agesm) for zoopp in doc.getpath('zoopp', []): zoopr = zoopp.get('zoopr', '') zoopc = zoopp.get('zoopc', '') cat.setdefault('ZooPreparation_tab', []).append(zoopr) cat.setdefault('ZooPreparationCount_tab', []).append(str(zoopc)) for taxon in doc.getpath('ideil', []): cat.setdefault('IdeTaxonRef_tab', []).append( #container({'ClaSpecies': taxon.get('idetx')}) #container({'ClaOtherValue_tab': [{ # 'ClaOtherValue': taxon.get('idetx') #}]}) container({'ClaScientificName': taxon.get('idetx')}) ) # Set collector(s) parties = doc.getpath('biopr', []) cat['BioEventSiteRef']['ColParticipantRef_tab'] = [ container({'SummaryData': party}) for party in parties ] # Map datestamp modtime = doc.getpath('admdm') cat['AdmDateModified'] = modtime.strftime('%Y-%m-%d') cat['AdmTimeModified'] = modtime.strftime('%H:%M:%S') #cat.pprint() cat.expand() #cat.pprint(True) return cat