477 lines
25 KiB
Python
Executable file
477 lines
25 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
from os import devnull
|
|
from sys import stdout, stderr, exit
|
|
from contextlib import redirect_stdout
|
|
from argparse import ArgumentParser
|
|
from warnings import catch_warnings
|
|
from multiprocessing import Pool
|
|
from shutil import rmtree
|
|
from gc import collect
|
|
from json import loads, dumps
|
|
from pathlib import Path
|
|
from typing import TypeVar
|
|
from cmap import Colormap
|
|
from hilbert import decode
|
|
from zlib import compress, crc32
|
|
from struct import pack
|
|
from numpy.typing import NDArray
|
|
import numpy as np
|
|
|
|
ip_bytes = 4
|
|
ip_bits = ip_bytes * 8
|
|
num_ips = 1 << ip_bits
|
|
num_ips_sqrt = 1 << (ip_bits >> 1)
|
|
|
|
def make_coord_range(start: int, end: int):
|
|
return decode(np.arange(start, end, dtype = np.uint32), num_dims = 2, num_bits = 16).astype(np.uint16)
|
|
|
|
default_batches = 64
|
|
default_processes = 4
|
|
|
|
def make_coords(output_path: Path, batches = default_batches, processes = default_processes):
|
|
if not 1 <= batches <= 0x10000:
|
|
raise ValueError(f"batches must be between 1 and {0x10000}")
|
|
if not 1 <= processes <= 256:
|
|
raise ValueError(f"processes must be between 1 and 256")
|
|
ips_per_batch, leftover_batch_ips = divmod(num_ips, batches)
|
|
if leftover_batch_ips > 0:
|
|
raise ValueError("the total number of ips must evenly divide into the number of batches")
|
|
ips_per_process, leftover_process_ips = divmod(ips_per_batch, processes)
|
|
if leftover_process_ips > 0:
|
|
raise ValueError("the number of ips within each batch must evenly divide into the number of processes")
|
|
if output_path.is_dir():
|
|
raise ValueError("output path must not be a directory")
|
|
|
|
output_path.write_bytes(b'')
|
|
with Pool(processes) as p:
|
|
for batch in range(batches):
|
|
print(f"starting batch {batch}...")
|
|
arrs = p.starmap(make_coord_range, ((offset * ips_per_process, offset * ips_per_process + ips_per_process)
|
|
for offset in range(batch * processes, batch * processes + processes)))
|
|
print(f"finished batch, writing arrays to file...")
|
|
with output_path.open("ab") as f:
|
|
for arr in arrs:
|
|
f.write(arr.tobytes())
|
|
print(f"finished writing to file")
|
|
|
|
def convert(input_path: Path, output_path: Path):
|
|
print(f"reading csv '{input_path}' into array...", end = " ", flush = True)
|
|
arr = np.loadtxt(input_path, dtype = np.uint32, delimiter = ",", skiprows = 1)
|
|
print("done")
|
|
print("filtering out unsuccessful values...", end = " ", flush = True)
|
|
arr = arr[arr[:, -1] == 1]
|
|
print("done")
|
|
print("removing success column...", end = " ", flush = True)
|
|
arr = arr[:, :-1]
|
|
print("done")
|
|
print("removing duplicate IP addresses...", end = " ", flush = True)
|
|
arr = arr[np.unique(arr[:, 0], return_index = True)[1]]
|
|
print("done")
|
|
print("converting IP addresses from big-endian to little-endian...", end = " ", flush = True)
|
|
arr[:, 0].byteswap(inplace = True)
|
|
print("done")
|
|
print(f"writing array to '{output_path}'")
|
|
output_path.write_bytes(arr.tobytes())
|
|
print("done")
|
|
|
|
default_tile_size = 1 << ip_bits // 4
|
|
default_variant_names = ["density", "rtt"]
|
|
default_colormap_names = ["viridis"]
|
|
default_quantile = 0.995
|
|
default_compression_level = -1
|
|
|
|
def make_tiles(coords_path: Path, input_path: Path, tiles_dir: Path, *,
|
|
tile_size = default_tile_size, alpha = False, negative_zoom = False,
|
|
variant_names: list[str] = default_variant_names, colormap_names: list[str] = default_colormap_names,
|
|
raws_path: Path | None = None, quantile = default_quantile, num_rows: int | None = None,
|
|
skip_iters: int | None = None, compression_level : int = default_compression_level,
|
|
json_path: Path | None = None):
|
|
|
|
if not 64 <= tile_size <= num_ips_sqrt or tile_size & (tile_size - 1) != 0:
|
|
raise ValueError(f"tile size must be a power of 2 between 64 and {num_ips_sqrt}")
|
|
if len(variant_names) == 0:
|
|
raise ValueError("must specify at least one variant")
|
|
if len(colormap_names) == 0 and raws_path is None:
|
|
raise ValueError("must specify at least one colormap or a path to save raws to")
|
|
if not 0 <= quantile <= 1:
|
|
raise ValueError(f"quantile must be between 0 and 1")
|
|
if not -1 <= compression_level <= 9:
|
|
raise ValueError("compression level must be between 0-9 or -1 for automatic")
|
|
|
|
T = TypeVar("T")
|
|
def dedup_preserving_order(vals: list[T]) -> list[T]:
|
|
seen = set()
|
|
result = []
|
|
for item in vals:
|
|
if item not in seen:
|
|
seen.add(item)
|
|
result.append(item)
|
|
return result
|
|
|
|
colormaps = [(colormap_name, Colormap(colormap_name)) for colormap_name in dedup_preserving_order(colormap_names)]
|
|
channels = 4 if alpha else 3
|
|
empty_color = np.zeros(channels, dtype = np.uint8)
|
|
num_colors = (1 << 16) - 1
|
|
|
|
final_size = 1 if negative_zoom else tile_size
|
|
|
|
should_generate_density = False
|
|
should_generate_rtt = False
|
|
for variant_name in variant_names:
|
|
if variant_name == "density":
|
|
should_generate_density = True
|
|
elif variant_name == "rtt":
|
|
should_generate_rtt = True
|
|
else:
|
|
raise ValueError(f"unknown variant '{variant_name}'")
|
|
|
|
if skip_iters is not None:
|
|
num_iters = num_ips_sqrt.bit_length() if negative_zoom else (num_ips_sqrt // tile_size).bit_length()
|
|
if not 0 <= skip_iters < num_iters:
|
|
raise ValueError("must skip zero or more but not all iterations")
|
|
|
|
if json_path is not None:
|
|
if json_path.is_dir():
|
|
raise ValueError("json path must not be a directory")
|
|
try:
|
|
tiles_dir_parts = tiles_dir.relative_to(json_path.parent).parts
|
|
except ValueError:
|
|
raise ValueError("tiles path must be relative to the json path")
|
|
else:
|
|
tiles_dir_parts = None
|
|
|
|
def get_chunk(tag: bytes, data = b""):
|
|
return b"".join((pack("!I", len(data)), tag, data, pack("!I", crc32(data, crc32(tag)) & (2 ** 32 - 1))))
|
|
|
|
signature = b"\x89PNG\r\n\x1a\n"
|
|
end_chunk = get_chunk(b"IEND")
|
|
|
|
def create_tiles(path: Path, data: np.ndarray, colors: NDArray[np.uint8] | None = None):
|
|
if data.shape[0] > tile_size:
|
|
tiles_per_side = data.shape[0] // tile_size
|
|
z = tiles_per_side.bit_length() - 1
|
|
z_path = path / f"{z}"
|
|
z_path.mkdir(exist_ok = True, parents = True)
|
|
def tile_generator():
|
|
for y in range(tiles_per_side):
|
|
y_path = z_path / f"{y}"
|
|
y_path.mkdir(exist_ok = True)
|
|
for x in range(tiles_per_side):
|
|
yield (y_path / f"{x}", data[
|
|
y * tile_size : y * tile_size + tile_size,
|
|
x * tile_size : x * tile_size + tile_size,
|
|
])
|
|
else:
|
|
tiles_per_side = 1
|
|
z = -((tile_size // data.shape[0]).bit_length() - 1)
|
|
z_path = path / f"{z}"
|
|
z_path.mkdir(exist_ok = True, parents = True)
|
|
def tile_generator():
|
|
y_path = z_path / "0"
|
|
y_path.mkdir(exist_ok = True)
|
|
yield (y_path / "0", data)
|
|
print(f"writing {tiles_per_side * tiles_per_side} ({tiles_per_side}x{tiles_per_side}) tiles to '{z_path}'...", end = " ", flush = True)
|
|
if colors is None:
|
|
for x_path, tile in tile_generator():
|
|
x_path.with_suffix(".bin").write_bytes(compress(tile.tobytes(), level = compression_level))
|
|
else:
|
|
img_size = tile_size if data.shape[0] > tile_size else data.shape[0]
|
|
ihdr_chunk = get_chunk(b"IHDR", pack("!2I5B", img_size, img_size, 8, 2 if colors.shape[1] == 3 else 6, 0, 0, 0))
|
|
preamble = signature + ihdr_chunk
|
|
for x_path, tile in tile_generator():
|
|
idat_chunk = get_chunk(b"IDAT", compress(np.insert(colors[tile].reshape(img_size, -1), 0, 0, axis = 1).tobytes(), level = compression_level))
|
|
x_path.with_suffix(".png").write_bytes(b"".join((preamble, idat_chunk, end_chunk)))
|
|
print("done")
|
|
|
|
def get_colors(colormap: Colormap, num_colors: int = num_colors):
|
|
return np.concatenate(([empty_color], ((colormap([0.0]) if num_colors == 1 else colormap.lut(num_colors))[:, 0:channels] * 255).astype(np.uint8)))
|
|
|
|
def get_scan_data() -> tuple[NDArray[np.uint32], NDArray[np.uint32]]:
|
|
print(f"reading scan data from file '{input_path}'...", end = " ", flush = True)
|
|
data = np.fromfile(input_path, count = num_rows * 2 if num_rows else -1, dtype = np.uint32).reshape(-1, 2)
|
|
ip_arr = np.copy(data.T[0])
|
|
rtt_arr = np.copy(data.T[1])
|
|
print("done")
|
|
return (ip_arr, rtt_arr)
|
|
|
|
def get_all_data() -> tuple[tuple[NDArray[np.uint16], NDArray[np.uint16]], NDArray[np.uint32]]:
|
|
ip_arr, rtt_arr = get_scan_data()
|
|
print(f"reading coordinates from file '{coords_path}'...", end = " ", flush = True)
|
|
ip_coords = np.fromfile(coords_path, dtype = np.uint16).reshape(-1, 2)
|
|
print("done")
|
|
print(f"converting ip addresses to coordinates...", end = " ", flush = True)
|
|
xs, ys = ip_coords[ip_arr].T
|
|
print("done")
|
|
return ((ys, xs), rtt_arr)
|
|
|
|
coords, rtt_arr = get_all_data()
|
|
|
|
def normalize_data(data: NDArray[np.uint32], max_value: float) -> NDArray[np.uint16]:
|
|
divisor = (max_value - 1) / num_colors
|
|
print("normalizing data: getting non-zero...", end = " ", flush = True)
|
|
non_zero = data != 0
|
|
print("casting to floats...", end = " ", flush = True)
|
|
data_f = data.astype(np.float32)
|
|
print("decrementing...", end = " ", flush = True)
|
|
data_f[non_zero] -= 1
|
|
print("dividing...", end = " ", flush = True)
|
|
data_f /= divisor
|
|
print("incrementing...", end = " ", flush = True)
|
|
data_f[non_zero] += 1
|
|
del non_zero
|
|
collect()
|
|
print("clipping...", end = " ", flush = True)
|
|
data_f.clip(0, num_colors, out = data_f)
|
|
print("casting to ints...", end = " ", flush = True)
|
|
with catch_warnings(action = "ignore"):
|
|
data_norm = data_f.astype(np.uint16)
|
|
print("done")
|
|
return data_norm
|
|
|
|
def generate_density():
|
|
possible_overlaps = 1
|
|
variant_name = "density"
|
|
|
|
print(f"allocating empty {num_ips_sqrt}x{num_ips_sqrt} array of density data...", end = " ", flush = True)
|
|
density_data = np.zeros((num_ips_sqrt, num_ips_sqrt), dtype = np.uint32)
|
|
print("done")
|
|
print(f"assigning values to density data array...", end = " ", flush = True)
|
|
density_data[coords] = possible_overlaps
|
|
print("done")
|
|
|
|
def squish():
|
|
nonlocal density_data
|
|
nonlocal possible_overlaps
|
|
density_data = np.swapaxes(density_data.reshape(density_data.shape[0] >> 1, 2, density_data.shape[1] >> 1, 2), 1, 2)
|
|
print("calculating density sum...", end = " ", flush = True)
|
|
density_data[:, :, 0, 0] += density_data[:, :, 0, 1]
|
|
density_data[:, :, 0, 0] += density_data[:, :, 1, 0]
|
|
density_data[:, :, 0, 0] += density_data[:, :, 1, 1]
|
|
print("done")
|
|
print(f"shrinking density data from {density_data.shape[0] * 2}x{density_data.shape[1] * 2} to {density_data.shape[0]}x{density_data.shape[1]}...", end = " ", flush = True)
|
|
density_data = np.copy(density_data[:, :, 0, 0])
|
|
print("done")
|
|
possible_overlaps *= 4
|
|
|
|
if skip_iters is not None:
|
|
for _ in range(skip_iters):
|
|
squish()
|
|
|
|
def write_all_colormaps():
|
|
if raws_path is not None:
|
|
create_tiles(raws_path / variant_name, density_data.view(np.uint8).reshape(density_data.shape[0], density_data.shape[1], 4))
|
|
# skip normalizing step and just generate color stop for each possible value if not too many possibilities
|
|
if possible_overlaps <= num_colors + 1:
|
|
for colormap_name, colormap in colormaps:
|
|
create_tiles(tiles_dir / variant_name / colormap_name, density_data, get_colors(colormap, possible_overlaps))
|
|
else:
|
|
density_data_norm = normalize_data(density_data, possible_overlaps)
|
|
for colormap_name, colormap in colormaps:
|
|
create_tiles(tiles_dir / variant_name / colormap_name, density_data_norm, get_colors(colormap))
|
|
|
|
write_all_colormaps()
|
|
while density_data.shape[0] > final_size:
|
|
squish()
|
|
write_all_colormaps()
|
|
|
|
def generate_rtt():
|
|
nonlocal rtt_arr
|
|
variant_name = "rtt"
|
|
|
|
print(f"retrieving {quantile:.1%} quantile for rtt data...", end = " ", flush = True)
|
|
rtt_quantile = int(np.quantile(rtt_arr, quantile))
|
|
print("done")
|
|
print("clipping rtt data between 0 and quantile...", end = " ", flush = True)
|
|
rtt_arr.clip(0, rtt_quantile, out = rtt_arr)
|
|
print("done")
|
|
print(f"allocating empty {num_ips_sqrt}x{num_ips_sqrt} array for rtt data...", end = " ", flush = True)
|
|
rtt_data = np.zeros((num_ips_sqrt, num_ips_sqrt), dtype = np.uint32)
|
|
print("done")
|
|
print(f"assigning values to rtt data array...", end = " ", flush = True)
|
|
rtt_data[coords] = rtt_arr
|
|
print("done")
|
|
del rtt_arr
|
|
collect()
|
|
|
|
def squish():
|
|
nonlocal rtt_data
|
|
print("sorting rtt values for median calculation...", end = " ", flush = True)
|
|
rtt_data = np.swapaxes(rtt_data.reshape(rtt_data.shape[0] >> 1, 2, rtt_data.shape[1] >> 1, 2), 1, 2)
|
|
mask = np.empty((rtt_data.shape[0], rtt_data.shape[1]), dtype = np.bool_)
|
|
np.less(rtt_data[:, :, 0, 0], rtt_data[:, :, 0, 1], out = mask) # sort first row
|
|
rtt_data[mask, 0] = rtt_data[mask, 0, ::-1]
|
|
np.less(rtt_data[:, :, 1, 0], rtt_data[:, :, 1, 1], out = mask) # sort second row
|
|
rtt_data[mask, 1] = rtt_data[mask, 1, ::-1]
|
|
np.less(rtt_data[:, :, 0, 1], rtt_data[:, :, 1, 1], out = mask) # sort second column
|
|
rtt_data[mask, :, 1] = rtt_data[mask, ::-1, 1]
|
|
np.greater(rtt_data[:, :, 0, 0], rtt_data[:, :, 1, 0], out = mask) # sort first column in reverse order
|
|
rtt_data[mask, :, 0] = rtt_data[mask, ::-1, 0]
|
|
np.greater(rtt_data[:, :, 0, 0], rtt_data[:, :, 0, 1], out = mask) # sort first row in reverse order
|
|
rtt_data[mask, 0] = rtt_data[mask, 0, ::-1]
|
|
rtt_data[:, :, :, 0] = rtt_data[:, :, ::-1, 0] # swap first column
|
|
print("done")
|
|
print("calculating median rtt values...", end = " ", flush = True)
|
|
mask2 = np.empty((rtt_data.shape[0], rtt_data.shape[1]), dtype = np.bool_) # need second mask for binary ops
|
|
np.not_equal(rtt_data[:, :, 1, 1], 0, out = mask) # four nums populated
|
|
rtt_data[mask, 0, 0] = rtt_data[mask, 0, 1]
|
|
rtt_data[mask, 0, 0] -= rtt_data[mask, 1, 0]
|
|
rtt_data[mask, 0, 0] >>= 1
|
|
rtt_data[mask, 0, 0] += rtt_data[mask, 1, 0] # take average of middle two nums
|
|
np.logical_and(np.not_equal(rtt_data[:, :, 1, 0], 0, out = mask), np.equal(rtt_data[:, :, 1, 1], 0, out = mask2), out = mask) # three nums populated
|
|
rtt_data[mask, 0, 0] = rtt_data[mask, 0, 1] # take middle of three nums
|
|
np.logical_and(np.not_equal(rtt_data[:, :, 0, 1], 0, out = mask), np.equal(rtt_data[:, :, 1, 0], 0, out = mask2), out = mask) # two nums populated
|
|
rtt_data[mask, 0, 0] -= rtt_data[mask, 0, 1]
|
|
rtt_data[mask, 0, 0] >>= 1
|
|
rtt_data[mask, 0, 0] += rtt_data[mask, 0, 1] # take average of first two nums
|
|
# everything else (1 or 0 nums populated) don't need any modifications
|
|
print("done")
|
|
print(f"shrinking rtt data from {rtt_data.shape[0] * 2}x{rtt_data.shape[1] * 2} to {rtt_data.shape[0]}x{rtt_data.shape[1]}...", end = " ", flush = True)
|
|
rtt_data = np.copy(rtt_data[:, :, 0, 0])
|
|
print("done")
|
|
|
|
if skip_iters is not None:
|
|
for _ in range(skip_iters):
|
|
squish()
|
|
|
|
def write_all_colormaps():
|
|
if raws_path is not None:
|
|
create_tiles(raws_path / variant_name, rtt_data.view(np.uint8).reshape(rtt_data.shape[0], rtt_data.shape[1], 4))
|
|
rtt_data_norm = normalize_data(rtt_data, rtt_quantile)
|
|
for colormap_name, colormap in colormaps:
|
|
create_tiles(tiles_dir / variant_name / colormap_name, rtt_data_norm, get_colors(colormap))
|
|
|
|
write_all_colormaps()
|
|
while rtt_data.shape[0] > final_size:
|
|
squish()
|
|
write_all_colormaps()
|
|
|
|
if should_generate_rtt:
|
|
generate_rtt()
|
|
else:
|
|
del rtt_arr
|
|
collect()
|
|
if should_generate_density:
|
|
generate_density()
|
|
|
|
if json_path is not None and tiles_dir_parts is not None:
|
|
try:
|
|
text = json_path.read_text(encoding = "UTF-8")
|
|
except:
|
|
print("json file not found at provided path, so it will be created instead")
|
|
tile_metadata = {}
|
|
else:
|
|
try:
|
|
tile_metadata: dict = loads(text)
|
|
except:
|
|
print("invalid json found at provided path, so re-creating file")
|
|
tile_metadata = {}
|
|
tile_metadata_cur = tile_metadata
|
|
for part in tiles_dir_parts:
|
|
if not part in tile_metadata_cur:
|
|
tile_metadata_cur[part] = {}
|
|
tile_metadata_cur = tile_metadata_cur[part]
|
|
for variant_name in variant_names:
|
|
if not variant_name in tile_metadata_cur:
|
|
tile_metadata_cur[variant_name] = dedup_preserving_order(colormap_names)
|
|
else:
|
|
tile_metadata_cur[variant_name] = dedup_preserving_order(tile_metadata_cur[variant_name] + colormap_names)
|
|
print(f"writing metadata to json file at '{json_path}'...", end = " ", flush = True)
|
|
json_path.write_text(dumps(tile_metadata, indent=2), encoding = "UTF-8")
|
|
print("done")
|
|
|
|
def remove_tiles(tiles_dir: Path, *, json_path: Path | None = None):
|
|
if not tiles_dir.is_dir():
|
|
raise ValueError(f"'{tiles_dir}' is not an existing directory")
|
|
|
|
if json_path:
|
|
if json_path.is_dir():
|
|
raise ValueError("json path must not be a directory")
|
|
try:
|
|
*tiles_dir_parts, tiles_dir_final = tiles_dir.relative_to(json_path.parent).parts
|
|
except ValueError:
|
|
raise ValueError("tiles path must be relative to but not containing the json path")
|
|
try:
|
|
text = json_path.read_text(encoding = "UTF-8")
|
|
except:
|
|
raise ValueError("json file not found at provided path")
|
|
try:
|
|
tile_metadata = loads(text)
|
|
except:
|
|
raise ValueError("invalid json found at provided path")
|
|
tile_metadata_cur = tile_metadata
|
|
try:
|
|
for part in tiles_dir_parts:
|
|
tile_metadata_cur = tile_metadata_cur[part]
|
|
if isinstance(tile_metadata_cur, list):
|
|
tile_metadata_cur = tile_metadata_cur.remove(tiles_dir_final)
|
|
else:
|
|
del tile_metadata_cur[tiles_dir_final]
|
|
except:
|
|
raise ValueError(f"unable to find path '{'/'.join([*tiles_dir_parts, tiles_dir_final])}' within json file")
|
|
print(f"writing metadata to json file at '{json_path}'...", end = " ", flush = True)
|
|
json_path.write_text(dumps(tile_metadata, indent=2), encoding = "UTF-8")
|
|
print("done")
|
|
|
|
print(f"removing files from '{tiles_dir}'...", end = " ", flush = True)
|
|
rmtree(tiles_dir)
|
|
print("done")
|
|
|
|
def parse_list_arg(arg: str):
|
|
return [x.strip().lower() for x in arg.split(",") if x.strip()]
|
|
|
|
def main():
|
|
parser = ArgumentParser("ipmap")
|
|
parser.add_argument("-q", "--quiet", action = "store_true", help = "decrease output verbosity")
|
|
subparsers = parser.add_subparsers(dest = "command", required = True, help = "the command to run")
|
|
mkcoords_parser = subparsers.add_parser("mkcoords", help = "generate coordinates corresponding to each IP address")
|
|
mkcoords_parser.add_argument("-b", "--batches", default = default_batches, type = int, help = "the number of batches to split the task into (default: %(default)s)")
|
|
mkcoords_parser.add_argument("-p", "--processes", default = default_processes, type = int, help = "the number of processes to split the task across (default: %(default)s)")
|
|
mkcoords_parser.add_argument("output", help = "the output path to save the generated coordinates to")
|
|
convert_parser = subparsers.add_parser("convert", help = "convert scan data from csv to parquet format")
|
|
convert_parser.add_argument("input", help = "the input path of the csv file to read the scan data from")
|
|
convert_parser.add_argument("output", help = "the output path of the parquet file to save the converted scan data to")
|
|
mktiles_parser = subparsers.add_parser("mktiles", help = "generate tile images from scan data in parquet format")
|
|
mktiles_parser.add_argument("-t", "--tile-size", default = default_tile_size, type = int, help = "the tile size to use (default: %(default)s)")
|
|
mktiles_parser.add_argument("-a", "--alpha", action = "store_true", help = "use alpha channel instead of black")
|
|
mktiles_parser.add_argument("-z", "--negative-zoom", action = "store_true", help = "generate negative zoom levels for tiles")
|
|
mktiles_parser.add_argument("-v", "--variants", default = ",".join(default_variant_names), help = "a comma separated list of variants to generate (default: %(default)s)")
|
|
mktiles_parser.add_argument("-c", "--colormaps", default = ",".join(default_colormap_names), help = "a comma separated list of colormaps to generate (default: %(default)s)")
|
|
mktiles_parser.add_argument("-r", "--raws", help = "generate images containing the raw data for each selected variant and save them to the provided path (default: none)")
|
|
mktiles_parser.add_argument("-q", "--quantile", type = float, default = default_quantile, help = "the quantile to use for scaling data such as rtt (default: %(default)s)")
|
|
mktiles_parser.add_argument("-n", "--num-rows", type = int, help = "how many rows to read from the scan data (default: all)")
|
|
mktiles_parser.add_argument("-s", "--skip-iters", type = int, help = "how many iterations to skip generating images for (default: none)")
|
|
mktiles_parser.add_argument("-C", "--compression-level", default = default_compression_level, type = int, help = "the level of compression to use for the tile images (default: %(default)s)")
|
|
mktiles_parser.add_argument("-j", "--json", help = "the path for the json file to store metadata about the tile images (default: none)")
|
|
mktiles_parser.add_argument("coords", help = "the path of the binary file containing the coords to map IP addresses to")
|
|
mktiles_parser.add_argument("input", help = "the input path of the parquet file to read the scan data from")
|
|
mktiles_parser.add_argument("output", help = "the output path to save the generated tile images to")
|
|
rmtiles_parser = subparsers.add_parser("rmtiles", help = "remove tile images")
|
|
rmtiles_parser.add_argument("-j", "--json", help = "the path for the json file to store metadata about the tile images (default: none)")
|
|
rmtiles_parser.add_argument("input", help = "the path containing tile images to remove")
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
with redirect_stdout(open(devnull, "w") if args.quiet else stdout):
|
|
match args.command:
|
|
case "mkcoords":
|
|
make_coords(output_path = Path(args.output), batches = args.batches, processes = args.processes)
|
|
case "convert":
|
|
convert(input_path = Path(args.input), output_path = Path(args.output))
|
|
case "mktiles":
|
|
make_tiles(coords_path = Path(args.coords), input_path = Path(args.input), tiles_dir = Path(args.output),
|
|
tile_size = args.tile_size, alpha = args.alpha, negative_zoom = args.negative_zoom,
|
|
variant_names = parse_list_arg(args.variants), colormap_names = parse_list_arg(args.colormaps),
|
|
raws_path = Path(args.raws) if args.raws else None, quantile = args.quantile,
|
|
num_rows = args.num_rows, skip_iters = args.skip_iters, compression_level = args.compression_level,
|
|
json_path = Path(args.json) if args.json else None)
|
|
case "rmtiles":
|
|
remove_tiles(tiles_dir = Path(args.input), json_path = Path(args.json) if args.json else None)
|
|
case _:
|
|
raise ValueError("invalid command")
|
|
except ValueError as e:
|
|
print(f"error: {e}", file = stderr)
|
|
exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|