Source code for minsci.xmu.tools.biblio.ris

"""Populates an ebibliography record based on RIS data in NotNotes"""

import logging
import pprint as pp
import re
from collections import namedtuple
from itertools import chain

import requests
import requests_cache
from dateparser import parse

from .bibbot import BibBot
from ....helpers import parse_names
from ....xmu import XMu, BiblioRecord




# List of keys that can contain multiple values
LISTS = [
    'AU',
    'A1',
    'A2',
    'A3',
    'A4',
    'ED',
    'KW',
    'N1',
    'SN',
    'UR'
]

# Dict mapping pub type to prefix. Only for cass where the prefix does not
# equal the first three letters of the pub type.
PREFIXES = {
    'Book Series': 'Bos'
}

ROLES = {
    'AU': u'Author',
    'A1': u'First author',
    'A2': u'Secondary author',
    'A3': u'Tertiary author',
    'A4': u'Subsidiary author',
    'ED': u'Editor'
}


Contributor = namedtuple('Contributor', ['name', 'role'])
Source = namedtuple('Source', ['type', 'parent_type'])

bot = BibBot()


[docs]class FillFromRIS(XMu): """Fill out skeleton bibliography records that have RIS data in notes""" def __init__(self, *args, **kwargs): parsers = kwargs.pop('parsers', {}) if parsers is None: parsers = {} super(FillFromRIS, self).__init__(*args, **kwargs) self.records = [] self.errors = [] self.parsers = {'default': ['CY', 'DB', 'N1', 'UR']} for key, handler in parsers.iteritems(): self.parsers[key] = handler
[docs] def iterate(self, element): """Pulls reference information from BibTex based on DOI in EMu record""" rec = self.parse(element) ris = rec('NotNotes').strip() # Check for urls in notes field if 'http' in ris and not 'TY' in ris: ris = get_ris(ris).strip() # Check for RIS data if 'TY' in ris: irn = rec('irn') try: formatted = emuize(ris , self.parsers) except (KeyError, ValueError) as e: self.errors.append(e) logging.exception('ris') return True if len(formatted) == 1: for rec in formatted: if irn: rec['irn'] = irn if ris == rec('NotNotes'): del rec['NotNotes'] self.records.extend(formatted)
[docs]def ris2emu(fp, parsers=None): """Parses RIS data in the notes field of an ebibliography export""" bib = FillFromRIS(fp, container=BiblioRecord, parsers=parsers) bib.fast_iter(report=10) return bib.records
[docs]def split_records(ris): """Splits a RIS document into records""" records = [] lines = [] pattern = re.compile(ur'[A-Z][A-Z0-9] {1,2}-') for line in ris.split('\n'): line = line.strip() if isinstance(line, str): try: line = line.decode('utf-8') except UnicodeDecodeError: line = line.decode('latin1') if pattern.match(line): lines.append(line) if line.startswith('ER'): records.append(lines) lines = [] return records
[docs]def emuize(ris, parsers=None): """Converts RIS record to EMu ebibliography format""" records = split_records(ris) bibs = [] for ris in records: rec = ris2dict(ris) # Anything with a DOI should be handled using doi.py if rec.get('DO'): continue # Look for customizers based on UR parser = None for key, func in parsers.iteritems(): if [url for url in rec.get('UR', []) if key in url]: parser = func break else: parser = parsers['default'] try: rec = parser(rec) except TypeError: rec = generic(rec, parser) source = get_type(rec) if source.parent_type is not None: parent_prefix = PREFIXES.get(source.parent_type, source.parent_type[:3]) else: parent_prefix = None # Create a bibliography record bib = {} bib['BibRecordType'] = source.type bib['{}PublicationLanguage'] = rec.pop('LA', None) bib['{}Title'] = get_title(rec) bib['{}Volume'] = rec.pop('VL', None) bib['{}Pages'] = get_pages(rec) # Issue issue = rec.pop('IS', None) if source.type == 'Book' and not bib['{}Volume']: bib['{}Volume'] = issue else: bib['{}Issue'] = issue # Source title source_title = get_source(rec) if source_title is not None: bib['{}ParentRef'] = { 'BibRecordType': source.parent_type, '{}Title'.format(parent_prefix): source_title } # Check for issn issns = rec.pop('SN', []) if len(issns) > 1: raise ValueError('Too many ISSNs!') if issns: issn = issns[0] if re.match('^\d{4}-\d{3}[\dX]$', issn, re.I): bib['{}ParentRef']['{}ISSN'.format(parent_prefix)] = issn else: raise ValueError('Not an ISSN: {}'.format(issn)) # Contributors contributors = get_contributors(rec) blank = {'NamPartyType': 'Person'} parties = [p for p in contributors if p.name != blank] if len(parties) != len(contributors): bib['{}AuthorsEtAl'] = 'Yes' # Special handling for authors of theses/dissertations if bib.get('BibRecordType') == 'Thesis': bib['{}AuthorsRef'] = [p.name for p in parties][0] else: bib['{}AuthorsRef_tab'] = [p.name for p in parties] bib['{}Role_tab'] = [p.role for p in parties] # Publication date pub_date = get_date(rec) if pub_date is not None: bib['{}PublicationDates'] = pub_date bib['{}PublicationDate'] = pub_date # DOI doi = rec.pop('DO', None) if doi is not None: bib['AdmGUIDType_tab'] = ['DOI'] bib['AdmGUIDIsPreferred_tab'] = ['Yes'] bib['AdmGUIDValue_tab'] = [doi] # Store the original RIS file as a note bib['NotNotes'] = '\n'.join(ris) if len(ris) > 50: raise ValueError('RIS file is too long to fit in EMu notes field') # Add publisher info (books only) publisher = rec.pop('PB', None) if publisher is not None: if source.type == 'Book' and source_title is not None: bib['{}ParentRef']['BosPublishedByRef'] = { 'NamPartyType': 'Organization', 'NamOrganisation': publisher } bib['{}ParentRef']['BosPublicationCity'] = rec.pop('CY', None) elif source.type == 'Book': bib['BooPublishedByRef'] = { 'NamPartyType': 'Organization', 'NamOrganisation': publisher } bib['BooPublicationCity'] = rec.pop('CY', None) # Apply prefix prefix = PREFIXES.get(bib['BibRecordType'], bib['BibRecordType'][:3]) bib = {key.format(prefix): val for key, val in bib.iteritems() if val} try: bibs.append(BiblioRecord(bib).expand()) except: BiblioRecord(bib).pprint(True) raise u1 = rec.pop('U1', None) if u1: print 'Info: {}'.format(u1) rec = remove_duplicate_fields(rec, ris2dict(ris)) if rec: pp.pprint(ris2dict(ris)) raise KeyError('Found unhandled keys: {}'.format(rec.keys())) return bibs
[docs]def remove_duplicate_fields(rec, orig): """Removes fields holding duplicate data""" orig = {key: val for key, val in orig.iteritems() if not key in rec} for key in rec.keys(): val = rec[key] if val in orig.values(): del rec[key] return rec
[docs]def ris2dict(ris): """Converts a RIS record to a dictionary""" rec = {} for line in ris: key, val = [s.strip(' -') for s in line.split('-', 1)] if key in LISTS: rec.setdefault(key, []).append(val) else: if rec.get(key): raise ValueError('{} is not listable'.format(key)) rec[key] = val return {key: val for key, val in rec.iteritems() if any(val)}
[docs]def get_type(rec): """Determines the kind of publication based on TY""" bib_type = rec.pop('TY') # Classify publication by type. Keys with a leading underscore are # assigned by customizer functions and are not official RIS types. types = { 'ABST': Source('Article', 'Journal'), 'BOOK': Source('Book', 'Book Series'), 'CHAP': Source('Chapter', 'Book'), 'CPAPER': Source('Article', 'Journal'), 'JOUR': Source('Article', 'Journal'), 'RPRT': Source('Article', 'Journal'), 'THES': Source('Thesis', None), '_MONOGRAPH': Source('Book', 'Book Series') } # Handle M3 work_type = rec.pop('M3', None) if work_type is not None: work_types = { 'ABST': ['Abstract'], 'BOOK': ['Proceedings'], 'CHAP': ['Book', 'Report'], 'CPAPER': ['Paper'], 'JOUR': ['Journal article', 'Paper', 'Report'], 'RPRT': ['Report'] } if not work_type in work_types.get(bib_type, []): raise ValueError('Work type {} (bib_type={})'.format(work_type, bib_type)) return types[bib_type]
[docs]def get_title(rec): """Returns publication title""" keys = ['TI', 'T1'] vals = [val for val in [rec.pop(key, None) for key in keys] if val] if not vals: raise ValueError('No publication title provided') return vals[0]
[docs]def get_date(rec): """Returns publication date""" year = rec.pop('PY', None) date = rec.pop('Y1', None) if date: parsed = parse(date) if year is not None and int(year) != parsed.year: raise ValueError('Inconsistent dates: {}'.format(rec)) return parsed.strftime(r'%Y-%m-%d') return year
[docs]def get_source(rec): """Returns source title (journal, book, etc.)""" keys = ['JF', 'T2', 'J1', 'T3'] vals = [val for val in [rec.pop(key, None) for key in keys] if val] if vals: return vals[0]
[docs]def get_pages(rec): """Returns page range""" keys = ('SP', 'EP', 'LP') vals = [val for val in [rec.pop(key, None) for key in keys] if val] if len(set(vals)) == 1: return vals[0] return '-'.join(vals)
[docs]def get_contributors(rec): """Returns parsed list of contributors""" contributors = [] for key, role in ROLES.iteritems(): names = rec.pop(key, []) if names: if not isinstance(names, list): names = [names] parties = [parse_names(name, True) for name in names] contributors.extend([Contributor(name, role) for name in chain(*parties)]) return contributors
[docs]def get_ris(url): """Retrieves RIS from a url""" if 'pubs.er.usgs.gov' in url or 'pubs.usgs.gov' in url: url = url.replace('pubs.usgs.gov', 'pubs.er.usgs.gov') print url result = bot.download(url.rstrip('/? \n\r') + '?mimetype=ris') print result return bot.download(url.rstrip('/? \n\r') + '?mimetype=ris') print 'Failed to retrieve {}'.format(url) return url
[docs]def generic(ris, keys): """Parses generic RIS records, excluding keys""" for key in keys: ris.pop(key, None) return ris