Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func main() {
mux.HandleFunc("PUT /collections/{cid}", putCollection)
mux.HandleFunc("DELETE /collections/{cid}", deleteCollection)
mux.HandleFunc("GET /collections/{cid}/trajectories", trajectories)
mux.HandleFunc("GET /collections/{cid}/timeseries", timeseries)
mux.HandleFunc("GET /collections/{cid}/items", streamItems)
mux.HandleFunc("GET /collections/{cid}/items/{fid}", getItem)
mux.HandleFunc("POST /collections/{cid}/items", postItem)
Expand Down
181 changes: 181 additions & 0 deletions timeseries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Aggregate time series for the animated tutorial's evolving chart: the average
// of a temporal property across the fleet, grouped by ship type, over uniform
// time buckets. It is the animated sibling of the static per-category charts —
// the map clock sweeps it in lockstep. MEOS does the work: each vessel's stored
// property is instant-sampled (tsample) and the samples are averaged per
// (ship type, bucket). Instant sampling is exact at the sampled instants.
package main

import (
"compress/gzip"
"encoding/json"
"io"
"net/http"
"sort"
"strconv"
"strings"
"time"
)

// parseFloatSeries reads (timestamp, value) pairs from a MovingFloat MF-JSON
// document, handling both the flat form and a sequence set.
func parseFloatSeries(b []byte) []struct {
t int64
v float64
} {
var doc map[string]any
if json.Unmarshal(b, &doc) != nil {
return nil
}
var out []struct {
t int64
v float64
}
collect := func(m map[string]any) {
dts, _ := m["datetimes"].([]any)
vals, _ := m["values"].([]any)
n := len(dts)
if len(vals) < n {
n = len(vals)
}
for i := 0; i < n; i++ {
ts, _ := dts[i].(string)
tm, err := time.Parse(time.RFC3339, rfc3339Tz(ts))
if err != nil {
continue
}
v, ok := vals[i].(float64)
if !ok {
continue
}
out = append(out, struct {
t int64
v float64
}{tm.Unix(), v})
}
}
if seqs, ok := doc["sequences"].([]any); ok {
for _, s := range seqs {
if m, ok := s.(map[string]any); ok {
collect(m)
}
}
} else {
collect(doc)
}
return out
}

// timeseries serves the fleet-wide average of the `speed` property, grouped by
// ship type, over `step`-second buckets: {buckets:[epoch…], uom, series:{type:[avg|null…]}}.
func timeseries(w http.ResponseWriter, r *http.Request) {
cid := r.PathValue("cid")
tbl, _, ok := collectionMeta(r.Context(), cid)
if !ok {
httpErr(w, 404, "collection not found")
return
}
step := 600
if s := r.URL.Query().Get("step"); s != "" {
if n, err := strconv.Atoi(s); err == nil && n >= 60 {
step = n
}
}

rows, err := db.Query(r.Context(),
"SELECT COALESCE(s.ship_type, 'Other'), "+
"asMFJSON(tsample(p.vfloat, make_interval(secs => $2))) "+
"FROM "+ident(tbl)+" s JOIN mf_tproperty p ON p.cid = $1 AND p.fid = s.id AND p.name = 'speed'",
cid, step)
if err != nil {
httpErr(w, 500, err.Error())
return
}
defer rows.Close()

type acc struct {
sum float64
n int
}
data := map[string]map[int64]*acc{} // ship type -> bucket start (epoch) -> accumulator
var tmin int64 = 1<<62 - 1
var tmax int64 = -(1 << 62)
for rows.Next() {
var typ, mf *string
if rows.Scan(&typ, &mf) != nil || mf == nil {
continue
}
t := "Other"
if typ != nil && *typ != "" {
t = *typ
}
bk := data[t]
if bk == nil {
bk = map[int64]*acc{}
data[t] = bk
}
for _, p := range parseFloatSeries([]byte(*mf)) {
b := p.t - (p.t % int64(step))
a := bk[b]
if a == nil {
a = &acc{}
bk[b] = a
}
a.sum += p.v
a.n++
if b < tmin {
tmin = b
}
if b > tmax {
tmax = b
}
}
}
if err := rows.Err(); err != nil {
httpErr(w, 500, err.Error())
return
}

out := map[string]any{"uom": "kn", "stepSeconds": step}
if tmax < tmin {
out["buckets"] = []int64{}
out["series"] = map[string][]any{}
} else {
var buckets []int64
for b := tmin; b <= tmax; b += int64(step) {
buckets = append(buckets, b)
}
idx := map[int64]int{}
for i, b := range buckets {
idx[b] = i
}
series := map[string][]*float64{}
types := make([]string, 0, len(data))
for t := range data {
types = append(types, t)
}
sort.Strings(types)
for _, t := range types {
row := make([]*float64, len(buckets))
for b, a := range data[t] {
if a.n > 0 {
avg := a.sum / float64(a.n)
row[idx[b]] = &avg
}
}
series[t] = row
}
out["buckets"] = buckets
out["series"] = series
}

w.Header().Set("Content-Type", "application/json")
var wr io.Writer = w
if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
w.Header().Set("Content-Encoding", "gzip")
gz := gzip.NewWriter(w)
defer gz.Close()
wr = gz
}
json.NewEncoder(wr).Encode(out)
}
14 changes: 13 additions & 1 deletion tutorial-stream/map/fleet.html
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,16 @@
<title>MF Stream — animated fleet</title>
<style>
html, body { margin: 0; height: 100%; font-family: system-ui, sans-serif; }
#map { position: absolute; top: 0; left: 0; right: 0; bottom: 40px; }
#map { position: absolute; top: 0; left: 0; right: 0; bottom: 184px; }
#chartwrap {
position: absolute; left: 0; right: 0; bottom: 40px; height: 144px;
background: #11161c; z-index: 1;
}
#chart { display: block; width: 100%; height: 100%; }
#chartlabel {
position: absolute; left: 12px; top: 6px; z-index: 2;
color: #8aa0b2; font-size: 11px; pointer-events: none;
}
#timebar {
position: absolute; left: 0; right: 0; bottom: 0; height: 40px;
background: #11161c; color: #cdd6e0; z-index: 1;
Expand Down Expand Up @@ -34,6 +43,9 @@
</head>
<body>
<div id="map"></div>
<div id="chartwrap">
<canvas id="chart"></canvas>
</div>
<div id="timebar">
<button id="playpause" title="Play / pause">⏸</button>
<span id="tstart" class="tlabel"></span>
Expand Down
82 changes: 82 additions & 0 deletions tutorial-stream/map/src/fleet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const playBtn = document.getElementById('playpause')!;
const tnowEl = document.getElementById('tnow')!;
const tstartEl = document.getElementById('tstart')!;
const tendEl = document.getElementById('tend')!;
const chartCanvas = document.getElementById('chart') as HTMLCanvasElement;

