-
Notifications
You must be signed in to change notification settings - Fork 101
Implement websocket support in Dropshot as an Extractor and #[channel] macro #403
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,95 @@ | ||
| // Copyright 2022 Oxide Computer Company | ||
| /*! | ||
| * Example use of Dropshot with a websocket endpoint. | ||
| */ | ||
|
|
||
| use dropshot::channel; | ||
| use dropshot::ApiDescription; | ||
| use dropshot::ConfigDropshot; | ||
| use dropshot::ConfigLogging; | ||
| use dropshot::ConfigLoggingLevel; | ||
| use dropshot::HttpServerStarter; | ||
| use dropshot::Query; | ||
| use dropshot::RequestContext; | ||
| use dropshot::WebsocketConnection; | ||
| use futures_util::SinkExt; | ||
| use schemars::JsonSchema; | ||
| use serde::Deserialize; | ||
| use std::sync::Arc; | ||
| use tungstenite::protocol::Role; | ||
| use tungstenite::Message; | ||
|
|
||
| #[tokio::main] | ||
| async fn main() -> Result<(), String> { | ||
| /* | ||
| * We must specify a configuration with a bind address. We'll use 127.0.0.1 | ||
| * since it's available and won't expose this server outside the host. We | ||
| * request port 0, which allows the operating system to pick any available | ||
| * port. | ||
| */ | ||
| let config_dropshot: ConfigDropshot = Default::default(); | ||
|
|
||
| /* | ||
| * For simplicity, we'll configure an "info"-level logger that writes to | ||
| * stderr assuming that it's a terminal. | ||
| */ | ||
| let config_logging = | ||
| ConfigLogging::StderrTerminal { level: ConfigLoggingLevel::Info }; | ||
| let log = config_logging | ||
| .to_logger("example-basic") | ||
| .map_err(|error| format!("failed to create logger: {}", error))?; | ||
|
|
||
| /* | ||
| * Build a description of the API. | ||
| */ | ||
| let mut api = ApiDescription::new(); | ||
| api.register(example_api_websocket_counter).unwrap(); | ||
|
|
||
| /* | ||
| * Set up the server. | ||
| */ | ||
| let server = HttpServerStarter::new(&config_dropshot, api, (), &log) | ||
| .map_err(|error| format!("failed to create server: {}", error))? | ||
| .start(); | ||
|
|
||
| /* | ||
| * Wait for the server to stop. Note that there's not any code to shut down | ||
| * this server, so we should never get past this point. | ||
| */ | ||
| server.await | ||
| } | ||
|
|
||
| /* | ||
| * HTTP API interface | ||
| */ | ||
|
|
||
| #[derive(Deserialize, JsonSchema)] | ||
| struct QueryParams { | ||
| start: Option<u8>, | ||
| } | ||
|
|
||
| /** | ||
| * An eternally-increasing sequence of bytes, wrapping on overflow, starting | ||
| * from the value given for the query parameter "start." | ||
| */ | ||
| #[channel { | ||
| protocol = WEBSOCKETS, | ||
| path = "/counter", | ||
| }] | ||
| async fn example_api_websocket_counter( | ||
| _rqctx: Arc<RequestContext<()>>, | ||
| upgraded: WebsocketConnection, | ||
| qp: Query<QueryParams>, | ||
| ) -> dropshot::WebsocketChannelResult { | ||
| let mut ws = tokio_tungstenite::WebSocketStream::from_raw_socket( | ||
| upgraded.into_inner(), | ||
| Role::Server, | ||
| None, | ||
| ) | ||
| .await; | ||
| let mut count = qp.into_inner().start.unwrap_or(0); | ||
| while ws.send(Message::Binary(vec![count])).await.is_ok() { | ||
| count = count.wrapping_add(1); | ||
| } | ||
| Ok(()) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -45,7 +45,7 @@ pub struct ApiEndpoint<Context: ServerContext> { | |
| pub summary: Option<String>, | ||
| pub description: Option<String>, | ||
| pub tags: Vec<String>, | ||
| pub paginated: bool, | ||
| pub extension_mode: ExtensionMode, | ||
| pub visible: bool, | ||
| } | ||
|
|
||
|
|
@@ -78,7 +78,7 @@ impl<'a, Context: ServerContext> ApiEndpoint<Context> { | |
| summary: None, | ||
| description: None, | ||
| tags: vec![], | ||
| paginated: func_parameters.paginated, | ||
| extension_mode: func_parameters.extension_mode, | ||
| visible: true, | ||
| } | ||
| } | ||
|
|
@@ -688,11 +688,20 @@ impl<Context: ServerContext> ApiDescription<Context> { | |
| }) | ||
| .next(); | ||
|
|
||
| if endpoint.paginated { | ||
| operation.extensions.insert( | ||
| crate::pagination::PAGINATION_EXTENSION.to_string(), | ||
| serde_json::json! {true}, | ||
| ); | ||
| match endpoint.extension_mode { | ||
| ExtensionMode::None => {} | ||
| ExtensionMode::Paginated => { | ||
| operation.extensions.insert( | ||
| crate::pagination::PAGINATION_EXTENSION.to_string(), | ||
| serde_json::json! {true}, | ||
| ); | ||
| } | ||
| ExtensionMode::Websocket => { | ||
| operation.extensions.insert( | ||
| crate::websocket::WEBSOCKET_EXTENSION.to_string(), | ||
| serde_json::json!({}), | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| let response = if let Some(schema) = &endpoint.response.schema { | ||
|
|
@@ -1579,6 +1588,22 @@ pub struct TagExternalDocs { | |
| pub url: String, | ||
| } | ||
|
|
||
| /** | ||
| * Dropshot/Progenitor features used by endpoints which are not a part of the base OpenAPI spec. | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what's the plan here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. per our discussion on matrix, i'll file some issues after this merges about turning this AsyncAPI support (and perhaps coming up with a general.. philosophy? on how to approach other extensions, like pagination or anything we come up with in the future that's outside of the scope of AsyncAPI in particular) |
||
| */ | ||
| #[derive(Copy, Clone, Debug, Eq, PartialEq)] | ||
| pub enum ExtensionMode { | ||
| None, | ||
| Paginated, | ||
| Websocket, | ||
| } | ||
|
|
||
| impl Default for ExtensionMode { | ||
|
lifning marked this conversation as resolved.
|
||
| fn default() -> Self { | ||
| ExtensionMode::None | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod test { | ||
| use super::j2oas_schema; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this means a given endpoint can only be in one mode at a time, right? That makes sense for pagination and websockets, since they're mutually exclusive. I have no idea what other kinds of extensions we might dream up, but is it possible we could want to use more than one at a time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah - i suppose if such a thing comes to pass, we could change to be something more sophisticated than a bare enum, but i didn't want to try to anticipate unknown unknowns yet, just represent that pagination and websockets are mutually exclusive extensions