add configurable compression level argument
This commit is contained in:
parent
9c16be8bf3
commit
024111185f
18
ipmap.py
18
ipmap.py
|
@ -79,12 +79,14 @@ default_tile_size = 1 << ip_bits // 4
|
||||||
default_variant_names = ["density", "rtt"]
|
default_variant_names = ["density", "rtt"]
|
||||||
default_colormap_names = ["viridis"]
|
default_colormap_names = ["viridis"]
|
||||||
default_quantile = 0.995
|
default_quantile = 0.995
|
||||||
|
default_compression_level = -1
|
||||||
|
|
||||||
def make_tiles(coords_path: Path, input_path: Path, tiles_dir: Path, *,
|
def make_tiles(coords_path: Path, input_path: Path, tiles_dir: Path, *,
|
||||||
tile_size = default_tile_size, alpha = False, negative_zoom = False,
|
tile_size = default_tile_size, alpha = False, negative_zoom = False,
|
||||||
variant_names: list[str] = default_variant_names, colormap_names: list[str] = default_colormap_names,
|
variant_names: list[str] = default_variant_names, colormap_names: list[str] = default_colormap_names,
|
||||||
raws_path: Path | None = None, quantile = default_quantile, num_rows: int | None = None,
|
raws_path: Path | None = None, quantile = default_quantile, num_rows: int | None = None,
|
||||||
skip_iters: int | None = None, json_path: Path | None = None):
|
skip_iters: int | None = None, compression_level : int = default_compression_level,
|
||||||
|
json_path: Path | None = None):
|
||||||
|
|
||||||
if not 64 <= tile_size <= num_ips_sqrt or tile_size & (tile_size - 1) != 0:
|
if not 64 <= tile_size <= num_ips_sqrt or tile_size & (tile_size - 1) != 0:
|
||||||
raise ValueError(f"tile size must be a power of 2 between 64 and {num_ips_sqrt}")
|
raise ValueError(f"tile size must be a power of 2 between 64 and {num_ips_sqrt}")
|
||||||
|
@ -94,6 +96,8 @@ def make_tiles(coords_path: Path, input_path: Path, tiles_dir: Path, *,
|
||||||
raise ValueError("must specify at least one colormap or a path to save raws to")
|
raise ValueError("must specify at least one colormap or a path to save raws to")
|
||||||
if not 0 <= quantile <= 1:
|
if not 0 <= quantile <= 1:
|
||||||
raise ValueError(f"quantile must be between 0 and 1")
|
raise ValueError(f"quantile must be between 0 and 1")
|
||||||
|
if not -1 <= compression_level <= 9:
|
||||||
|
raise ValueError("compression level must be between 0-9 or -1 for automatic")
|
||||||
|
|
||||||
T = TypeVar("T")
|
T = TypeVar("T")
|
||||||
def dedup_preserving_order(vals: list[T]) -> list[T]:
|
def dedup_preserving_order(vals: list[T]) -> list[T]:
|
||||||
|
@ -170,13 +174,13 @@ def make_tiles(coords_path: Path, input_path: Path, tiles_dir: Path, *,
|
||||||
print(f"writing {tiles_per_side * tiles_per_side} ({tiles_per_side}x{tiles_per_side}) tiles to '{z_path}'...", end = " ", flush = True)
|
print(f"writing {tiles_per_side * tiles_per_side} ({tiles_per_side}x{tiles_per_side}) tiles to '{z_path}'...", end = " ", flush = True)
|
||||||
if colors is None:
|
if colors is None:
|
||||||
for x_path, tile in tile_generator():
|
for x_path, tile in tile_generator():
|
||||||
x_path.with_suffix(".bin").write_bytes(compress(tile.tobytes()))
|
x_path.with_suffix(".bin").write_bytes(compress(tile.tobytes(), level = compression_level))
|
||||||
else:
|
else:
|
||||||
img_size = tile_size if data.shape[0] > tile_size else data.shape[0]
|
img_size = tile_size if data.shape[0] > tile_size else data.shape[0]
|
||||||
ihdr_chunk = get_chunk(b"IHDR", pack("!2I5B", img_size, img_size, 8, 2 if colors.shape[1] == 3 else 6, 0, 0, 0))
|
ihdr_chunk = get_chunk(b"IHDR", pack("!2I5B", img_size, img_size, 8, 2 if colors.shape[1] == 3 else 6, 0, 0, 0))
|
||||||
preamble = signature + ihdr_chunk
|
preamble = signature + ihdr_chunk
|
||||||
for x_path, tile in tile_generator():
|
for x_path, tile in tile_generator():
|
||||||
idat_chunk = get_chunk(b"IDAT", compress(np.insert(colors[tile].reshape(img_size, -1), 0, 0, axis = 1).tobytes()))
|
idat_chunk = get_chunk(b"IDAT", compress(np.insert(colors[tile].reshape(img_size, -1), 0, 0, axis = 1).tobytes(), level = compression_level))
|
||||||
x_path.with_suffix(".png").write_bytes(b"".join((preamble, idat_chunk, end_chunk)))
|
x_path.with_suffix(".png").write_bytes(b"".join((preamble, idat_chunk, end_chunk)))
|
||||||
print("done")
|
print("done")
|
||||||
|
|
||||||
|
@ -292,7 +296,7 @@ def make_tiles(coords_path: Path, input_path: Path, tiles_dir: Path, *,
|
||||||
|
|
||||||
def squish():
|
def squish():
|
||||||
nonlocal rtt_data
|
nonlocal rtt_data
|
||||||
print(f"sorting rtt values for median calculation...", end = " ", flush = True)
|
print("sorting rtt values for median calculation...", end = " ", flush = True)
|
||||||
rtt_data = np.swapaxes(rtt_data.reshape(rtt_data.shape[0] >> 1, 2, rtt_data.shape[1] >> 1, 2), 1, 2)
|
rtt_data = np.swapaxes(rtt_data.reshape(rtt_data.shape[0] >> 1, 2, rtt_data.shape[1] >> 1, 2), 1, 2)
|
||||||
mask = np.empty((rtt_data.shape[0], rtt_data.shape[1]), dtype = np.bool_)
|
mask = np.empty((rtt_data.shape[0], rtt_data.shape[1]), dtype = np.bool_)
|
||||||
np.less(rtt_data[:, :, 0, 0], rtt_data[:, :, 0, 1], out = mask) # sort first row
|
np.less(rtt_data[:, :, 0, 0], rtt_data[:, :, 0, 1], out = mask) # sort first row
|
||||||
|
@ -437,6 +441,7 @@ def main():
|
||||||
mktiles_parser.add_argument("-q", "--quantile", type = float, default = default_quantile, help = "the quantile to use for scaling data such as rtt (default: %(default)s)")
|
mktiles_parser.add_argument("-q", "--quantile", type = float, default = default_quantile, help = "the quantile to use for scaling data such as rtt (default: %(default)s)")
|
||||||
mktiles_parser.add_argument("-n", "--num-rows", type = int, help = "how many rows to read from the scan data (default: all)")
|
mktiles_parser.add_argument("-n", "--num-rows", type = int, help = "how many rows to read from the scan data (default: all)")
|
||||||
mktiles_parser.add_argument("-s", "--skip-iters", type = int, help = "how many iterations to skip generating images for (default: none)")
|
mktiles_parser.add_argument("-s", "--skip-iters", type = int, help = "how many iterations to skip generating images for (default: none)")
|
||||||
|
mktiles_parser.add_argument("-C", "--compression-level", default = default_compression_level, type = int, help = "the level of compression to use for the tile images (default: %(default)s)")
|
||||||
mktiles_parser.add_argument("-j", "--json", help = "the path for the json file to store metadata about the tile images (default: none)")
|
mktiles_parser.add_argument("-j", "--json", help = "the path for the json file to store metadata about the tile images (default: none)")
|
||||||
mktiles_parser.add_argument("coords", help = "the path of the binary file containing the coords to map IP addresses to")
|
mktiles_parser.add_argument("coords", help = "the path of the binary file containing the coords to map IP addresses to")
|
||||||
mktiles_parser.add_argument("input", help = "the input path of the parquet file to read the scan data from")
|
mktiles_parser.add_argument("input", help = "the input path of the parquet file to read the scan data from")
|
||||||
|
@ -457,8 +462,9 @@ def main():
|
||||||
make_tiles(coords_path = Path(args.coords), input_path = Path(args.input), tiles_dir = Path(args.output),
|
make_tiles(coords_path = Path(args.coords), input_path = Path(args.input), tiles_dir = Path(args.output),
|
||||||
tile_size = args.tile_size, alpha = args.alpha, negative_zoom = args.negative_zoom,
|
tile_size = args.tile_size, alpha = args.alpha, negative_zoom = args.negative_zoom,
|
||||||
variant_names = parse_list_arg(args.variants), colormap_names = parse_list_arg(args.colormaps),
|
variant_names = parse_list_arg(args.variants), colormap_names = parse_list_arg(args.colormaps),
|
||||||
raws_path = Path(args.raws) if args.raws else None, quantile = args.quantile, num_rows = args.num_rows,
|
raws_path = Path(args.raws) if args.raws else None, quantile = args.quantile,
|
||||||
skip_iters = args.skip_iters, json_path = Path(args.json) if args.json else None)
|
num_rows = args.num_rows, skip_iters = args.skip_iters, compression_level = args.compression_level,
|
||||||
|
json_path = Path(args.json) if args.json else None)
|
||||||
case "rmtiles":
|
case "rmtiles":
|
||||||
remove_tiles(tiles_dir = Path(args.input), json_path = Path(args.json) if args.json else None)
|
remove_tiles(tiles_dir = Path(args.input), json_path = Path(args.json) if args.json else None)
|
||||||
case _:
|
case _:
|
||||||
|
|
Loading…
Reference in New Issue