Add negative zoom levels
This commit is contained in:
parent
27edd7d58f
commit
cd3cb3c393
1 changed files with 82 additions and 60 deletions
142
ipmap.py
142
ipmap.py
|
@ -81,10 +81,10 @@ default_colormap_names = ["viridis"]
|
|||
default_quantile = 0.995
|
||||
|
||||
def make_tiles(coords_path: Path, input_path: Path, tiles_dir: Path, *,
|
||||
tile_size = default_tile_size, alpha = False, variant_names: list[str] = default_variant_names,
|
||||
colormap_names: list[str] = default_colormap_names, raws_path: Path | None = None,
|
||||
quantile = default_quantile, num_rows: int | None = None, skip_iters: int | None = None,
|
||||
json_path: Path | None = None):
|
||||
tile_size = default_tile_size, alpha = False, negative_zoom = False,
|
||||
variant_names: list[str] = default_variant_names, colormap_names: list[str] = default_colormap_names,
|
||||
raws_path: Path | None = None, quantile = default_quantile, num_rows: int | None = None,
|
||||
skip_iters: int | None = None, json_path: Path | None = None):
|
||||
|
||||
if not 64 <= tile_size <= num_ips_sqrt or tile_size & (tile_size - 1) != 0:
|
||||
raise ValueError(f"tile size must be a power of 2 between 64 and {num_ips_sqrt}")
|
||||
|
@ -108,6 +108,9 @@ def make_tiles(coords_path: Path, input_path: Path, tiles_dir: Path, *,
|
|||
colormaps = [(colormap_name, Colormap(colormap_name)) for colormap_name in dedup_preserving_order(colormap_names)]
|
||||
channels = 4 if alpha else 3
|
||||
empty_color = np.zeros(channels, dtype = np.uint8)
|
||||
num_colors = (1 << 16) - 1
|
||||
|
||||
final_size = 1 if negative_zoom else tile_size
|
||||
|
||||
should_generate_density = False
|
||||
should_generate_rtt = False
|
||||
|
@ -120,7 +123,8 @@ def make_tiles(coords_path: Path, input_path: Path, tiles_dir: Path, *,
|
|||
raise ValueError(f"unknown variant '{variant_name}'")
|
||||
|
||||
if skip_iters is not None:
|
||||
if not 0 <= skip_iters < (num_ips_sqrt // tile_size).bit_length():
|
||||
num_iters = num_ips_sqrt.bit_length() if negative_zoom else (num_ips_sqrt // tile_size).bit_length()
|
||||
if not 0 <= skip_iters < num_iters:
|
||||
raise ValueError("must skip zero or more but not all iterations")
|
||||
|
||||
if json_path is not None:
|
||||
|
@ -137,38 +141,46 @@ def make_tiles(coords_path: Path, input_path: Path, tiles_dir: Path, *,
|
|||
return b"".join((pack("!I", len(data)), tag, data, pack("!I", crc32(data, crc32(tag)) & (2 ** 32 - 1))))
|
||||
|
||||
signature = b"\x89PNG\r\n\x1a\n"
|
||||
def get_preamble(alpha: bool):
|
||||
return signature + get_chunk(b"IHDR", pack("!2I5B", tile_size, tile_size, 8, 6 if alpha else 2, 0, 0, 0))
|
||||
rgb_preamble = get_preamble(False)
|
||||
rgba_preamble = get_preamble(True)
|
||||
end_chunk = get_chunk(b"IEND")
|
||||
|
||||
def create_tiles(path: Path, data: np.ndarray, colors: NDArray[np.uint8] | None = None):
|
||||
tiles_per_side = data.shape[0] // tile_size
|
||||
z = tiles_per_side.bit_length() - 1
|
||||
z_path = path / f"{z}"
|
||||
z_path.mkdir(exist_ok = True, parents = True)
|
||||
def tile_generator():
|
||||
for y in range(tiles_per_side):
|
||||
y_path = z_path / f"{y}"
|
||||
if data.shape[0] > tile_size:
|
||||
tiles_per_side = data.shape[0] // tile_size
|
||||
z = tiles_per_side.bit_length() - 1
|
||||
z_path = path / f"{z}"
|
||||
z_path.mkdir(exist_ok = True, parents = True)
|
||||
def tile_generator():
|
||||
for y in range(tiles_per_side):
|
||||
y_path = z_path / f"{y}"
|
||||
y_path.mkdir(exist_ok = True)
|
||||
for x in range(tiles_per_side):
|
||||
yield (y_path / f"{x}", data[
|
||||
y * tile_size : y * tile_size + tile_size,
|
||||
x * tile_size : x * tile_size + tile_size,
|
||||
])
|
||||
else:
|
||||
tiles_per_side = 1
|
||||
z = -((tile_size // data.shape[0]).bit_length() - 1)
|
||||
z_path = path / f"{z}"
|
||||
z_path.mkdir(exist_ok = True, parents = True)
|
||||
def tile_generator():
|
||||
y_path = z_path / "0"
|
||||
y_path.mkdir(exist_ok = True)
|
||||
for x in range(tiles_per_side):
|
||||
yield (y_path, x, data[
|
||||
y * tile_size : y * tile_size + tile_size,
|
||||
x * tile_size : x * tile_size + tile_size,
|
||||
])
|
||||
yield (y_path / "0", data)
|
||||
print(f"writing {tiles_per_side * tiles_per_side} ({tiles_per_side}x{tiles_per_side}) tiles to '{z_path}'...", end = " ", flush = True)
|
||||
if colors is None:
|
||||
for y_path, x, tile in tile_generator():
|
||||
(y_path / f"{x}.bin").write_bytes(compress(tile.tobytes()))
|
||||
for x_path, tile in tile_generator():
|
||||
x_path.with_suffix(".bin").write_bytes(compress(tile.tobytes()))
|
||||
else:
|
||||
preamble = rgb_preamble if colors.shape[1] == 3 else rgba_preamble
|
||||
for y_path, x, tile in tile_generator():
|
||||
idat_chunk = get_chunk(b"IDAT", compress(np.insert(colors[tile].reshape(tile_size, -1), 0, 0, axis = 1).tobytes()))
|
||||
(y_path / f"{x}.png").write_bytes(b"".join((preamble, idat_chunk, end_chunk)))
|
||||
img_size = tile_size if data.shape[0] > tile_size else data.shape[0]
|
||||
ihdr_chunk = get_chunk(b"IHDR", pack("!2I5B", img_size, img_size, 8, 2 if colors.shape[1] == 3 else 6, 0, 0, 0))
|
||||
preamble = signature + ihdr_chunk
|
||||
for x_path, tile in tile_generator():
|
||||
idat_chunk = get_chunk(b"IDAT", compress(np.insert(colors[tile].reshape(img_size, -1), 0, 0, axis = 1).tobytes()))
|
||||
x_path.with_suffix(".png").write_bytes(b"".join((preamble, idat_chunk, end_chunk)))
|
||||
print("done")
|
||||
|
||||
def get_colors(colormap: Colormap, num_colors: int):
|
||||
def get_colors(colormap: Colormap, num_colors: int = num_colors):
|
||||
return np.concatenate(([empty_color], ((colormap([0.0]) if num_colors == 1 else colormap.lut(num_colors))[:, 0:channels] * 255).astype(np.uint8)))
|
||||
|
||||
def get_scan_data() -> tuple[NDArray[np.uint32], NDArray[np.uint32]]:
|
||||
|
@ -190,6 +202,28 @@ def make_tiles(coords_path: Path, input_path: Path, tiles_dir: Path, *,
|
|||
return ((ys, xs), rtt_arr)
|
||||
|
||||
coords, rtt_arr = get_all_data()
|
||||
|
||||
def normalize_data(data: NDArray[np.uint32], max_value: float) -> NDArray[np.uint16]:
|
||||
divisor = (max_value - 1) / num_colors
|
||||
print("normalizing data: getting non-zero...", end = " ", flush = True)
|
||||
non_zero = data != 0
|
||||
print("casting to floats...", end = " ", flush = True)
|
||||
data_f = data.astype(np.float32)
|
||||
print("decrementing...", end = " ", flush = True)
|
||||
data_f[non_zero] -= 1
|
||||
print("dividing...", end = " ", flush = True)
|
||||
data_f /= divisor
|
||||
print("incrementing...", end = " ", flush = True)
|
||||
data_f[non_zero] += 1
|
||||
del non_zero
|
||||
collect()
|
||||
print("clipping...", end = " ", flush = True)
|
||||
data_f.clip(0, num_colors, out = data_f)
|
||||
print("casting to ints...", end = " ", flush = True)
|
||||
with catch_warnings(action = "ignore"):
|
||||
data_norm = data_f.astype(np.uint16)
|
||||
print("done")
|
||||
return data_norm
|
||||
|
||||
def generate_density():
|
||||
possible_overlaps = 1
|
||||
|
@ -211,7 +245,7 @@ def make_tiles(coords_path: Path, input_path: Path, tiles_dir: Path, *,
|
|||
density_data[:, :, 0, 0] += density_data[:, :, 1, 0]
|
||||
density_data[:, :, 0, 0] += density_data[:, :, 1, 1]
|
||||
print("done")
|
||||
print(f"shrinking density data from {density_data.shape[0]}x{density_data.shape[1]} to {density_data.shape[0] // 2}x{density_data.shape[1] // 2}...", end = " ", flush = True)
|
||||
print(f"shrinking density data from {density_data.shape[0] * 2}x{density_data.shape[1] * 2} to {density_data.shape[0]}x{density_data.shape[1]}...", end = " ", flush = True)
|
||||
density_data = np.copy(density_data[:, :, 0, 0])
|
||||
print("done")
|
||||
possible_overlaps *= 4
|
||||
|
@ -223,22 +257,26 @@ def make_tiles(coords_path: Path, input_path: Path, tiles_dir: Path, *,
|
|||
def write_all_colormaps():
|
||||
if raws_path is not None:
|
||||
create_tiles(raws_path / variant_name, density_data.view(np.uint8).reshape(density_data.shape[0], density_data.shape[1], 4))
|
||||
for colormap_name, colormap in colormaps:
|
||||
create_tiles(tiles_dir / variant_name / colormap_name, density_data, get_colors(colormap, possible_overlaps))
|
||||
# skip normalizing step and just generate color stop for each possible value if not too many possibilities
|
||||
if possible_overlaps <= num_colors + 1:
|
||||
for colormap_name, colormap in colormaps:
|
||||
create_tiles(tiles_dir / variant_name / colormap_name, density_data, get_colors(colormap, possible_overlaps))
|
||||
else:
|
||||
density_data_norm = normalize_data(density_data, possible_overlaps)
|
||||
for colormap_name, colormap in colormaps:
|
||||
create_tiles(tiles_dir / variant_name / colormap_name, density_data_norm, get_colors(colormap))
|
||||
|
||||
write_all_colormaps()
|
||||
while density_data.shape[0] > tile_size:
|
||||
while density_data.shape[0] > final_size:
|
||||
squish()
|
||||
write_all_colormaps()
|
||||
|
||||
def generate_rtt():
|
||||
nonlocal rtt_arr
|
||||
num_colors = (1 << 16) - 1
|
||||
variant_name = "rtt"
|
||||
|
||||
print(f"retrieving {quantile:.1%} quantile for rtt data...", end = " ", flush = True)
|
||||
rtt_quantile = int(np.quantile(rtt_arr, quantile))
|
||||
divisor = (rtt_quantile - 1) / (num_colors - 1)
|
||||
print("done")
|
||||
print("clipping rtt data between 0 and quantile...", end = " ", flush = True)
|
||||
rtt_arr.clip(0, rtt_quantile, out = rtt_arr)
|
||||
|
@ -284,7 +322,7 @@ def make_tiles(coords_path: Path, input_path: Path, tiles_dir: Path, *,
|
|||
rtt_data[mask, 0, 0] += rtt_data[mask, 0, 1] # take average of first two nums
|
||||
# everything else (1 or 0 nums populated) don't need any modifications
|
||||
print("done")
|
||||
print(f"shrinking rtt data from {rtt_data.shape[0]}x{rtt_data.shape[1]} to {rtt_data.shape[0] // 2}x{rtt_data.shape[1] // 2}...", end = " ", flush = True)
|
||||
print(f"shrinking rtt data from {rtt_data.shape[0] * 2}x{rtt_data.shape[1] * 2} to {rtt_data.shape[0]}x{rtt_data.shape[1]}...", end = " ", flush = True)
|
||||
rtt_data = np.copy(rtt_data[:, :, 0, 0])
|
||||
print("done")
|
||||
|
||||
|
@ -292,34 +330,17 @@ def make_tiles(coords_path: Path, input_path: Path, tiles_dir: Path, *,
|
|||
for _ in range(skip_iters):
|
||||
squish()
|
||||
|
||||
def normalize():
|
||||
print("normalizing rtt data: getting non-zero...", end = " ", flush = True)
|
||||
non_zero = rtt_data != 0
|
||||
print("converting to floating point...", end = " ", flush = True)
|
||||
rtt_data_f = rtt_data.astype(np.float32)
|
||||
print("decrementing non-zero...", end = " ", flush = True)
|
||||
rtt_data_f[non_zero] -= 1
|
||||
print("dividing...", end = " ", flush = True)
|
||||
rtt_data_f /= divisor
|
||||
print("incrementing non-zero...", end = " ", flush = True)
|
||||
rtt_data_f[non_zero] += 1
|
||||
del non_zero
|
||||
collect()
|
||||
print("converting to ints...", end = " ", flush = True)
|
||||
with catch_warnings(action = "ignore"):
|
||||
rtt_data_norm = rtt_data_f.astype(np.uint16)
|
||||
print("done")
|
||||
return rtt_data_norm
|
||||
|
||||
|
||||
def write_all_colormaps():
|
||||
if raws_path is not None:
|
||||
create_tiles(raws_path / variant_name, rtt_data.view(np.uint8).reshape(rtt_data.shape[0], rtt_data.shape[1], 4))
|
||||
rtt_data_norm = normalize()
|
||||
rtt_data_norm = normalize_data(rtt_data, rtt_quantile)
|
||||
for colormap_name, colormap in colormaps:
|
||||
create_tiles(tiles_dir / variant_name / colormap_name, rtt_data_norm, get_colors(colormap, num_colors))
|
||||
create_tiles(tiles_dir / variant_name / colormap_name, rtt_data_norm, get_colors(colormap))
|
||||
|
||||
write_all_colormaps()
|
||||
while rtt_data.shape[0] > tile_size:
|
||||
while rtt_data.shape[0] > final_size:
|
||||
squish()
|
||||
write_all_colormaps()
|
||||
|
||||
|
@ -411,6 +432,7 @@ def main():
|
|||
mktiles_parser = subparsers.add_parser("mktiles", help = "generate tile images from scan data in parquet format")
|
||||
mktiles_parser.add_argument("-t", "--tile-size", default = default_tile_size, type = int, help = "the tile size to use (default: %(default)s)")
|
||||
mktiles_parser.add_argument("-a", "--alpha", action = "store_true", help = "use alpha channel instead of black")
|
||||
mktiles_parser.add_argument("-z", "--negative-zoom", action = "store_true", help = "generate negative zoom levels for tiles")
|
||||
mktiles_parser.add_argument("-v", "--variants", default = ",".join(default_variant_names), help = "a comma separated list of variants to generate (default: %(default)s)")
|
||||
mktiles_parser.add_argument("-c", "--colormaps", default = ",".join(default_colormap_names), help = "a comma separated list of colormaps to generate (default: %(default)s)")
|
||||
mktiles_parser.add_argument("-r", "--raws", help = "generate images containing the raw data for each selected variant and save them to the provided path (default: none)")
|
||||
|
@ -435,10 +457,10 @@ def main():
|
|||
convert(input_path = Path(args.input), output_path = Path(args.output))
|
||||
case "mktiles":
|
||||
make_tiles(coords_path = Path(args.coords), input_path = Path(args.input), tiles_dir = Path(args.output),
|
||||
tile_size = args.tile_size, alpha = args.alpha, variant_names = parse_list_arg(args.variants),
|
||||
colormap_names = parse_list_arg(args.colormaps), raws_path = Path(args.raws) if args.raws else None,
|
||||
quantile = args.quantile, num_rows = args.num_rows, skip_iters = args.skip_iters,
|
||||
json_path = Path(args.json) if args.json else None)
|
||||
tile_size = args.tile_size, alpha = args.alpha, negative_zoom = args.negative_zoom,
|
||||
variant_names = parse_list_arg(args.variants), colormap_names = parse_list_arg(args.colormaps),
|
||||
raws_path = Path(args.raws) if args.raws else None, quantile = args.quantile, num_rows = args.num_rows,
|
||||
skip_iters = args.skip_iters, json_path = Path(args.json) if args.json else None)
|
||||
case "rmtiles":
|
||||
remove_tiles(tiles_dir = Path(args.input), json_path = Path(args.json) if args.json else None)
|
||||
case _:
|
||||
|
|
Loading…
Reference in a new issue