Big refactor from polars to numpy

This commit is contained in:
LilyRose2798 2024-04-15 22:21:42 +10:00
parent d0b3d05067
commit 3f61d7fac6
4 changed files with 347 additions and 207 deletions

1
.gitignore vendored
View file

@ -1,2 +1,3 @@
/data
/public/assets/tiles
**/*.bin

View file

@ -1,13 +1,10 @@
#!/usr/bin/env python3
import re
import argparse
from argparse import ArgumentParser
from dataclasses import dataclass
import dns.exception
import dns.rdatatype
import dns.resolver
import dns.reversename
import uvicorn
from dns.resolver import resolve_address, NXDOMAIN, NoAnswer, LifetimeTimeout, NoNameservers
from dns.exception import SyntaxError
from uvicorn import run
from fastapi import FastAPI, HTTPException, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from slowapi.errors import RateLimitExceeded
@ -26,22 +23,20 @@ app.add_middleware(
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
ip_regex = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$")
limit = "1/second"
@app.get("/api/rdns/{ip}")
@limiter.limit(lambda: limit)
async def get_rdns(ip: str, request: Request, response: Response):
try:
answer = dns.resolver.resolve_address(ip, search = True)
except dns.exception.SyntaxError:
answer = resolve_address(ip, search = True)
except SyntaxError:
raise HTTPException(status_code=400, detail="Invalid IP address")
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
except (NXDOMAIN, NoAnswer):
raise HTTPException(status_code=404, detail="No rDNS information found for IP")
except dns.resolver.LifetimeTimeout:
except LifetimeTimeout:
raise HTTPException(status_code=504, detail="Request for rDNS information timed out")
except dns.resolver.NoNameservers:
except NoNameservers:
raise HTTPException(status_code=503, detail="No nameservers currently available to fulfil request")
except:
raise HTTPException(status_code=500, detail="Unexpected error occurred")
@ -59,13 +54,13 @@ def parse_list_arg(arg: str):
def main():
global limit
parser = argparse.ArgumentParser("ipmap")
parser = ArgumentParser("ipmap")
parser.add_argument("-a", "--address", default = "127.0.0.1", help = "the address to use for the api (default: %(default)s)")
parser.add_argument("-p", "--port", default = 8000, type = int, help = "the port to use for the api (default: %(default)s)")
parser.add_argument("-l", "--limit", default = limit, help = "the rate limit for the api (default: %(default)s)")
args = parser.parse_args(namespace = IpApiArgs)
limit = args.limit
uvicorn.run(app, host=args.address, port=args.port)
run(app, host=args.address, port=args.port)
if __name__ == "__main__":
main()

513
ipmap.py
View file

@ -1,98 +1,128 @@
#!/usr/bin/env python3
import sys
import math
import functools
import argparse
import json
import shutil
from os import devnull
from sys import stdout, stderr, exit
from contextlib import redirect_stdout
from argparse import ArgumentParser
from warnings import catch_warnings
from multiprocessing import Pool
from shutil import rmtree
from gc import collect
from json import loads, dumps
from pathlib import Path
import png
import hilbert
from dataclasses import dataclass
from typing import Literal, TypeVar
from png import Writer
from cmap import Colormap
from hilbert import decode
import numpy as np
import polars as pl
from cmap import Colormap
from multiprocessing import Pool
from dataclasses import dataclass
from typing import Literal
def dedup_preserving_order(vals: list) -> list:
seen = set()
result = []
for item in vals:
if item not in seen:
seen.add(item)
result.append(item)
return result
ip_bytes = 4
ip_bits = ip_bytes * 8
num_ips = 1 << ip_bits
num_ips_sqrt = 1 << ip_bits // 2
def convert_to_parquet(csv_path: Path, parquet_path: Path, *, quiet = False):
if not quiet:
print(f"scanning csv '{csv_path}' into parquet '{parquet_path}'...", end = " ", flush = True)
lf = pl.scan_csv(csv_path, schema={
"saddr": pl.String,
def make_coord_range(start: int, end: int):
return decode(np.arange(start, end, dtype = np.uint32), num_dims = 2, num_bits = 16).astype(np.uint16)
default_batches = 64
default_processes = 4
def make_coords(output_path: Path, batches = default_batches, processes = default_processes):
if not 1 <= batches <= 0x10000:
raise ValueError(f"batches must be between 1 and {0x10000}")
if not 1 <= processes <= 256:
raise ValueError(f"processes must be between 1 and 256")
ips_per_batch, leftover_batch_ips = divmod(num_ips, batches)
if leftover_batch_ips > 0:
raise ValueError("the total number of ips must evenly divide into the number of batches")
ips_per_process, leftover_process_ips = divmod(ips_per_batch, processes)
if leftover_process_ips > 0:
raise ValueError("the number of ips within each batch must evenly divide into the number of processes")
if output_path.is_dir():
raise ValueError("output path must not be a directory")
output_path.write_bytes(b'')
with Pool(processes) as p:
for batch in range(batches):
print(f"starting batch {batch}...")
arrs = p.starmap(make_coord_range, ((offset * ips_per_process, offset * ips_per_process + ips_per_process)
for offset in range(batch * processes, batch * processes + processes)))
print(f"finished batch, writing arrays to file...")
with output_path.open("ab") as f:
for arr in arrs:
f.write(arr.tobytes())
print(f"finished writing to file")
def convert(input_path: Path, output_path: Path):
print(f"scanning csv '{input_path}' into array...", end = " ", flush = True)
lf = pl.scan_csv(input_path, schema = {
"saddr_raw": pl.UInt32,
"rtt_us": pl.UInt64,
"success": pl.UInt8
})
lf = lf.filter(pl.col("success") == 1)
lf = lf.drop("success")
lf = lf.with_columns(rtt_us = pl.col("rtt_us").clip(0, 0xFFFFFFFF).cast(pl.UInt32))
lf = lf.with_columns(saddr = pl.col("saddr").str.split_exact(".", 3).struct.rename_fields(["a", "b", "c", "d"]))
lf = lf.with_columns(saddr = pl.col("saddr").struct.field("a").cast(pl.UInt32) * 0x1000000 + pl.col("saddr").struct.field("b").cast(pl.UInt32) * 0x10000 + pl.col("saddr").struct.field("c").cast(pl.UInt32) * 0x100 + pl.col("saddr").struct.field("d").cast(pl.UInt32))
lf = lf.unique("saddr")
lf = lf.with_columns(coords = pl.col("saddr").map_batches(functools.partial(hilbert.decode, num_dims = 2, num_bits = 16), pl.Array(pl.UInt16, 2), is_elementwise = True))
lf = lf.with_columns(x = pl.col("coords").arr.get(0), y = pl.col("coords").arr.get(1))
lf = lf.drop("coords")
lf.sink_parquet(parquet_path)
if not quiet:
print("done")
lf = lf.unique("saddr_raw")
arr = lf.collect().to_numpy()
print("done")
print("converting IP addresses from big-endian to little-endian...", end = " ", flush = True)
arr[:, 0].byteswap(inplace = True)
print("done")
print(f"writing array to '{output_path}'")
output_path.write_bytes(arr.tobytes())
print("done")
def write_tile(path: Path, rows: np.ndarray, *, alpha = False):
path.parent.mkdir(exist_ok = True, parents = True)
png.Writer(rows.shape[1], rows.shape[0], greyscale = False, alpha = alpha).write_packed(path.open("wb"), rows)
default_tile_size = 256
default_variants = ["density", "rtt"]
default_colormaps = ["viridis"]
default_tile_size = 1 << ip_bits // 4
default_variant_names = ["density", "rtt"]
default_colormap_names = ["viridis"]
default_quantile = 0.995
default_processes = 16
def generate_tiles(parquet_path: Path, tiles_dir: Path, *, tile_size = default_tile_size, alpha = False,
variants: list[str] = default_variants, colormaps: list[str] = default_colormaps,
quantile = default_quantile, processes = default_processes, num_rows: int | None = None,
skip_iters: int | None = None, json_path: Path | None = None, quiet = False):
def make_tiles(coords_path: Path, input_path: Path, tiles_dir: Path, *,
tile_size = default_tile_size, alpha = False,
variant_names: list[str] = default_variant_names,
colormap_names: list[str] = default_colormap_names,
quantile = default_quantile, num_rows: int | None = None,
skip_iters: int | None = None, json_path: Path | None = None):
if not 1 <= tile_size <= 0x10000 or tile_size & (tile_size - 1) != 0:
raise ValueError(f"tile size must be a power of 2 between 1 and {0x10000}")
tiles_per_side = int(math.sqrt(0x100000000)) // tile_size
if len(variants) == 0:
if not 64 <= tile_size <= num_ips_sqrt or tile_size & (tile_size - 1) != 0:
raise ValueError(f"tile size must be a power of 2 between 64 and {num_ips_sqrt}")
if len(variant_names) == 0:
raise ValueError("must specify at least one variant")
if len(colormaps) == 0:
if len(colormap_names) == 0:
raise ValueError("must specify at least one colormap")
if not 0 <= quantile <= 1:
raise ValueError(f"quantile must be between 0 and 1")
colormaps = dedup_preserving_order(colormaps)
channels = 4 if alpha else 3
colormaps_by_name = { colormap: [bytes(c) for c in (Colormap(colormap).lut()[:,0:channels] * (256.0 - np.finfo(np.float32).eps)).astype(np.uint8)] for colormap in colormaps }
T = TypeVar("T")
def dedup_preserving_order(vals: list[T]) -> list[T]:
seen = set()
result = []
for item in vals:
if item not in seen:
seen.add(item)
result.append(item)
return result
generate_density = False
generate_rtt = False
for variant in variants:
if variant == "density":
generate_density = True
elif variant == "rtt":
generate_rtt = True
colormaps = [(colormap_name, Colormap(colormap_name)) for colormap_name in dedup_preserving_order(colormap_names)]
channels = 4 if alpha else 3
empty_color = np.zeros(channels, dtype = np.uint8)
should_generate_density = False
should_generate_rtt = False
for variant_name in variant_names:
if variant_name == "density":
should_generate_density = True
elif variant_name == "rtt":
should_generate_rtt = True
else:
raise ValueError(f"unknown variant '{variant}'")
raise ValueError(f"unknown variant '{variant_name}'")
if skip_iters is not None:
if skip_iters <= 0:
raise ValueError("cannot skip negative iterations")
elif skip_iters >= tiles_per_side.bit_length():
raise ValueError("cannot skip all iterations")
if not 0 <= skip_iters < (num_ips_sqrt // tile_size).bit_length():
raise ValueError("must skip zero or more but not all iterations")
if json_path is not None:
if json_path.is_dir():
@ -104,103 +134,211 @@ def generate_tiles(parquet_path: Path, tiles_dir: Path, *, tile_size = default_t
else:
tiles_dir_parts = None
if not quiet:
print(f"reading parquet '{parquet_path}'...", end = " ", flush = True)
df = pl.read_parquet(parquet_path, columns = ["x", "y", "rtt_us"], n_rows=num_rows)
if not quiet:
def create_images(data: np.ndarray, colormap: Colormap, num_colors: int, path: Path):
print(f"creating {num_colors} color stop(s) of {colormap.name} colormap...", end = " ", flush = True)
colors = np.concatenate(([empty_color], ((colormap([0.0]) if num_colors == 1 else colormap.lut(num_colors))[:, 0:channels] * 255).astype(np.uint8)))
print("done")
possible_overlaps = 1
rtt_quantile = df.get_column("rtt_us").quantile(quantile) or 1.0
df = df.with_columns(count = pl.lit(possible_overlaps, pl.UInt32), rtt_us = pl.col("rtt_us").clip(0, rtt_quantile))
write_tile_p = functools.partial(write_tile, alpha = alpha)
def generate_images(colormap: str, type_name: str, expr: pl.Expr):
image_size = tiles_per_side * tile_size
if not quiet:
print(f"creating {image_size} by {image_size} pixel {type_name} image data with {colormap} colormap...", end = " ", flush = True)
image_data = np.zeros((image_size, image_size), dtype = f"S{channels}")
image_data[(df.get_column("y"), df.get_column("x"))] = df.select((expr * 255.9999).clip(0, 255).cast(pl.UInt8).replace(pl.int_range(256), colormaps_by_name[colormap], return_dtype = pl.Binary)).to_series()
if not quiet:
print("done")
if not quiet:
print(f"writing {tiles_per_side * tiles_per_side} ({tiles_per_side}x{tiles_per_side}) {type_name} images with {colormap} colormap...", end = " ", flush = True)
with Pool(processes) as pool:
z = tiles_per_side.bit_length() - 1
z_path = tiles_dir / type_name / colormap / f"{z}"
z_path.mkdir(exist_ok = True, parents = True)
pool.starmap(write_tile_p, [
(z_path / f"{y}" / f"{x}.png", image_data[
print(f"creating {data.shape[1]}x{data.shape[0]} pixel image for {colormap.name} colormap...", end = " ", flush = True)
image_data = colors[data]
print("done")
tiles_per_side = image_data.shape[0] // tile_size
z = tiles_per_side.bit_length() - 1
z_path = path / f"{z}"
z_path.mkdir(exist_ok = True, parents = True)
print(f"writing {tiles_per_side * tiles_per_side} ({tiles_per_side}x{tiles_per_side}) images to '{path}'...", end = " ", flush = True)
for y in range(tiles_per_side):
y_path = z_path / f"{y}"
y_path.mkdir(exist_ok = True)
for x in range(tiles_per_side):
path = y_path / f"{x}.png"
rows = image_data[
y * tile_size : y * tile_size + tile_size,
x * tile_size : x * tile_size + tile_size,
])
for y in range(tiles_per_side)
for x in range(tiles_per_side)
])
if not quiet:
]
Writer(tile_size, tile_size, greyscale = False, alpha = alpha).write_packed(path.open("wb"), rows)
print("done")
print(f"reading scan data from file '{input_path}'...", end = " ", flush = True)
data = np.fromfile(input_path, dtype = np.uint32).reshape(-1, 2)
ip_arr = np.copy(data.T[0])
rtt_arr = np.copy(data.T[1])
print("done")
del data
collect()
print(f"reading coordinates from file '{coords_path}'...", end = " ", flush = True)
ip_coords = np.fromfile(coords_path, dtype = np.uint16).reshape(-1, 2)
print("done")
print(f"converting ip addresses to coordinates...", end = " ", flush = True)
xs, ys = ip_coords[ip_arr].T
coords = (ys, xs)
print("done")
del ip_coords
del ip_arr
collect()
def generate_density():
variant_name = "density"
possible_overlaps = 1
print(f"allocating empty {num_ips_sqrt}x{num_ips_sqrt} array of density data...", end = " ", flush = True)
density_data = np.zeros((num_ips_sqrt, num_ips_sqrt), dtype = np.uint32)
print("done")
print(f"assigning values to density data array...", end = " ", flush = True)
density_data[coords] = possible_overlaps
print("done")
def squish():
nonlocal density_data
nonlocal possible_overlaps
density_data = np.swapaxes(density_data.reshape(density_data.shape[0] // 2, 2, density_data.shape[1] // 2, 2), 1, 2)
print("calculating density sum...", end = " ", flush = True)
density_data[:, :, 0, 0] += density_data[:, :, 0, 1]
density_data[:, :, 0, 0] += density_data[:, :, 1, 0]
density_data[:, :, 0, 0] += density_data[:, :, 1, 1]
print(f"done (shrunk density data from {density_data.shape[0] * 2}x{density_data.shape[1] * 2} -> {density_data.shape[0]}x{density_data.shape[1]})")
density_data = density_data[:, :, 0, 0]
possible_overlaps *= 4
if skip_iters is not None:
for _ in range(skip_iters):
squish()
def write_all_colormaps():
for colormap_name, colormap in colormaps:
create_images(density_data, colormap, possible_overlaps, tiles_dir / variant_name / colormap_name)
write_all_colormaps()
while density_data.shape[0] > tile_size:
squish()
write_all_colormaps()
def generate_rtt():
nonlocal rtt_arr
variant_name = "rtt"
num_colors = (1 << 16) - 1
multiplier = num_colors - 1
print(f"retrieving {quantile:.1%} quantile for rtt data...", end = " ", flush = True)
rtt_quantile = np.quantile(rtt_arr, quantile)
print("done")
print(f"scaling rtt data using rtt quantile...", end = " ", flush = True)
rtt_arr_f = rtt_arr / rtt_quantile
print("done")
del rtt_arr
collect()
print("clipping rtt data between 0 and 1...", end = " ", flush = True)
rtt_arr_f.clip(0, 1, out = rtt_arr_f)
print("done")
print(f"allocating empty {num_ips_sqrt}x{num_ips_sqrt} array for rtt data...", end = " ", flush = True)
rtt_data = np.full((num_ips_sqrt, num_ips_sqrt), np.nan, dtype = np.float32)
print("done")
print(f"assigning values to rtt data array...", end = " ", flush = True)
rtt_data[coords] = rtt_arr_f
print("done")
del rtt_arr_f
collect()
def squish():
nonlocal rtt_data
print(f"sorting rtt values for median calculation...", end = " ", flush = True)
rtt_data = np.swapaxes(rtt_data.reshape(rtt_data.shape[0] // 2, 2, rtt_data.shape[1] // 2, 2), 1, 2)
rtt_data[np.isnan(rtt_data)] = np.inf # convert NaNs to Inf so comparisons work correctly
mask = np.empty((rtt_data.shape[0], rtt_data.shape[1]), dtype = np.bool_)
np.greater(rtt_data[:, :, 0, 0], rtt_data[:, :, 0, 1], out = mask) # sort first row
rtt_data[mask, 0] = rtt_data[mask, 0, ::-1]
np.greater(rtt_data[:, :, 1, 0], rtt_data[:, :, 1, 1], out = mask) # sort second row
rtt_data[mask, 1] = rtt_data[mask, 1, ::-1]
np.greater(rtt_data[:, :, 0, 0], rtt_data[:, :, 1, 0], out = mask) # sort first column
rtt_data[mask, :, 0] = rtt_data[mask, ::-1, 0]
np.less(rtt_data[:, :, 0, 1], rtt_data[:, :, 1, 1], out = mask) # sort second column in reverse order
rtt_data[mask, :, 1] = rtt_data[mask, ::-1, 1]
np.less(rtt_data[:, :, 1, 0], rtt_data[:, :, 1, 1], out = mask) # sort second row in reverse order
rtt_data[mask, 1] = rtt_data[mask, 1, ::-1]
# rtt_data[:, :, :, 1] = rtt_data[:, :, ::-1, 1] # swap second column (not entirely necessary, just makes indices below nicer)
rtt_data[np.isinf(rtt_data)] = np.nan # restore NaNs
print("done")
def scale_down_coords(scale = 2):
nonlocal df
nonlocal tiles_per_side
nonlocal possible_overlaps
print("calculating median rtt values...", end = " ", flush = True)
mask2 = np.empty((rtt_data.shape[0], rtt_data.shape[1]), dtype = np.bool_) # need second mask for binary ops
np.invert(np.isnan(rtt_data[:, :, 0, 1], out = mask), out = mask) # four nums populated
rtt_data[mask, 0, 0] = rtt_data[mask, 1, 1] # take average of index 1 and 2
rtt_data[mask, 0, 0] += rtt_data[mask, 1, 0]
rtt_data[mask, 0, 0] /= 2
np.logical_and(np.invert(np.isnan(rtt_data[:, :, 1, 0], out = mask), out = mask), np.isnan(rtt_data[:, :, 0, 1], out = mask2), out = mask) # three nums populated
rtt_data[mask, 0, 0] = rtt_data[mask, 1, 1] # take index 1
np.logical_and(np.invert(np.isnan(rtt_data[:, :, 1, 1], out = mask), out = mask), np.isnan(rtt_data[:, :, 1, 0], out = mask2), out = mask) # two nums populated
rtt_data[mask, 0, 0] = rtt_data[mask, 0, 0] # take average of index 0 and 1
rtt_data[mask, 0, 0] += rtt_data[mask, 1, 1]
rtt_data[mask, 0, 0] /= 2
# everything else (1 or 0 nums populated) don't need any modifications
print(f"done (shrunk rtt data from {rtt_data.shape[0] * 2}x{rtt_data.shape[1] * 2} -> {rtt_data.shape[0]}x{rtt_data.shape[1]})")
rtt_data = rtt_data[:, :, 0, 0]
prev_tiles_per_side = tiles_per_side
tiles_per_side //= scale
possible_overlaps *= scale * scale
if skip_iters is not None:
for _ in range(skip_iters):
squish()
if not quiet:
print(f"scaling {len(df)} coords down from {prev_tiles_per_side}x{prev_tiles_per_side} tiles to {tiles_per_side}x{tiles_per_side} tiles...", end = " ", flush = True)
df = df.with_columns(x = pl.col("x") // scale, y = pl.col("y") // scale).group_by(["x", "y"]).agg(count = pl.sum("count"), rtt_us = pl.median("rtt_us"))
if not quiet:
print(f"done with {len(df)} coords remaining")
def write_all_colormaps():
print(f"normalizing rtt data: multiplying...", end = " ", flush = True)
rtt_data_f = rtt_data * multiplier
print(f"incrementing...", end = " ", flush = True)
rtt_data_f += 1
# print(f"replacing NaNs...", end = " ", flush = True)
# rtt_data_f[np.isnan(rtt_data_f)] = 0.0
print(f"converting to ints...", end = " ", flush = True)
with catch_warnings():
rtt_data_i = rtt_data_f.astype(np.uint16)
print("done")
del rtt_data_f
collect()
for colormap_name, colormap in colormaps:
create_images(rtt_data_i, colormap, num_colors, tiles_dir / variant_name / colormap_name)
write_all_colormaps()
while rtt_data.shape[0] > tile_size:
squish()
write_all_colormaps()
if skip_iters is not None and skip_iters > 0:
scale_down_coords(1 << skip_iters)
while True:
for colormap in colormaps:
if generate_density:
generate_images(colormap, "density", pl.col("count") / (256 if possible_overlaps == 1 else possible_overlaps))
if generate_rtt:
generate_images(colormap, "rtt", pl.col("rtt_us") / rtt_quantile)
if tiles_per_side == 1:
break
scale_down_coords()
if should_generate_rtt:
generate_rtt()
else:
del rtt_arr
collect()
if should_generate_density:
generate_density()
del xs
del ys
del coords
collect()
if json_path is not None and tiles_dir_parts is not None:
try:
text = json_path.read_text(encoding = "UTF-8")
except:
if not quiet:
print("json file not found at provided path, so it will be created instead")
print("json file not found at provided path, so it will be created instead")
tile_metadata = {}
else:
try:
tile_metadata: dict = json.loads(text)
tile_metadata: dict = loads(text)
except:
if not quiet:
print("invalid json found at provided path, so re-creating file")
print("invalid json found at provided path, so re-creating file")
tile_metadata = {}
tile_metadata_cur = tile_metadata
for part in tiles_dir_parts:
if not part in tile_metadata_cur:
tile_metadata_cur[part] = {}
tile_metadata_cur = tile_metadata_cur[part]
for variant in variants:
if not variant in tile_metadata_cur:
tile_metadata_cur[variant] = colormaps
for variant_name in variant_names:
if not variant_name in tile_metadata_cur:
tile_metadata_cur[variant_name] = dedup_preserving_order(colormap_names)
else:
tile_metadata_cur[variant] = dedup_preserving_order(tile_metadata_cur[variant] + colormaps)
if not quiet:
print(f"writing metadata to json file at '{json_path}'...", end = " ", flush = True)
json_path.write_text(json.dumps(tile_metadata, indent=2), encoding = "UTF-8")
if not quiet:
print("done")
tile_metadata_cur[variant_name] = dedup_preserving_order(tile_metadata_cur[variant_name] + colormap_names)
print(f"writing metadata to json file at '{json_path}'...", end = " ", flush = True)
json_path.write_text(dumps(tile_metadata, indent=2), encoding = "UTF-8")
print("done")
def remove_tiles(tiles_dir: Path, *, json_path: Path | None = None, quiet = False):
def remove_tiles(tiles_dir: Path, *, json_path: Path | None = None):
if not tiles_dir.is_dir():
raise ValueError(f"'{tiles_dir}' is not an existing directory")
@ -216,7 +354,7 @@ def remove_tiles(tiles_dir: Path, *, json_path: Path | None = None, quiet = Fals
except:
raise ValueError("json file not found at provided path")
try:
tile_metadata = json.loads(text)
tile_metadata = loads(text)
except:
raise ValueError("invalid json found at provided path")
tile_metadata_cur = tile_metadata
@ -229,30 +367,28 @@ def remove_tiles(tiles_dir: Path, *, json_path: Path | None = None, quiet = Fals
del tile_metadata_cur[tiles_dir_final]
except:
raise ValueError(f"unable to find path '{'/'.join([*tiles_dir_parts, tiles_dir_final])}' within json file")
if not quiet:
print(f"writing metadata to json file at '{json_path}'...", end = " ", flush = True)
json_path.write_text(json.dumps(tile_metadata, indent=2), encoding = "UTF-8")
if not quiet:
print("done")
if not quiet:
print(f"removing files from '{tiles_dir}'...", end = " ", flush = True)
shutil.rmtree(tiles_dir)
if not quiet:
print(f"writing metadata to json file at '{json_path}'...", end = " ", flush = True)
json_path.write_text(dumps(tile_metadata, indent=2), encoding = "UTF-8")
print("done")
print(f"removing files from '{tiles_dir}'...", end = " ", flush = True)
rmtree(tiles_dir)
print("done")
@dataclass
class IpMapArgs:
command: Literal["convert", "generate", "remove"]
command: Literal["mkcoords", "convert", "mktiles", "rmtiles"]
quiet: bool
coords: str
input: str
output: str
batches: int
processes: int
tile_size: int
alpha: bool
variants: str
colormaps: str
quantile: float
processes: int
num_rows: int | None
skip_iters: int | None
json: str | None
@ -261,45 +397,52 @@ def parse_list_arg(arg: str):
return [x.strip().lower() for x in arg.split(",") if x.strip()]
def main():
parser = argparse.ArgumentParser("ipmap")
parser = ArgumentParser("ipmap")
parser.add_argument("-q", "--quiet", action = "store_true", help = "decrease output verbosity")
subparsers = parser.add_subparsers(dest = "command", required = True, help = "the command to run")
mkcoords_parser = subparsers.add_parser("mkcoords", help = "generate coordinates corresponding to each IP address")
mkcoords_parser.add_argument("-b", "--batches", default = default_batches, type = int, help = "the number of batches to split the task into (default: %(default)s)")
mkcoords_parser.add_argument("-p", "--processes", default = default_processes, type = int, help = "the number of processes to split the task across (default: %(default)s)")
mkcoords_parser.add_argument("output", help = "the output path to save the generated coordinates to")
convert_parser = subparsers.add_parser("convert", help = "convert scan data from csv to parquet format")
convert_parser.add_argument("input", help = "the input path of the csv file to read the scan data from")
convert_parser.add_argument("output", help = "the output path of the parquet file to save the converted scan data to")
generate_parser = subparsers.add_parser("generate", help = "generate tile images from scan data in parquet format")
generate_parser.add_argument("-t", "--tile-size", default = default_tile_size, type = int, help = "the tile size to use (default: %(default)s)")
generate_parser.add_argument("-a", "--alpha", action = "store_true", help = "use alpha channel instead of black")
generate_parser.add_argument("-v", "--variants", default = ",".join(default_variants), help = "a comma separated list of variants to generate (default: %(default)s)")
generate_parser.add_argument("-c", "--colormaps", default = ",".join(default_colormaps), help = "a comma separated list of colormaps to generate (default: %(default)s)")
generate_parser.add_argument("-q", "--quantile", type = float, default = default_quantile, help = "the quantile to use for scaling data such as rtt (default: %(default)s)")
generate_parser.add_argument("-p", "--processes", default = default_processes, type = int, help = "how many processes to spawn for saving images (default: %(default)s)")
generate_parser.add_argument("-n", "--num-rows", type = int, help = "how many rows to read from the scan data (default: all)")
generate_parser.add_argument("-s", "--skip-iters", type = int, help = "how many iterations to skip generating images for (default: none)")
generate_parser.add_argument("-j", "--json", help = "the path for the json file to store metadata about the tile images (default: none)")
generate_parser.add_argument("input", help = "the input path of the parquet file to read the scan data from")
generate_parser.add_argument("output", help = "the output path to save the generated tile images to")
remove_parser = subparsers.add_parser("remove", help = "remove tile images")
remove_parser.add_argument("-j", "--json", help = "the path for the json file to store metadata about the tile images (default: none)")
remove_parser.add_argument("input", help = "the path containing tile images to remove")
mktiles_parser = subparsers.add_parser("mktiles", help = "generate tile images from scan data in parquet format")
mktiles_parser.add_argument("-t", "--tile-size", default = default_tile_size, type = int, help = "the tile size to use (default: %(default)s)")
mktiles_parser.add_argument("-a", "--alpha", action = "store_true", help = "use alpha channel instead of black")
mktiles_parser.add_argument("-v", "--variants", default = ",".join(default_variant_names), help = "a comma separated list of variants to generate (default: %(default)s)")
mktiles_parser.add_argument("-c", "--colormaps", default = ",".join(default_colormap_names), help = "a comma separated list of colormaps to generate (default: %(default)s)")
mktiles_parser.add_argument("-q", "--quantile", type = float, default = default_quantile, help = "the quantile to use for scaling data such as rtt (default: %(default)s)")
mktiles_parser.add_argument("-n", "--num-rows", type = int, help = "how many rows to read from the scan data (default: all)")
mktiles_parser.add_argument("-s", "--skip-iters", type = int, help = "how many iterations to skip generating images for (default: none)")
mktiles_parser.add_argument("-j", "--json", help = "the path for the json file to store metadata about the tile images (default: none)")
mktiles_parser.add_argument("coords", help = "the path of the binary file containing the coords to map IP addresses to")
mktiles_parser.add_argument("input", help = "the input path of the parquet file to read the scan data from")
mktiles_parser.add_argument("output", help = "the output path to save the generated tile images to")
rmtiles_parser = subparsers.add_parser("rmtiles", help = "remove tile images")
rmtiles_parser.add_argument("-j", "--json", help = "the path for the json file to store metadata about the tile images (default: none)")
rmtiles_parser.add_argument("input", help = "the path containing tile images to remove")
args = parser.parse_args(namespace = IpMapArgs)
try:
if args.command == "convert":
convert_to_parquet(csv_path = Path(args.input), parquet_path = Path(args.output), quiet = args.quiet)
elif args.command == "generate":
generate_tiles(parquet_path = Path(args.input), tiles_dir = Path(args.output),
tile_size = args.tile_size, alpha = args.alpha, variants = parse_list_arg(args.variants),
colormaps = parse_list_arg(args.colormaps), quantile = args.quantile,
processes = args.processes, num_rows = args.num_rows, skip_iters = args.skip_iters,
json_path = Path(args.json) if args.json else None, quiet = args.quiet)
elif args.command == "remove":
remove_tiles(tiles_dir = Path(args.input), json_path = Path(args.json) if args.json else None, quiet = args.quiet)
else:
raise ValueError("invalid command")
with redirect_stdout(open(devnull, "w") if args.quiet else stdout):
match args.command:
case "mkcoords":
make_coords(output_path = Path(args.output), batches = args.batches, processes = args.processes)
case "convert":
convert(input_path = Path(args.input), output_path = Path(args.output))
case "mktiles":
make_tiles(coords_path = Path(args.coords), input_path = Path(args.input), tiles_dir = Path(args.output),
tile_size = args.tile_size, alpha = args.alpha, variant_names = parse_list_arg(args.variants),
colormap_names = parse_list_arg(args.colormaps), quantile = args.quantile,
num_rows = args.num_rows, skip_iters = args.skip_iters, json_path = Path(args.json) if args.json else None)
case "rmtiles":
remove_tiles(tiles_dir = Path(args.input), json_path = Path(args.json) if args.json else None)
case _:
raise ValueError("invalid command")
except ValueError as e:
print(f"error: {e}", file = sys.stderr)
sys.exit(1)
print(f"error: {e}", file = stderr)
exit(1)
if __name__ == "__main__":
main()

13
zmap.sh
View file

@ -7,6 +7,7 @@ REMOTE_HOST=localhost
REMOTE_KEY=~/.ssh/id_rsa
REMOTE_DATA_PATH=/data
REMOTE_TILES_PATH=/tiles
REMOTE_COORDS_PATH=/coords.bin
REMOTE_IPMAP_PATH=/scripts/ipmap.py
COLORMAPS=jet,fake_parula,viridis,plasma,thermal,batlow
VARIANTS=density,rtt
@ -16,27 +17,27 @@ DAYS_KEPT=14
DATE=$(date -u +"%Y%m%d")
KEEP_DATE=$(date -d "$DAYS_KEPT days ago" -u +"%Y%m%d")
CSV_FILENAME="full-scan.csv"
PARQUET_FILENAME="full-scan.parquet"
BIN_FILENAME="full-scan.bin"
JSON_FILENAME="tiles.json"
CURRENT_LOCAL_DATA_PATH="$LOCAL_DATA_PATH/$DATE"
LOCAL_CSV_PATH="$CURRENT_LOCAL_DATA_PATH/$CSV_FILENAME"
REMOTE="$REMOTE_USER@$REMOTE_HOST"
CURRENT_REMOTE_DATA_PATH="$REMOTE_DATA_PATH/$DATE"
REMOTE_CSV_PATH="$CURRENT_REMOTE_DATA_PATH/$CSV_FILENAME"
REMOTE_PARQUET_PATH="$CURRENT_REMOTE_DATA_PATH/$PARQUET_FILENAME"
REMOTE_BIN_PATH="$CURRENT_REMOTE_DATA_PATH/$BIN_FILENAME"
REMOTE_JSON_PATH="$REMOTE_TILES_PATH/$JSON_FILENAME"
CURRENT_REMOTE_TILES_PATH="$REMOTE_TILES_PATH/$DATE"
mkdir -p "$CURRENT_LOCAL_DATA_PATH" &&
zmap -B '100M' -M icmp_echo_time '0.0.0.0/0' -f 'saddr,rtt_us,success' -o "$LOCAL_CSV_PATH" &&
zmap -B '100M' -M icmp_echo_time '0.0.0.0/0' -f 'saddr_raw,rtt_us,success' -o "$LOCAL_CSV_PATH" &&
ssh -i "$REMOTE_KEY" "$REMOTE" "mkdir -p '$CURRENT_REMOTE_DATA_PATH'" &&
scp -i "$REMOTE_KEY" "$LOCAL_CSV_PATH" "$REMOTE:$REMOTE_CSV_PATH" &&
rm "$LOCAL_CSV_PATH" &&
ssh -i "$REMOTE_KEY" "$REMOTE" "
'$REMOTE_IPMAP_PATH' convert '$REMOTE_CSV_PATH' '$REMOTE_PARQUET_PATH' &&
'$REMOTE_IPMAP_PATH' convert '$REMOTE_CSV_PATH' '$REMOTE_BIN_PATH' &&
rm '$REMOTE_CSV_PATH' &&
mkdir -p '$CURRENT_REMOTE_TILES_PATH' &&
'$REMOTE_IPMAP_PATH' generate -a -c '$COLORMAPS' -v '$VARIANTS' -j '$REMOTE_JSON_PATH' '$REMOTE_PARQUET_PATH' '$CURRENT_REMOTE_TILES_PATH'
'$REMOTE_IPMAP_PATH' mkcoords -a -c '$COLORMAPS' -v '$VARIANTS' -j '$REMOTE_JSON_PATH' '$REMOTE_COORDS_PATH' '$REMOTE_BIN_PATH' '$CURRENT_REMOTE_TILES_PATH'
cd '$REMOTE_DATA_PATH' && for f in *; do (( \$f < $KEEP_DATE )) && rm \"\$f\"; done
cd '$REMOTE_TILES_PATH' && for f in *; do (( \$f < $KEEP_DATE )) && '$REMOTE_IPMAP_PATH' remove -j '$REMOTE_JSON_PATH' \"\$f\"; done
cd '$REMOTE_TILES_PATH' && for f in *; do (( \$f < $KEEP_DATE )) && '$REMOTE_IPMAP_PATH' rmtiles -j '$REMOTE_JSON_PATH' \"\$f\"; done
"