"""Defines a requests session customized to interact with GeoNames"""
import math
import time
from datetime import datetime
import requests
import requests_cache
from lxml import etree
from .containers import GeoList, TO_COUNTRY_CODE, NAME_TO_ABBR
[docs]class GeoBot(requests_cache.CachedSession):
"""Methods to handle and retry HTTP requests for georeferencing"""
def __init__(self, wait, *args, **kwargs):
super(GeoBot, self).__init__(*args, **kwargs)
self.wait = wait
def _retry(self, func, *args, **kwargs):
"""Retries failed requests using a simple exponential backoff"""
for i in xrange(8):
try:
response = func(*args, **kwargs)
except (requests.exceptions.ConnectionError,
requests.exceptions.Timeout):
seconds = 30 * 2 ** i
print 'Retrying in {:,} seconds...'.format(seconds)
time.sleep(seconds)
else:
if not response.from_cache:
print 'Resting up for the big push...'
time.sleep(self.wait)
return response
raise Exception('Maximum retries exceeded')
[docs]class GeoNamesBot(GeoBot):
"""A cacheable requests object customized for GeoNames webservices
Attributes:
username (str): a valid username for GeoNames
headers (dict): headers data for requests
"""
def __init__(self, username, user_id=None):
wait = 3600. / 2000.
super(GeoNamesBot, self).__init__(wait, cache_name='geobots')
self.username = username
if user_id is None:
user_id = username
user_agent = 'MinSciBot/0.2 ({})'.format(user_id)
self.headers.update({
'User-Agent': user_agent
})
# Maps simple names to GeoNames field names
self._params = {
'country': 'countryName',
'state': 'adminName1',
'county': 'adminName2'
}
def _query_geonames(self, url, **params):
"""Generalized method for querying the GeoNames webservices
Args:
url (str): the url to query
params (dict): query parameters
Returns:
Result set as JSON
"""
defaults = {
'formatted': 'True',
'style': 'full',
'username': self.username,
}
defaults.update(params)
# Make and parse query
response = self._retry(self.get, url, params=defaults)
if response.status_code == 200:
print response.url
content = response.json()
status = content.get('status')
if status is None:
return GeoList(content.get('geonames', content), **self._params)
elif response.from_cache:
# If bad response comes from cache, delete that entry and
# try again
self.cache.delete_url(response.url)
return self._query_geonames(url, **self._params)
else:
# If bad response is live, kill the process
self.cache.delete_url(response.url)
print '{message} (code={value})'.format(**status)
if status.get('value') in (18, 19, 20):
raise RuntimeError('Out of credits')
# If not a credit error, try again in 30 seconds
time.sleep(30)
return self._query_geonames(url, **self._params)
[docs] def get_by_id(self, geoname_id):
"""Returns feature data for a given GeoNames ID
Args:
geoname_id (str): the ID of a feature in GeoNames
Returns:
JSON representation of the matching feature
"""
assert geoname_id
url = 'http://api.geonames.org/getJSON'
return self._query_geonames(url, geonameId=geoname_id)
[docs] def search(self, query, countries=None, **params):
"""Searches all GeoNames fields for a query string
Args:
query (str): query string
countries (mixed): a list or pipe-delimited string of countries
featureClass (list)
featureCode (list)
Returns:
JSON representation of matching locations
"""
url = 'http://api.geonames.org/searchJSON'
if query:
params['q'] = query
if countries is not None:
if isinstance(countries, basestring):
countries = countries.split('|')
codes = [TO_COUNTRY_CODE.get(c.strip()) for c in countries if c]
codes = [code for code in codes if code is not None]
if len(codes) == len(countries):
params['country'] = codes
return self._query_geonames(url, **params)
else:
return GeoList([], **self._params)
[docs] def find_nearby(self, lat, lng, dec_places=None):
"""Returns geographical information for a lat-long pair
Args:
lat (float): latitide
lng (float): longitude
dec_places (int): decimal places
Returns:
JSON representation of point
"""
url = 'http://api.geonames.org/findNearbyJSON'
return self._find_latlong(url, lat, lng, dec_places)
[docs] def country_subdivision(self, lat, lng, dec_places=None):
"""Returns basic geographical information for a lat-long pair
Args:
lat (float): latitide
lng (float): longitude
dec_places (int): decimal places
Returns:
JSON representation of point
"""
url = 'http://api.geonames.org/countrySubdivisionJSON'
return self._find_latlong(url, lat, lng, dec_places)
def _find_latlong(self, url, lat, lng, dec_places=None):
"""Returns information for a lat-long pair from the given url
Args:
url (str): url of webservice. Must accept lat/lng as params.
lat (float): latitide
lng (float): longitude
dec_places (int): decimal places
Returns:
JSON representation of point
"""
if dec_places is not None:
mask = '{0:.' + str(dec_places) + 'f}'
lat = mask.format(lat)
lng = mask.format(lng)
params = {'lat': lat, 'lng': lng}
print 'Populating geography for {}...'.format(params)
return self._query_geonames(url, **params)
[docs]class GEOLocateBot(GeoBot):
"""A cacheable requests object customized for GEOLocate webservices
FIXME: This whole class needs to be cleaned up and tested
"""
[docs] def search(self, loc_string, country, state, county=None, **kwargs):
"""Use the GeoLocate webservice to geolocate the query string
Args:
loc_string (str): a query string
country (str): name or abbreviation of country
state (str): name or abbreviation of state or equivalent
country (str): name of county or equivalent
Returns:
Tuple including best match and payload. Best match is a list
including lat, lng, radius, precision, and score of match. payload
is a dict of search parameters.
"""
print u'Geolocating "{}" using GeoLocate...'.format(loc_string)
url = ('http://www.museum.tulane.edu/webservices'
'/geolocatesvcv2/geolocatesvc.asmx/Georef2')
headers = {'content-type': 'application/x-www-form-urlencoded'}
params = {
'Country': country,
'State': state,
'County': county,
'LocalityString': loc_string,
'HwyX': True,
'FindWaterbody': True,
'RestrictToLowestAdm': False,
'doUncert': True,
'doPoly': False,
'displacePoly': False,
'polyAsLinkID': False,
'LanguageKey': 0
}
params.update(kwargs)
response = self.get(url, headers=headers, params=params)
if response.status_code == 200:
if not response.from_cache:
print u' Caching {}...'.format(response.url)
time.sleep(3) # GeoLocate asks for a 3-second gap b/w requests
nmsp = 'http://www.museum.tulane.edu/webservices/'
base = '/nmsp:Georef_Result_Set/nmsp:ResultSet'
# Process results file. Keep only the best match.
root = etree.fromstring(response.text.encode('utf8')) # why is encode here
lat = root.xpath('{}/nmsp:WGS84Coordinate/nmsp:Latitude'.format(base),
namespaces={'nmsp': nmsp})
lng = root.xpath('{}/nmsp:WGS84Coordinate/nmsp:Longitude'.format(base),
namespaces={'nmsp': nmsp})
radius = root.xpath('{}/nmsp:UncertaintyRadiusMeters'.format(base),
namespaces={'nmsp': nmsp})
precision = root.xpath('{}/nmsp:Precision'.format(base),
namespaces={'nmsp': nmsp})
score = root.xpath('{}/nmsp:Score'.format(base),
namespaces={'nmsp': nmsp})
results = [[s.text for s in row] for row
in zip(lat, lng, radius, precision, score)]
try:
high_score = max([int(x[4]) for x in results])
except ValueError:
return None, params
try:
result = [r for r in results if int(r[4]) == high_score][0]
except IndexError:
# No match found
result = None
result[0] = float(result[0]) # decimalize latitude
result[1] = float(result[1]) # decimalize longitude
return result, params
else:
raw_input(response.text)
[docs] @staticmethod
def geolocate_to_emu(result, payload):
"""Create EMu import based on GeoLocate result"""
note = (u'Coordinates determined using the GEOLocate'
' Georef2 webservice for locality string'
' "' + payload['LocalityString'] + '."'
' Additional search parameters were: ')
for key in ('Country',
'State',
'County',
'HwyX',
'FindWaterbody',
'RestrictToLowestAdm',
'doUncert',
'doPoly',
'displacePoly',
'polyAsLinkID',
'LanguageKey'):
val = payload[key]
if val is True:
val = 'TRUE'
elif val is False:
val = 'FALSE'
else:
val = str(val)
if bool(val):
note += key + '=' + str(val) + '; '
note = note.rstrip('; ')
return {
'LatLatitudeDecimal': [result[0]],
'LatLongitudeDecimal': [result[1]],
'LatComment': [result[3] + ' confidence'],
'LatGeoreferencingNotes': [note],
'LatDetSource': ['Georeference'],
'LatRadiusVerbatim': [result[2] + ' m'],
'LatRadiusProbability': [result[4]],
'LatRadiusNumeric': [result[2]],
'LatRadiusUnit': ['m'],
'LatDatum': ['WGS84'],
'LatDeterminedByRef': ['1006206'],
'LatDetDate': [datetime.now().strftime('%d%m%Y')]
}
[docs]class TownshipGeocoder(GeoBot):
"""A cacheable requests object customized for BLM geocoder webservice"""
[docs] def geocommunicator(self, trs, state, meridian=None):
"""Use the BLM TownshipGeocoder webservice to geolocate TRS
Args:
trs (str): well-formed section-township-range
state (str): name or abbreviation of a U.S. state
meridian (str): number of principal meridian. This is required to
geolocate a TRS, but rarely recorded, so the function will try
out all principal meridians in a state if it is not provided.
Returns:
List of lat-lng pairs
"""
print u'Geolocating "{}" using GeoCommunicator...'.format(trs)
# Get two-letter abberviation for state
if len(state) != 2:
try:
state = NAME_TO_ABBR[state.lower()]
except KeyError:
print u'"{}" is not a valid state'.format(state)
return []
# Format TRS following GeoCommunicator guidelines
twn, rng, sec = trs.upper().split(' ', 2)
qtr = ''
if sec.count(' ') > 1:
sec, qtr = sec.rsplit(' ', 1)
gc_trs = [
state, # two-letter state abbreviation
None, # principal meridian (populated before request)
twn.strip('TNS'), # township number
0, # township fraction (?)
twn[-1], # township direction
rng.strip('REW'), # range number
0, # range fraction (?)
rng[-1], # range direction
sec.strip('SEC. '), # section number
qtr, # quarter section as NW, N2NW, NWNWSW, etc.
0
]
# Identify prime meridians in the given state if meridian not given
if meridian is None:
params = {'StateAbbrev': state}
url = ('http://www.geocommunicator.gov/TownshipGeocoder'
'/TownshipGeocoder.asmx/GetPMList')
response = self._retry(self.get, url, params=params)
if response.status_code == 200:
if not response.from_cache:
print u' Caching {}...'.format(response.url)
time.sleep(3)
root = etree.fromstring(response.text.encode('utf8'))
meridians = root.xpath('/nmsp:TownshipGeocoderResult/nmsp:Data',
namespaces={'nmsp': 'http://www.esri.com/'})
meridians = [meridian.strip()[:2] for meridian
in meridians[0].text.split(',')]
else:
meridians = [meridian.zfill(2)]
# Get coordinates for trs for all principal meridians
url = ('http://www.geocommunicator.gov/TownshipGeocoder'
'/TownshipGeocoder.asmx/GetLatLon')
coordinates = []
for meridian in meridians:
gc_trs[1] = meridian
params = {'TRS': ','.join([str(s) for s in gc_trs])}
response = self._retry(self.get, url, params=params)
if response.status_code == 200:
if not response.from_cache:
print u' Caching {}...'.format(response.url)
time.sleep(3)
root = etree.fromstring(response.text.encode('utf8'))
result = root.xpath('/nmsp:TownshipGeocoderResult/nmsp:Data',
namespaces={'nmsp': 'http://www.esri.com/'})
if len(result):
root = etree.fromstring(result[0].text)
point = root.xpath('/rss/channel/item/georss:point',
namespaces={'georss': 'http://www.georss.org/georss'})
lng, lat = [float(c) for c in point[0].text.split(' ')]
coordinates.append((lat, lng))
return coordinates
[docs]def distance_on_unit_sphere(lat1, long1, lat2, long2):
"""Calculates the distance in km between two points on a sphere
From http://www.johndcook.com/blog/python_longitude_latitude/
Args:
lat1 (int or float): latitude of first coordinate pair
long1 (int or float): longitude of first coordinate pair
lat2 (int or float): latitude of second coordinate pair
long2 (int or float): longitdue of second coordinate pair
Returns:
Distance between two points in km
"""
# Convert latitude and longitude to spherical coordinates in radians.
degrees_to_radians = math.pi / 180.0
# phi = 90 - latitude
phi1 = (90.0 - lat1) * degrees_to_radians
phi2 = (90.0 - lat2) * degrees_to_radians
# theta = longitude
theta1 = long1 * degrees_to_radians
theta2 = long2 * degrees_to_radians
# Compute spherical distance from spherical coordinates.
# For two locations in spherical coordinates
# (1, theta, phi) and (1, theta', phi')
# cosine( arc length ) =
# sin phi sin phi' cos(theta-theta') + cos phi cos phi'
# distance = rho * arc length
cos = (math.sin(phi1) * math.sin(phi2) * math.cos(theta1 - theta2) +
math.cos(phi1) * math.cos(phi2))
arc = math.acos(cos)
# Remember to multiply arc by the radius of the earth
# in your favorite set of units to get length.
return arc * 6371.
[docs]def dec2dms(dec, is_lat):
"""Converts decimal degrees to degrees-minutes-seconds
Args:
dec (float): a coordinate as a decimal
is_lat (bool): specifies if the coordinate is a latitude
Returns:
Coordinate in degrees-minutes-seconds
"""
# Force longitude if decimal degrees more than 90
if is_lat and dec > 90:
raise ValueError('Invalid latitude: {}'.format(dec))
# Get degrees-minutes-seconds
degrees = abs(int(dec))
minutes = 60. * (abs(dec) % 1)
seconds = 60. * (minutes % 1)
minutes = int(minutes)
if seconds >= 60:
minutes += 1
seconds -= 60
if minutes == 60:
degrees += 1
minutes = 0
if dec >= 0:
hemisphere = 'N' if is_lat else 'E'
else:
hemisphere = 'S' if is_lat else 'W'
# FIXME: Estimate precision based on decimal places
mask = '{} {} {} {}'
return mask.format(degrees, minutes, seconds, hemisphere)