from decimal import Decimal
import json
import re
import sys
from overpy.__about__ import (
__author__, __copyright__, __email__, __license__, __summary__, __title__,
__uri__, __version__
)
from overpy import exception
PY2 = sys.version_info[0] == 2
PY3 = sys.version_info[0] == 3
if PY2:
from urllib2 import urlopen
from urllib2 import HTTPError
elif PY3:
from urllib.request import urlopen
from urllib.error import HTTPError
[docs]class Overpass(object):
"""
Class to access the Overpass API
"""
default_read_chunk_size = 4096
def __init__(self, read_chunk_size=None):
"""
:param read_chunk_size: Max size of each chunk read from the server response
:type read_chunk_size: Integer
"""
self.url = "http://overpass-api.de/api/interpreter"
self._regex_extract_error_msg = re.compile(b"\<p\>(?P<msg>\<strong\s.*?)\</p\>")
self._regex_remove_tag = re.compile(b"<[^>]*?>")
if read_chunk_size is None:
read_chunk_size = self.default_read_chunk_size
self.read_chunk_size = read_chunk_size
[docs] def query(self, query):
"""
Query the Overpass API
:param String|Bytes query: The query string in Overpass QL
:return: The parsed result
:rtype: overpy.Result
"""
if not isinstance(query, bytes):
query = bytes(query, "utf-8")
try:
f = urlopen(self.url, query)
except HTTPError as e:
f = e
response = f.read(self.read_chunk_size)
while True:
data = f.read(self.read_chunk_size)
if len(data) == 0:
break
response = response + data
f.close()
if f.code == 200:
if PY2:
http_info = f.info()
content_type = http_info.getheader("content-type")
else:
content_type = f.getheader("Content-Type")
if content_type == "application/json":
return self.parse_json(response)
if content_type == "application/osm3s+xml":
return self.parse_xml(response)
raise exception.OverpassUnknownContentType(content_type)
if f.code == 400:
msgs = []
for msg in self._regex_extract_error_msg.finditer(response):
tmp = self._regex_remove_tag.sub(b"", msg.group("msg"))
try:
tmp = tmp.decode("utf-8")
except UnicodeDecodeError:
tmp = repr(tmp)
msgs.append(tmp)
raise exception.OverpassBadRequest(
query,
msgs=msgs
)
if f.code == 429:
raise exception.OverpassTooManyRequests
if f.code == 504:
raise exception.OverpassGatewayTimeout
raise exception.OverpassUnknownHTTPStatusCode(f.code)
[docs] def parse_json(self, data, encoding="utf-8"):
"""
Parse raw response from Overpass service.
:param data: Raw JSON Data
:type data: String or Bytes
:param encoding: Encoding to decode byte string
:type encoding: String
:return: Result object
:rtype: overpy.Result
"""
if isinstance(data, bytes):
data = data.decode(encoding)
data = json.loads(data, parse_float=Decimal)
return Result.from_json(data, api=self)
[docs] def parse_xml(self, data, encoding="utf-8"):
"""
:param data: Raw XML Data
:type data: String or Bytes
:param encoding: Encoding to decode byte string
:type encoding: String
:return: Result object
:rtype: overpy.Result
"""
if isinstance(data, bytes):
data = data.decode(encoding)
if PY2 and not isinstance(data, str):
# Python 2.x: Convert unicode strings
data = data.encode(encoding)
import xml.etree.ElementTree as ET
root = ET.fromstring(data)
return Result.from_xml(root, api=self)
[docs]class Result(object):
"""
Class to handle the result.
"""
def __init__(self, elements=None, api=None):
"""
:param List elements:
:param api:
:type api: overpy.Overpass
"""
if elements is None:
elements = []
self._elements = elements
self.api = api
[docs] def expand(self, other):
"""
Add all elements from an other result to the list of elements of this result object.
It is used by the auto resolve feature.
:param other: Expand the result with the elements from this result.
:type other: overpy.Result
:raises ValueError: If provided parameter is not instance of :class:`overpy.Result`
"""
if not isinstance(other, Result):
raise ValueError("Provided argument has to be instance of overpy:Result()")
def do(ids, elements):
for element in elements:
if element.id is not None and element.id not in ids:
self._elements.append(element)
ids.append(element.id)
do(self.node_ids, other.nodes)
do(self.way_ids, other.ways)
do(self.relation_ids, other.relations)
[docs] def append(self, element):
"""
Append a new element to the result.
:param element: The element to append
:type element: overpy.Element
"""
self._elements.append(element)
[docs] def get_elements(self, filter_cls, elem_id=None):
"""
Get a list of elements from the result and filter the element type by a class.
:param filter_cls:
:param elem_id: ID of the object
:type elem_id: Integer
:return: List of available elements
:rtype: List
"""
result = []
for e in self._elements:
if not isinstance(e, filter_cls):
continue
if elem_id is not None and e.id != elem_id:
continue
result.append(e)
return result
[docs] def get_ids(self, filter_cls):
"""
:param filter_cls:
:return:
"""
result = []
for node in self.get_elements(filter_cls):
if node.id is not None and node.id not in result:
result.append(node.id)
return result
def get_node_ids(self):
return self.get_ids(filter_cls=Node)
def get_way_ids(self):
return self.get_ids(filter_cls=Way)
def get_relation_ids(self):
return self.get_ids(filter_cls=Relation)
@classmethod
[docs] def from_json(cls, data, api=None):
"""
Create a new instance and load data from json object.
:param data: JSON data returned by the Overpass API
:type data: Dict
:param api:
:type api: overpy.Overpass
:return: New instance of Result object
:rtype: overpy.Result
"""
result = cls(api=api)
for elem_cls in [Node, Way, Relation]:
for element in data.get("elements", []):
e_type = element.get("type")
if hasattr(e_type, "lower") and e_type.lower() == elem_cls._type_value:
result.append(elem_cls.from_json(element, result=result))
return result
@classmethod
[docs] def from_xml(cls, root, api=None):
"""
Create a new instance and load data from xml object.
:param data: Root element
:type data: xml.etree.ElementTree.Element
:param api:
:type api: Overpass
:return: New instance of Result object
:rtype: Result
"""
result = cls(api=api)
for elem_cls in [Node, Way, Relation]:
for child in root:
if child.tag.lower() == elem_cls._type_value:
result.append(elem_cls.from_xml(child, result=result))
return result
[docs] def get_node(self, node_id, resolve_missing=False):
"""
Get a node by its ID.
:param node_id: The node ID
:type node_id: Integer
:param resolve_missing: Query the Overpass API if the node is missing in the result set.
:return: The node
:rtype: overpy.Node
:raises overpy.exception.DataIncomplete: At least one referenced node is not available in the result cache.
:raises overpy.exception.DataIncomplete: If resolve_missing is True and at least one node can't be resolved.
"""
nodes = self.get_nodes(node_id=node_id)
if len(nodes) == 0:
if not resolve_missing:
raise exception.DataIncomplete("Resolve missing nodes is disabled")
query = ("\n"
"[out:json];\n"
"node({node_id});\n"
"out body;\n"
)
query = query.format(
node_id=node_id
)
tmp_result = self.api.query(query)
self.expand(tmp_result)
nodes = self.get_nodes(node_id=node_id)
if len(nodes) == 0:
raise exception.DataIncomplete("Unable to resolve all nodes")
return nodes[0]
[docs] def get_nodes(self, node_id=None, **kwargs):
"""
Alias for get_elements() but filter the result by Node()
:param node_id: The Id of the node
:type node_id: Integer
:return: List of elements
"""
return self.get_elements(Node, elem_id=node_id, **kwargs)
[docs] def get_relation(self, rel_id, resolve_missing=False):
"""
Get a relation by its ID.
:param rel_id: The relation ID
:type rel_id: Integer
:param resolve_missing: Query the Overpass API if the relation is missing in the result set.
:return: The relation
:rtype: overpy.Relation
:raises overpy.exception.DataIncomplete: The requested relation is not available in the result cache.
:raises overpy.exception.DataIncomplete: If resolve_missing is True and the relation can't be resolved.
"""
relations = self.get_relations(rel_id=rel_id)
if len(relations) == 0:
if resolve_missing is False:
raise exception.DataIncomplete("Resolve missing relations is disabled")
query = ("\n"
"[out:json];\n"
"relation({relation_id});\n"
"out body;\n"
)
query = query.format(
relation_id=rel_id
)
tmp_result = self.api.query(query)
self.expand(tmp_result)
relations = self.get_relations(rel_id=rel_id)
if len(relations) == 0:
raise exception.DataIncomplete("Unable to resolve requested reference")
return relations[0]
[docs] def get_relations(self, rel_id=None, **kwargs):
"""
Alias for get_elements() but filter the result by Relation
:param rel_id: Id of the relation
:type rel_id: Integer
:return: List of elements
"""
return self.get_elements(Relation, elem_id=rel_id, **kwargs)
[docs] def get_way(self, way_id, resolve_missing=False):
"""
Get a way by its ID.
:param way_id: The way ID
:type way_id: Integer
:param resolve_missing: Query the Overpass API if the way is missing in the result set.
:return: The way
:rtype: overpy.Way
:raises overpy.exception.DataIncomplete: The requested way is not available in the result cache.
:raises overpy.exception.DataIncomplete: If resolve_missing is True and the way can't be resolved.
"""
ways = self.get_ways(way_id=way_id)
if len(ways) == 0:
if resolve_missing is False:
raise exception.DataIncomplete("Resolve missing way is disabled")
query = ("\n"
"[out:json];\n"
"way({way_id});\n"
"out body;\n"
)
query = query.format(
way_id=way_id
)
tmp_result = self.api.query(query)
self.expand(tmp_result)
ways = self.get_ways(way_id=way_id)
if len(ways) == 0:
raise exception.DataIncomplete("Unable to resolve requested way")
return ways[0]
[docs] def get_ways(self, way_id=None, **kwargs):
"""
Alias for get_elements() but filter the result by Way
:param way_id: The Id of the way
:type way_id: Integer
:return: List of elements
"""
return self.get_elements(Way, elem_id=way_id, **kwargs)
node_ids = property(get_node_ids)
nodes = property(get_nodes)
relation_ids = property(get_relation_ids)
relations = property(get_relations)
way_ids = property(get_way_ids)
ways = property(get_ways)
[docs]class Element(object):
"""
Base element
"""
def __init__(self, attributes=None, result=None, tags=None):
"""
:param attributes: Additional attributes
:type attributes: Dict
:param result: The result object this element belongs to
:param tags: List of tags
:type tags: Dict
"""
self._result = result
self.attributes = attributes
self.tags = tags
[docs]class Node(Element):
"""
Class to represent an element of type node
"""
_type_value = "node"
def __init__(self, node_id=None, lat=None, lon=None, **kwargs):
"""
:param lat: Latitude
:type lat: Decimal or Float
:param lon: Longitude
:type long: Decimal or Float
:param node_id: Id of the node element
:type node_id: Integer
:param kwargs: Additional arguments are passed directly to the parent class
"""
Element.__init__(self, **kwargs)
self.id = node_id
self.lat = lat
self.lon = lon
@classmethod
[docs] def from_json(cls, data, result=None):
"""
Create new Node element from JSON data
:param child: Element data from JSON
:type child: Dict
:param result: The result this element belongs to
:type result: overpy.Result
:return: New instance of Node
:rtype: overpy.Node
:raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match.
"""
if data.get("type") != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=data.get("type")
)
tags = data.get("tags", {})
node_id = data.get("id")
lat = data.get("lat")
lon = data.get("lon")
attributes = {}
ignore = ["type", "id", "lat", "lon", "tags"]
for n, v in data.items():
if n in ignore:
continue
attributes[n] = v
return cls(node_id=node_id, lat=lat, lon=lon, tags=tags, attributes=attributes, result=result)
@classmethod
[docs] def from_xml(cls, child, result=None):
"""
Create new way element from XML data
:param child: XML node to be parsed
:type child: xml.etree.ElementTree.Element
:param result: The result this node belongs to
:type result: overpy.Result
:return: New Way oject
:rtype: overpy.Node
:raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match
:raises ValueError: If a tag doesn't have a name
"""
if child.tag.lower() != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=child.tag.lower()
)
tags = {}
for sub_child in child:
if sub_child.tag.lower() == "tag":
name = sub_child.attrib.get("k")
if name is None:
raise ValueError("Tag without name/key.")
value = sub_child.attrib.get("v")
tags[name] = value
node_id = child.attrib.get("id")
if node_id is not None:
node_id = int(node_id)
lat = child.attrib.get("lat")
if lat is not None:
lat = Decimal(lat)
lon = child.attrib.get("lon")
if lon is not None:
lon = Decimal(lon)
attributes = {}
ignore = ["id", "lat", "lon"]
for n, v in child.attrib.items():
if n in ignore:
continue
attributes[n] = v
return cls(node_id=node_id, lat=lat, lon=lon, tags=tags, attributes=attributes, result=result)
[docs]class Way(Element):
"""
Class to represent an element of type way
"""
_type_value = "way"
def __init__(self, way_id=None, node_ids=None, **kwargs):
"""
:param node_ids: List of node IDs
:type node_ids: List or Tuple
:param way_id: Id of the way element
:type way_id: Integer
:param kwargs: Additional arguments are passed directly to the parent class
"""
Element.__init__(self, **kwargs)
#: The id of the way
self.id = way_id
#: List of Ids of the associated nodes
self._node_ids = node_ids
@property
[docs] def nodes(self):
"""
List of nodes associated with the way.
"""
return self.get_nodes()
[docs] def get_nodes(self, resolve_missing=False):
"""
Get the nodes defining the geometry of the way
:param resolve_missing: Try to resolve missing nodes.
:type resolve_missing: Boolean
:return: List of nodes
:rtype: List of overpy.Node
:raises overpy.exception.DataIncomplete: At least one referenced node is not available in the result cache.
:raises overpy.exception.DataIncomplete: If resolve_missing is True and at least one node can't be resolved.
"""
result = []
resolved = False
for node_id in self._node_ids:
try:
node = self._result.get_node(node_id)
except exception.DataIncomplete:
node = None
if node is not None:
result.append(node)
continue
if not resolve_missing:
raise exception.DataIncomplete("Resolve missing nodes is disabled")
# We tried to resolve the data but some nodes are still missing
if resolved:
raise exception.DataIncomplete("Unable to resolve all nodes")
query = ("\n"
"[out:json];\n"
"way({way_id});\n"
"node(w);\n"
"out body;\n"
)
query = query.format(
way_id=self.id
)
tmp_result = self._result.api.query(query)
self._result.expand(tmp_result)
resolved = True
try:
node = self._result.get_node(node_id)
except exception.DataIncomplete:
node = None
if node is None:
raise exception.DataIncomplete("Unable to resolve all nodes")
result.append(node)
return result
@classmethod
[docs] def from_json(cls, data, result=None):
"""
Create new Way element from JSON data
:param child: Element data from JSON
:type child: Dict
:param result: The result this element belongs to
:type result: overpy.Result
:return: New instance of Way
:rtype: overpy.Way
:raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match.
"""
if data.get("type") != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=data.get("type")
)
tags = data.get("tags", {})
way_id = data.get("id")
node_ids = data.get("nodes")
attributes = {}
ignore = ["id", "nodes", "tags", "type"]
for n, v in data.items():
if n in ignore:
continue
attributes[n] = v
return cls(way_id=way_id, attributes=attributes, node_ids=node_ids, tags=tags, result=result)
@classmethod
[docs] def from_xml(cls, child, result=None):
"""
Create new way element from XML data
:param child: XML node to be parsed
:type child: xml.etree.ElementTree.Element
:param result: The result this node belongs to
:type result: overpy.Result
:return: New Way oject
:rtype: overpy.Way
:raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match
:raises ValueError: If the ref attribute of the xml node is not provided
:raises ValueError: If a tag doesn't have a name
"""
if child.tag.lower() != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=child.tag.lower()
)
tags = {}
node_ids = []
for sub_child in child:
if sub_child.tag.lower() == "tag":
name = sub_child.attrib.get("k")
if name is None:
raise ValueError("Tag without name/key.")
value = sub_child.attrib.get("v")
tags[name] = value
if sub_child.tag.lower() == "nd":
ref_id = sub_child.attrib.get("ref")
if ref_id is None:
raise ValueError("Unable to find required ref value.")
ref_id = int(ref_id)
node_ids.append(ref_id)
way_id = child.attrib.get("id")
if way_id is not None:
way_id = int(way_id)
attributes = {}
ignore = ["id"]
for n, v in child.attrib.items():
if n in ignore:
continue
attributes[n] = v
return cls(way_id=way_id, attributes=attributes, node_ids=node_ids, tags=tags, result=result)
[docs]class Relation(Element):
"""
Class to represent an element of type relation
"""
_type_value = "relation"
def __init__(self, rel_id=None, members=None, **kwargs):
"""
:param members:
:param rel_id: Id of the relation element
:type rel_id: Integer
:param kwargs:
:return:
"""
Element.__init__(self, **kwargs)
self.id = rel_id
self.members = members
@classmethod
[docs] def from_json(cls, data, result=None):
"""
Create new Relation element from JSON data
:param child: Element data from JSON
:type child: Dict
:param result: The result this element belongs to
:type result: overpy.Result
:return: New instance of Relation
:rtype: overpy.Relation
:raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match.
"""
if data.get("type") != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=data.get("type")
)
tags = data.get("tags", {})
rel_id = data.get("id")
members = []
supported_members = [RelationNode, RelationWay]
for member in data.get("members", []):
type_value = member.get("type")
for member_cls in supported_members:
if member_cls._type_value == type_value:
members.append(
member_cls.from_json(
member,
result=result
)
)
attributes = {}
ignore = ["id", "members", "tags", "type"]
for n, v in data.items():
if n in ignore:
continue
attributes[n] = v
return cls(rel_id=rel_id, attributes=attributes, members=members, tags=tags, result=result)
@classmethod
[docs] def from_xml(cls, child, result=None):
"""
Create new way element from XML data
:param child: XML node to be parsed
:type child: xml.etree.ElementTree.Element
:param result: The result this node belongs to
:type result: overpy.Result
:return: New Way oject
:rtype: overpy.Relation
:raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match
:raises ValueError: If a tag doesn't have a name
"""
if child.tag.lower() != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=child.tag.lower()
)
tags = {}
members = []
supported_members = [RelationNode, RelationWay]
for sub_child in child:
if sub_child.tag.lower() == "tag":
name = sub_child.attrib.get("k")
if name is None:
raise ValueError("Tag without name/key.")
value = sub_child.attrib.get("v")
tags[name] = value
if sub_child.tag.lower() == "member":
type_value = sub_child.attrib.get("type")
for member_cls in supported_members:
if member_cls._type_value == type_value:
members.append(
member_cls.from_xml(
sub_child,
result=result
)
)
rel_id = child.attrib.get("id")
if rel_id is not None:
rel_id = int(rel_id)
attributes = {}
ignore = ["id"]
for n, v in child.attrib.items():
if n in ignore:
continue
attributes[n] = v
return cls(rel_id=rel_id, attributes=attributes, members=members, tags=tags, result=result)
[docs]class RelationMember(object):
"""
Base class to represent a member of a relation.
"""
def __init__(self, ref=None, role=None, result=None):
"""
:param ref: Reference Id
:type ref: Integer
:param role: The role of the relation member
:type role: String
:param result:
"""
self.ref = ref
self._result = result
self.role = role
@classmethod
[docs] def from_json(cls, data, result=None):
"""
Create new RelationMember element from JSON data
:param child: Element data from JSON
:type child: Dict
:param result: The result this element belongs to
:type result: overpy.Result
:return: New instance of RelationMember
:rtype: overpy.RelationMember
:raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match.
"""
if data.get("type") != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=data.get("type")
)
ref = data.get("ref")
role = data.get("role")
return cls(ref=ref, role=role, result=result)
@classmethod
[docs] def from_xml(cls, child, result=None):
"""
Create new RelationMember from XML data
:param child: XML node to be parsed
:type child: xml.etree.ElementTree.Element
:param result: The result this element belongs to
:type result: overpy.Result
:return: New relation member oject
:rtype: overpy.RelationMember
:raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match
"""
if child.attrib.get("type") != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=child.tag.lower()
)
ref = child.attrib.get("ref")
if ref is not None:
ref = int(ref)
role = child.attrib.get("role")
return cls(ref=ref, role=role, result=result)
[docs]class RelationNode(RelationMember):
_type_value = "node"
def resolve(self, resolve_missing=False):
return self._result.get_node(self.ref, resolve_missing=resolve_missing)
[docs]class RelationWay(RelationMember):
_type_value = "way"
def resolve(self, resolve_missing=False):
return self._result.get_way(self.ref, resolve_missing=resolve_missing)