Skip to content

Commit e40eb16

Browse files
authored
Merge branch 'master' into 17-logging
2 parents 6d13fed + 83b6cc4 commit e40eb16

File tree

5 files changed

+47
-44
lines changed

5 files changed

+47
-44
lines changed

examples/long_lived.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,18 @@ fn schedule_pubsub_pull(subscription: Arc<Subscription>) {
2727
task::spawn(async move {
2828
while subscription.client().is_running() {
2929
match subscription.get_messages::<UpdatePacket>().await {
30-
Ok((packets, acks)) => {
31-
for packet in packets {
32-
println!("Received: {:?}", packet);
33-
}
34-
35-
if !acks.is_empty() {
36-
let s = Arc::clone(&subscription);
37-
task::spawn(async move {
38-
s.acknowledge_messages(acks).await;
39-
});
30+
Ok(messages) => {
31+
for (result, ack_id) in messages {
32+
match result {
33+
Ok(message) => {
34+
println!("Received: {:?}", message);
35+
let subscription = Arc::clone(&subscription);
36+
task::spawn(async move {
37+
subscription.acknowledge_messages(vec![ack_id]).await;
38+
});
39+
}
40+
Err(e) => log::error!("Failed converting to UpdatePacket: {}", e),
41+
}
4042
}
4143
}
4244
Err(e) => eprintln!("Failed to pull PubSub messages: {}", e),

examples/serde.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,15 @@ async fn main() {
4141

4242
let order_sub = Arc::new(pubsub.subscribe(config.pubsub_subscription));
4343
match order_sub.clone().get_messages::<UpdatePacket>().await {
44-
Ok((packets, acks)) => {
45-
for packet in packets {
44+
Ok(packets) => {
45+
for packet in &packets {
4646
println!("Received: {:?}", packet);
4747
}
4848

49+
let acks: Vec<String> = packets
50+
.into_iter()
51+
.map(|packet| packet.1)
52+
.collect::<Vec<_>>();
4953
if !acks.is_empty() {
5054
task::spawn(async move { order_sub.acknowledge_messages(acks).await })
5155
.await // This will block until acknowledgement is complete

examples/singleshot.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,22 @@ async fn main() {
3636
Ok(p) => p,
3737
};
3838

39-
let order_sub = Arc::new(pubsub.subscribe(config.pubsub_subscription));
40-
match order_sub.clone().get_messages::<UpdatePacket>().await {
41-
Ok((packets, acks)) => {
42-
for packet in packets {
43-
println!("Received: {:?}", packet);
44-
}
45-
46-
if !acks.is_empty() {
47-
task::spawn(async move { order_sub.acknowledge_messages(acks).await })
48-
.await // This will block until acknowledgement is complete
49-
.expect("Failed to acknoweldge messages");
39+
let subscription = Arc::new(pubsub.subscribe(config.pubsub_subscription));
40+
match subscription.get_messages::<UpdatePacket>().await {
41+
Ok(messages) => {
42+
for (result, ack_id) in messages {
43+
match result {
44+
Ok(message) => {
45+
println!("Received: {:?}", message);
46+
let subscription = Arc::clone(&subscription);
47+
task::spawn(async move {
48+
subscription.acknowledge_messages(vec![ack_id]).await;
49+
});
50+
}
51+
Err(e) => log::error!("Failed converting to UpdatePacket: {}", e),
52+
}
5053
}
5154
}
52-
Err(e) => println!("Error Checking PubSub: {}", e),
55+
Err(e) => eprintln!("Failed to pull PubSub messages: {}", e),
5356
}
5457
}

examples/subscribe_to_topic.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,21 @@ async fn main() {
3333
let sub = topic.subscribe().await.expect("Failed to subscribe");
3434

3535
println!("Subscribed to topic with: {}", sub.name);
36-
let (packets, acks) = sub
36+
let packets = sub
3737
.clone()
3838
.get_messages::<UpdatePacket>()
3939
.await
4040
.expect("Error Checking PubSub");
4141

42-
for packet in packets {
42+
for packet in &packets {
4343
println!("Received: {:?}", packet);
4444
}
4545

46-
if !acks.is_empty() {
46+
if !packets.is_empty() {
47+
let acks = packets
48+
.into_iter()
49+
.map(|packet| packet.1)
50+
.collect::<Vec<_>>();
4751
sub.acknowledge_messages(acks).await;
4852
} else {
4953
println!("Cleaning up");

src/subscription.rs

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ impl Subscription {
5959

6060
pub async fn get_messages<T: FromPubSubMessage>(
6161
&self,
62-
) -> Result<(Vec<T>, Vec<String>), error::Error> {
62+
) -> Result<Vec<(Result<T, error::Error>, String)>, error::Error> {
6363
let client = self
6464
.client
6565
.as_ref()
@@ -75,6 +75,7 @@ impl Subscription {
7575
*req.uri_mut() = uri.clone();
7676

7777
let response = client.hyper_client().request(req).await?;
78+
7879
if response.status() == StatusCode::NOT_FOUND {
7980
return Err(error::Error::PubSub {
8081
code: 404,
@@ -87,24 +88,13 @@ impl Subscription {
8788
if let Some(e) = response.error {
8889
return Err(e);
8990
}
90-
let messages = response.received_messages.unwrap_or_default();
91-
let ack_ids: Vec<String> = messages
92-
.as_slice()
93-
.iter()
94-
.map(|packet| packet.ack_id.clone())
95-
.collect();
96-
let packets = messages
91+
let messages = response
92+
.received_messages
93+
.unwrap_or_default()
9794
.into_iter()
98-
.filter_map(|packet| match T::from(packet.message) {
99-
Ok(o) => Some(o),
100-
Err(e) => {
101-
log::error!("Failed converting pubsub {}", e);
102-
None
103-
}
104-
})
95+
.map(|m| (T::from(m.message), m.ack_id))
10596
.collect();
106-
107-
Ok((packets, ack_ids))
97+
Ok(messages)
10898
}
10999

110100
pub async fn destroy(self) -> Result<(), error::Error> {

0 commit comments

Comments
 (0)