#!/usr/bin/env python3 import sys import math import functools import argparse import json import shutil from pathlib import Path import png import hilbert import numpy as np import polars as pl from cmap import Colormap from multiprocessing import Pool from dataclasses import dataclass from typing import Literal def dedup_preserving_order(vals: list) -> list: seen = set() result = [] for item in vals: if item not in seen: seen.add(item) result.append(item) return result def convert_to_parquet(csv_path: Path, parquet_path: Path, *, quiet = False): if not quiet: print(f"scanning csv '{csv_path}' into parquet '{parquet_path}'...", end = " ", flush = True) lf = pl.scan_csv(csv_path, schema={ "saddr": pl.String, "rtt_us": pl.UInt64, "success": pl.UInt8 }) lf = lf.filter(pl.col("success") == 1) lf = lf.drop("success") lf = lf.with_columns(rtt_us = pl.col("rtt_us").clip(0, 0xFFFFFFFF).cast(pl.UInt32)) lf = lf.with_columns(saddr = pl.col("saddr").str.split_exact(".", 3).struct.rename_fields(["a", "b", "c", "d"])) lf = lf.with_columns(saddr = pl.col("saddr").struct.field("a").cast(pl.UInt32) * 0x1000000 + pl.col("saddr").struct.field("b").cast(pl.UInt32) * 0x10000 + pl.col("saddr").struct.field("c").cast(pl.UInt32) * 0x100 + pl.col("saddr").struct.field("d").cast(pl.UInt32)) lf = lf.unique("saddr") lf = lf.with_columns(coords = pl.col("saddr").map_batches(functools.partial(hilbert.decode, num_dims = 2, num_bits = 16), pl.Array(pl.UInt16, 2), is_elementwise = True)) lf = lf.with_columns(x = pl.col("coords").arr.get(0), y = pl.col("coords").arr.get(1)) lf = lf.drop("coords") lf.sink_parquet(parquet_path) if not quiet: print("done") def write_tile(path: Path, rows: np.ndarray, *, alpha = False): path.parent.mkdir(exist_ok = True, parents = True) png.Writer(rows.shape[1], rows.shape[0], greyscale = False, alpha = alpha).write_packed(path.open("wb"), rows) default_tile_size = 256 default_colormaps = ["viridis"] default_variants = ["density", "rtt"] default_processes = 16 def generate_tiles(parquet_path: Path, tiles_dir: Path, *, tile_size = default_tile_size, alpha = False, variants: list[str] = default_variants, colormaps: list[str] = default_colormaps, processes = default_processes, num_rows: int | None = None, skip_iters: int | None = None, json_path: Path | None = None, quiet = False): if tile_size < 1 or tile_size > 0x10000 or tile_size & (tile_size - 1) != 0: raise ValueError(f"tile size must be a power of 2 between 1 and {0x10000}") if len(variants) == 0: raise ValueError("must specify at least one variant") if len(colormaps) == 0: raise ValueError("must specify at least one colormap") colormaps = dedup_preserving_order(colormaps) channels = 4 if alpha else 3 colormaps_by_name = { colormap: [bytes(c) for c in (Colormap(colormap).lut()[:,0:channels] * (256.0 - np.finfo(np.float32).eps)).astype(np.uint8)] for colormap in colormaps } generate_density = False generate_rtt = False for variant in variants: if variant == "density": generate_density = True elif variant == "rtt": generate_rtt = True else: raise ValueError(f"unknown variant '{variant}'") if json_path is not None: if json_path.is_dir(): raise ValueError("json path must not be a directory") try: tiles_dir_parts = tiles_dir.relative_to(json_path.parent).parts except ValueError: raise ValueError("tiles path must be relative to the json path") else: tiles_dir_parts = None if not quiet: print(f"reading parquet '{parquet_path}'...", end = " ", flush = True) df = pl.read_parquet(parquet_path, columns = ["x", "y", "rtt_us"], n_rows=num_rows).with_columns(count = pl.lit(1, pl.UInt32)) if not quiet: print("done") tiles_per_side = int(math.sqrt(0x100000000)) // tile_size rtt_div: float = df.get_column("rtt_us").std() / 4 possible_overlaps = 1 write_tile_p = functools.partial(write_tile, alpha = alpha) def generate_images(colormap: str, type_name: str, col_name: str, divisor: int | float): nonlocal df if not quiet: print(f"creating {type_name} image data with {colormap} colormap...", end = " ", flush = True) image_data = np.zeros((tiles_per_side * tile_size, tiles_per_side * tile_size), dtype = f"S{channels}") image_data[(df.get_column("y"), df.get_column("x"))] = (df.get_column(col_name) / divisor * 255.9999).clip(0, 255).cast(pl.UInt8).replace(pl.int_range(256), colormaps_by_name[colormap], return_dtype = pl.Binary) if not quiet: print("done") if not quiet: print(f"writing {tiles_per_side}x{tiles_per_side}={tiles_per_side * tiles_per_side} {type_name} images with {colormap} colormap...", end = " ", flush = True) with Pool(processes) as pool: z = tiles_per_side.bit_length() - 1 z_path = tiles_dir / type_name / colormap / f"{z}" z_path.mkdir(exist_ok = True, parents = True) pool.starmap(write_tile_p, [ (z_path / f"{y}" / f"{x}.png", image_data[ y * tile_size : y * tile_size + tile_size, x * tile_size : x * tile_size + tile_size, ]) for y in range(tiles_per_side) for x in range(tiles_per_side) ]) if not quiet: print("done") del image_data def scale_down_coords(scale = 2): nonlocal df nonlocal tiles_per_side nonlocal possible_overlaps prev_tiles_per_side = tiles_per_side tiles_per_side //= scale possible_overlaps *= scale * scale if not quiet: print(f"scaling {len(df)} coords down from {prev_tiles_per_side}x{prev_tiles_per_side} tiles to {tiles_per_side}x{tiles_per_side} tiles...", end = " ", flush = True) df = df.with_columns(x = pl.col("x") // scale, y = pl.col("y") // scale).group_by(["x", "y"]).agg(count = pl.sum("count"), rtt_us = pl.median("rtt_us")) if not quiet: print(f"done with {len(df)} coords remaining") if skip_iters and skip_iters > 0: remaining_iters = tiles_per_side.bit_length() - skip_iters if remaining_iters <= 0: if not quiet: print("skipping all iters") return scale_down_coords(1 << skip_iters) while True: for colormap in colormaps: if generate_density: generate_images(colormap, "density", "count", 256 if possible_overlaps == 1 else possible_overlaps) if generate_rtt: generate_images(colormap, "rtt", "rtt_us", rtt_div) if tiles_per_side == 1: break scale_down_coords() if json_path is not None and tiles_dir_parts is not None: try: text = json_path.read_text(encoding = "UTF-8") except: if not quiet: print("json file not found at provided path, so it will be created instead") tile_metadata = {} else: try: tile_metadata: dict = json.loads(text) except: if not quiet: print("invalid json found at provided path, so re-creating file") tile_metadata = {} tile_metadata_cur = tile_metadata for part in tiles_dir_parts: if not part in tile_metadata_cur: tile_metadata_cur[part] = {} tile_metadata_cur = tile_metadata_cur[part] for variant in variants: if not variant in tile_metadata_cur: tile_metadata_cur[variant] = colormaps else: tile_metadata_cur[variant] = dedup_preserving_order(tile_metadata_cur[variant] + colormaps) if not quiet: print(f"writing metadata to json file at '{json_path}'...", end = " ", flush = True) json_path.write_text(json.dumps(tile_metadata, indent=2), encoding = "UTF-8") if not quiet: print("done") def remove_tiles(tiles_dir: Path, *, json_path: Path | None = None, quiet = False): if not tiles_dir.is_dir(): raise ValueError(f"'{tiles_dir}' is not an existing directory") if json_path: if json_path.is_dir(): raise ValueError("json path must not be a directory") try: *tiles_dir_parts, tiles_dir_final = tiles_dir.relative_to(json_path.parent).parts except ValueError: raise ValueError("tiles path must be relative to but not containing the json path") try: text = json_path.read_text(encoding = "UTF-8") except: raise ValueError("json file not found at provided path") try: tile_metadata = json.loads(text) except: raise ValueError("invalid json found at provided path") tile_metadata_cur = tile_metadata try: for part in tiles_dir_parts: tile_metadata_cur = tile_metadata_cur[part] if isinstance(tile_metadata_cur, list): tile_metadata_cur = tile_metadata_cur.remove(tiles_dir_final) else: del tile_metadata_cur[tiles_dir_final] except: raise ValueError(f"unable to find path '{'/'.join([*tiles_dir_parts, tiles_dir_final])}' within json file") if not quiet: print(f"writing metadata to json file at '{json_path}'...", end = " ", flush = True) json_path.write_text(json.dumps(tile_metadata, indent=2), encoding = "UTF-8") if not quiet: print("done") if not quiet: print(f"removing files from '{tiles_dir}'...", end = " ", flush = True) shutil.rmtree(tiles_dir) if not quiet: print("done") @dataclass class IpMapArgs: command: Literal["convert", "generate", "remove"] quiet: bool input: str output: str tile_size: int alpha: bool colormaps: str variants: str processes: int num_rows: int | None skip_iters: int | None json: str | None def parse_list_arg(arg: str): return [x.strip().lower() for x in arg.split(",") if x.strip()] def main(): parser = argparse.ArgumentParser("ipmap") parser.add_argument("-q", "--quiet", action = "store_true", help = "decrease output verbosity") subparsers = parser.add_subparsers(dest = "command", required = True, help = "the command to run") convert_parser = subparsers.add_parser("convert", help = "convert scan data from csv to parquet format") convert_parser.add_argument("input", help = "the input path of the csv file to read the scan data from") convert_parser.add_argument("output", help = "the output path of the parquet file to save the converted scan data to") generate_parser = subparsers.add_parser("generate", help = "generate tile images from scan data in parquet format") generate_parser.add_argument("-t", "--tile-size", default = default_tile_size, type = int, help = "the tile size to use (default: %(default)s)") generate_parser.add_argument("-a", "--alpha", action = "store_true", help = "use alpha channel instead of black") generate_parser.add_argument("-v", "--variants", default = ",".join(default_variants), help = "a comma separated list of variants to generate (default: %(default)s)") generate_parser.add_argument("-c", "--colormaps", default = ",".join(default_colormaps), help = "a comma separated list of colormaps to generate (default: %(default)s)") generate_parser.add_argument("-p", "--processes", default = default_processes, type = int, help = "how many processes to spawn for saving images (default: %(default)s)") generate_parser.add_argument("-n", "--num-rows", type = int, help = "how many rows to read from the scan data (default: all)") generate_parser.add_argument("-s", "--skip-iters", type = int, help = "how many iterations to skip generating images for (default: none)") generate_parser.add_argument("-j", "--json", help = "the path for the json file to store metadata about the tile images (default: none)") generate_parser.add_argument("input", help = "the input path of the parquet file to read the scan data from") generate_parser.add_argument("output", help = "the output path to save the generated tile images to") remove_parser = subparsers.add_parser("remove", help = "remove tile images") remove_parser.add_argument("-j", "--json", help = "the path for the json file to store metadata about the tile images (default: none)") remove_parser.add_argument("input", help = "the path containing tile images to remove") args = parser.parse_args(namespace = IpMapArgs) try: if args.command == "convert": convert_to_parquet(csv_path = Path(args.input), parquet_path = Path(args.output), quiet = args.quiet) elif args.command == "generate": generate_tiles(parquet_path = Path(args.input), tiles_dir = Path(args.output), tile_size = args.tile_size, alpha = args.alpha, variants = parse_list_arg(args.variants), colormaps = parse_list_arg(args.colormaps), processes = args.processes, num_rows = args.num_rows, skip_iters = args.skip_iters, json_path = Path(args.json) if args.json else None, quiet = args.quiet) elif args.command == "remove": remove_tiles(tiles_dir = Path(args.input), json_path = Path(args.json) if args.json else None, quiet = args.quiet) else: raise ValueError("invalid command") except ValueError as e: print(f"error: {e}", file = sys.stderr) sys.exit(1) if __name__ == "__main__": main()