Source code for minsci.xmu.containers.mediarecord
"""Subclass of XMuRecord with methods specific to emultimedia"""
import os
import re
import shutil
from collections import namedtuple
from itertools import izip_longest
from unidecode import unidecode
from .xmurecord import XMuRecord
from ..tools.multimedia.embedder import Embedder, EmbedField
from ..tools.multimedia.hasher import hash_file
from ...catnums import get_catnums
from ...helpers import dedupe, format_catnums, oxford_comma, parse_catnum, lcfirst, sort_catnums
VALID_COLLECTIONS = [
'Behind the scenes (Mineral Sciences)',
'Collections objects (Mineral Sciences)',
'Documents and data (Mineral Sciences)',
'Exhibit (Mineral Sciences)',
'Field pictures (Mineral Sciences)',
'Inventory (Mineral Sciences)',
'Macro photographs (Mineral Sciences)',
'Micrographs (Mineral Sciences)',
'Non-collections objects (Mineral Sciences)',
'Pretty pictures (Mineral Sciences)',
'Research pictures (Mineral Sciences)',
'Unidentified objects (Mineral Sciences)'
]
COLLECTION_MAP = {
'Behind the Scenes' : 'Behind the scenes',
'Catalog Cards': 'Documents and data',
'Datasets': 'Documents and data',
'Demonstrations': 'Behind the scenes',
'Documentation': 'Documents and data',
'Exhibit': 'Exhibit',
'Inventory': 'Inventory',
'Ledgers': 'Documents and data',
'Logs': 'Documents and data',
'Maps': 'Documents and data',
'Micrographs': 'Micrographs',
'Miscellaneous': '',
'Other': '',
'Pretty Pictures': 'Pretty pictures',
'Publications': 'Documents and data',
'Research': 'Research pictures',
'Specimens': '',
'Meteorite Datapacks': 'Documents and data'
}
KW_WHITELIST = [
'Allure of Pearls',
'Blue Room',
'Splendor of Diamonds',
'Micrograph, cross-polarized light',
'Micrograph, plane-polarized light',
'Micrograph, reflected light'
]
FORMATS = (
'.cr2',
'.dng',
'.gif',
'.jp2',
'.jpg',
'.jpeg',
'.png',
'.tif',
'.tiff'
)
MediaFile = namedtuple('MediaFile', ['irn', 'filename', 'path', 'hash', 'size',
'width', 'height', 'is_image', 'row'])
[docs]class MediaRecord(XMuRecord):
"""Subclass of XMuRecord with methods specific to emultimedia"""
def __init__(self, *args):
super(MediaRecord, self).__init__(*args)
self.module = 'emultimedia'
self._attributes = ['cataloger', 'embedder', 'fields', 'module']
#self.cataloger = None
#self.embedder = None
self.image_data = {}
# Attributes used with cataloger
self.catnums = []
self.object = None
self.smart_functions = {
'MulTitle': self.smart_title,
'MulDescription': self.smart_caption,
'DetRelation_tab': self.smart_related,
'DetSubject_tab': self.smart_keywords,
'DetCollectionName_tab': self.smart_collections,
'NotNotes': self.smart_note
}
self.defaults = {
'DetSubject_tab': []
}
self.whitelist = KW_WHITELIST
self.masks = {
'MulTitle': u'{name} (NMNH {catnum}) [AUTO]'
}
[docs] def add_embedder(self, embedder, **kwargs):
"""Create an Embedder instance for the MediaRecord"""
self.embedder = embedder(**kwargs)
[docs] def add_cataloger(self, cataloger):
"""Add a Cataloger instance to the MediaRecord"""
self.cataloger = cataloger
[docs] def check_filename(self, primary=True):
"""Verifies that filename follows best practices"""
media = [self.get_primary()] if primary else self.get_all_media()
for mm in media:
stem, ext = os.path.splitext(mm.filename)
matches = re.findall(r'[^a-zA-Z0-9_\-]', stem)
if matches or ext != ext.lower():
return False
return True
[docs] def fix_filename(self, fn=None):
"""Fixes filename to conform with best practices"""
if fn is None:
fn = self.get_primary().filename
stem, ext = os.path.splitext(fn)
stem = stem.replace('-', '_')
stem = re.sub(r'\((\d+)\)', r'_\1_', stem)
stem = re.sub(r'[\s_]+', u'_', unidecode(stem))
stem = re.sub(r'[^a-zA-Z0-9_]', '', stem)
print fn, '=>', stem.rstrip('_') + ext.lower()
return stem.rstrip('_') + ext.lower()
def _get_params(self, for_filename=False):
# Format catalog numbers
catnums = sort_catnums(self.catnums)
if len(catnums) > 1:
mask = '{} and others' if for_filename else '{} and others'
catnum = mask.format(catnums[0])
else:
catnum = catnums[0]
params = {
'catnum': catnum,
'catnum_simple': catnum.replace('NMNH ', '') \
.replace('USNM', '') \
.replace('-00', ''),
'name': self.object.object['xname'],
'primary': self.object.object['xname'].split(' with ')[0],
}
if for_filename:
return {k: v.replace(' ', '_') for k, v in params.iteritems()}
return params
[docs] def set_filename(self, mask):
params = self._get_params(for_filename=True)
ext = os.path.splitext(self('Multimedia'))[1]
self['MulIdentifier'] = mask.format(**params) + ext
[docs] def set_default(self, key):
defaults = {
'DetSource': self.embedder.source,
'DetRights': self.embedder.rights
}
self[key] = defaults[key]
[docs] def get_all_media(self):
"""Gets the filepaths for all media in this record"""
return [self.get_primary()] + self.get_supplementary()
[docs] def get_primary(self):
"""Gets properties for the primary asset"""
filename = self('MulIdentifier')
if not filename:
filename = os.path.basename(self('Multimedia'))
is_image = filename.lower().endswith(FORMATS)
size = self('ChaFileSize')
width = self('ChaImageWidth')
height = self('ChaImageHeight')
return MediaFile(self('irn'),
filename,
self('Multimedia'),
self('ChaMd5Sum'),
int(size) if is_image and size else None,
int(width) if is_image and width else None,
int(height) if is_image and height else None,
is_image,
None)
[docs] def get_supplementary(self):
"""Gets supplementary assets and their basic properites"""
paths = self('Supplementary_tab')
files = self('SupIdentifier_tab')
hashes = self('SupMD5Checksum_tab')
sizes = self('SupFileSize_tab')
widths = self('SupWidth_tab')
heights = self('SupWidth_tab')
supp_files = izip_longest(paths, files, hashes, sizes, widths, heights)
supplementary = []
for i, supp_file in enumerate(supp_files):
path, filename, hexhash, s, w, h = supp_file
if not filename:
filename = os.path.basename(path)
is_image = filename.lower().endswith(FORMATS)
supplementary.append(MediaFile(self('irn'),
filename,
path,
hexhash,
int(s) if is_image and s else None,
int(w) if is_image and w else None,
int(h) if is_image and h else None,
is_image,
i + 1))
return supplementary
[docs] def get_catalog_numbers(self, field='MulTitle', **kwargs):
"""Find catalog numbers in the given field"""
return get_catnums(self(field), **kwargs)
[docs] def get_photo_numbers(self):
"""Gets the photo number"""
return self.get_matching_rows('Photographer number',
'AdmGUIDType_tab',
'AdmGUIDValue_tab')
[docs] def copy_to(self, path, overwrite=False, verify_image=False):
"""Copies the primary file to a new location
Args:
path (str): the directory to copy the image to
overwrite (bool): specifies whether to overwrite existing file
verify_master (bool): specifies whether to verify copied file
"""
primary = self.get_primary()
try:
os.makedirs(path)
except OSError:
pass
dst = os.path.join(path, primary.filename)
try:
open(dst, 'rb')
except IOError:
print 'Copying {} to {}...'.format(primary.path, dst)
shutil.copy2(primary.path, dst)
else:
if overwrite:
print 'Copying {} to {}...'.format(primary.path, dst)
os.remove(dst)
shutil.copy2(primary.path, dst)
# Verify the copy if required
if verify_image and hash_file(dst) != primary.hash:
raise ValueError('Checksums do not match')
self['Multimedia'] = dst
[docs] def embed_metadata(self, verify_image=True):
"""Updates metadata in the primary and supplementary images"""
rec = self.clone(self)
for media in self.get_all_media():
# Embed metadata or add a placeholder for non-image files
fp = media.path
if media.is_image:
if verify_image and rec('irn'):
self.verify_master(media)
# Rename file based on MulIdentifier if that is different
# from the filename in Multimedia
if media.row is None:
new_name = rec('MulIdentifier')
else:
new_name = rec('SupIdentifier_tab')[media.row - 1]
if fp.endswith(new_name):
new_name = None
fp = self.embedder.embed_metadata(self, fp, new_name)
if fp and media.row is None:
rec['Multimedia'] = fp
elif media.row and rec('irn'):
rec.setdefault('Supplementary_tab({}=)', []).append(fp)
if len(rec['Supplementary_tab({}=)']) != media.row:
raise ValueError
else:
try:
rec['Supplementary_tab'][media.row - 1] = fp
except:
print fp
print rec['Supplementary_tab']
print media.row
print media
raise
if rec:
rec['irn'] = media.irn
return rec.strip_derived().expand()
[docs] def verify_master(self, media=None):
"""Verifies download/copy of master file by comparing hashes"""
if media is None:
media = self.get_primary()
verified = hash_file(media.path) == media.hash
if not verified:
raise ValueError('Checksums do not match')
return verified
[docs] def verify_import(self, images, strict=True, test=False):
"""Verifies import against path"""
Image = namedtuple('Image', ['path', 'hash'])
for mm in self.get_all_media():
matches = images.get(mm.filename, [])
# Get MD5 hashes and store them for future use
hashes = {}
if strict:
for i, im in enumerate(matches):
try:
im.hash
except AttributeError:
try:
matches[i] = Image(im, hash_file(im))
except IOError:
print 'File not found: {}'.format(im)
images[mm.filename] = matches
hashes = {im.hash: im.path for im in matches}
# Delete if the filename and hash match (strict) or if
# the filename exists (not strict)
ok_to_delete = ((strict and mm.hash in hashes)
or (not strict and len(matches) == 1))
if ok_to_delete:
fp = hashes[mm.hash] if mm.hash in hashes else matches[0]
if ok_to_delete and test:
print 'Would delete: {}'.format(fp)
elif ok_to_delete:
print 'Deleting {}...'.format(fp)
#os.unlink(paths[0])
elif strict and mm.hash not in hashes:
print 'Hash mismatch: {}'.format(mm.filename)
elif not strict and len(matches) != 1:
print 'Non-unique match (n={}): {}'.format(len(matches), fp)
elif not matches:
print 'File error: No matches found for {}'.format(mm.filename)
else:
print 'Unknown error: {}'.format(mm.filename)
# Provide additional info about hashes if strict
if strict:
print ' File hash:\n {}'.format(mm.hash)
print ' Ref hashes:'
for i, md5 in enumerate(sorted(hashes)):
asterisk = ''
if md5 == mm.hash:
asterisk = '*'
print ' {: >2d}. {}{}'.format(i + 1, md5, asterisk)
print '-' * 60
[docs] def match(self, ignore_suffix=False):
"""Returns list of catalog objects matching data in MulTitle"""
parsed = get_catnums(self('MulTitle'))
records = []
for identifier in parsed:
matches = self.cataloger.get(identifier, [], ignore_suffix)
for match in matches:
if not match in records:
records.append(match)
self.catnums = str(parsed)
return records
[docs] def match_one(self):
"""Returns a matching catalog object if exactly one match found"""
matches = self.match()
catnums = [m.object['catnum'] for m in matches]
matches = [m for i, m in enumerate(matches)
if not m.object['catnum'] in catnums[:i]]
if not matches or len(matches) > 1:
raise ValueError('No unique match: {}'.format(self.catnums))
return matches[0]
[docs] def match_and_fill(self, strict=True):
"""Updates record if unique match in catalog found"""
print 'Matching on identifiers in "{}"...'.format(self('MulTitle'))
self.expand()
try:
match = self.match_one()
except ValueError:
if strict:
raise
else:
print 'Unique match found! Updating record...'
enhanced = self.clone(self)
enhanced.whitelist = self.whitelist
enhanced.masks = self.masks
enhanced.object = match
enhanced.objects = [match]
enhanced.catnums = self.catnums
for key, func in enhanced.smart_functions.iteritems():
enhanced[key] = func() if func is not None else enhanced(key)
# Tweak rights statement for non-collections objects
non_si_coll = 'Non-collections object (Mineral Sciences)'
if non_si_coll in enhanced.get('DetCollectionName_tab', []):
enhanced['DetRights'] = ('One or more objects depicted in this'
' image are not owned by the'
' Smithsonian Institution.')
enhanced['DetRelation_tab'] = [rel.replace('(0/', '(1/') for rel
in enhanced['DetRelation_tab']]
#enhanced['_Objects'] = [match]
return enhanced.expand()
[docs] def strip_derived(self):
"""Strips fields derived by EMu from the record"""
strip = [
'AdmImportIdentifier',
'ChaImageHeight',
'ChaImageWidth',
'ChaMd5Sum',
'MulIdentifier',
'MulMimeFormat',
'SupIdentifier_tab',
'SupHeight_tab',
'SupWidth_tab',
'SupMD5Checksum_tab'
]
strip.extend([key for key in self.keys() if key.startswith('_')])
for key in strip:
try:
del self[key]
except KeyError:
pass
return self
[docs] def smart_title(self):
"""Derives image title from catalog"""
title = self('MulTitle')
if (not title
or (title.startswith('Mineral Sci') and title.endswith('Photo'))
or title.endswith('[AUTO]')):
# Use the catnum originally parsed from the title, not the one
# from the linked record
params = self._get_params()
title = self.masks['MulTitle'].format(**params).replace(' ()', ' ')
return title
[docs] def smart_caption(self):
"""Derives image caption from catalog"""
description = self('MulDescription')
if not description or description.endswith('[AUTO]'):
description = self.object.caption + ' [AUTO]'
return description
[docs] def smart_keywords(self, whitelist=None):
"""Derives keywords from catalog"""
if whitelist is None:
whitelist = self.whitelist
keywords = self.object.keywords
keywords.extend([kw for kw in self('DetSubject_tab')
if ':' in kw or kw in whitelist])
keywords.extend(self.defaults['DetSubject_tab'])
return dedupe(keywords, False)
[docs] def smart_collections(self):
"""Populates DetCollectionName_tab based on catalog record"""
collections = self('DetCollectionName_tab') if self else []
collections = [COLLECTION_MAP.get(c, c) for c in collections]
# Check if micrograph
coll = 'Micrographs (Mineral Sciences)'
if 'micrograph' in self('MulTitle').lower() and not coll in collections:
collections.append(coll)
# Check if there are any non-SI objects in photos. Different
# collections and restrictions are applied for these photos.
if self('DetResourceType') == 'Specimen/Object':
si_object = 'Collections objects (Mineral Sciences)'
non_si_object = 'Non-collections object (Mineral Sciences)'
if self.object.object['status'] != 'active':
rights = ('One or more objects depicted in this image are not'
' owned by the Smithsonian Institution.')
collections.append(non_si_object)
try:
collections.remove(si_object)
except ValueError:
pass
self['DetRights'] = rights
else:
collections.append(si_object)
try:
collections.remove(non_si_object)
except ValueError:
pass
return dedupe(collections, False)
[docs] def smart_note(self):
"""Updates note based on catalog record"""
note = self('NotNotes').split(';')
for i, val in enumerate(note):
if val.strip().startswith('Linked:'):
note[i] = 'Linked: Yes'
break
else:
note.append('Linked: Yes')
return '; '.join(note).strip('; ')
[docs]class EmbedFromEMu(Embedder):
"""Tools to embed metadata into a file based on existing data in EMu"""
def __init__(self, *args, **kwargs):
super(EmbedFromEMu, self).__init__(*args, **kwargs)
# Default rights statement
self.creator = 'Unknown photographer'
self.rights = ('This image was obtained from the Smithsonian'
' Institution. Its contents may be protected by'
' international copyright laws.')
self.source = 'NMNH-Smithsonian Institution'
self.job_id = None
# Use artwork identifiers to store specimen info
object_metadata = {
'object_number': EmbedField('ArtworkSourceInventoryNo', 16,
self.get_object_numbers),
'object_source': EmbedField('ArtworkSource', 16,
self.get_object_sources),
'object_url': EmbedField('ArtworkSourceInvURL', 64,
self.get_object_urls),
'object_title': EmbedField('ArtworkTitle', 64,
self.get_object_titles)
}
self.metadata_fields.update(object_metadata)
[docs] def set_job_id(self, job_id):
"""Sets job id manually for images not imported into EMu yet"""
self.job_id = job_id
[docs] @staticmethod
def get_objects(rec, field='MulTitle'):
"""Returns list of catalog numbers parsed from MulTitle"""
catnums = get_catnums(rec(field), prefixed_only=True)
# FIXME: Only handles one catalog number for now
if catnums:
catnums = catnums.__class__(catnums[:1])
return catnums
[docs] def get_caption(self, rec):
"""Placeholder function returning the caption"""
return rec('MulDescription')
[docs] def get_copyright(self, rec):
"""Placeholder function returning copyright info"""
rights = rec('DetRights')
if not rights:
return self.rights
return rights
[docs] def get_creator(self, rec):
"""Placeholder function returning the creator"""
if not rec('MulCreator_tab'):
return self.creator
return rec('MulCreator_tab')
#return oxford_comma(rec('MulCreator_tab'), False)
[docs] def get_credit_line(self, rec):
"""Returns short credit line"""
if not rec('MulCreator_tab'):
creator = self.creator
else:
creator = rec('MulCreator_tab')[0]
return u'{}'.format(creator)
[docs] def get_date_created(self, rec):
"""Placeholder function returning the date created"""
return self.get_mtime(rec('Multimedia'), '%Y%m%d')
[docs] def get_datetime_created(self, rec):
"""Placeholder function returning the full date and time created"""
return self.get_mtime(rec('Multimedia'))
[docs] def get_headline(self, rec):
"""Placeholder function returning the headline"""
headline = rec('MulTitle')
# Limit to exactly 64 characters
if len(headline) > 64:
if '(NMNH' in headline or '(USNM' in headline:
headline, catnum = headline.rsplit('(', 1)
len_catnum = len(catnum) + 1
headline = headline[:60 - len_catnum].rstrip() + '... (' + catnum
else:
headline = headline[:61].rstrip() + '...'
return headline
[docs] def get_inventory_numbers(self, rec):
"""Returns a list of catalog numbers"""
return self.get_objects()
[docs] def get_job_id(self, rec):
"""Returns the import identifier"""
job_id = rec('AdmImportIdentifier')
if not job_id:
job_id = self.job_id
return job_id
[docs] def get_object_name(self, rec, mask='include_code'):
"""Returns the photo identifier or list of pictured objects"""
object_name = rec.get_guid('Photographer number')
if object_name is None:
objects = []
for obj in self.get_objects(rec):
obj.set_mask(mask)
objects.append(obj.from_mask())
object_name = '; '.join(objects)
return object_name
[docs] def get_source(self, rec):
"""Returns source of the multimedia file"""
return rec('DetSource')
[docs] def get_special_instructions(self, rec):
"""Returns long credit line for special instructions"""
creators = [c if not c.startswith('Unknown') else lcfirst(c)
for c in self.get_creator(rec)]
creator = oxford_comma(creators, False)
credit = ['Full credit line: Photo by {} provided courtesy of the '
'Smithsonian Institution'.format(creator)]
# Add any photo enhancements logged in EMu
contributors = [contrib for contrib in rec('DetContributor_tab')
if 'enhanced by ' in contrib]
credit.extend(contributors)
return '. '.join(credit)
[docs] def get_subjects(self, rec):
"""Returns media topics for this record"""
subjects = ['medtop:20000727'] # geology
if 'NMNH G' in rec('MulTitle'):
subjects.append('medtop:20000012') # jewellery
return subjects
[docs] def get_time_created(self, rec):
"""Placeholder function returning the time created"""
return self.get_mtime(rec('Multimedia'), '%H%M%S%z')
[docs] def get_transmission_reference(self, rec):
"""Returns the import identifier"""
return self.get_job_id(rec)
[docs] def get_object_numbers(self, rec):
"""Returns list of catalog numbers"""
obj_data = []
for obj in rec.objects if hasattr(rec, 'objects') else []:
obj_data.append(obj.object['catnum'])
return obj_data
[docs] def get_object_sources(self, rec, source='NMNH-Smithsonian Institution'):
"""Returns list with museum name"""
return [source] * len(rec.objects if hasattr(rec, 'objects') else [])
[docs] def get_object_titles(self, rec):
"""Returns list of object titles"""
obj_data = []
for obj in rec.objects if hasattr(rec, 'objects') else []:
obj_data.append(obj.object['xname'])
return obj_data
[docs] def get_object_urls(self, rec):
"""Returns list of object URLs"""
"""Returns list of object titles"""
obj_data = []
for obj in rec.objects if hasattr(rec, 'objects') else []:
obj_data.append(obj.object['url'])
return obj_data