Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import au.org.aodn.ogcapi.server.core.exception.GeoserverFieldsNotFoundException;
import au.org.aodn.ogcapi.server.core.exception.UnauthorizedServerException;
import au.org.aodn.ogcapi.server.core.model.enumeration.SseEventName;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

Expand Down Expand Up @@ -50,7 +51,7 @@ public static void handleError(Exception e, String uuid, SseEmitter emitter, Run
case VALIDATION_ERROR -> {
log.warn("Invalid parameter error for UUID {}: {}", uuid, e.getMessage());
emitter.send(SseEmitter.event()
.name("error")
.name(SseEventName.ERROR.getValue())
.data(Map.of(
"message", "Invalid parameter error",
"timestamp", System.currentTimeMillis()
Expand All @@ -61,7 +62,7 @@ public static void handleError(Exception e, String uuid, SseEmitter emitter, Run
case UNAUTHORIZED_SERVER_ERROR -> {
log.warn("Unauthorized wfs server for UUID {}", uuid, e);
emitter.send(SseEmitter.event()
.name("error")
.name(SseEventName.ERROR.getValue())
.data(Map.of(
"message", "Unauthorized wfs server",
"timestamp", System.currentTimeMillis()
Expand All @@ -72,7 +73,7 @@ public static void handleError(Exception e, String uuid, SseEmitter emitter, Run
case DOWNLOADABLE_FIELDS_ERROR -> {
log.warn("No downloadable fields found for UUID {}", uuid, e);
emitter.send(SseEmitter.event()
.name("error")
.name(SseEventName.ERROR.getValue())
.data(Map.of(
"message", "No downloadable fields found",
"timestamp", System.currentTimeMillis()
Expand All @@ -83,7 +84,7 @@ public static void handleError(Exception e, String uuid, SseEmitter emitter, Run
case UNKNOWN_ERROR -> {
log.warn("Unknown error for UUID {}", uuid, e);
emitter.send(SseEmitter.event()
.name("error")
.name(SseEventName.ERROR.getValue())
.data(Map.of(
"message", "Unknown error: " + e.getMessage(),
"timestamp", System.currentTimeMillis()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public enum ProcessIdEnum {
DOWNLOAD_DATASET("download"),
DOWNLOAD_WFS_SSE("downloadWfs"),
DOWNLOAD_WFS_ESTIMATE("estimateWfsDownload"),
DOWNLOAD_CO_ESTIMATE("estimateCOdownload"),
UNKNOWN("");

private final String value;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package au.org.aodn.ogcapi.server.core.model.enumeration;

import lombok.Getter;

/**
* Names of the SSE events sent by the download / size-estimate streams.
* <p>
* The frontend identifies events by these literal names, so they form a wire
* contract — change a value only together with the portal (and keep them
* aligned with the data-access-service sse_wrapper vocabulary).
*/
@Getter
public enum SseEventName {
CONNECTION_ESTABLISHED("connection-established"),
KEEP_ALIVE("keep-alive"),
DOWNLOAD_STARTED("download-started"),
FILE_CHUNK("file-chunk"),
DOWNLOAD_COMPLETE("download-complete"),
ESTIMATE_COMPLETE("estimate-complete"),
ESTIMATE_FAILED("estimate-failed"),
ERROR("error");

private final String value;

SseEventName(String value) {
this.value = value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.net.URLEncoder;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Service("DataAccessService")
Expand All @@ -35,51 +36,97 @@ public void init() {
httpEntity = new HttpEntity<>(headers);
}

public byte[] getWaveBuoys(String from, String to){
public byte[] getWaveBuoys(String from, String to) {
String waveBuoysUrlTemplate = UriComponentsBuilder.fromUriString(dasConfig.host + "/api/v1/das/data/feature-collection/wave-buoy")
.queryParam("start_date","{start_date}")
.queryParam("end_date","{end_date}")
.queryParam("start_date", "{start_date}")
.queryParam("end_date", "{end_date}")
.encode()
.toUriString();
Map<String,String> params = new HashMap<>();
Map<String, String> params = new HashMap<>();
params.put("start_date", from);
params.put("end_date",to);
params.put("end_date", to);

return httpClient.exchange(waveBuoysUrlTemplate, HttpMethod.GET,httpEntity,byte[].class,params).getBody();
return httpClient.exchange(waveBuoysUrlTemplate, HttpMethod.GET, httpEntity, byte[].class, params).getBody();
}

public byte[] getWaveBuoysLatestDate(){
public byte[] getWaveBuoysLatestDate() {
String waveBuoysUrlTemplate = UriComponentsBuilder.fromUriString(dasConfig.host + "/api/v1/das/data/feature-collection/wave-buoy/latest")
.encode()
.toUriString();

return httpClient.exchange(waveBuoysUrlTemplate, HttpMethod.GET,httpEntity,byte[].class).getBody();
return httpClient.exchange(waveBuoysUrlTemplate, HttpMethod.GET, httpEntity, byte[].class).getBody();
}

public byte[] getWaveBuoyData(String from, String to, String buoy){
public byte[] getWaveBuoyData(String from, String to, String buoy) {
String encodedBuoy = URLEncoder.encode(buoy, java.nio.charset.StandardCharsets.UTF_8);

String waveBuoyDataUrlTemplate = UriComponentsBuilder.fromUriString(dasConfig.host + "/api/v1/das/data/feature-collection/wave-buoy/" + encodedBuoy)
.queryParam("start_date","{start_date}")
.queryParam("end_date","{end_date}")
.queryParam("start_date", "{start_date}")
.queryParam("end_date", "{end_date}")
.encode()
.toUriString();
Map<String,String> params = new HashMap<>();
Map<String, String> params = new HashMap<>();
params.put("start_date", from);
params.put("end_date",to);
params.put("end_date", to);

return httpClient.exchange(waveBuoyDataUrlTemplate, HttpMethod.GET,httpEntity,byte[].class,params).getBody();
return httpClient.exchange(waveBuoyDataUrlTemplate, HttpMethod.GET, httpEntity, byte[].class, params).getBody();
}

public byte[] getLatestWaveBuoySites(){
public byte[] getLatestWaveBuoySites() {
String waveBuoysUrlTemplate = UriComponentsBuilder.fromUriString(dasConfig.host + "/api/v1/das/data/feature-collection/wave-buoy/all")
.encode()
.toUriString();

return httpClient.exchange(waveBuoysUrlTemplate, HttpMethod.GET,httpEntity,byte[].class).getBody();
return httpClient.exchange(waveBuoysUrlTemplate, HttpMethod.GET, httpEntity, byte[].class).getBody();
}

public boolean isCollectionSupported(String collectionId){
/**
* Call the data-access-service cloud-optimised size estimate endpoint.
* Returns the raw JSON response body so the SSE layer can forward
* it to the frontend unchanged.
*/
public String estimateCloudOptimisedDownloadSize(
String uuid,
List<String> keys,
String startDate,
String endDate,
Object multiPolygon,
List<String> columns,
String outputFormat) {

String url = UriComponentsBuilder.fromUriString(dasConfig.host + "/api/v1/das/data/{uuid}/estimate_size")
.encode()
.toUriString();

Map<String, Object> body = new HashMap<>();
body.put("keys", keys);
body.put("start_date", startDate != null ? startDate : "non-specified");
body.put("end_date", endDate != null ? endDate : "non-specified");
body.put("output_format", outputFormat);
// multi_polygon is accepted as a GeoJSON object or string; forward as-is.
if (multiPolygon != null) {
body.put("multi_polygon", multiPolygon);
}
// columns is not sent today (frontend doesn't subset columns yet, and the
// batch download grabs all variables), keeping the estimate aligned.
if (columns != null) {
body.put("columns", columns);
}

HttpHeaders headers = new HttpHeaders();
headers.set(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE);
headers.set(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
headers.set("X-API-KEY", dasConfig.secret);

HttpEntity<Map<String, Object>> entity = new HttpEntity<>(body, headers);

Map<String, String> uriVars = new HashMap<>();
uriVars.put("uuid", uuid);

return httpClient.exchange(url, HttpMethod.POST, entity, String.class, uriVars).getBody();
}

public boolean isCollectionSupported(String collectionId) {
final String waveBuoyRealtimeCollectionID = "b299cdcd-3dee-48aa-abdd-e0fcdbb9cadc";
return waveBuoyRealtimeCollectionID.contentEquals(collectionId);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package au.org.aodn.ogcapi.server.core.service.geoserver.wfs;

import au.org.aodn.ogcapi.server.core.configuration.CacheConfig;
import au.org.aodn.ogcapi.server.core.model.enumeration.SseEventName;
import au.org.aodn.ogcapi.server.core.model.ogc.FeatureRequest;
import au.org.aodn.ogcapi.server.core.util.DatetimeUtils;
import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -191,9 +192,9 @@ public BigInteger estimateDownloadSize(
}

/**
* Execute WFS request with SSE support
* Call the WFS server and stream the downloaded data to the client over SSE
*/
public void executeWfsRequestWithSse(
public void streamWfsDataWithSse(
String wfsRequestUrl,
String uuid,
String layerName,
Expand All @@ -213,7 +214,7 @@ public void executeWfsRequestWithSse(

// Send download started confirmation
emitter.send(SseEmitter.event()
.name("download-started")
.name(SseEventName.DOWNLOAD_STARTED.getValue())
.data(Map.of(
"message", "WFS server responded, starting data stream...",
"timestamp", System.currentTimeMillis()
Expand All @@ -236,7 +237,7 @@ public void executeWfsRequestWithSse(
String encodedData = Base64.getEncoder().encodeToString(chunkBytes);

emitter.send(SseEmitter.event()
.name("file-chunk")
.name(SseEventName.FILE_CHUNK.getValue())
.data(Map.of(
"chunkNumber", ++chunkNumber,
"data", encodedData,
Expand All @@ -254,7 +255,7 @@ public void executeWfsRequestWithSse(
if (chunkBuffer.size() > 0) {
String encodedData = Base64.getEncoder().encodeToString(chunkBuffer.toByteArray());
emitter.send(SseEmitter.event()
.name("file-chunk")
.name(SseEventName.FILE_CHUNK.getValue())
.data(Map.of(
"chunkNumber", ++chunkNumber,
"data", encodedData,
Expand All @@ -266,7 +267,7 @@ public void executeWfsRequestWithSse(

// Send completion event
emitter.send(SseEmitter.event()
.name("download-complete")
.name(SseEventName.DOWNLOAD_COMPLETE.getValue())
.data(Map.of(
"totalBytes", totalBytes,
"totalChunks", chunkNumber,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package au.org.aodn.ogcapi.server.core.service.sse;

import au.org.aodn.ogcapi.server.core.exception.wfs.WfsErrorHandler;
import au.org.aodn.ogcapi.server.core.model.enumeration.SseEventName;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

/**
* A single SSE stream's runtime state: the underlying {@link SseEmitter}, an
* optional keep-alive ticker, and the cleanup of those resources.
* <p>
* Created and managed by {@link SseStreamHandler}; the work lambda receives one
* to send events and (optionally) start a keep-alive.
*/
@Slf4j
public class SseSession {

private final String contextId;

@Getter
private final SseEmitter emitter;

private final AtomicReference<ScheduledFuture<?>> keepAliveTaskRef = new AtomicReference<>();
private final AtomicReference<ScheduledExecutorService> keepAliveExecutorRef = new AtomicReference<>();

public SseSession(String contextId, SseEmitter emitter) {
this.contextId = contextId;
this.emitter = emitter;
}

/**
* Send a named SSE event with the given payload.
*/
public void send(SseEventName eventName, Object data) throws IOException {
emitter.send(SseEmitter.event().name(eventName.getValue()).data(data));
}

/**
* Start sending a {@code keep-alive} event every {@code intervalSeconds}. The
* payload is recomputed each tick by {@code payloadSupplier} so callers can
* reflect changing state (e.g. whether an upstream server has responded yet).
*/
public void startKeepAlive(long intervalSeconds, Supplier<Object> payloadSupplier) {
ScheduledExecutorService keepAliveExecutor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture<?> keepAliveTask = keepAliveExecutor.scheduleAtFixedRate(() -> {
try {
send(SseEventName.KEEP_ALIVE, payloadSupplier.get());
} catch (Exception e) {
WfsErrorHandler.handleError(e, contextId, emitter, this::cleanup);
}
}, intervalSeconds, intervalSeconds, TimeUnit.SECONDS);

keepAliveTaskRef.set(keepAliveTask);
keepAliveExecutorRef.set(keepAliveExecutor);
}

/**
* Complete the stream, closing the connection to the client.
*/
public void complete() {
emitter.complete();
}

/**
* Cancel the keep-alive task and shut down its executor. Idempotent.
*/
public void cleanup() {
try {
ScheduledFuture<?> keepAliveTask = keepAliveTaskRef.get();
if (keepAliveTask != null && !keepAliveTask.isCancelled()) {
keepAliveTask.cancel(false);
}

ScheduledExecutorService keepAliveExecutor = keepAliveExecutorRef.get();
if (keepAliveExecutor != null && !keepAliveExecutor.isShutdown()) {
keepAliveExecutor.shutdown();
}
} catch (Exception e) {
log.error("Error during cleanup for SSE stream: {}", contextId, e);
}
}
}
Loading
Loading