Skip to content

Commit 8d3923b

Browse files
change get_messages signature as discussed (kayleg#2)
* This fix changes the returned values from `get_messages` from a tuple of vectors with ack_ids and message, to a HashMap where ack_id is stored with a Result that contains either the successfully deserialized message or the error that occurred. Co-authored-by: Robert Walker <robert@walker.st>
1 parent 0f2568b commit 8d3923b

File tree

5 files changed

+34
-150
lines changed

5 files changed

+34
-150
lines changed

examples/long_lived.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,18 @@ fn schedule_pubsub_pull(subscription: Arc<Subscription>) {
2727
task::spawn(async move {
2828
while subscription.client().is_running() {
2929
match subscription.get_messages::<UpdatePacket>().await {
30-
Ok((packets, acks)) => {
31-
for packet in packets {
32-
println!("Received: {:?}", packet);
33-
}
34-
35-
if !acks.is_empty() {
36-
let s = Arc::clone(&subscription);
37-
task::spawn(async move {
38-
s.acknowledge_messages(acks).await;
39-
});
30+
Ok(messages) => {
31+
for (result, ack_id) in messages {
32+
match result {
33+
Ok(message) => {
34+
println!("Received: {:?}", message);
35+
let subscription = Arc::clone(&subscription);
36+
task::spawn(async move {
37+
subscription.acknowledge_messages(vec![ack_id]).await;
38+
});
39+
}
40+
Err(e) => log::error!("Failed converting to UpdatePacket: {}", e),
41+
}
4042
}
4143
}
4244
Err(e) => eprintln!("Failed to pull PubSub messages: {}", e),

examples/serde.rs

Lines changed: 0 additions & 57 deletions
This file was deleted.

examples/singleshot.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,22 @@ async fn main() {
3636
Ok(p) => p,
3737
};
3838

39-
let order_sub = Arc::new(pubsub.subscribe(config.pubsub_subscription));
40-
match order_sub.clone().get_messages::<UpdatePacket>().await {
41-
Ok((packets, acks)) => {
42-
for packet in packets {
43-
println!("Received: {:?}", packet);
44-
}
45-
46-
if !acks.is_empty() {
47-
task::spawn(async move { order_sub.acknowledge_messages(acks).await })
48-
.await // This will block until acknowledgement is complete
49-
.expect("Failed to acknoweldge messages");
39+
let subscription = Arc::new(pubsub.subscribe(config.pubsub_subscription));
40+
match subscription.get_messages::<UpdatePacket>().await {
41+
Ok(messages) => {
42+
for (result, ack_id) in messages {
43+
match result {
44+
Ok(message) => {
45+
println!("Received: {:?}", message);
46+
let subscription = Arc::clone(&subscription);
47+
task::spawn(async move {
48+
subscription.acknowledge_messages(vec![ack_id]).await;
49+
});
50+
}
51+
Err(e) => log::error!("Failed converting to UpdatePacket: {}", e),
52+
}
5053
}
5154
}
52-
Err(e) => println!("Error Checking PubSub: {}", e),
55+
Err(e) => eprintln!("Failed to pull PubSub messages: {}", e),
5356
}
5457
}

examples/subscribe_to_topic.rs

Lines changed: 0 additions & 54 deletions
This file was deleted.

src/subscription.rs

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ impl Subscription {
6060

6161
pub async fn get_messages<T: FromPubSubMessage>(
6262
&self,
63-
) -> Result<(Vec<T>, Vec<String>), error::Error> {
63+
) -> Result<Vec<(Result<T, error::Error>, String)>, error::Error> {
6464
let client = self
6565
.client
6666
.as_ref()
@@ -76,6 +76,7 @@ impl Subscription {
7676
*req.uri_mut() = uri.clone();
7777

7878
let response = client.hyper_client().request(req).await?;
79+
7980
if response.status() == StatusCode::NOT_FOUND {
8081
return Err(error::Error::PubSub {
8182
code: 404,
@@ -88,24 +89,13 @@ impl Subscription {
8889
if let Some(e) = response.error {
8990
return Err(e);
9091
}
91-
let messages = response.received_messages.unwrap_or_default();
92-
let ack_ids: Vec<String> = messages
93-
.as_slice()
94-
.iter()
95-
.map(|packet| packet.ack_id.clone())
96-
.collect();
97-
let packets = messages
92+
let messages = response
93+
.received_messages
94+
.unwrap_or_default()
9895
.into_iter()
99-
.filter_map(|packet| match T::from(packet.message) {
100-
Ok(o) => Some(o),
101-
Err(e) => {
102-
error!("Failed converting pubsub {}", e,);
103-
None
104-
}
105-
})
96+
.map(|m| (T::from(m.message), m.ack_id))
10697
.collect();
107-
108-
Ok((packets, ack_ids))
98+
Ok(messages)
10999
}
110100

111101
pub async fn destroy(self) -> Result<(), error::Error> {

0 commit comments

Comments
 (0)