-
Notifications
You must be signed in to change notification settings - Fork 19
Expand file tree
/
Copy pathsubscribe_to_topic.rs
More file actions
58 lines (48 loc) · 1.56 KB
/
subscribe_to_topic.rs
File metadata and controls
58 lines (48 loc) · 1.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
use cloud_pubsub::error;
use cloud_pubsub::{Client, EncodedMessage, FromPubSubMessage};
use serde_derive::Deserialize;
#[derive(Deserialize)]
struct Config {
topic: String,
google_application_credentials: String,
}
#[derive(Debug)]
struct UpdatePacket(String);
impl FromPubSubMessage for UpdatePacket {
fn from(message: EncodedMessage) -> Result<Self, error::Error> {
match message.decode() {
Ok(bytes) => Ok(UpdatePacket(String::from_utf8_lossy(&bytes).into_owned())),
Err(e) => Err(error::Error::from(e)),
}
}
}
#[tokio::main]
async fn main() {
let config: Config = envy::from_env().expect("ENV is not valid");
let pubsub = Client::new(config.google_application_credentials)
.await
.expect("Failed to initialize pubsub");
let topic = pubsub.topic(config.topic);
let sub = topic.subscribe().await.expect("Failed to subscribe");
println!("Subscribed to topic with: {}", sub.name);
let packets = sub
.clone()
.get_messages::<UpdatePacket>()
.await
.expect("Error Checking PubSub");
for packet in &packets {
println!("Received: {:?}", packet);
}
if !packets.is_empty() {
let acks = packets
.into_iter()
.map(|packet| packet.1)
.collect::<Vec<_>>();
sub.acknowledge_messages(acks).await;
} else {
println!("Cleaning up");
drop(pubsub);
sub.destroy().await.expect("Failed deleting subscription");
println!("Successfully deleted subscription");
}
}