const params = new URLSearchParams(location.search);
const CID = params.get('cid') ?? 'ships';
Expand Down Expand Up @@ -67,6 +68,70 @@ function parsePoint(wkt: string): [number, number] | null {
const sliderToPlayback = (v: number) => 180000 - (v / 1000) * 165000; // 180 s (slow) … 15 s (fast)
const yieldToBrowser = () => new Promise(r => setTimeout(r, 0));

// --- evolving chart: average speed over ground by ship type, swept by the clock ---
const SHIPTYPE_COLOR: Record<string, [number, number, number]> = {
Cargo: [44, 123, 182], Tanker: [215, 25, 28], Passenger: [77, 175, 74],
Fishing: [255, 160, 40], Tug: [152, 78, 163], Pilot: [120, 190, 210],
HSC: [240, 110, 200], Dredging: [160, 140, 90], SAR: [230, 200, 50],
Military: [110, 120, 130], Pleasure: [90, 200, 160], Sailing: [200, 180, 90],
Towing: [180, 120, 70], Other: [120, 130, 140],
};
interface ChartSeries { type: string; color: [number, number, number]; pts: { ms: number; v: number }[]; }
let chartSeries: ChartSeries[] = [];
let chartMaxV = 1;

// draw the chart each frame: full curves faint, the portion up to `clockMs` bold,
// a playhead at the current time — the animated sibling of the static category chart
function drawChart(clockMs: number, tminMs: number, tmaxMs: number): void {
const dpr = window.devicePixelRatio || 1;
const W = chartCanvas.clientWidth, H = chartCanvas.clientHeight;
if (chartCanvas.width !== W * dpr || chartCanvas.height !== H * dpr) {
chartCanvas.width = W * dpr; chartCanvas.height = H * dpr;
}
const ctx = chartCanvas.getContext('2d')!;
ctx.setTransform(dpr, 0, 0, dpr, 0, 0);
ctx.clearRect(0, 0, W, H);
if (!chartSeries.length || tmaxMs <= tminMs) return;
const padL = 34, padR = 8, padT = 20, padB = 14;
const xOf = (ms: number) => padL + ((ms - tminMs) / (tmaxMs - tminMs)) * (W - padL - padR);
const yOf = (v: number) => padT + (1 - v / chartMaxV) * (H - padT - padB);
// gridlines + y labels
ctx.strokeStyle = '#26303a'; ctx.fillStyle = '#5b6b78'; ctx.font = '10px system-ui'; ctx.lineWidth = 1;
for (let k = 0; k <= 2; k++) {
const v = (chartMaxV / 2) * k, y = yOf(v);
ctx.beginPath(); ctx.moveTo(padL, y); ctx.lineTo(W - padR, y); ctx.stroke();
ctx.fillText(v.toFixed(0), 6, y + 3);
}
const px = xOf(clockMs);
for (const s of chartSeries) {
const col = `rgb(${s.color[0]},${s.color[1]},${s.color[2]})`;
// faint full curve
ctx.strokeStyle = `rgba(${s.color[0]},${s.color[1]},${s.color[2]},0.25)`; ctx.lineWidth = 1;
ctx.beginPath();
s.pts.forEach((p, i) => (i ? ctx.lineTo(xOf(p.ms), yOf(p.v)) : ctx.moveTo(xOf(p.ms), yOf(p.v))));
ctx.stroke();
// bold up to the clock
ctx.strokeStyle = col; ctx.lineWidth = 1.8; ctx.beginPath();
let started = false;
for (const p of s.pts) {
if (p.ms > clockMs) break;
const x = xOf(p.ms), y = yOf(p.v);
started ? ctx.lineTo(x, y) : ctx.moveTo(x, y); started = true;
}
ctx.stroke();
}
// playhead
ctx.strokeStyle = '#ffc83c'; ctx.lineWidth = 1; ctx.beginPath();
ctx.moveTo(px, padT - 4); ctx.lineTo(px, H - padB); ctx.stroke();
// legend (top, left-to-right)
let lx = padL + 4; ctx.font = '11px system-ui'; ctx.textBaseline = 'middle';
for (const s of chartSeries) {
ctx.fillStyle = `rgb(${s.color[0]},${s.color[1]},${s.color[2]})`;
ctx.fillRect(lx, 8, 9, 9); lx += 13;
ctx.fillText(s.type, lx, 13); lx += ctx.measureText(s.type).width + 14;
}
}

async function main(): Promise<void> {
statusEl.textContent = 'initialising MEOS (WebAssembly)…';
await initMeos();
Expand All @@ -88,6 +153,22 @@ async function main(): Promise<void> {
let ids = [...byId.keys()];
if (MAX_SHIPS > 0 && ids.length > MAX_SHIPS) ids = ids.slice(0, MAX_SHIPS);

// fetch the aggregate time series (avg SOG per ship type) for the evolving chart
fetch(`/collections/${CID}/timeseries?step=600`).then(async tr => {
if (!tr.ok) return;
const ts = (await tr.json()) as { buckets: number[]; series: Record<string, (number | null)[]> };
const out: ChartSeries[] = [];
for (const [type, vals] of Object.entries(ts.series)) {
const pts = ts.buckets.map((b, i) => ({ ms: b * 1000, v: vals[i] })).filter(p => p.v != null) as { ms: number; v: number }[];
if (pts.length < 2) continue;
out.push({ type, color: SHIPTYPE_COLOR[type] ?? SHIPTYPE_COLOR.Other, pts });
}
// keep the most-populated types so the chart stays legible
out.sort((a, b) => b.pts.length - a.pts.length);
chartSeries = out.slice(0, 7);
chartMaxV = Math.max(1, Math.ceil(Math.max(...chartSeries.flatMap(s => s.pts.map(p => p.v))) / 5) * 5);
}).catch(() => { /* chart is optional */ });

// --- map + basemap + controls FIRST so the view shows immediately ---
const mapOpts: maplibregl.MapOptions = {
container: 'map',
Expand Down Expand Up @@ -200,6 +281,7 @@ async function main(): Promise<void> {
});
scrub.value = String(Math.round(((clockMs - tminMs) / spanMs) * 1000));
tnowEl.textContent = fmtClock(clockMs);
drawChart(clockMs, tminMs, tmaxMs);
requestAnimationFrame(frame);
}
requestAnimationFrame(frame);
Expand Down
5 changes: 3 additions & 2 deletions tutorial/setup/load_ships.sql
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ WHERE Latitude BETWEEN 40.18 AND 84.73
-- --- 3. One report per (vessel, instant) -----------------------------------
DROP TABLE IF EXISTS AISClean;
CREATE TABLE AISClean AS
SELECT DISTINCT ON (MMSI, T) MMSI, T, Name, Geom, SOG
SELECT DISTINCT ON (MMSI, T) MMSI, T, Name, Geom, SOG, ShipType
FROM AISInput
WHERE Geom IS NOT NULL
ORDER BY MMSI, T;
Expand All @@ -80,10 +80,11 @@ ORDER BY MMSI, T;
-- selection of both tutorials) are vessels that actually move.
DROP TABLE IF EXISTS ships;
CREATE TABLE ships AS
SELECT row_number() OVER (ORDER BY length(trip) DESC, mmsi)::int AS id, mmsi, name, trip
SELECT row_number() OVER (ORDER BY length(trip) DESC, mmsi)::int AS id, mmsi, name, ship_type, trip
FROM (
SELECT MMSI AS mmsi,
MIN(Name) FILTER (WHERE Name IS NOT NULL) AS name,
COALESCE(MIN(ShipType) FILTER (WHERE ShipType IS NOT NULL AND ShipType <> 'Undefined'), 'Other') AS ship_type,
tgeompointSeqSetGaps(array_agg(tgeompoint(Geom, T) ORDER BY T),
maxt := (:'gap')::interval) AS trip
FROM AISClean
Expand Down