#!/usr/bin/python
"""
GeoBase. Rebooted.
>>> b = NeoBase()
>>> b.get('ORY', 'city_code_list')
['PAR']
>>> b.get('ORY', 'city_name_list')
['Paris']
>>> b.get('ORY', 'country_code')
'FR'
>>> b.distance('ORY', 'CDG')
34.87...
>>> b.get_location('ORY')
LatLng(lat=48.72..., lng=2.35...)
"""
try:
from importlib.resources import open_text
except ImportError:
from importlib_resources import open_text
from os import getenv
import operator
from datetime import datetime
from collections import namedtuple
from math import pi, cos, sin, asin, sqrt
import csv
import heapq
from functools import partial
open_ = partial(open, encoding="utf-8")
__all__ = ["NeoBase", "LatLng", "OPTD_POR_URL", "UnknownKeyError"]
OPTD_POR_URL = (
"https://raw.githubusercontent.com/opentraveldata/opentraveldata/"
"master/opentraveldata/optd_por_public.csv"
)
_DEF_OPTD_POR_FILE = "optd_por_public.csv"
_DEFAULT_RADIUS = 50
LatLng = namedtuple("LatLng", ["lat", "lng"])
[docs]class UnknownKeyError(KeyError):
pass
# Sentinel value for signatures
_sentinel = object()
[docs]class NeoBase:
"""Main structure, a wrapper around a dict, with dict-like behavior."""
KEY = 0 # iata_code
FIELDS = (
("iata_code", 0, None),
("name", 6, None),
("lat", 8, None),
("lng", 9, None),
("page_rank", 12, lambda s: float(s) if s else None),
("country_code", 16, None),
("country_name", 18, None),
("continent_name", 19, None),
("timezone", 31, None),
("city_code_list", 36, lambda s: s.split(",")),
("city_name_list", 37, lambda s: s.split("=")),
("location_type", 41, list),
("currency", 46, None),
)
# Duplicates behavior, by default we keep everything
DUPLICATES = getenv("OPTD_POR_DUPLICATES", "1") == "1"
[docs] @staticmethod
def skip(row, date):
date_from, date_until = row[13], row[14]
if date_from and date < date_from:
return True
if date_until and date > date_until:
return True
return False
[docs] def __init__(self, rows=None, date=None, duplicates=None):
if date is None:
date = getenv("OPTD_POR_DATE", datetime.today().strftime("%Y-%m-%d"))
if duplicates is None:
duplicates = self.DUPLICATES
if rows is None:
filename = getenv("OPTD_POR_FILE")
if filename is None:
f = open_text("neobase", _DEF_OPTD_POR_FILE)
else:
f = open_(filename)
self._data = self.load(f, date, duplicates)
f.close()
else:
self._data = self.load(rows, date, duplicates)
@staticmethod
def _empty_value():
return {"__dup__": set()}
[docs] @classmethod
def load(cls, f, date, duplicates):
"""Building a dictionary of geographical data from optd_por.
>>> import os.path as op
>>> path = op.join(op.dirname(__file__), 'optd_por_public.csv')
>>> with open_(path) as f:
... b = NeoBase.load(f, '2030-01-01', True)
>>> b['ORY']['city_code_list']
['PAR']
"""
f = iter(f) # convert lists to iterators
fields, key_c = cls.FIELDS, cls.KEY
empty_value = cls._empty_value
data = {}
try:
next(f) # skipping first line
except StopIteration:
pass
for row in csv.reader(f, delimiter="^", quotechar='"'):
# Comments and empty lines
if not row or row[0].startswith("#"):
continue
if cls.skip(row, date):
continue
key = row[key_c]
if not key:
continue
if key in data and not duplicates:
continue
d = empty_value()
for field, c, splitter in fields:
if splitter is None:
d[field] = row[c]
else:
d[field] = splitter(row[c])
if key not in data:
data[key] = d
else:
prev_d = data[key]
new_key = f"{key}@{1 + len(prev_d['__dup__'])}"
data[new_key] = d
# Exchanging duplicata information
d["__dup__"] = prev_d["__dup__"] | {key}
prev_d["__dup__"].add(new_key)
return data
def __iter__(self):
"""Returns iterator of all keys in the base.
:returns: the iterator of all keys
>>> b = NeoBase()
>>> sorted(b)
['AAA', 'AAA@1', 'AAB', ...
"""
return iter(self._data)
def __contains__(self, key):
"""Test if a key is in the base.
:param key: the key of to be tested
:returns: a boolean
>>> b = NeoBase()
>>> 'AN' in b
False
>>> 'AGN' in b
True
>>> 'agn' in b
True
>>> None in b
False
"""
if key is not None:
key = key.upper()
return key in self._data
def __nonzero__(self):
"""Testing structure emptiness.
:returns: a boolean
>>> b = NeoBase()
>>> if b:
... print('not empty')
not empty
"""
return bool(self._data)
def __len__(self):
"""Testing structure size.
:returns: a integer
>>> b = NeoBase()
>>> 18000 < len(b) < 20000
True
"""
return len(self._data)
[docs] def keys(self):
"""Returns iterator of all keys in the base.
:returns: the iterator of all keys
>>> b = NeoBase()
>>> sorted(b.keys())
['AAA', 'AAA@1', 'AAB', ...
"""
return iter(self)
[docs] def set(self, key, **data):
"""Set information.
>>> b = NeoBase()
>>> b.get('ORY', 'name')
'Paris Orly Airport'
>>> b.set('ORY', name='test')
>>> b.get('ORY', 'name')
'test'
>>> b.set('Wow!', name='test')
>>> b.get('Wow!', 'name')
'test'
"""
if key is not None:
key = key.upper()
if key not in self:
self._data[key] = self._empty_value()
self._data[key].update(data)
[docs] def get(self, key, field=None, default=_sentinel):
"""Get data from structure.
>>> b = NeoBase()
>>> b.get('OR', 'city_code_list', default=None)
>>> b.get('ORY', 'city_code_list')
['PAR']
>>> b.get('nce', 'city_code_list')
['NCE']
"""
if key is not None:
key = key.upper()
try:
d = self._data[key]
except KeyError as e:
# Unless default is set, we raise an Exception
if default is _sentinel:
raise UnknownKeyError(f"Key not found: {key}") from e
return default
if field is None:
return d # we return the whole dictionary
try:
res = d[field]
except KeyError as e:
raise KeyError(f"Field '{field}' (for key '{key}') not in {list(d)}") from e
else:
return res
[docs] def get_location(self, key, default=_sentinel):
"""Get None or the geocode.
>>> b = NeoBase()
>>> b.get_location('ORY')
LatLng(lat=48.72..., lng=2.35...)
"""
if key not in self:
# Unless default is set, we raise an Exception
if default is _sentinel:
raise UnknownKeyError(f"Key not found: {key}")
return default
try:
loc = LatLng(float(self.get(key, "lat")), float(self.get(key, "lng")))
except (ValueError, TypeError, KeyError):
# Decode geocode, if error, returns None
# TypeError : input type is not a string, probably None
# ValueError: could not convert to float
# KeyError : could not find lat or lng 'fields'
return None
else:
return loc
[docs] @staticmethod
def distance_between_locations(l0, l1):
"""Great circle distance
:param l0: the LatLng tuple of the first location
:param l1: the LatLng tuple of the second location
:returns: the distance in kilometers
>>> NeoBase.distance_between_locations((48.84, 2.367), (43.70, 7.26)) # Paris -> Nice
683.85...
Case of unknown location.
>>> NeoBase.distance_between_locations(None, (43.70, 7.26)) # returns None
"""
if l0 is None or l1 is None:
return None
l0_lat = l0[0] / 180 * pi
l0_lng = l0[1] / 180 * pi
l1_lat = l1[0] / 180 * pi
l1_lng = l1[1] / 180 * pi
# Haversine formula (6371 is Earth radius)
return (
2
* 6371.0
* asin(
sqrt(
sin(0.5 * (l0_lat - l1_lat)) ** 2
+ sin(0.5 * (l0_lng - l1_lng)) ** 2 * cos(l0_lat) * cos(l1_lat)
)
)
)
[docs] def distance(self, key_0, key_1, default=_sentinel):
"""Compute distance between two elements.
This is just a wrapper between the original haversine
function, but it is probably the most used feature :)
:param key_0: the first key
:param key_1: the second key
:returns: the distance (km)
>>> b = NeoBase()
>>> b.distance('ORY', 'CDG')
34.87...
"""
try:
l0 = self.get_location(key_0)
l1 = self.get_location(key_1)
except KeyError:
if default is _sentinel:
raise
return default
else:
return self.distance_between_locations(l0, l1)
def _build_distances(self, lat_lng_ref, keys):
"""
Compute the iterable of (dist, keys) of a reference
lat_lng and a list of keys. Keys which have not valid
geocodes will not appear in the results.
>>> b = NeoBase()
>>> list(b._build_distances((0,0), ['ORY', 'CDG']))
[(5422.74..., 'ORY'), (5455.45..., 'CDG')]
"""
if lat_lng_ref is None:
return
for key in keys:
if key in self:
lat_lng = self.get_location(key)
if lat_lng is not None:
yield self.distance_between_locations(lat_lng_ref, lat_lng), key
[docs] def find_near_location(self, lat_lng, radius=_DEFAULT_RADIUS, from_keys=None):
"""
Returns a list of nearby keys from a location (given
latidude and longitude), and a radius for the search.
Note that the haversine function, which compute distance
at the surface of a sphere, here returns kilometers,
so the radius should be in kms.
:param lat_lng: the lat_lng of the location
:param radius: the radius of the search (kilometers)
:param from_keys: if None, it takes all keys in consideration, else takes from_keys \
iterable of keys to perform search.
:returns: an iterable of (dist, key)
>>> b = NeoBase()
>>> # Paris, airports <= 50km
>>> [b.get(k, 'iata_code') for d, k in sorted(b.find_near_location((48.84, 2.367), 5))]
['PAR', 'XGB', 'XHP', 'XPG', 'XEX']
"""
if from_keys is None:
from_keys = iter(self)
for dist, key in self._build_distances(lat_lng, from_keys):
if dist <= radius:
yield dist, key
[docs] def find_near(self, key, radius=_DEFAULT_RADIUS, from_keys=None):
"""
Same as find_near_location, except the location is given
not by a lat/lng, but with its key, like ORY or SFO.
We just look up in the base to retrieve lat/lng, and
call find_near_location.
:param key: the key of the location
:param radius: the radius of the search (kilometers)
:param from_keys: if None, it takes all keys in consideration, else takes from_keys \
iterable of keys to perform search.
:returns: an iterable of (dist, key)
>>> b = NeoBase()
>>> sorted(b.find_near('ORY', 10)) # Orly, por <= 10km
[(0.0, 'ORY'), (6.94..., 'XJY'), (9.96..., 'QFC')]
"""
if from_keys is None:
from_keys = iter(self)
if key not in self:
return
yield from self.find_near_location(
self.get_location(key),
radius=radius,
from_keys=from_keys,
)
[docs] def find_closest_from_location(self, lat_lng, N=1, from_keys=None):
"""
Concept close to find_near_location, but here we do not
look for the keys radius-close to a location,
we look for the closest key from this location, given by
latitude/longitude.
:param lat_lng: the lat_lng of the location
:param N: the N closest results wanted
:param from_keys: if None, it takes all keys in consideration, else takes from_keys \
iterable of keys to perform find_closest_from_location. This is useful to combine \
searches
:returns: an iterable of (dist, key)
>>> b = NeoBase()
>>> list(b.find_closest_from_location((43.70, 7.26))) # Nice
[(0.60..., 'NCE@1')]
>>> list(b.find_closest_from_location((43.70, 7.26), N=3)) # Nice
[(0.60..., 'NCE@1'), (5.82..., 'NCE'), (5.89..., 'XBM')]
"""
if from_keys is None:
from_keys = iter(self)
iterable = self._build_distances(lat_lng, from_keys)
yield from heapq.nsmallest(N, iterable)
[docs] def find_closest_from(self, key, N=1, from_keys=None):
"""
Same as find_closest_from_location, except the location is given
not by a lat/lng, but with its key, like ORY or SFO.
We just look up in the base to retrieve lat/lng, and
call find_closest_from_location.
:param key: the key of the location
:param N: the N closest results wanted
:param from_keys: if None, it takes all keys in consideration, else takes from_keys \
iterable of keys to perform find_closest_from_location. This is useful to combine \
searches
:returns: an iterable of (dist, key)
>>> b = NeoBase()
>>> list(b.find_closest_from('NCE'))
[(0.0, 'NCE')]
>>> list(b.find_closest_from('NCE', N=3))
[(0.0, 'NCE'), (5.07..., 'XCG@1'), (5.45..., 'XCG')]
"""
if from_keys is None:
from_keys = iter(self)
if key not in self:
return
yield from self.find_closest_from_location(
self.get_location(key),
N=N,
from_keys=from_keys,
)
[docs] def find_with(self, conditions, from_keys=None, reverse=False):
"""Get iterator of all keys with particular field.
For example, if you want to know all airports in Paris.
:param conditions: a list of (field, value) conditions
:param reverse: we look keys where the field is *not* the particular value
:returns: an iterator of matching keys
Testing several conditions.
>>> b = NeoBase()
>>> c0 = [('city_code_list', ['PAR'])]
>>> c1 = [('location_type', ['H'])]
>>> len(list(b.find_with(c0)))
16
>>> len(list(b.find_with(c0 + c1)))
2
"""
if from_keys is None:
from_keys = iter(self)
match = operator.ne if reverse else operator.eq
for key in from_keys:
if key in self:
if all(match(self.get(key, f), v) for f, v in conditions):
yield key