Add raw scan data functionality

This commit is contained in:
LilyRose2798 2024-04-22 08:35:12 +10:00
parent eebcbae626
commit b06a53c6c6
4 changed files with 172 additions and 144 deletions

View file

@ -10,11 +10,11 @@ from shutil import rmtree
from gc import collect
from json import loads, dumps
from pathlib import Path
from dataclasses import dataclass
from typing import Literal, TypeVar
from png import Writer
from typing import TypeVar
from cmap import Colormap
from hilbert import decode
from zlib import compress, crc32
from struct import pack
from numpy.typing import NDArray
import numpy as np
@ -133,38 +133,43 @@ def make_tiles(coords_path: Path, input_path: Path, tiles_dir: Path, *,
else:
tiles_dir_parts = None
def create_tile_images(data: np.ndarray, colormap: Colormap, num_colors: int, path: Path):
print(f"creating {num_colors} color stop(s) of {colormap.name} colormap...", end = " ", flush = True)
colors = np.concatenate(([empty_color], ((colormap([0.0]) if num_colors == 1 else colormap.lut(num_colors))[:, 0:channels] * 255).astype(np.uint8)))
print("done")
print(f"creating {data.shape[1]}x{data.shape[0]} pixel image for {colormap.name} colormap...", end = " ", flush = True)
image_data = colors[data]
print("done")
del colors
collect()
tiles_per_side = image_data.shape[0] // tile_size
def get_chunk(tag: bytes, data = b""):
return b"".join((pack("!I", len(data)), tag, data, pack("!I", crc32(data, crc32(tag)) & (2 ** 32 - 1))))
signature = b"\x89PNG\r\n\x1a\n"
def get_preamble(alpha: bool):
return signature + get_chunk(b"IHDR", pack("!2I5B", tile_size, tile_size, 8, 6 if alpha else 2, 0, 0, 0))
rgb_preamble = get_preamble(False)
rgba_preamble = get_preamble(True)
end_chunk = get_chunk(b"IEND")
def create_tiles(path: Path, data: np.ndarray, colors: NDArray[np.uint8] | None = None):
tiles_per_side = data.shape[0] // tile_size
z = tiles_per_side.bit_length() - 1
z_path = path / f"{z}"
z_path.mkdir(exist_ok = True, parents = True)
print(f"writing {tiles_per_side * tiles_per_side} ({tiles_per_side}x{tiles_per_side}) images to '{path}'...", end = " ", flush = True)
for y in range(tiles_per_side):
y_path = z_path / f"{y}"
y_path.mkdir(exist_ok = True)
for x in range(tiles_per_side):
x_path = y_path / f"{x}.png"
rows = image_data[
y * tile_size : y * tile_size + tile_size,
x * tile_size : x * tile_size + tile_size,
]
Writer(tile_size, tile_size, greyscale = False, alpha = alpha).write_packed(x_path.open("wb"), rows)
def tile_generator():
for y in range(tiles_per_side):
y_path = z_path / f"{y}"
y_path.mkdir(exist_ok = True)
for x in range(tiles_per_side):
yield (y_path, x, data[
y * tile_size : y * tile_size + tile_size,
x * tile_size : x * tile_size + tile_size,
])
print(f"writing {tiles_per_side * tiles_per_side} ({tiles_per_side}x{tiles_per_side}) tiles to '{z_path}'...", end = " ", flush = True)
if colors is None:
for y_path, x, tile in tile_generator():
(y_path / f"{x}.bin").write_bytes(compress(tile.tobytes()))
else:
preamble = rgb_preamble if colors.shape[1] == 3 else rgba_preamble
for y_path, x, tile in tile_generator():
idat_chunk = get_chunk(b"IDAT", compress(np.insert(colors[tile].reshape(tile_size, -1), 0, 0, axis = 1).tobytes()))
(y_path / f"{x}.png").write_bytes(b"".join((preamble, idat_chunk, end_chunk)))
print("done")
def create_raw_image(data: np.ndarray, path: Path):
path.mkdir(exist_ok = True, parents = True)
z_path = path / f"{(data.shape[0] // tile_size).bit_length() - 1}.png"
print(f"writing {data.shape[1]}x{data.shape[0]} raw image to '{path}'...", end = " ", flush = True)
Writer(data.shape[1], data.shape[0], greyscale = False, alpha = True).write_packed(z_path.open("wb"), data)
print("done")
def get_colors(colormap: Colormap, num_colors: int):
return np.concatenate(([empty_color], ((colormap([0.0]) if num_colors == 1 else colormap.lut(num_colors))[:, 0:channels] * 255).astype(np.uint8)))
def get_scan_data() -> tuple[NDArray[np.uint32], NDArray[np.uint32]]:
print(f"reading scan data from file '{input_path}'...", end = " ", flush = True)
@ -205,8 +210,10 @@ def make_tiles(coords_path: Path, input_path: Path, tiles_dir: Path, *,
density_data[:, :, 0, 0] += density_data[:, :, 0, 1]
density_data[:, :, 0, 0] += density_data[:, :, 1, 0]
density_data[:, :, 0, 0] += density_data[:, :, 1, 1]
print(f"done (shrunk density data from {density_data.shape[0] * 2}x{density_data.shape[1] * 2} -> {density_data.shape[0]}x{density_data.shape[1]})")
density_data = density_data[:, :, 0, 0]
print("done")
print(f"shrinking density data from {density_data.shape[0]}x{density_data.shape[1]} to {density_data.shape[0] // 2}x{density_data.shape[1] // 2}...", end = " ", flush = True)
density_data = np.copy(density_data[:, :, 0, 0])
print("done")
possible_overlaps *= 4
if skip_iters is not None:
@ -214,10 +221,10 @@ def make_tiles(coords_path: Path, input_path: Path, tiles_dir: Path, *,
squish()
def write_all_colormaps():
for colormap_name, colormap in colormaps:
create_tile_images(density_data, colormap, possible_overlaps, tiles_dir / variant_name / colormap_name)
if raws_path is not None:
create_raw_image(density_data, raws_path / variant_name)
create_tiles(raws_path / variant_name, density_data.view(np.uint8).reshape(density_data.shape[0], density_data.shape[1], 4))
for colormap_name, colormap in colormaps:
create_tiles(tiles_dir / variant_name / colormap_name, density_data, get_colors(colormap, possible_overlaps))
write_all_colormaps()
while density_data.shape[0] > tile_size:
@ -276,14 +283,16 @@ def make_tiles(coords_path: Path, input_path: Path, tiles_dir: Path, *,
rtt_data[mask, 0, 1] //= 2
rtt_data[mask, 0, 0] += rtt_data[mask, 0, 1] # take average of first two nums
# everything else (1 or 0 nums populated) don't need any modifications
print(f"done (shrunk rtt data from {rtt_data.shape[0] * 2}x{rtt_data.shape[1] * 2} -> {rtt_data.shape[0]}x{rtt_data.shape[1]})")
rtt_data = rtt_data[:, :, 0, 0]
print("done")
print(f"shrinking rtt data from {rtt_data.shape[0]}x{rtt_data.shape[1]} to {rtt_data.shape[0] // 2}x{rtt_data.shape[1] // 2}...", end = " ", flush = True)
rtt_data = np.copy(rtt_data[:, :, 0, 0])
print("done")
if skip_iters is not None:
for _ in range(skip_iters):
squish()
def get_normalized_data():
def normalize():
print("normalizing rtt data: getting non-zero...", end = " ", flush = True)
non_zero = rtt_data != 0
print("converting to floating point...", end = " ", flush = True)
@ -304,10 +313,10 @@ def make_tiles(coords_path: Path, input_path: Path, tiles_dir: Path, *,
def write_all_colormaps():
if raws_path is not None:
create_raw_image(rtt_data, raws_path / variant_name)
rtt_data_norm = get_normalized_data()
create_tiles(raws_path / variant_name, rtt_data.view(np.uint8).reshape(rtt_data.shape[0], rtt_data.shape[1], 4))
rtt_data_norm = normalize()
for colormap_name, colormap in colormaps:
create_tile_images(rtt_data_norm, colormap, num_colors, tiles_dir / variant_name / colormap_name)
create_tiles(tiles_dir / variant_name / colormap_name, rtt_data_norm, get_colors(colormap, num_colors))
write_all_colormaps()
while rtt_data.shape[0] > tile_size:

13
poetry.lock generated
View file

@ -75,18 +75,7 @@ files = [
{file = "numpy-hilbert-curve-1.0.1.tar.gz", hash = "sha256:0745dbd4c16b258c180342d6df57dfa99110b9d98c86a84d920f29af5cc0707b"},
]
[[package]]
name = "pypng"
version = "0.20220715.0"
description = "Pure Python library for saving and loading PNG images"
optional = false
python-versions = "*"
files = [
{file = "pypng-0.20220715.0-py3-none-any.whl", hash = "sha256:4a43e969b8f5aaafb2a415536c1a8ec7e341cd6a3f957fd5b5f32a4cfeed902c"},
{file = "pypng-0.20220715.0.tar.gz", hash = "sha256:739c433ba96f078315de54c0db975aee537cbc3e1d0ae4ed9aab0ca1e427e2c1"},
]
[metadata]
lock-version = "2.0"
python-versions = "^3.11"
content-hash = "6f18c3faab65fe3440461e20f9200f5665981d0ba9d69f1dc8a1740840108ab1"
content-hash = "7b7fb0cb9bc597ae838486c3f91be6d24679db0784b7d2813f1826bb305e9279"

View file

@ -73,6 +73,7 @@
font-size: 0.9rem;
padding: 0.3rem 2.2rem 0.3rem 0.6rem;
border-radius: 4px;
white-space: nowrap;
}
.maplibregl-popup-close-button {
height: 100%;
@ -546,108 +547,138 @@
dateControl.addControl()
variantControl.addControl()
colormapControl.addControl()
})
map.addControl(new maplibregl.NavigationControl({ showCompass: false }), "top-left")
map.addControl(new maplibregl.NavigationControl({ showCompass: false }), "top-left")
const hoverTextControl = document.createElement("div")
hoverTextControl.className = "maplibregl-ctrl maplibregl-ctrl-group"
const hoverTextP = document.createElement("p")
hoverTextControl.replaceChildren(hoverTextP)
const hoverTextControl = document.createElement("div")
hoverTextControl.className = "maplibregl-ctrl maplibregl-ctrl-group"
const hoverTextP = document.createElement("p")
hoverTextControl.replaceChildren(hoverTextP)
const ipFromMouseEvent = e => {
const { x, y } = maplibregl.MercatorCoordinate.fromLngLat(e.lngLat, 0)
const ip = coordsToHilbert({ x: Math.floor(0x10000 * x), y: Math.floor(0x10000 * y) })
const subnet = Math.min(32, Math.round(map.getZoom()) * 2 + 18)
return { ip, subnet }
}
map.on("mousemove", e => {
const { ip, subnet } = ipFromMouseEvent(e)
const ipStr = ipToString(ip, subnet)
hoverTextP.textContent = subnet < 32 ? `Range: ${ipStr}/${subnet}` : `IP: ${ipStr}`
})
map.addControl({
onAdd: () => hoverTextControl,
onRemove: () => hoverTextControl.parentNode.removeChild(hoverTextControl)
}, "bottom-left")
let curPopup
const setPopup = (pos, ip, subnet = 32) => {
const isRange = subnet < 32
const name = isRange ? "Range" : "IP"
const ipStr = ipToString(ip, subnet)
const ipText = isRange ? `${ipStr}/${subnet}` : ipStr
const ipLink = `<a href="https://bgp.tools/prefix/${ipStr}" target="_blank">${ipText}</a>`
const htmlBase = `${name}: ${ipLink}`
const privateRange = getPrivateRange(ip)
const html = privateRange ? `${htmlBase}<br>Part of private range ${privateRange.range}<br>Used for ${privateRange.description}` : htmlBase
curPopup?.remove()
const popup = new maplibregl.Popup({ focusAfterOpen: false }).setHTML(html).setLngLat(pos).addTo(map)
curPopup = popup
if (!isRange && !privateRange) {
fetch(`${apiUrl}/api/rdns/${ipStr}`).then(res => {
if (!res.ok)
throw new Error(`Error fetching rdns for ip ${ipStr}`)
return res.json()
}).then(data => {
const rdns = data?.rdns
if (rdns) popup.setHTML(`${html}<br>rDNS: ${rdns}`)
}).catch(_ => {})
const ipFromMouseEvent = e => {
const { x, y } = maplibregl.MercatorCoordinate.fromLngLat(e.lngLat, 0)
const ip = coordsToHilbert({ x: Math.floor(0x10000 * x), y: Math.floor(0x10000 * y) })
const subnet = Math.min(32, Math.round(map.getZoom()) * 2 + 18)
return { ip, subnet }
}
}
map.on("click", e => {
const { ip, subnet } = ipFromMouseEvent(e)
setPopup(e.lngLat, ip, subnet)
})
map.on("mousemove", e => {
const { ip, subnet } = ipFromMouseEvent(e)
const ipStr = ipToString(ip, subnet)
hoverTextP.textContent = subnet < 32 ? `Range: ${ipStr}/${subnet}` : `IP: ${ipStr}`
})
const main = document.getElementsByTagName("main")[0]
main.addEventListener("click", e => {
if (e.target === main && curPopup) curPopup.remove()
})
const rangeInput = document.getElementById("range-input")
const jumpParam = new URLSearchParams(location.search).get("jump")
map.addControl({
onAdd: () => hoverTextControl,
onRemove: () => hoverTextControl.parentNode.removeChild(hoverTextControl)
}, "bottom-left")
const jump = range => {
const { ip, subnet } = ipFromString(range)
const { x, y } = hilbertToCoords(ip + Math.floor((2 ** (32 - subnet)) / 2))
const center = new maplibregl.MercatorCoordinate((x + 0.5) / 0x10000, (y + 0.5) / 0x10000, 0).toLngLat()
const zoom = subnet / 2 - 0.501
map.jumpTo({ center, zoom })
if (subnet > 24) setPopup(center, ip, subnet)
}
let curPopup
const setPopup = (pos, ip, subnet = 32) => {
const isRange = subnet < 32
const name = isRange ? "Range" : "IP"
const ipStr = ipToString(ip, subnet)
const ipText = isRange ? `${ipStr}/${subnet}` : ipStr
const ipLink = `<a href="https://bgp.tools/prefix/${ipStr}" target="_blank">${ipText}</a>`
const htmlBase = `${name}: ${ipLink}`
const privateRange = getPrivateRange(ip)
let html = privateRange ? `${htmlBase}<br>Part of private range ${privateRange.range}<br>Used for ${privateRange.description}` : htmlBase
const setJumpParam = jump => {
const searchParams = new URLSearchParams(location.search)
if (jump) searchParams.set("jump", jump)
else searchParams.delete("jump")
history.pushState("", "", searchParams.size === 0 ? location.pathname : `?${searchParams}`)
}
try {
jump(jumpParam)
rangeInput.value = jumpParam
} catch(e) {
setJumpParam(undefined)
}
curPopup?.remove()
const popup = new maplibregl.Popup({ focusAfterOpen: false }).setHTML(html).setLngLat(pos).addTo(map)
curPopup = popup
const jumpToInput = () => {
if (!privateRange) {
fetch(`${apiUrl}/api/scandata/${curDate}/rtt/range/${encodeURIComponent(`${ipStr}/${subnet}`)}`).then(res => {
if (!res.ok)
throw new Error(`Error fetching scan data for range ${ipStr}`)
return res.json()
}).then(data => {
const rtt = data?.rtt
if (rtt) {
html = `${html}<br>RTT: ${(rtt / 1000).toFixed(2)}ms`
popup.setHTML(html)
}
}).catch(_ => {})
if (isRange)
fetch(`${apiUrl}/api/scandata/${curDate}/density/range/${encodeURIComponent(`${ipStr}/${subnet}`)}`).then(res => {
if (!res.ok)
throw new Error(`Error fetching scan data for range ${ipStr}`)
return res.json()
}).then(data => {
const density = data?.density
if (density !== undefined) {
const possibleOverlaps = 2 ** (32 - subnet)
const densityPct = (100 * (density / possibleOverlaps)).toFixed(2)
html = `${html}<br>Density: ${densityPct}% (${density}/${possibleOverlaps})`
popup.setHTML(html)
}
}).catch(_ => {})
else
fetch(`${apiUrl}/api/rdns/${encodeURIComponent(ipStr)}`).then(res => {
if (!res.ok)
throw new Error(`Error fetching rdns for ip ${ipStr}`)
return res.json()
}).then(data => {
const rdns = data?.rdns
if (rdns) {
html = `${html}<br>rDNS: ${rdns}`
popup.setHTML(html)
}
}).catch(_ => {})
}
}
map.on("click", e => {
const { ip, subnet } = ipFromMouseEvent(e)
setPopup(e.lngLat, ip, subnet)
})
const main = document.getElementsByTagName("main")[0]
main.addEventListener("click", e => {
if (e.target === main && curPopup) curPopup.remove()
})
const rangeInput = document.getElementById("range-input")
const jumpParam = new URLSearchParams(location.search).get("jump")
const jump = range => {
const { ip, subnet } = ipFromString(range)
const { x, y } = hilbertToCoords(ip + Math.floor((2 ** (32 - subnet)) / 2))
const center = new maplibregl.MercatorCoordinate((x + 0.5) / 0x10000, (y + 0.5) / 0x10000, 0).toLngLat()
const zoom = subnet / 2 - 0.501
map.jumpTo({ center, zoom })
if (subnet > 24) setPopup(center, ip, subnet)
}
const setJumpParam = jump => {
const searchParams = new URLSearchParams(location.search)
if (jump) searchParams.set("jump", jump)
else searchParams.delete("jump")
history.pushState("", "", searchParams.size === 0 ? location.pathname : `?${searchParams}`)
}
try {
if (rangeInput.value) jump(rangeInput.value)
setJumpParam(rangeInput.value)
rangeInput.setCustomValidity("")
} catch (e) {
rangeInput.setCustomValidity(e.message)
jump(jumpParam)
rangeInput.value = jumpParam
} catch(e) {
setJumpParam(undefined)
}
rangeInput.reportValidity()
}
rangeInput.addEventListener("change", jumpToInput)
document.getElementById("range-button").addEventListener("click", jumpToInput)
const jumpToInput = () => {
try {
if (rangeInput.value) jump(rangeInput.value)
setJumpParam(rangeInput.value)
rangeInput.setCustomValidity("")
} catch (e) {
rangeInput.setCustomValidity(e.message)
}
rangeInput.reportValidity()
}
rangeInput.addEventListener("change", jumpToInput)
document.getElementById("range-button").addEventListener("click", jumpToInput)
})
</script>
</body>
</html>

View file

@ -7,7 +7,6 @@ license = "AGPLv3"
[tool.poetry.dependencies]
python = "^3.11"
pypng = "^0.20220715.0"
numpy = "^1.26.4"
numpy-hilbert-curve = "^1.0.1"
cmap = "^0.1.3"