Fix rtt colors once and for all

This commit is contained in:
LilyRose2798 2024-04-12 23:04:55 +10:00
parent 8675ce6c08
commit d87ed2b1c7

View file

@ -51,22 +51,29 @@ def write_tile(path: Path, rows: np.ndarray, *, alpha = False):
png.Writer(rows.shape[1], rows.shape[0], greyscale = False, alpha = alpha).write_packed(path.open("wb"), rows)
default_tile_size = 256
default_colormaps = ["viridis"]
default_variants = ["density", "rtt"]
default_colormaps = ["viridis"]
default_quantile = 0.995
default_processes = 16
def generate_tiles(parquet_path: Path, tiles_dir: Path, *, tile_size = default_tile_size, alpha = False,
variants: list[str] = default_variants, colormaps: list[str] = default_colormaps,
processes = default_processes, num_rows: int | None = None,
quantile = default_quantile, processes = default_processes, num_rows: int | None = None,
skip_iters: int | None = None, json_path: Path | None = None, quiet = False):
if tile_size < 1 or tile_size > 0x10000 or tile_size & (tile_size - 1) != 0:
if not 1 <= tile_size <= 0x10000 or tile_size & (tile_size - 1) != 0:
raise ValueError(f"tile size must be a power of 2 between 1 and {0x10000}")
tiles_per_side = int(math.sqrt(0x100000000)) // tile_size
if len(variants) == 0:
raise ValueError("must specify at least one variant")
if len(colormaps) == 0:
raise ValueError("must specify at least one colormap")
if not 0 <= quantile <= 1:
raise ValueError(f"quantile must be between 0 and 1")
colormaps = dedup_preserving_order(colormaps)
channels = 4 if alpha else 3
colormaps_by_name = { colormap: [bytes(c) for c in (Colormap(colormap).lut()[:,0:channels] * (256.0 - np.finfo(np.float32).eps)).astype(np.uint8)] for colormap in colormaps }
@ -81,6 +88,12 @@ def generate_tiles(parquet_path: Path, tiles_dir: Path, *, tile_size = default_t
else:
raise ValueError(f"unknown variant '{variant}'")
if skip_iters is not None:
if skip_iters <= 0:
raise ValueError("cannot skip negative iterations")
elif skip_iters >= tiles_per_side.bit_length():
raise ValueError("cannot skip all iterations")
if json_path is not None:
if json_path.is_dir():
raise ValueError("json path must not be a directory")
@ -93,23 +106,23 @@ def generate_tiles(parquet_path: Path, tiles_dir: Path, *, tile_size = default_t
if not quiet:
print(f"reading parquet '{parquet_path}'...", end = " ", flush = True)
df = pl.read_parquet(parquet_path, columns = ["x", "y", "rtt_us"], n_rows=num_rows).with_columns(count = pl.lit(1, pl.UInt32))
df = pl.read_parquet(parquet_path, columns = ["x", "y", "rtt_us"], n_rows=num_rows)
if not quiet:
print("done")
tiles_per_side = int(math.sqrt(0x100000000)) // tile_size
rtt_div: float = df.get_column("rtt_us").std() / 6
possible_overlaps = 1
rtt_quantile = df.get_column("rtt_us").quantile(quantile) or 1.0
df = df.with_columns(count = pl.lit(possible_overlaps, pl.UInt32), rtt_us = pl.col("rtt_us").clip(0, rtt_quantile))
write_tile_p = functools.partial(write_tile, alpha = alpha)
def generate_images(colormap: str, type_name: str, col_name: str, divisor: int | float):
def generate_images(colormap: str, type_name: str, series: pl.Series):
nonlocal df
if not quiet:
print(f"creating {type_name} image data with {colormap} colormap...", end = " ", flush = True)
image_data = np.zeros((tiles_per_side * tile_size, tiles_per_side * tile_size), dtype = f"S{channels}")
image_data[(df.get_column("y"), df.get_column("x"))] = (df.get_column(col_name) / divisor * 255.9999).clip(0, 255).cast(pl.UInt8).replace(pl.int_range(256), colormaps_by_name[colormap], return_dtype = pl.Binary)
image_data[(df.get_column("y"), df.get_column("x"))] = (series * 255.9999).clip(0, 255).cast(pl.UInt8).replace(pl.int_range(256), colormaps_by_name[colormap], return_dtype = pl.Binary)
if not quiet:
print("done")
@ -146,20 +159,18 @@ def generate_tiles(parquet_path: Path, tiles_dir: Path, *, tile_size = default_t
if not quiet:
print(f"done with {len(df)} coords remaining")
if skip_iters and skip_iters > 0:
remaining_iters = tiles_per_side.bit_length() - skip_iters
if remaining_iters <= 0:
if not quiet:
print("skipping all iters")
return
if skip_iters is not None and skip_iters > 0:
scale_down_coords(1 << skip_iters)
while True:
for colormap in colormaps:
if generate_density:
generate_images(colormap, "density", "count", 256 if possible_overlaps == 1 else possible_overlaps)
divisor = 256 if possible_overlaps == 1 else possible_overlaps
series = df.get_column("count") / divisor
generate_images(colormap, "density", series)
if generate_rtt:
generate_images(colormap, "rtt", "rtt_us", rtt_div)
series = df.get_column("rtt_us") / rtt_quantile
generate_images(colormap, "rtt", series)
if tiles_per_side == 1:
break
scale_down_coords()
@ -243,8 +254,9 @@ class IpMapArgs:
output: str
tile_size: int
alpha: bool
colormaps: str
variants: str
colormaps: str
quantile: float
processes: int
num_rows: int | None
skip_iters: int | None
@ -265,6 +277,7 @@ def main():
generate_parser.add_argument("-a", "--alpha", action = "store_true", help = "use alpha channel instead of black")
generate_parser.add_argument("-v", "--variants", default = ",".join(default_variants), help = "a comma separated list of variants to generate (default: %(default)s)")
generate_parser.add_argument("-c", "--colormaps", default = ",".join(default_colormaps), help = "a comma separated list of colormaps to generate (default: %(default)s)")
generate_parser.add_argument("-q", "--quantile", type = float, default = default_quantile, help = "the quantile to use for scaling data such as rtt (default: %(default)s)")
generate_parser.add_argument("-p", "--processes", default = default_processes, type = int, help = "how many processes to spawn for saving images (default: %(default)s)")
generate_parser.add_argument("-n", "--num-rows", type = int, help = "how many rows to read from the scan data (default: all)")
generate_parser.add_argument("-s", "--skip-iters", type = int, help = "how many iterations to skip generating images for (default: none)")
@ -281,8 +294,8 @@ def main():
convert_to_parquet(csv_path = Path(args.input), parquet_path = Path(args.output), quiet = args.quiet)
elif args.command == "generate":
generate_tiles(parquet_path = Path(args.input), tiles_dir = Path(args.output),
tile_size = args.tile_size, alpha = args.alpha,
variants = parse_list_arg(args.variants), colormaps = parse_list_arg(args.colormaps),
tile_size = args.tile_size, alpha = args.alpha, variants = parse_list_arg(args.variants),
colormaps = parse_list_arg(args.colormaps), quantile = args.quantile,
processes = args.processes, num_rows = args.num_rows, skip_iters = args.skip_iters,
json_path = Path(args.json) if args.json else None, quiet = args.quiet)
elif args.command == "remove":