|
| 1 | +//! Slack Socket Mode API Client Library |
| 2 | +
|
| 3 | +use futures_util::{SinkExt, StreamExt}; |
| 4 | +use url::Url; |
| 5 | + |
| 6 | +pub mod protocol; |
| 7 | + |
| 8 | +#[allow(unused_variables)] |
| 9 | +pub trait EventHandler { |
| 10 | + fn on_hello( |
| 11 | + &mut self, |
| 12 | + connection_info: protocol::ConnectionInfo, |
| 13 | + num_connections: u32, |
| 14 | + debug_info: protocol::DebugInfo, |
| 15 | + ) { |
| 16 | + } |
| 17 | + |
| 18 | + fn on_events_api(&mut self) {} |
| 19 | +} |
| 20 | + |
| 21 | +#[derive(Debug, Clone)] |
| 22 | +pub enum DisconnectReason { |
| 23 | + RefreshRequested, |
| 24 | + Other(String), |
| 25 | + Unknown, |
| 26 | +} |
| 27 | + |
| 28 | +#[derive(Debug)] |
| 29 | +pub enum RunError { |
| 30 | + HttpClientError(HttpClientError), |
| 31 | + OpenConnectionApiError(Option<String>), |
| 32 | + UrlParseError(url::ParseError), |
| 33 | + #[cfg(feature = "runtime-async-std")] |
| 34 | + TcpStreamConnectionError(async_std::io::Error), |
| 35 | + TlsConnectionError(std::io::Error), |
| 36 | + WebSocketError(tungstenite::Error), |
| 37 | +} |
| 38 | +impl From<HttpClientError> for RunError { |
| 39 | + fn from(e: HttpClientError) -> Self { |
| 40 | + Self::HttpClientError(e) |
| 41 | + } |
| 42 | +} |
| 43 | +impl From<url::ParseError> for RunError { |
| 44 | + fn from(e: url::ParseError) -> Self { |
| 45 | + Self::UrlParseError(e) |
| 46 | + } |
| 47 | +} |
| 48 | +impl From<tungstenite::Error> for RunError { |
| 49 | + fn from(e: tungstenite::Error) -> Self { |
| 50 | + Self::WebSocketError(e) |
| 51 | + } |
| 52 | +} |
| 53 | + |
| 54 | +pub async fn run<H: EventHandler + ?Sized>(token: &str, handler: &mut H) -> Result<DisconnectReason, RunError> { |
| 55 | + let ws_url = open_connection(token) |
| 56 | + .await? |
| 57 | + .map_err(RunError::OpenConnectionApiError)?; |
| 58 | + let ws_parsed = Url::parse(&ws_url)?; |
| 59 | + let ws_domain = ws_parsed.domain().expect("WebSocket URL doesn't have domain"); |
| 60 | + |
| 61 | + #[cfg(feature = "runtime-async-std")] |
| 62 | + let tcp_stream = async_std::net::TcpStream::connect((ws_domain, 443)) |
| 63 | + .await |
| 64 | + .map_err(RunError::TcpStreamConnectionError)?; |
| 65 | + let enc_stream = async_tls::TlsConnector::default() |
| 66 | + .connect(ws_domain, tcp_stream) |
| 67 | + .await |
| 68 | + .map_err(RunError::TlsConnectionError)?; |
| 69 | + let (mut ws, _) = async_tungstenite::client_async(&ws_url, enc_stream).await?; |
| 70 | + |
| 71 | + while let Some(msg) = ws.next().await { |
| 72 | + match msg? { |
| 73 | + tungstenite::Message::Text(t) => match serde_json::from_str(&t) { |
| 74 | + Ok(protocol::Message::Hello { |
| 75 | + num_connections, |
| 76 | + connection_info, |
| 77 | + debug_info, |
| 78 | + }) => { |
| 79 | + handler.on_hello(connection_info, num_connections, debug_info); |
| 80 | + } |
| 81 | + Ok(protocol::Message::Disconnect { reason, .. }) => { |
| 82 | + return match reason { |
| 83 | + "refresh_requested" => Ok(DisconnectReason::RefreshRequested), |
| 84 | + s => Ok(DisconnectReason::Other(String::from(s))), |
| 85 | + } |
| 86 | + } |
| 87 | + Err(e) => { |
| 88 | + log::error!("Failed to parse incoming message: {}: {:?}", t, e); |
| 89 | + } |
| 90 | + }, |
| 91 | + tungstenite::Message::Ping(p) => { |
| 92 | + ws.send(tungstenite::Message::Pong(p)).await?; |
| 93 | + } |
| 94 | + tungstenite::Message::Close(_) => { |
| 95 | + break; |
| 96 | + } |
| 97 | + m => { |
| 98 | + log::warn!("Unsupported WebSocket Message: {:?}", m); |
| 99 | + } |
| 100 | + } |
| 101 | + } |
| 102 | + |
| 103 | + Ok(DisconnectReason::Unknown) |
| 104 | +} |
| 105 | + |
| 106 | +#[cfg(feature = "runtime-async-std")] |
| 107 | +type HttpClientError = surf::Error; |
| 108 | + |
| 109 | +async fn open_connection(token: &str) -> Result<Result<String, Option<String>>, HttpClientError> { |
| 110 | + #[derive(serde::Deserialize)] |
| 111 | + pub struct ApiResponse { |
| 112 | + ok: bool, |
| 113 | + url: Option<String>, |
| 114 | + error: Option<String>, |
| 115 | + } |
| 116 | + let mut tok_bearer = String::with_capacity(token.len() + 7); |
| 117 | + tok_bearer.push_str("Bearer "); |
| 118 | + tok_bearer.push_str(token); |
| 119 | + |
| 120 | + #[cfg(feature = "runtime-async-std")] |
| 121 | + let r: ApiResponse = surf::post("https://slack.com/api/apps.connections.open") |
| 122 | + .header(surf::http::headers::AUTHORIZATION, tok_bearer) |
| 123 | + .recv_json() |
| 124 | + .await?; |
| 125 | + |
| 126 | + Ok(if r.ok { |
| 127 | + Ok(r.url.expect("no url returned from api?")) |
| 128 | + } else { |
| 129 | + Err(r.error) |
| 130 | + }) |
| 131 | +} |
0 commit comments