Skip to content

Commit c2c44e7

Browse files
author
Carlos Cabanero
committed
ACK Ping messages
1 parent c41ae41 commit c2c44e7

File tree

1 file changed

+21
-18
lines changed

1 file changed

+21
-18
lines changed

src/subscription.rs

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -90,25 +90,28 @@ impl Subscription {
9090
if let Some(e) = response.error {
9191
return Err(e);
9292
}
93-
let messages = response
94-
.received_messages
95-
.unwrap_or_default()
96-
.into_iter()
97-
.filter_map(|m| {
98-
if let Ok(data) = m.message.decode() {
99-
if data == PING.as_bytes() {
100-
log::debug!("{}", PING);
101-
return None;
102-
} else {
103-
// Echo as info, in case we want to check the queue status
104-
log::info!("echo - {}", String::from_utf8_lossy(&data));
105-
return None;
106-
}
93+
94+
let messages = response.received_messages.unwrap_or_default();
95+
96+
let mut batch = Vec::new();
97+
98+
for m in messages {
99+
if let Ok(data) = m.message.decode() {
100+
if data == PING.as_bytes() {
101+
log::debug!("{}", PING);
102+
self.acknowledge_messages(vec![m.ack_id]).await;
103+
continue;
104+
} else {
105+
// Echo as info, in case we want to check the queue status
106+
log::info!("echo - {}", String::from_utf8_lossy(&data));
107+
self.acknowledge_messages(vec![m.ack_id]).await;
108+
continue;
107109
}
108-
Some((T::from(m.message), m.ack_id))
109-
})
110-
.collect();
111-
Ok(messages)
110+
}
111+
batch.push((T::from(m.message), m.ack_id));
112+
}
113+
114+
Ok(batch)
112115
}
113116

114117
pub async fn destroy(self) -> Result<(), error::Error> {

0 commit comments

Comments
 (0)