Skip to content

Commit 3be2ad8

Browse files
committed
Added Topic object and ability to subscribe to a topic.
1 parent e32717d commit 3be2ad8

File tree

9 files changed

+325
-96
lines changed

9 files changed

+325
-96
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "cloud-pubsub"
3-
version = "0.2.0"
3+
version = "0.3.0"
44
authors = ["Kayle Gishen <k@bkdsw.com>"]
55
edition = "2018"
66
description = "Google Cloud PubSub Client"
@@ -24,3 +24,4 @@ serde_json = "1.0"
2424
base64 = "0.10.1"
2525
lazy_static = "1.2.0"
2626
envy = "0.4"
27+
rand = "0.6"

README.md

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,5 +63,57 @@ fn main() {
6363
let sub = my_client.subscribe("subscription-name")
6464
```
6565

66-
Currently a subscription needs to be created using the gcloud CLI or the web interface. There are
67-
plans to support creating a subscription directly from a topic name.
66+
### Subscribing to a topic
67+
68+
When subscribing to a topic, a random subscription name will be generated. To prevent dangling
69+
subscriptions, you need to explicitly call `subscription.destroy()`.
70+
71+
```
72+
let topic = Arc::new(pubsub.topic("update-topic".to_owned()));
73+
tokio::run(lazy(move || {
74+
topic
75+
.subscribe()
76+
.map(|subscription| {
77+
println!("Subscribed to topic with: {}", subscription.name);
78+
let sub = Arc::new(subscription);
79+
let get = sub
80+
.clone()
81+
.get_messages::<UpdatePacket>()
82+
.map(move |(packets, acks)| {
83+
for packet in packets {
84+
println!("Received: {:?}", packet);
85+
}
86+
87+
if !acks.is_empty() {
88+
tokio::spawn(sub.acknowledge_messages(acks));
89+
} else {
90+
println!("Cleaning up");
91+
if let Ok(s) = Arc::try_unwrap(sub) {
92+
let destroy = s
93+
.destroy()
94+
.map(|_r| println!("Successfully deleted subscription"))
95+
.map_err(|e| eprintln!("Failed deleting subscription: {}", e))
96+
.then(move |_| {
97+
drop(pubsub);
98+
Ok(())
99+
});
100+
tokio::spawn(destroy);
101+
} else {
102+
eprintln!("Subscription is still owned");
103+
}
104+
}
105+
})
106+
.map_err(|e| println!("Error Checking PubSub: {}", e));
107+
tokio::spawn(get);
108+
})
109+
.map_err(|e| eprintln!("Failed to subscribe: {}", e))
110+
}));
111+
```
112+
113+
## Known Caveats
114+
115+
A Hyper Client is retained by each PubSub client for reuse. This can cause the tokio runtime to not exit until the Hyper Client keep-alive terminates. To shutdown the runtime immediately on complete, the PubSub client needs to be explicitly dropped.
116+
117+
```
118+
drop(my_client);
119+
```

examples/serde.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use cloud_pubsub::error;
2-
use cloud_pubsub::{BaseClient, Client, EncodedMessage, FromPubSubMessage};
2+
use cloud_pubsub::{Client, EncodedMessage, FromPubSubMessage};
33
use futures::future::lazy;
44
use serde_derive::Deserialize;
55
use std::sync::Arc;
@@ -34,7 +34,7 @@ fn main() {
3434
}
3535
let config = parsed_env.unwrap();
3636

37-
let pubsub = match BaseClient::create(config.google_application_credentials) {
37+
let pubsub = match Client::new(config.google_application_credentials) {
3838
Err(e) => panic!("Failed to initialize pubsub: {}", e),
3939
Ok(p) => p,
4040
};
@@ -53,6 +53,7 @@ fn main() {
5353
if !acks.is_empty() {
5454
tokio::spawn(order_sub.acknowledge_messages(acks));
5555
}
56+
drop(pubsub);
5657
})
5758
.map_err(|e| println!("Error Checking PubSub: {}", e))
5859
}));

examples/singleshot.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use cloud_pubsub::error;
2-
use cloud_pubsub::{BaseClient, Client, EncodedMessage, FromPubSubMessage};
2+
use cloud_pubsub::{Client, EncodedMessage, FromPubSubMessage};
33
use futures::future::lazy;
44
use serde_derive::Deserialize;
55
use std::sync::Arc;
@@ -31,7 +31,7 @@ fn main() {
3131
}
3232
let config = parsed_env.unwrap();
3333

34-
let pubsub = match BaseClient::create(config.google_application_credentials) {
34+
let pubsub = match Client::new(config.google_application_credentials) {
3535
Err(e) => panic!("Failed to initialize pubsub: {}", e),
3636
Ok(p) => p,
3737
};
@@ -50,6 +50,8 @@ fn main() {
5050
if !acks.is_empty() {
5151
tokio::spawn(order_sub.acknowledge_messages(acks));
5252
}
53+
54+
drop(pubsub);
5355
})
5456
.map_err(|e| println!("Error Checking PubSub: {}", e))
5557
}));

examples/subscribe_to_topic.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
use cloud_pubsub::error;
2+
use cloud_pubsub::{Client, EncodedMessage, FromPubSubMessage};
3+
use futures::future::lazy;
4+
use serde_derive::Deserialize;
5+
use std::sync::Arc;
6+
use tokio::prelude::*;
7+
8+
#[derive(Deserialize)]
9+
struct Config {
10+
topic: String,
11+
google_application_credentials: String,
12+
}
13+
14+
#[derive(Debug)]
15+
struct UpdatePacket(String);
16+
17+
impl FromPubSubMessage for UpdatePacket {
18+
fn from(message: EncodedMessage) -> Result<Self, error::Error> {
19+
match message.decode() {
20+
Ok(bytes) => Ok(UpdatePacket(String::from_utf8_lossy(&bytes).into_owned())),
21+
Err(e) => Err(error::Error::from(e)),
22+
}
23+
}
24+
}
25+
26+
fn main() {
27+
let parsed_env = envy::from_env::<Config>();
28+
if let Err(e) = parsed_env {
29+
eprintln!("ENV is not valid: {}", e);
30+
std::process::exit(1);
31+
}
32+
let config = parsed_env.unwrap();
33+
34+
let pubsub = match Client::new(config.google_application_credentials) {
35+
Err(e) => panic!("Failed to initialize pubsub: {}", e),
36+
Ok(p) => p,
37+
};
38+
39+
let topic = Arc::new(pubsub.topic(config.topic));
40+
41+
tokio::run(lazy(move || {
42+
topic
43+
.subscribe()
44+
.map(|subscription| {
45+
println!("Subscribed to topic with: {}", subscription.name);
46+
let sub = Arc::new(subscription);
47+
let get = sub
48+
.clone()
49+
.get_messages::<UpdatePacket>()
50+
.map(move |(packets, acks)| {
51+
for packet in packets {
52+
println!("Received: {:?}", packet);
53+
}
54+
55+
if !acks.is_empty() {
56+
tokio::spawn(sub.acknowledge_messages(acks));
57+
} else {
58+
println!("Cleaning up");
59+
if let Ok(s) = Arc::try_unwrap(sub) {
60+
let destroy = s
61+
.destroy()
62+
.map(|_r| println!("Successfully deleted subscription"))
63+
.map_err(|e| eprintln!("Failed deleting subscription: {}", e))
64+
.then(move |_| {
65+
drop(pubsub);
66+
Ok(())
67+
});
68+
tokio::spawn(destroy);
69+
} else {
70+
eprintln!("Subscription is still owned");
71+
}
72+
}
73+
})
74+
.map_err(|e| println!("Error Checking PubSub: {}", e));
75+
tokio::spawn(get);
76+
})
77+
.map_err(|e| eprintln!("Failed to subscribe: {}", e))
78+
}));
79+
}

src/client.rs

Lines changed: 76 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,23 @@
11
use crate::error;
22
use crate::subscription::Subscription;
3+
use crate::topic::Topic;
34
use futures::prelude::*;
45
use goauth::auth::JwtClaims;
56
use goauth::scopes::Scope;
7+
use hyper::client::HttpConnector;
8+
use hyper_tls::HttpsConnector;
69
use smpl_jwt::Jwt;
710
use std::sync::{Arc, RwLock};
811
use std::time::{Duration, Instant};
912
use tokio::timer::Interval;
1013

14+
type HyperClient = Arc<hyper::Client<HttpsConnector<HttpConnector>, hyper::Body>>;
15+
1116
pub struct State {
1217
token: Option<goauth::auth::Token>,
1318
credentials_path: String,
1419
project: Option<String>,
20+
hyper_client: HyperClient,
1521
}
1622

1723
impl State {
@@ -28,53 +34,53 @@ impl State {
2834
}
2935
}
3036

31-
pub trait Client
32-
where
33-
Self: std::marker::Sized,
34-
{
35-
fn create(credentials_path: String) -> Result<Self, error::Error>;
36-
fn subscribe(&self, name: String) -> Subscription;
37-
fn set_project(&mut self, project: String);
38-
fn project(&self) -> String;
39-
40-
fn spawn_token_renew(&self);
41-
fn refresh_token(&mut self) -> Result<(), error::Error>;
42-
fn get_token(&mut self) -> Result<goauth::auth::Token, goauth::error::GOErr>;
43-
}
44-
45-
pub type BaseClient = Arc<RwLock<State>>;
37+
pub struct Client(Arc<RwLock<State>>);
4638

47-
impl Client for BaseClient {
48-
fn subscribe(&self, name: String) -> Subscription {
49-
Subscription {
50-
client: self.clone(),
51-
canonical_name: format!("projects/{}/subscriptions/{}", self.project(), name),
52-
name,
53-
}
39+
impl Clone for Client {
40+
fn clone(&self) -> Self {
41+
Client(self.0.clone())
5442
}
43+
}
5544

56-
fn create(credentials_path: String) -> Result<Self, error::Error> {
57-
let mut client = Arc::new(RwLock::new(State {
45+
impl Client {
46+
pub fn new(credentials_path: String) -> Result<Self, error::Error> {
47+
let mut client = Client(Arc::new(RwLock::new(State {
5848
token: None,
5949
credentials_path,
6050
project: None,
61-
}));
51+
hyper_client: setup_hyper(),
52+
})));
6253

6354
match client.refresh_token() {
6455
Ok(_) => Ok(client),
6556
Err(e) => Err(e),
6657
}
6758
}
6859

69-
fn set_project(&mut self, project: String) {
70-
self.write().unwrap().project = Some(project);
60+
pub fn subscribe(&self, name: String) -> Subscription {
61+
Subscription {
62+
client: Some(self.clone()),
63+
name: format!("projects/{}/subscriptions/{}", self.project(), name),
64+
topic: None,
65+
}
7166
}
7267

73-
fn project(&self) -> String {
74-
self.read().unwrap().project().to_string()
68+
pub fn set_project(&mut self, project: String) {
69+
self.0.write().unwrap().project = Some(project);
7570
}
7671

77-
fn spawn_token_renew(&self) {
72+
pub fn project(&self) -> String {
73+
self.0.read().unwrap().project().to_string()
74+
}
75+
76+
pub fn topic(&self, name: String) -> Topic {
77+
Topic {
78+
client: Some(Client(self.0.clone())),
79+
name: format!("projects/{}/topics/{}", self.project(), name),
80+
}
81+
}
82+
83+
pub fn spawn_token_renew(&self) {
7884
let mut client = self.clone();
7985
let renew_token_task = Interval::new(Instant::now(), Duration::from_secs(15 * 60))
8086
.for_each(move |_instant| {
@@ -89,10 +95,10 @@ impl Client for BaseClient {
8995
tokio::spawn(renew_token_task);
9096
}
9197

92-
fn refresh_token(&mut self) -> Result<(), error::Error> {
98+
pub fn refresh_token(&mut self) -> Result<(), error::Error> {
9399
match self.get_token() {
94100
Ok(token) => {
95-
self.write().unwrap().token = Some(token);
101+
self.0.write().unwrap().token = Some(token);
96102
Ok(())
97103
}
98104
Err(e) => Err(error::Error::from(e)),
@@ -101,7 +107,7 @@ impl Client for BaseClient {
101107

102108
fn get_token(&mut self) -> Result<goauth::auth::Token, goauth::error::GOErr> {
103109
let credentials =
104-
goauth::credentials::Credentials::from_file(&self.read().unwrap().credentials_path)
110+
goauth::credentials::Credentials::from_file(&self.0.read().unwrap().credentials_path)
105111
.unwrap();
106112

107113
self.set_project(credentials.project());
@@ -116,4 +122,41 @@ impl Client for BaseClient {
116122
let jwt = Jwt::new(claims, credentials.rsa_key().unwrap(), None);
117123
goauth::get_token_with_creds(&jwt, &credentials)
118124
}
125+
126+
pub(crate) fn request<T: Into<hyper::Body>>(
127+
&self,
128+
method: hyper::Method,
129+
data: T,
130+
) -> hyper::Request<hyper::Body>
131+
where
132+
hyper::Body: std::convert::From<T>,
133+
{
134+
let mut req = hyper::Request::new(hyper::Body::from(data));
135+
*req.method_mut() = method;
136+
req.headers_mut().insert(
137+
hyper::header::CONTENT_TYPE,
138+
hyper::header::HeaderValue::from_static("application/json"),
139+
);
140+
let readable = self.0.read().unwrap();
141+
req.headers_mut().insert(
142+
hyper::header::AUTHORIZATION,
143+
hyper::header::HeaderValue::from_str(&format!(
144+
"{} {}",
145+
readable.token_type(),
146+
readable.access_token()
147+
))
148+
.unwrap(),
149+
);
150+
req
151+
}
152+
153+
pub fn hyper_client(&self) -> HyperClient {
154+
self.0.read().unwrap().hyper_client.clone()
155+
}
156+
}
157+
158+
fn setup_hyper() -> HyperClient {
159+
// 4 is number of blocking DNS threads
160+
let https = HttpsConnector::new(4).unwrap();
161+
Arc::new(hyper::Client::builder().build::<_, hyper::Body>(https))
119162
}

src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ pub mod client;
22
pub mod error;
33
pub mod message;
44
pub mod subscription;
5+
pub mod topic;
56

6-
pub use client::{BaseClient, Client};
7+
pub use client::Client;
78
pub use message::{EncodedMessage, FromPubSubMessage};
89
pub use subscription::Subscription;
10+
pub use topic::Topic;

0 commit comments

Comments
 (0)