File tree Expand file tree Collapse file tree 2 files changed +21
-2
lines changed
Expand file tree Collapse file tree 2 files changed +21
-2
lines changed Original file line number Diff line number Diff line change @@ -3,6 +3,8 @@ use base64::{self, Engine};
33use serde_derive:: { Deserialize , Serialize } ;
44use std:: collections:: HashMap ;
55
6+ pub ( crate ) const PING : & str = "ping" ;
7+
68#[ derive( Deserialize , Clone , Serialize ) ]
79pub struct EncodedMessage {
810 data : String ,
@@ -38,6 +40,10 @@ impl EncodedMessage {
3840 let data = base64:: engine:: general_purpose:: STANDARD . encode ( & incoming) ;
3941 EncodedMessage { data, attributes }
4042 }
43+
44+ pub fn ping ( ) -> Self {
45+ Self :: new_binary ( & PING . to_string ( ) , None )
46+ }
4147}
4248
4349#[ derive( Deserialize ) ]
Original file line number Diff line number Diff line change 11use crate :: client:: Client ;
22use crate :: error;
3- use crate :: message:: { FromPubSubMessage , Message } ;
3+ use crate :: message:: { FromPubSubMessage , Message , PING } ;
44use hyper:: body:: Buf ;
55use hyper:: { Method , StatusCode } ;
66use lazy_static:: lazy_static;
@@ -84,6 +84,7 @@ impl Subscription {
8484 message : self . name . clone ( ) ,
8585 } ) ;
8686 }
87+
8788 let body = hyper:: body:: aggregate ( response) . await ?;
8889 let response: Response = serde_json:: from_reader ( body. reader ( ) ) ?;
8990 if let Some ( e) = response. error {
@@ -93,7 +94,19 @@ impl Subscription {
9394 . received_messages
9495 . unwrap_or_default ( )
9596 . into_iter ( )
96- . map ( |m| ( T :: from ( m. message ) , m. ack_id ) )
97+ . filter_map ( |m| {
98+ if let Ok ( data) = m. message . decode ( ) {
99+ if data == PING . as_bytes ( ) {
100+ log:: debug!( "{}" , PING ) ;
101+ return None ;
102+ } else {
103+ // Echo as info, in case we want to check the queue status
104+ log:: info!( "echo - {}" , String :: from_utf8_lossy( & data) ) ;
105+ return None ;
106+ }
107+ }
108+ Some ( ( T :: from ( m. message ) , m. ack_id ) )
109+ } )
97110 . collect ( ) ;
98111 Ok ( messages)
99112 }
You can’t perform that action at this time.
0 commit comments