ipapi/ipapi.py

163 lines
7.2 KiB
Python
Executable file

#!/usr/bin/env python3
from argparse import ArgumentParser, Namespace
from functools import lru_cache
from sys import stderr
from pathlib import Path
from time import time
from zlib import decompress
from re import compile
from urllib.parse import unquote
from hilbertcurve.hilbertcurve import HilbertCurve
from ipaddress import IPv4Network
from dns.resolver import resolve_address, NXDOMAIN, NoAnswer, LifetimeTimeout, NoNameservers
from dns.exception import SyntaxError
parser = ArgumentParser("ipmap")
parser.add_argument("-a", "--address", default = "127.0.0.1", help = "the address to use for the api (default: %(default)s)")
parser.add_argument("-p", "--port", default = 8000, type = int, help = "the port to use for the api (default: %(default)s)")
parser.add_argument("--processes", default = 1, type = int, help = "the number of processes to use for the api (default: %(default)s)")
parser.add_argument("--workers", default = 1, type = int, help = "the number of workers to use for the api (default: %(default)s)")
parser.add_argument("--log-level", default = "INFO", help = "the log level use for the api (default: %(default)s)")
parser.add_argument("-l", "--limit", default = 3600, type = int, help = "the number of requests per hour to limit usage of the api to (default: %(default)s)")
parser.add_argument("-r", "--raws", help = "the path containing raw tile data (default: %(default)s)")
parser.add_argument("-t", "--tile-size", default = 256, type = int, help = "the tile size to use (default: %(default)s)")
parser.add_argument("-c", "--cache-size", default = 4096, type = int, help = "the number of compressed tiles to keep in the cache (default: %(default)s)")
parser.add_argument("-d", "--decomp-cache-size", default = 1024, type = int, help = "the number of decompressed tiles to keep in the cache (default: %(default)s)")
parser.add_argument("-v", "--variants", default = "density,rtt", help = "the variants to return data for (default: %(default)s)")
args = parser.parse_args()
def parse_list_arg(arg: str):
return [x.strip().lower() for x in arg.split(",") if x.strip()]
from robyn import Robyn, Request, Response, ALLOW_CORS, jsonify
app = Robyn(__file__, Namespace(**{
"processes": args.processes,
"workers": args.workers,
"dev": False,
"create": False,
"docs": False,
"open_browser": False,
"version": False,
"compile_rust_path": None,
"create_rust_file": None,
"log_level": args.log_level,
}))
ALLOW_CORS(app, origins = ["*"])
def response(body: dict, status_code = 200, headers: dict = {}):
return Response(status_code, headers, jsonify(body))
def error_response(message: str, status_code: int, headers: dict = {}):
return response({ "detail": message }, status_code, headers)
limit = int(args.limit)
limit_ttl = 3600
ttl_cache: dict[str, list[int]] = dict()
@app.before_request()
def middleware(request: Request):
if request.ip_addr:
current_timestamp = int(time())
timestamps = [t for t in ttl_cache.get(request.ip_addr, []) if t > current_timestamp - limit_ttl] + [current_timestamp]
ttl_cache[request.ip_addr] = timestamps
if len(timestamps) > limit:
return error_response("Rate limit exceeded!", 429)
return request
@app.get("/api/rdns/:ip")
def get_rdns(request: Request):
ip = unquote(request.path_params["ip"])
try:
answer = resolve_address(ip, search = True)
except SyntaxError:
return error_response("Invalid IP address", 400)
except (NXDOMAIN, NoAnswer):
return error_response("No rDNS information found for IP", 404)
except LifetimeTimeout:
return error_response("Request for rDNS information timed out", 504)
except NoNameservers:
return error_response("No nameservers currently available to fulfil request", 503)
except Exception as e:
print(e, file = stderr)
return error_response("Unexpected error occurred", 500)
return response({ "ip": ip, "rdns": str(answer[0]).rstrip(".") }, headers = { "Cache-Control": f"max-age={answer.rrset.ttl}" })
if args.raws:
raws_path = Path(args.raws)
tile_size = int(args.tile_size)
variants = parse_list_arg(args.variants)
variants_set = set(variants)
date_pattern = compile(r"^[0-9]{8}$")
data_bytes = 4
@lru_cache(args.cache_size)
def get_raw_data_compressed(path: Path):
print(f"reading file '{path}'...")
return path.read_bytes()
@lru_cache(args.decomp_cache_size)
def get_raw_data(path: Path):
compressed_data = get_raw_data_compressed(path)
print(f"decompressing file '{path}'...")
return decompress(compressed_data)
@app.get("/api/scandata/:date/:variant/tile/:z/:y/:x")
def get_info_tile(request: Request):
date = unquote(request.path_params["date"])
if date_pattern.match(date) is None:
return error_response("Invalid date provided", 400)
variant = unquote(request.path_params["variant"])
if variant not in variants_set:
return error_response("Invalid variant provided", 400)
try:
z = int(unquote(request.path_params["z"]))
except ValueError:
return error_response("Invalid z value provided", 400)
try:
y = int(unquote(request.path_params["y"]))
if y < 0:
raise ValueError()
except ValueError:
return error_response("Invalid y value provided", 400)
try:
x = int(unquote(request.path_params["x"]))
if x < 0:
raise ValueError()
except ValueError:
return error_response("Invalid x value provided", 400)
y_tile, y_off = divmod(y, tile_size)
x_tile, x_off = divmod(x, tile_size)
path = raws_path / date / variant / f"{z}" / f"{y_tile}" / f"{x_tile}.bin"
img_size = (tile_size >> -z) if z < 0 else tile_size
off = y_off * img_size * data_bytes + x_off * data_bytes
try:
value = int.from_bytes(get_raw_data(path)[off : off + data_bytes], "little")
except FileNotFoundError:
return error_response("Requested data not found", 404)
except Exception as e:
print(e, file = stderr)
return error_response("Unexpected error occurred", 500)
return response({ variant: value }, headers = { "Cache-Control": "max-age=31536000" })
@app.get("/api/scandata/:date/:variant/range/:range")
def get_info_range(request: Request):
try:
range = IPv4Network(unquote(request.path_params["range"]), strict = False)
except ValueError:
return error_response("Invalid IPv4 range", 400)
if range.prefixlen % 2 != 0:
return error_response("Prefix length of IPv4 range must be even", 400)
if range.prefixlen == 0:
x, y = 0, 0
else:
x, y = HilbertCurve(range.prefixlen // 2, 2).point_from_distance(int(range.network_address) // (1 << (32 - range.prefixlen)))
z = (range.prefixlen - 16) // 2
request.path_params["x"] = str(x)
request.path_params["y"] = str(y)
request.path_params["z"] = str(z)
return get_info_tile(request)
app.start(host = args.address, port = args.port)