Compare commits

..

4 Commits

6 changed files with 353 additions and 259 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
/data /data
/public/assets/tiles /public/assets/tiles
**/*.bin

View File

@ -1,13 +1,10 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import re from argparse import ArgumentParser
import argparse
from dataclasses import dataclass from dataclasses import dataclass
import dns.exception from dns.resolver import resolve_address, NXDOMAIN, NoAnswer, LifetimeTimeout, NoNameservers
import dns.rdatatype from dns.exception import SyntaxError
import dns.resolver from uvicorn import run
import dns.reversename
import uvicorn
from fastapi import FastAPI, HTTPException, Request, Response from fastapi import FastAPI, HTTPException, Request, Response
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from slowapi.errors import RateLimitExceeded from slowapi.errors import RateLimitExceeded
@ -26,22 +23,20 @@ app.add_middleware(
app.state.limiter = limiter app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
ip_regex = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$")
limit = "1/second" limit = "1/second"
@app.get("/api/rdns/{ip}") @app.get("/api/rdns/{ip}")
@limiter.limit(lambda: limit) @limiter.limit(lambda: limit)
async def get_rdns(ip: str, request: Request, response: Response): async def get_rdns(ip: str, request: Request, response: Response):
try: try:
answer = dns.resolver.resolve_address(ip, search = True) answer = resolve_address(ip, search = True)
except dns.exception.SyntaxError: except SyntaxError:
raise HTTPException(status_code=400, detail="Invalid IP address") raise HTTPException(status_code=400, detail="Invalid IP address")
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer): except (NXDOMAIN, NoAnswer):
raise HTTPException(status_code=404, detail="No rDNS information found for IP") raise HTTPException(status_code=404, detail="No rDNS information found for IP")
except dns.resolver.LifetimeTimeout: except LifetimeTimeout:
raise HTTPException(status_code=504, detail="Request for rDNS information timed out") raise HTTPException(status_code=504, detail="Request for rDNS information timed out")
except dns.resolver.NoNameservers: except NoNameservers:
raise HTTPException(status_code=503, detail="No nameservers currently available to fulfil request") raise HTTPException(status_code=503, detail="No nameservers currently available to fulfil request")
except: except:
raise HTTPException(status_code=500, detail="Unexpected error occurred") raise HTTPException(status_code=500, detail="Unexpected error occurred")
@ -59,13 +54,13 @@ def parse_list_arg(arg: str):
def main(): def main():
global limit global limit
parser = argparse.ArgumentParser("ipmap") parser = ArgumentParser("ipmap")
parser.add_argument("-a", "--address", default = "127.0.0.1", help = "the address to use for the api (default: %(default)s)") parser.add_argument("-a", "--address", default = "127.0.0.1", help = "the address to use for the api (default: %(default)s)")
parser.add_argument("-p", "--port", default = 8000, type = int, help = "the port to use for the api (default: %(default)s)") parser.add_argument("-p", "--port", default = 8000, type = int, help = "the port to use for the api (default: %(default)s)")
parser.add_argument("-l", "--limit", default = limit, help = "the rate limit for the api (default: %(default)s)") parser.add_argument("-l", "--limit", default = limit, help = "the rate limit for the api (default: %(default)s)")
args = parser.parse_args(namespace = IpApiArgs) args = parser.parse_args(namespace = IpApiArgs)
limit = args.limit limit = args.limit
uvicorn.run(app, host=args.address, port=args.port) run(app, host=args.address, port=args.port)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

503
ipmap.py
View File

@ -1,22 +1,103 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import sys from os import devnull
import math from sys import stdout, stderr, exit
import functools from contextlib import redirect_stdout
import argparse from argparse import ArgumentParser
import json from warnings import catch_warnings
import shutil
from pathlib import Path
import png
import hilbert
import numpy as np
import polars as pl
from cmap import Colormap
from multiprocessing import Pool from multiprocessing import Pool
from shutil import rmtree
from gc import collect
from json import loads, dumps
from pathlib import Path
from dataclasses import dataclass from dataclasses import dataclass
from typing import Literal from typing import Literal, TypeVar
from png import Writer
from cmap import Colormap
from hilbert import decode
from numpy.typing import NDArray
import numpy as np
def dedup_preserving_order(vals: list) -> list: ip_bytes = 4
ip_bits = ip_bytes * 8
num_ips = 1 << ip_bits
num_ips_sqrt = 1 << ip_bits // 2
def make_coord_range(start: int, end: int):
return decode(np.arange(start, end, dtype = np.uint32), num_dims = 2, num_bits = 16).astype(np.uint16)
default_batches = 64
default_processes = 4
def make_coords(output_path: Path, batches = default_batches, processes = default_processes):
if not 1 <= batches <= 0x10000:
raise ValueError(f"batches must be between 1 and {0x10000}")
if not 1 <= processes <= 256:
raise ValueError(f"processes must be between 1 and 256")
ips_per_batch, leftover_batch_ips = divmod(num_ips, batches)
if leftover_batch_ips > 0:
raise ValueError("the total number of ips must evenly divide into the number of batches")
ips_per_process, leftover_process_ips = divmod(ips_per_batch, processes)
if leftover_process_ips > 0:
raise ValueError("the number of ips within each batch must evenly divide into the number of processes")
if output_path.is_dir():
raise ValueError("output path must not be a directory")
output_path.write_bytes(b'')
with Pool(processes) as p:
for batch in range(batches):
print(f"starting batch {batch}...")
arrs = p.starmap(make_coord_range, ((offset * ips_per_process, offset * ips_per_process + ips_per_process)
for offset in range(batch * processes, batch * processes + processes)))
print(f"finished batch, writing arrays to file...")
with output_path.open("ab") as f:
for arr in arrs:
f.write(arr.tobytes())
print(f"finished writing to file")
def convert(input_path: Path, output_path: Path):
print(f"reading csv '{input_path}' into array...", end = " ", flush = True)
arr = np.loadtxt(input_path, dtype = np.uint32, delimiter = ",", skiprows = 1)
print("done")
print("filtering out unsuccessful values...", end = " ", flush = True)
arr = arr[arr[:, -1] == 1]
print("done")
print("removing success column...", end = " ", flush = True)
arr = arr[:, :-1]
print("done")
print("removing duplicate IP addresses...", end = " ", flush = True)
arr = arr[np.unique(arr[:, 0], return_index = True)[1]]
print("done")
print("converting IP addresses from big-endian to little-endian...", end = " ", flush = True)
arr[:, 0].byteswap(inplace = True)
print("done")
print(f"writing array to '{output_path}'")
output_path.write_bytes(arr.tobytes())
print("done")
default_tile_size = 1 << ip_bits // 4
default_variant_names = ["density", "rtt"]
default_colormap_names = ["viridis"]
default_quantile = 0.995
def make_tiles(coords_path: Path, input_path: Path, tiles_dir: Path, *,
tile_size = default_tile_size, alpha = False,
variant_names: list[str] = default_variant_names,
colormap_names: list[str] = default_colormap_names,
quantile = default_quantile, num_rows: int | None = None,
skip_iters: int | None = None, json_path: Path | None = None):
if not 64 <= tile_size <= num_ips_sqrt or tile_size & (tile_size - 1) != 0:
raise ValueError(f"tile size must be a power of 2 between 64 and {num_ips_sqrt}")
if len(variant_names) == 0:
raise ValueError("must specify at least one variant")
if len(colormap_names) == 0:
raise ValueError("must specify at least one colormap")
if not 0 <= quantile <= 1:
raise ValueError(f"quantile must be between 0 and 1")
T = TypeVar("T")
def dedup_preserving_order(vals: list[T]) -> list[T]:
seen = set() seen = set()
result = [] result = []
for item in vals: for item in vals:
@ -25,74 +106,23 @@ def dedup_preserving_order(vals: list) -> list:
result.append(item) result.append(item)
return result return result
def convert_to_parquet(csv_path: Path, parquet_path: Path, *, quiet = False): colormaps = [(colormap_name, Colormap(colormap_name)) for colormap_name in dedup_preserving_order(colormap_names)]
if not quiet:
print(f"scanning csv '{csv_path}' into parquet '{parquet_path}'...", end = " ", flush = True)
lf = pl.scan_csv(csv_path, schema={
"saddr": pl.String,
"rtt_us": pl.UInt64,
"success": pl.UInt8
})
lf = lf.filter(pl.col("success") == 1)
lf = lf.drop("success")
lf = lf.with_columns(rtt_us = pl.col("rtt_us").clip(0, 0xFFFFFFFF).cast(pl.UInt32))
lf = lf.with_columns(saddr = pl.col("saddr").str.split_exact(".", 3).struct.rename_fields(["a", "b", "c", "d"]))
lf = lf.with_columns(saddr = pl.col("saddr").struct.field("a").cast(pl.UInt32) * 0x1000000 + pl.col("saddr").struct.field("b").cast(pl.UInt32) * 0x10000 + pl.col("saddr").struct.field("c").cast(pl.UInt32) * 0x100 + pl.col("saddr").struct.field("d").cast(pl.UInt32))
lf = lf.unique("saddr")
lf = lf.with_columns(coords = pl.col("saddr").map_batches(functools.partial(hilbert.decode, num_dims = 2, num_bits = 16), pl.Array(pl.UInt16, 2), is_elementwise = True))
lf = lf.with_columns(x = pl.col("coords").arr.get(0), y = pl.col("coords").arr.get(1))
lf = lf.drop("coords")
lf.sink_parquet(parquet_path)
if not quiet:
print("done")
def write_tile(path: Path, rows: np.ndarray, *, alpha = False):
path.parent.mkdir(exist_ok = True, parents = True)
png.Writer(rows.shape[1], rows.shape[0], greyscale = False, alpha = alpha).write_packed(path.open("wb"), rows)
default_tile_size = 256
default_variants = ["density", "rtt"]
default_colormaps = ["viridis"]
default_quantile = 0.995
default_processes = 16
def generate_tiles(parquet_path: Path, tiles_dir: Path, *, tile_size = default_tile_size, alpha = False,
variants: list[str] = default_variants, colormaps: list[str] = default_colormaps,
quantile = default_quantile, processes = default_processes, num_rows: int | None = None,
skip_iters: int | None = None, json_path: Path | None = None, quiet = False):
if not 1 <= tile_size <= 0x10000 or tile_size & (tile_size - 1) != 0:
raise ValueError(f"tile size must be a power of 2 between 1 and {0x10000}")
tiles_per_side = int(math.sqrt(0x100000000)) // tile_size
if len(variants) == 0:
raise ValueError("must specify at least one variant")
if len(colormaps) == 0:
raise ValueError("must specify at least one colormap")
if not 0 <= quantile <= 1:
raise ValueError(f"quantile must be between 0 and 1")
colormaps = dedup_preserving_order(colormaps)
channels = 4 if alpha else 3 channels = 4 if alpha else 3
colormaps_by_name = { colormap: [bytes(c) for c in (Colormap(colormap).lut()[:,0:channels] * (256.0 - np.finfo(np.float32).eps)).astype(np.uint8)] for colormap in colormaps } empty_color = np.zeros(channels, dtype = np.uint8)
generate_density = False should_generate_density = False
generate_rtt = False should_generate_rtt = False
for variant in variants: for variant_name in variant_names:
if variant == "density": if variant_name == "density":
generate_density = True should_generate_density = True
elif variant == "rtt": elif variant_name == "rtt":
generate_rtt = True should_generate_rtt = True
else: else:
raise ValueError(f"unknown variant '{variant}'") raise ValueError(f"unknown variant '{variant_name}'")
if skip_iters is not None: if skip_iters is not None:
if skip_iters <= 0: if not 0 <= skip_iters < (num_ips_sqrt // tile_size).bit_length():
raise ValueError("cannot skip negative iterations") raise ValueError("must skip zero or more but not all iterations")
elif skip_iters >= tiles_per_side.bit_length():
raise ValueError("cannot skip all iterations")
if json_path is not None: if json_path is not None:
if json_path.is_dir(): if json_path.is_dir():
@ -104,89 +134,193 @@ def generate_tiles(parquet_path: Path, tiles_dir: Path, *, tile_size = default_t
else: else:
tiles_dir_parts = None tiles_dir_parts = None
if not quiet: def create_images(data: np.ndarray, colormap: Colormap, num_colors: int, path: Path):
print(f"reading parquet '{parquet_path}'...", end = " ", flush = True) print(f"creating {num_colors} color stop(s) of {colormap.name} colormap...", end = " ", flush = True)
df = pl.read_parquet(parquet_path, columns = ["x", "y", "rtt_us"], n_rows=num_rows) colors = np.concatenate(([empty_color], ((colormap([0.0]) if num_colors == 1 else colormap.lut(num_colors))[:, 0:channels] * 255).astype(np.uint8)))
if not quiet:
print("done") print("done")
print(f"creating {data.shape[1]}x{data.shape[0]} pixel image for {colormap.name} colormap...", end = " ", flush = True)
possible_overlaps = 1 image_data = colors[data]
rtt_quantile = df.get_column("rtt_us").quantile(quantile) or 1.0
df = df.with_columns(count = pl.lit(possible_overlaps, pl.UInt32), rtt_us = pl.col("rtt_us").clip(0, rtt_quantile))
write_tile_p = functools.partial(write_tile, alpha = alpha)
def generate_images(colormap: str, type_name: str, series: pl.Series):
nonlocal df
if not quiet:
print(f"creating {type_name} image data with {colormap} colormap...", end = " ", flush = True)
image_data = np.zeros((tiles_per_side * tile_size, tiles_per_side * tile_size), dtype = f"S{channels}")
image_data[(df.get_column("y"), df.get_column("x"))] = (series * 255.9999).clip(0, 255).cast(pl.UInt8).replace(pl.int_range(256), colormaps_by_name[colormap], return_dtype = pl.Binary)
if not quiet:
print("done") print("done")
tiles_per_side = image_data.shape[0] // tile_size
if not quiet:
print(f"writing {tiles_per_side}x{tiles_per_side}={tiles_per_side * tiles_per_side} {type_name} images with {colormap} colormap...", end = " ", flush = True)
with Pool(processes) as pool:
z = tiles_per_side.bit_length() - 1 z = tiles_per_side.bit_length() - 1
z_path = tiles_dir / type_name / colormap / f"{z}" z_path = path / f"{z}"
z_path.mkdir(exist_ok = True, parents = True) z_path.mkdir(exist_ok = True, parents = True)
pool.starmap(write_tile_p, [ print(f"writing {tiles_per_side * tiles_per_side} ({tiles_per_side}x{tiles_per_side}) images to '{path}'...", end = " ", flush = True)
(z_path / f"{y}" / f"{x}.png", image_data[ for y in range(tiles_per_side):
y_path = z_path / f"{y}"
y_path.mkdir(exist_ok = True)
for x in range(tiles_per_side):
path = y_path / f"{x}.png"
rows = image_data[
y * tile_size : y * tile_size + tile_size, y * tile_size : y * tile_size + tile_size,
x * tile_size : x * tile_size + tile_size, x * tile_size : x * tile_size + tile_size,
]) ]
for y in range(tiles_per_side) Writer(tile_size, tile_size, greyscale = False, alpha = alpha).write_packed(path.open("wb"), rows)
for x in range(tiles_per_side)
])
if not quiet:
print("done") print("done")
del image_data
def scale_down_coords(scale = 2): def get_scan_data() -> tuple[NDArray[np.uint32], NDArray[np.uint32]]:
nonlocal df print(f"reading scan data from file '{input_path}'...", end = " ", flush = True)
nonlocal tiles_per_side data = np.fromfile(input_path, dtype = np.uint32).reshape(-1, 2)
ip_arr = np.copy(data.T[0])
rtt_arr = np.copy(data.T[1])
print("done")
return (ip_arr, rtt_arr)
def get_all_data() -> tuple[tuple[NDArray[np.uint16], NDArray[np.uint16]], NDArray[np.uint32]]:
ip_arr, rtt_arr = get_scan_data()
print(f"reading coordinates from file '{coords_path}'...", end = " ", flush = True)
ip_coords = np.fromfile(coords_path, dtype = np.uint16).reshape(-1, 2)
print("done")
print(f"converting ip addresses to coordinates...", end = " ", flush = True)
xs, ys = ip_coords[ip_arr].T
print("done")
return ((ys, xs), rtt_arr)
coords, rtt_arr = get_all_data()
def generate_density():
possible_overlaps = 1
print(f"allocating empty {num_ips_sqrt}x{num_ips_sqrt} array of density data...", end = " ", flush = True)
density_data = np.zeros((num_ips_sqrt, num_ips_sqrt), dtype = np.uint32)
print("done")
print(f"assigning values to density data array...", end = " ", flush = True)
density_data[coords] = possible_overlaps
print("done")
def squish():
nonlocal density_data
nonlocal possible_overlaps nonlocal possible_overlaps
density_data = np.swapaxes(density_data.reshape(density_data.shape[0] // 2, 2, density_data.shape[1] // 2, 2), 1, 2)
print("calculating density sum...", end = " ", flush = True)
density_data[:, :, 0, 0] += density_data[:, :, 0, 1]
density_data[:, :, 0, 0] += density_data[:, :, 1, 0]
density_data[:, :, 0, 0] += density_data[:, :, 1, 1]
print(f"done (shrunk density data from {density_data.shape[0] * 2}x{density_data.shape[1] * 2} -> {density_data.shape[0]}x{density_data.shape[1]})")
density_data = density_data[:, :, 0, 0]
possible_overlaps *= 4
prev_tiles_per_side = tiles_per_side if skip_iters is not None:
tiles_per_side //= scale for _ in range(skip_iters):
possible_overlaps *= scale * scale squish()
if not quiet: def write_all_colormaps():
print(f"scaling {len(df)} coords down from {prev_tiles_per_side}x{prev_tiles_per_side} tiles to {tiles_per_side}x{tiles_per_side} tiles...", end = " ", flush = True) for colormap_name, colormap in colormaps:
df = df.with_columns(x = pl.col("x") // scale, y = pl.col("y") // scale).group_by(["x", "y"]).agg(count = pl.sum("count"), rtt_us = pl.median("rtt_us")) create_images(density_data, colormap, possible_overlaps, tiles_dir / "density" / colormap_name)
if not quiet:
print(f"done with {len(df)} coords remaining")
if skip_iters is not None and skip_iters > 0: write_all_colormaps()
scale_down_coords(1 << skip_iters) while density_data.shape[0] > tile_size:
squish()
write_all_colormaps()
while True: def generate_rtt():
for colormap in colormaps: num_colors = (1 << 16) - 1
if generate_density: multiplier = num_colors - 1
divisor = 256 if possible_overlaps == 1 else possible_overlaps
series = df.get_column("count") / divisor def get_rtt_data():
generate_images(colormap, "density", series) nonlocal rtt_arr
if generate_rtt: print(f"retrieving {quantile:.1%} quantile for rtt data...", end = " ", flush = True)
series = df.get_column("rtt_us") / rtt_quantile rtt_quantile = np.quantile(rtt_arr, quantile)
generate_images(colormap, "rtt", series) print("done")
if tiles_per_side == 1: print(f"scaling rtt data using rtt quantile...", end = " ", flush = True)
break rtt_arr_f = rtt_arr / rtt_quantile
scale_down_coords() print("done")
del rtt_arr
collect()
print("clipping rtt data between 0 and 1...", end = " ", flush = True)
rtt_arr_f.clip(0, 1, out = rtt_arr_f)
print("done")
print(f"allocating empty {num_ips_sqrt}x{num_ips_sqrt} array for rtt data...", end = " ", flush = True)
rtt_data = np.full((num_ips_sqrt, num_ips_sqrt), np.nan, dtype = np.float32)
print("done")
print(f"assigning values to rtt data array...", end = " ", flush = True)
rtt_data[coords] = rtt_arr_f
print("done")
return rtt_data
rtt_data = get_rtt_data()
def squish():
nonlocal rtt_data
print(f"sorting rtt values for median calculation...", end = " ", flush = True)
rtt_data = np.swapaxes(rtt_data.reshape(rtt_data.shape[0] // 2, 2, rtt_data.shape[1] // 2, 2), 1, 2)
rtt_data[np.isnan(rtt_data)] = np.inf # convert NaNs to Inf so comparisons work correctly
mask = np.empty((rtt_data.shape[0], rtt_data.shape[1]), dtype = np.bool_)
np.greater(rtt_data[:, :, 0, 0], rtt_data[:, :, 0, 1], out = mask) # sort first row
rtt_data[mask, 0] = rtt_data[mask, 0, ::-1]
np.greater(rtt_data[:, :, 1, 0], rtt_data[:, :, 1, 1], out = mask) # sort second row
rtt_data[mask, 1] = rtt_data[mask, 1, ::-1]
np.greater(rtt_data[:, :, 0, 0], rtt_data[:, :, 1, 0], out = mask) # sort first column
rtt_data[mask, :, 0] = rtt_data[mask, ::-1, 0]
np.less(rtt_data[:, :, 0, 1], rtt_data[:, :, 1, 1], out = mask) # sort second column in reverse order
rtt_data[mask, :, 1] = rtt_data[mask, ::-1, 1]
np.less(rtt_data[:, :, 1, 0], rtt_data[:, :, 1, 1], out = mask) # sort second row in reverse order
rtt_data[mask, 1] = rtt_data[mask, 1, ::-1]
# rtt_data[:, :, :, 1] = rtt_data[:, :, ::-1, 1] # swap second column (not entirely necessary, just makes indices below nicer)
rtt_data[np.isinf(rtt_data)] = np.nan # restore NaNs
print("done")
print("calculating median rtt values...", end = " ", flush = True)
mask2 = np.empty((rtt_data.shape[0], rtt_data.shape[1]), dtype = np.bool_) # need second mask for binary ops
np.invert(np.isnan(rtt_data[:, :, 0, 1], out = mask), out = mask) # four nums populated
rtt_data[mask, 0, 0] = rtt_data[mask, 1, 1] # take average of index 1 and 2
rtt_data[mask, 0, 0] += rtt_data[mask, 1, 0]
rtt_data[mask, 0, 0] /= 2
np.logical_and(np.invert(np.isnan(rtt_data[:, :, 1, 0], out = mask), out = mask), np.isnan(rtt_data[:, :, 0, 1], out = mask2), out = mask) # three nums populated
rtt_data[mask, 0, 0] = rtt_data[mask, 1, 1] # take index 1
np.logical_and(np.invert(np.isnan(rtt_data[:, :, 1, 1], out = mask), out = mask), np.isnan(rtt_data[:, :, 1, 0], out = mask2), out = mask) # two nums populated
rtt_data[mask, 0, 0] = rtt_data[mask, 0, 0] # take average of index 0 and 1
rtt_data[mask, 0, 0] += rtt_data[mask, 1, 1]
rtt_data[mask, 0, 0] /= 2
# everything else (1 or 0 nums populated) don't need any modifications
print(f"done (shrunk rtt data from {rtt_data.shape[0] * 2}x{rtt_data.shape[1] * 2} -> {rtt_data.shape[0]}x{rtt_data.shape[1]})")
rtt_data = rtt_data[:, :, 0, 0]
if skip_iters is not None:
for _ in range(skip_iters):
squish()
def get_normalized_data():
print(f"normalizing rtt data: multiplying...", end = " ", flush = True)
rtt_data_f = rtt_data * multiplier
print(f"incrementing...", end = " ", flush = True)
rtt_data_f += 1
# print(f"replacing NaNs...", end = " ", flush = True)
# rtt_data_f[np.isnan(rtt_data_f)] = 0.0
print(f"converting to ints...", end = " ", flush = True)
with catch_warnings():
rtt_data_norm = rtt_data_f.astype(np.uint16)
print("done")
return rtt_data_norm
def write_all_colormaps():
rtt_data_norm = get_normalized_data()
for colormap_name, colormap in colormaps:
create_images(rtt_data_norm, colormap, num_colors, tiles_dir / "rtt" / colormap_name)
write_all_colormaps()
while rtt_data.shape[0] > tile_size:
squish()
write_all_colormaps()
if should_generate_rtt:
generate_rtt()
else:
del rtt_arr
collect()
if should_generate_density:
generate_density()
if json_path is not None and tiles_dir_parts is not None: if json_path is not None and tiles_dir_parts is not None:
try: try:
text = json_path.read_text(encoding = "UTF-8") text = json_path.read_text(encoding = "UTF-8")
except: except:
if not quiet:
print("json file not found at provided path, so it will be created instead") print("json file not found at provided path, so it will be created instead")
tile_metadata = {} tile_metadata = {}
else: else:
try: try:
tile_metadata: dict = json.loads(text) tile_metadata: dict = loads(text)
except: except:
if not quiet:
print("invalid json found at provided path, so re-creating file") print("invalid json found at provided path, so re-creating file")
tile_metadata = {} tile_metadata = {}
tile_metadata_cur = tile_metadata tile_metadata_cur = tile_metadata
@ -194,18 +328,16 @@ def generate_tiles(parquet_path: Path, tiles_dir: Path, *, tile_size = default_t
if not part in tile_metadata_cur: if not part in tile_metadata_cur:
tile_metadata_cur[part] = {} tile_metadata_cur[part] = {}
tile_metadata_cur = tile_metadata_cur[part] tile_metadata_cur = tile_metadata_cur[part]
for variant in variants: for variant_name in variant_names:
if not variant in tile_metadata_cur: if not variant_name in tile_metadata_cur:
tile_metadata_cur[variant] = colormaps tile_metadata_cur[variant_name] = dedup_preserving_order(colormap_names)
else: else:
tile_metadata_cur[variant] = dedup_preserving_order(tile_metadata_cur[variant] + colormaps) tile_metadata_cur[variant_name] = dedup_preserving_order(tile_metadata_cur[variant_name] + colormap_names)
if not quiet:
print(f"writing metadata to json file at '{json_path}'...", end = " ", flush = True) print(f"writing metadata to json file at '{json_path}'...", end = " ", flush = True)
json_path.write_text(json.dumps(tile_metadata, indent=2), encoding = "UTF-8") json_path.write_text(dumps(tile_metadata, indent=2), encoding = "UTF-8")
if not quiet:
print("done") print("done")
def remove_tiles(tiles_dir: Path, *, json_path: Path | None = None, quiet = False): def remove_tiles(tiles_dir: Path, *, json_path: Path | None = None):
if not tiles_dir.is_dir(): if not tiles_dir.is_dir():
raise ValueError(f"'{tiles_dir}' is not an existing directory") raise ValueError(f"'{tiles_dir}' is not an existing directory")
@ -221,7 +353,7 @@ def remove_tiles(tiles_dir: Path, *, json_path: Path | None = None, quiet = Fals
except: except:
raise ValueError("json file not found at provided path") raise ValueError("json file not found at provided path")
try: try:
tile_metadata = json.loads(text) tile_metadata = loads(text)
except: except:
raise ValueError("invalid json found at provided path") raise ValueError("invalid json found at provided path")
tile_metadata_cur = tile_metadata tile_metadata_cur = tile_metadata
@ -234,30 +366,28 @@ def remove_tiles(tiles_dir: Path, *, json_path: Path | None = None, quiet = Fals
del tile_metadata_cur[tiles_dir_final] del tile_metadata_cur[tiles_dir_final]
except: except:
raise ValueError(f"unable to find path '{'/'.join([*tiles_dir_parts, tiles_dir_final])}' within json file") raise ValueError(f"unable to find path '{'/'.join([*tiles_dir_parts, tiles_dir_final])}' within json file")
if not quiet:
print(f"writing metadata to json file at '{json_path}'...", end = " ", flush = True) print(f"writing metadata to json file at '{json_path}'...", end = " ", flush = True)
json_path.write_text(json.dumps(tile_metadata, indent=2), encoding = "UTF-8") json_path.write_text(dumps(tile_metadata, indent=2), encoding = "UTF-8")
if not quiet:
print("done") print("done")
if not quiet:
print(f"removing files from '{tiles_dir}'...", end = " ", flush = True) print(f"removing files from '{tiles_dir}'...", end = " ", flush = True)
shutil.rmtree(tiles_dir) rmtree(tiles_dir)
if not quiet:
print("done") print("done")
@dataclass @dataclass
class IpMapArgs: class IpMapArgs:
command: Literal["convert", "generate", "remove"] command: Literal["mkcoords", "convert", "mktiles", "rmtiles"]
quiet: bool quiet: bool
coords: str
input: str input: str
output: str output: str
batches: int
processes: int
tile_size: int tile_size: int
alpha: bool alpha: bool
variants: str variants: str
colormaps: str colormaps: str
quantile: float quantile: float
processes: int
num_rows: int | None num_rows: int | None
skip_iters: int | None skip_iters: int | None
json: str | None json: str | None
@ -266,45 +396,52 @@ def parse_list_arg(arg: str):
return [x.strip().lower() for x in arg.split(",") if x.strip()] return [x.strip().lower() for x in arg.split(",") if x.strip()]
def main(): def main():
parser = argparse.ArgumentParser("ipmap") parser = ArgumentParser("ipmap")
parser.add_argument("-q", "--quiet", action = "store_true", help = "decrease output verbosity") parser.add_argument("-q", "--quiet", action = "store_true", help = "decrease output verbosity")
subparsers = parser.add_subparsers(dest = "command", required = True, help = "the command to run") subparsers = parser.add_subparsers(dest = "command", required = True, help = "the command to run")
mkcoords_parser = subparsers.add_parser("mkcoords", help = "generate coordinates corresponding to each IP address")
mkcoords_parser.add_argument("-b", "--batches", default = default_batches, type = int, help = "the number of batches to split the task into (default: %(default)s)")
mkcoords_parser.add_argument("-p", "--processes", default = default_processes, type = int, help = "the number of processes to split the task across (default: %(default)s)")
mkcoords_parser.add_argument("output", help = "the output path to save the generated coordinates to")
convert_parser = subparsers.add_parser("convert", help = "convert scan data from csv to parquet format") convert_parser = subparsers.add_parser("convert", help = "convert scan data from csv to parquet format")
convert_parser.add_argument("input", help = "the input path of the csv file to read the scan data from") convert_parser.add_argument("input", help = "the input path of the csv file to read the scan data from")
convert_parser.add_argument("output", help = "the output path of the parquet file to save the converted scan data to") convert_parser.add_argument("output", help = "the output path of the parquet file to save the converted scan data to")
generate_parser = subparsers.add_parser("generate", help = "generate tile images from scan data in parquet format") mktiles_parser = subparsers.add_parser("mktiles", help = "generate tile images from scan data in parquet format")
generate_parser.add_argument("-t", "--tile-size", default = default_tile_size, type = int, help = "the tile size to use (default: %(default)s)") mktiles_parser.add_argument("-t", "--tile-size", default = default_tile_size, type = int, help = "the tile size to use (default: %(default)s)")
generate_parser.add_argument("-a", "--alpha", action = "store_true", help = "use alpha channel instead of black") mktiles_parser.add_argument("-a", "--alpha", action = "store_true", help = "use alpha channel instead of black")
generate_parser.add_argument("-v", "--variants", default = ",".join(default_variants), help = "a comma separated list of variants to generate (default: %(default)s)") mktiles_parser.add_argument("-v", "--variants", default = ",".join(default_variant_names), help = "a comma separated list of variants to generate (default: %(default)s)")
generate_parser.add_argument("-c", "--colormaps", default = ",".join(default_colormaps), help = "a comma separated list of colormaps to generate (default: %(default)s)") mktiles_parser.add_argument("-c", "--colormaps", default = ",".join(default_colormap_names), help = "a comma separated list of colormaps to generate (default: %(default)s)")
generate_parser.add_argument("-q", "--quantile", type = float, default = default_quantile, help = "the quantile to use for scaling data such as rtt (default: %(default)s)") mktiles_parser.add_argument("-q", "--quantile", type = float, default = default_quantile, help = "the quantile to use for scaling data such as rtt (default: %(default)s)")
generate_parser.add_argument("-p", "--processes", default = default_processes, type = int, help = "how many processes to spawn for saving images (default: %(default)s)") mktiles_parser.add_argument("-n", "--num-rows", type = int, help = "how many rows to read from the scan data (default: all)")
generate_parser.add_argument("-n", "--num-rows", type = int, help = "how many rows to read from the scan data (default: all)") mktiles_parser.add_argument("-s", "--skip-iters", type = int, help = "how many iterations to skip generating images for (default: none)")
generate_parser.add_argument("-s", "--skip-iters", type = int, help = "how many iterations to skip generating images for (default: none)") mktiles_parser.add_argument("-j", "--json", help = "the path for the json file to store metadata about the tile images (default: none)")
generate_parser.add_argument("-j", "--json", help = "the path for the json file to store metadata about the tile images (default: none)") mktiles_parser.add_argument("coords", help = "the path of the binary file containing the coords to map IP addresses to")
generate_parser.add_argument("input", help = "the input path of the parquet file to read the scan data from") mktiles_parser.add_argument("input", help = "the input path of the parquet file to read the scan data from")
generate_parser.add_argument("output", help = "the output path to save the generated tile images to") mktiles_parser.add_argument("output", help = "the output path to save the generated tile images to")
remove_parser = subparsers.add_parser("remove", help = "remove tile images") rmtiles_parser = subparsers.add_parser("rmtiles", help = "remove tile images")
remove_parser.add_argument("-j", "--json", help = "the path for the json file to store metadata about the tile images (default: none)") rmtiles_parser.add_argument("-j", "--json", help = "the path for the json file to store metadata about the tile images (default: none)")
remove_parser.add_argument("input", help = "the path containing tile images to remove") rmtiles_parser.add_argument("input", help = "the path containing tile images to remove")
args = parser.parse_args(namespace = IpMapArgs) args = parser.parse_args(namespace = IpMapArgs)
try: try:
if args.command == "convert": with redirect_stdout(open(devnull, "w") if args.quiet else stdout):
convert_to_parquet(csv_path = Path(args.input), parquet_path = Path(args.output), quiet = args.quiet) match args.command:
elif args.command == "generate": case "mkcoords":
generate_tiles(parquet_path = Path(args.input), tiles_dir = Path(args.output), make_coords(output_path = Path(args.output), batches = args.batches, processes = args.processes)
tile_size = args.tile_size, alpha = args.alpha, variants = parse_list_arg(args.variants), case "convert":
colormaps = parse_list_arg(args.colormaps), quantile = args.quantile, convert(input_path = Path(args.input), output_path = Path(args.output))
processes = args.processes, num_rows = args.num_rows, skip_iters = args.skip_iters, case "mktiles":
json_path = Path(args.json) if args.json else None, quiet = args.quiet) make_tiles(coords_path = Path(args.coords), input_path = Path(args.input), tiles_dir = Path(args.output),
elif args.command == "remove": tile_size = args.tile_size, alpha = args.alpha, variant_names = parse_list_arg(args.variants),
remove_tiles(tiles_dir = Path(args.input), json_path = Path(args.json) if args.json else None, quiet = args.quiet) colormap_names = parse_list_arg(args.colormaps), quantile = args.quantile,
else: num_rows = args.num_rows, skip_iters = args.skip_iters, json_path = Path(args.json) if args.json else None)
case "rmtiles":
remove_tiles(tiles_dir = Path(args.input), json_path = Path(args.json) if args.json else None)
case _:
raise ValueError("invalid command") raise ValueError("invalid command")
except ValueError as e: except ValueError as e:
print(f"error: {e}", file = sys.stderr) print(f"error: {e}", file = stderr)
sys.exit(1) exit(1)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

41
poetry.lock generated
View File

@ -264,45 +264,6 @@ files = [
{file = "packaging-24.0.tar.gz", hash = "sha256:eb82c5e3e56209074766e6885bb04b8c38a0c015d0a30036ebe7ece34c9989e9"}, {file = "packaging-24.0.tar.gz", hash = "sha256:eb82c5e3e56209074766e6885bb04b8c38a0c015d0a30036ebe7ece34c9989e9"},
] ]
[[package]]
name = "polars-lts-cpu"
version = "0.20.17"
description = "Blazingly fast DataFrame library"
optional = false
python-versions = ">=3.8"
files = [
{file = "polars_lts_cpu-0.20.17-cp38-abi3-macosx_10_12_x86_64.whl", hash = "sha256:c5ba1113df88bd0e46bc2e649279f1e2f09f20d24a7e3a8b07d342d1e117bf40"},
{file = "polars_lts_cpu-0.20.17-cp38-abi3-macosx_11_0_arm64.whl", hash = "sha256:514e833c63d2734d9028ca754fe441479cb8d68d06efe9f88fdb348db9578941"},
{file = "polars_lts_cpu-0.20.17-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3512862da0bcb764ed5e63bb122d265295d503e5294c839d5f46f88937543cc1"},
{file = "polars_lts_cpu-0.20.17-cp38-abi3-manylinux_2_24_aarch64.whl", hash = "sha256:2a30789e25a07e0c925e6fde030d2ee53024ae621a0194c423ff83f359d5f62c"},
{file = "polars_lts_cpu-0.20.17-cp38-abi3-win_amd64.whl", hash = "sha256:b5a3487d481517525d7c9b9c69210f123c2d1f233c47487fa058646c2dc3d42c"},
{file = "polars_lts_cpu-0.20.17.tar.gz", hash = "sha256:e11eb08f9264459339af4942c4be9c187daf2ffe4040d24284582e4e0e492ab7"},
]
[package.extras]
adbc = ["adbc-driver-manager", "adbc-driver-sqlite"]
all = ["polars[adbc,async,cloudpickle,connectorx,deltalake,fastexcel,fsspec,gevent,numpy,pandas,plot,pyarrow,pydantic,pyiceberg,sqlalchemy,timezone,xlsx2csv,xlsxwriter]"]
async = ["nest-asyncio"]
cloudpickle = ["cloudpickle"]
connectorx = ["connectorx (>=0.3.2)"]
deltalake = ["deltalake (>=0.14.0)"]
fastexcel = ["fastexcel (>=0.9)"]
fsspec = ["fsspec"]
gevent = ["gevent"]
matplotlib = ["matplotlib"]
numpy = ["numpy (>=1.16.0)"]
openpyxl = ["openpyxl (>=3.0.0)"]
pandas = ["pandas", "pyarrow (>=7.0.0)"]
plot = ["hvplot (>=0.9.1)"]
pyarrow = ["pyarrow (>=7.0.0)"]
pydantic = ["pydantic"]
pyiceberg = ["pyiceberg (>=0.5.0)"]
pyxlsb = ["pyxlsb (>=1.0)"]
sqlalchemy = ["pandas", "sqlalchemy"]
timezone = ["backports-zoneinfo", "tzdata"]
xlsx2csv = ["xlsx2csv (>=0.8.0)"]
xlsxwriter = ["xlsxwriter"]
[[package]] [[package]]
name = "pydantic" name = "pydantic"
version = "2.6.4" version = "2.6.4"
@ -580,4 +541,4 @@ files = [
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = "^3.11" python-versions = "^3.11"
content-hash = "882810214ec005c8e1d0b99099d0f9fc8d6e8fb9140ac9f452e18e7e3c580176" content-hash = "3ca6841a3434879d43d536188bf827e8a74f959cbac3da3d272dc1cc47769620"

View File

@ -10,7 +10,6 @@ python = "^3.11"
pypng = "^0.20220715.0" pypng = "^0.20220715.0"
numpy = "^1.26.4" numpy = "^1.26.4"
numpy-hilbert-curve = "^1.0.1" numpy-hilbert-curve = "^1.0.1"
polars-lts-cpu = "^0.20.17"
cmap = "^0.1.3" cmap = "^0.1.3"
fastapi = "^0.110.1" fastapi = "^0.110.1"
uvicorn = "^0.29.0" uvicorn = "^0.29.0"

13
zmap.sh
View File

@ -7,6 +7,7 @@ REMOTE_HOST=localhost
REMOTE_KEY=~/.ssh/id_rsa REMOTE_KEY=~/.ssh/id_rsa
REMOTE_DATA_PATH=/data REMOTE_DATA_PATH=/data
REMOTE_TILES_PATH=/tiles REMOTE_TILES_PATH=/tiles
REMOTE_COORDS_PATH=/coords.bin
REMOTE_IPMAP_PATH=/scripts/ipmap.py REMOTE_IPMAP_PATH=/scripts/ipmap.py
COLORMAPS=jet,fake_parula,viridis,plasma,thermal,batlow COLORMAPS=jet,fake_parula,viridis,plasma,thermal,batlow
VARIANTS=density,rtt VARIANTS=density,rtt
@ -16,27 +17,27 @@ DAYS_KEPT=14
DATE=$(date -u +"%Y%m%d") DATE=$(date -u +"%Y%m%d")
KEEP_DATE=$(date -d "$DAYS_KEPT days ago" -u +"%Y%m%d") KEEP_DATE=$(date -d "$DAYS_KEPT days ago" -u +"%Y%m%d")
CSV_FILENAME="full-scan.csv" CSV_FILENAME="full-scan.csv"
PARQUET_FILENAME="full-scan.parquet" BIN_FILENAME="full-scan.bin"
JSON_FILENAME="tiles.json" JSON_FILENAME="tiles.json"
CURRENT_LOCAL_DATA_PATH="$LOCAL_DATA_PATH/$DATE" CURRENT_LOCAL_DATA_PATH="$LOCAL_DATA_PATH/$DATE"
LOCAL_CSV_PATH="$CURRENT_LOCAL_DATA_PATH/$CSV_FILENAME" LOCAL_CSV_PATH="$CURRENT_LOCAL_DATA_PATH/$CSV_FILENAME"
REMOTE="$REMOTE_USER@$REMOTE_HOST" REMOTE="$REMOTE_USER@$REMOTE_HOST"
CURRENT_REMOTE_DATA_PATH="$REMOTE_DATA_PATH/$DATE" CURRENT_REMOTE_DATA_PATH="$REMOTE_DATA_PATH/$DATE"
REMOTE_CSV_PATH="$CURRENT_REMOTE_DATA_PATH/$CSV_FILENAME" REMOTE_CSV_PATH="$CURRENT_REMOTE_DATA_PATH/$CSV_FILENAME"
REMOTE_PARQUET_PATH="$CURRENT_REMOTE_DATA_PATH/$PARQUET_FILENAME" REMOTE_BIN_PATH="$CURRENT_REMOTE_DATA_PATH/$BIN_FILENAME"
REMOTE_JSON_PATH="$REMOTE_TILES_PATH/$JSON_FILENAME" REMOTE_JSON_PATH="$REMOTE_TILES_PATH/$JSON_FILENAME"
CURRENT_REMOTE_TILES_PATH="$REMOTE_TILES_PATH/$DATE" CURRENT_REMOTE_TILES_PATH="$REMOTE_TILES_PATH/$DATE"
mkdir -p "$CURRENT_LOCAL_DATA_PATH" && mkdir -p "$CURRENT_LOCAL_DATA_PATH" &&
zmap -B '100M' -M icmp_echo_time '0.0.0.0/0' -f 'saddr,rtt_us,success' -o "$LOCAL_CSV_PATH" && zmap -B '100M' -M icmp_echo_time '0.0.0.0/0' -f 'saddr_raw,rtt_us,success' -o "$LOCAL_CSV_PATH" &&
ssh -i "$REMOTE_KEY" "$REMOTE" "mkdir -p '$CURRENT_REMOTE_DATA_PATH'" && ssh -i "$REMOTE_KEY" "$REMOTE" "mkdir -p '$CURRENT_REMOTE_DATA_PATH'" &&
scp -i "$REMOTE_KEY" "$LOCAL_CSV_PATH" "$REMOTE:$REMOTE_CSV_PATH" && scp -i "$REMOTE_KEY" "$LOCAL_CSV_PATH" "$REMOTE:$REMOTE_CSV_PATH" &&
rm "$LOCAL_CSV_PATH" && rm "$LOCAL_CSV_PATH" &&
ssh -i "$REMOTE_KEY" "$REMOTE" " ssh -i "$REMOTE_KEY" "$REMOTE" "
'$REMOTE_IPMAP_PATH' convert '$REMOTE_CSV_PATH' '$REMOTE_PARQUET_PATH' && '$REMOTE_IPMAP_PATH' convert '$REMOTE_CSV_PATH' '$REMOTE_BIN_PATH' &&
rm '$REMOTE_CSV_PATH' && rm '$REMOTE_CSV_PATH' &&
mkdir -p '$CURRENT_REMOTE_TILES_PATH' && mkdir -p '$CURRENT_REMOTE_TILES_PATH' &&
'$REMOTE_IPMAP_PATH' generate -a -c '$COLORMAPS' -v '$VARIANTS' -j '$REMOTE_JSON_PATH' '$REMOTE_PARQUET_PATH' '$CURRENT_REMOTE_TILES_PATH' '$REMOTE_IPMAP_PATH' mkcoords -a -c '$COLORMAPS' -v '$VARIANTS' -j '$REMOTE_JSON_PATH' '$REMOTE_COORDS_PATH' '$REMOTE_BIN_PATH' '$CURRENT_REMOTE_TILES_PATH'
cd '$REMOTE_DATA_PATH' && for f in *; do (( \$f < $KEEP_DATE )) && rm \"\$f\"; done cd '$REMOTE_DATA_PATH' && for f in *; do (( \$f < $KEEP_DATE )) && rm \"\$f\"; done
cd '$REMOTE_TILES_PATH' && for f in *; do (( \$f < $KEEP_DATE )) && '$REMOTE_IPMAP_PATH' remove -j '$REMOTE_JSON_PATH' \"\$f\"; done cd '$REMOTE_TILES_PATH' && for f in *; do (( \$f < $KEEP_DATE )) && '$REMOTE_IPMAP_PATH' rmtiles -j '$REMOTE_JSON_PATH' \"\$f\"; done
" "