Skip to content

Commit 33f2af8

Browse files
committed
This fix changes the returned values from get_messages from a tuple of
vectors with ack_ids and message, to a HashMap where ack_id is stored with a Result that contains either the successfully deserialized message or the error that occured.
1 parent 8d3923b commit 33f2af8

File tree

3 files changed

+114
-0
lines changed

3 files changed

+114
-0
lines changed

examples/serde.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
use cloud_pubsub::error;
2+
use cloud_pubsub::{Client, EncodedMessage, FromPubSubMessage};
3+
use serde_derive::Deserialize;
4+
use std::sync::Arc;
5+
use tokio::task;
6+
7+
#[derive(Deserialize)]
8+
struct Config {
9+
pubsub_subscription: String,
10+
google_application_credentials: String,
11+
}
12+
13+
#[derive(Debug, Deserialize)]
14+
struct UpdatePacket {
15+
id: u64,
16+
name: String,
17+
}
18+
19+
impl FromPubSubMessage for UpdatePacket {
20+
fn from(message: EncodedMessage) -> Result<Self, error::Error> {
21+
match message.decode() {
22+
Ok(bytes) => serde_json::from_slice::<UpdatePacket>(&bytes).map_err(error::Error::from),
23+
Err(e) => Err(error::Error::from(e)),
24+
}
25+
}
26+
}
27+
28+
#[tokio::main]
29+
async fn main() {
30+
let parsed_env = envy::from_env::<Config>();
31+
if let Err(e) = parsed_env {
32+
eprintln!("ENV is not valid: {}", e);
33+
std::process::exit(1);
34+
}
35+
let config = parsed_env.unwrap();
36+
37+
let pubsub = match Client::new(config.google_application_credentials).await {
38+
Err(e) => panic!("Failed to initialize pubsub: {}", e),
39+
Ok(p) => p,
40+
};
41+
42+
let order_sub = Arc::new(pubsub.subscribe(config.pubsub_subscription));
43+
match order_sub.clone().get_messages::<UpdatePacket>().await {
44+
Ok(packets) => {
45+
for packet in &packets {
46+
println!("Received: {:?}", packet);
47+
}
48+
49+
let acks: Vec<String> = packets.into_iter().map(|packet| packet.0).collect();
50+
if !acks.is_empty() {
51+
task::spawn(async move { order_sub.acknowledge_messages(acks).await })
52+
.await // This will block until acknowledgement is complete
53+
.expect("Failed to acknoweldge messages");
54+
}
55+
}
56+
Err(e) => println!("Error Checking PubSub: {}", e),
57+
}
58+
}

examples/subscribe_to_topic.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
use cloud_pubsub::error;
2+
use cloud_pubsub::{Client, EncodedMessage, FromPubSubMessage};
3+
use serde_derive::Deserialize;
4+
5+
#[derive(Deserialize)]
6+
struct Config {
7+
topic: String,
8+
google_application_credentials: String,
9+
}
10+
11+
#[derive(Debug)]
12+
struct UpdatePacket(String);
13+
14+
impl FromPubSubMessage for UpdatePacket {
15+
fn from(message: EncodedMessage) -> Result<Self, error::Error> {
16+
match message.decode() {
17+
Ok(bytes) => Ok(UpdatePacket(String::from_utf8_lossy(&bytes).into_owned())),
18+
Err(e) => Err(error::Error::from(e)),
19+
}
20+
}
21+
}
22+
23+
#[tokio::main]
24+
async fn main() {
25+
let config: Config = envy::from_env().expect("ENV is not valid");
26+
27+
let pubsub = Client::new(config.google_application_credentials)
28+
.await
29+
.expect("Failed to initialize pubsub");
30+
31+
let topic = pubsub.topic(config.topic);
32+
33+
let sub = topic.subscribe().await.expect("Failed to subscribe");
34+
35+
println!("Subscribed to topic with: {}", sub.name);
36+
let packets = sub
37+
.clone()
38+
.get_messages::<UpdatePacket>()
39+
.await
40+
.expect("Error Checking PubSub");
41+
42+
for packet in &packets {
43+
println!("Received: {:?}", packet);
44+
}
45+
46+
if !packets.is_empty() {
47+
let acks = packets.into_iter().map(|packet| packet.0).collect();
48+
sub.acknowledge_messages(acks).await;
49+
} else {
50+
println!("Cleaning up");
51+
drop(pubsub);
52+
sub.destroy().await.expect("Failed deleting subscription");
53+
println!("Successfully deleted subscription");
54+
}
55+
}

src/subscription.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use hyper::{Method, StatusCode};
66
use lazy_static::lazy_static;
77
use log::error;
88
use serde_derive::{Deserialize, Serialize};
9+
use std::collections::HashMap;
910
use std::env;
1011

1112
lazy_static! {

0 commit comments

Comments
 (0)