Skip to content

Commit 0366bc8

Browse files
committed
Added example of long lived process
1 parent 3be2ad8 commit 0366bc8

File tree

4 files changed

+134
-3
lines changed

4 files changed

+134
-3
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "cloud-pubsub"
3-
version = "0.3.0"
3+
version = "0.3.1"
44
authors = ["Kayle Gishen <k@bkdsw.com>"]
55
edition = "2018"
66
description = "Google Cloud PubSub Client"
@@ -25,3 +25,4 @@ base64 = "0.10.1"
2525
lazy_static = "1.2.0"
2626
envy = "0.4"
2727
rand = "0.6"
28+
ctrlc = { git = "https://github.com/kayleg/rust-ctrlc" }

examples/long_lived.rs

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
use cloud_pubsub::error;
2+
use cloud_pubsub::{Client, EncodedMessage, FromPubSubMessage, Subscription};
3+
use ctrlc;
4+
use futures::future::lazy;
5+
use serde_derive::Deserialize;
6+
use std::sync::Arc;
7+
use std::time::Duration;
8+
use tokio::prelude::*;
9+
10+
#[derive(Deserialize)]
11+
struct Config {
12+
topic: String,
13+
google_application_credentials: String,
14+
}
15+
16+
#[derive(Debug)]
17+
struct UpdatePacket(String);
18+
19+
impl FromPubSubMessage for UpdatePacket {
20+
fn from(message: EncodedMessage) -> Result<Self, error::Error> {
21+
match message.decode() {
22+
Ok(bytes) => Ok(UpdatePacket(String::from_utf8_lossy(&bytes).into_owned())),
23+
Err(e) => Err(error::Error::from(e)),
24+
}
25+
}
26+
}
27+
28+
fn schedule_pubsub_pull(subscription: Arc<Subscription>) {
29+
let sub = Arc::clone(&subscription);
30+
let get = subscription
31+
.clone()
32+
.get_messages::<UpdatePacket>()
33+
.map(move |(packets, acks)| {
34+
for packet in packets {
35+
println!("Received: {:?}", packet);
36+
}
37+
38+
if !acks.is_empty() {
39+
tokio::spawn(subscription.acknowledge_messages(acks));
40+
}
41+
})
42+
.map_err(|e| println!("Error Checking PubSub: {}", e))
43+
.and_then(|_| {
44+
if sub.client().is_running() {
45+
schedule_pubsub_pull(sub);
46+
} else {
47+
println!("No longer pulling");
48+
}
49+
50+
Ok(())
51+
});
52+
tokio::spawn(get);
53+
}
54+
55+
fn main() {
56+
let parsed_env = envy::from_env::<Config>();
57+
if let Err(e) = parsed_env {
58+
eprintln!("ENV is not valid: {}", e);
59+
std::process::exit(1);
60+
}
61+
let config = parsed_env.unwrap();
62+
63+
let pubsub = match Client::new(config.google_application_credentials) {
64+
Err(e) => panic!("Failed to initialize pubsub: {}", e),
65+
Ok(p) => Arc::new(p),
66+
};
67+
68+
let topic = Arc::new(pubsub.topic(config.topic));
69+
70+
tokio::run(lazy(move || {
71+
pubsub.spawn_token_renew(Duration::from_secs(2));
72+
73+
topic
74+
.subscribe()
75+
.map(move |subscription| {
76+
println!("Subscribed to topic with: {}", subscription.name);
77+
let sub = Arc::new(subscription);
78+
79+
schedule_pubsub_pull(Arc::clone(&sub));
80+
81+
ctrlc::set_once_handler(move || {
82+
println!("Cleaning up");
83+
pubsub.stop();
84+
let client = Arc::clone(&pubsub);
85+
println!("Waiting for current Pull to finish....");
86+
while Arc::strong_count(&sub) > 1 {}
87+
println!("Deleting subscription");
88+
if let Ok(s) = Arc::try_unwrap(sub) {
89+
let destroy = s
90+
.destroy()
91+
.map(|_r| println!("Successfully deleted subscription"))
92+
.map_err(|e| eprintln!("Failed deleting subscription: {}", e))
93+
.then(move |_| {
94+
if let Ok(c) = Arc::try_unwrap(client) {
95+
drop(c);
96+
}
97+
Ok(())
98+
});
99+
tokio::run(destroy);
100+
} else {
101+
eprintln!("Subscription was still ownded");
102+
}
103+
})
104+
.expect("Error setting Ctrl-C handler");
105+
})
106+
.map_err(|e| eprintln!("Failed to subscribe: {}", e))
107+
}));
108+
}

src/client.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
use crate::error;
22
use crate::subscription::Subscription;
33
use crate::topic::Topic;
4+
use futures::future;
45
use futures::prelude::*;
56
use goauth::auth::JwtClaims;
67
use goauth::scopes::Scope;
78
use hyper::client::HttpConnector;
89
use hyper_tls::HttpsConnector;
910
use smpl_jwt::Jwt;
11+
use std::sync::atomic::{AtomicBool, Ordering};
1012
use std::sync::{Arc, RwLock};
1113
use std::time::{Duration, Instant};
1214
use tokio::timer::Interval;
@@ -18,6 +20,7 @@ pub struct State {
1820
credentials_path: String,
1921
project: Option<String>,
2022
hyper_client: HyperClient,
23+
running: Arc<AtomicBool>,
2124
}
2225

2326
impl State {
@@ -49,6 +52,7 @@ impl Client {
4952
credentials_path,
5053
project: None,
5154
hyper_client: setup_hyper(),
55+
running: Arc::new(AtomicBool::new(true)),
5256
})));
5357

5458
match client.refresh_token() {
@@ -80,9 +84,23 @@ impl Client {
8084
}
8185
}
8286

83-
pub fn spawn_token_renew(&self) {
87+
pub fn is_running(&self) -> bool {
88+
self.0.read().unwrap().running.load(Ordering::SeqCst)
89+
}
90+
91+
pub fn stop(&self) {
92+
self.0
93+
.write()
94+
.unwrap()
95+
.running
96+
.store(false, Ordering::SeqCst)
97+
}
98+
99+
pub fn spawn_token_renew(&self, interval: Duration) {
84100
let mut client = self.clone();
85-
let renew_token_task = Interval::new(Instant::now(), Duration::from_secs(15 * 60))
101+
let c = self.clone();
102+
let renew_token_task = Interval::new(Instant::now(), interval)
103+
.take_while(move |_| future::ok(c.is_running()))
86104
.for_each(move |_instant| {
87105
println!("Renewing pubsub token");
88106
if let Err(e) = client.refresh_token() {

src/subscription.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,8 @@ impl Subscription {
118118
.and_then(|_res| Ok(()))
119119
.from_err::<error::Error>()
120120
}
121+
122+
pub fn client(&self) -> &Client {
123+
self.client.as_ref().unwrap()
124+
}
121125
}

0 commit comments

Comments
 (0)