Skip to content

Commit e32717d

Browse files
committed
Determine project from credentials. Only require subscription short name
1 parent 5cc6cb3 commit e32717d

File tree

5 files changed

+63
-16
lines changed

5 files changed

+63
-16
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "cloud-pubsub"
3-
version = "0.1.0"
3+
version = "0.2.0"
44
authors = ["Kayle Gishen <k@bkdsw.com>"]
55
edition = "2018"
66
description = "Google Cloud PubSub Client"

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,10 @@ fn main() {
5757

5858
## Subscribing
5959

60-
The full canonical subscription name should be used when subscribing:
60+
### Connecting to existing subscription
6161

6262
```
63-
let sub = my_client.subscribe("projects/{google-project-id}/subscriptions/{subscription-name}")
63+
let sub = my_client.subscribe("subscription-name")
6464
```
6565

6666
Currently a subscription needs to be created using the gcloud CLI or the web interface. There are

src/client.rs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use tokio::timer::Interval;
1111
pub struct State {
1212
token: Option<goauth::auth::Token>,
1313
credentials_path: String,
14+
project: Option<String>,
1415
}
1516

1617
impl State {
@@ -21,20 +22,24 @@ impl State {
2122
pub fn access_token(&self) -> &str {
2223
self.token.as_ref().unwrap().access_token()
2324
}
25+
26+
pub fn project(&self) -> &str {
27+
&(self.project.as_ref().expect("Google Cloud Project has not been set. If it is not in your credential file, call set_project to set it manually."))
28+
}
2429
}
2530

2631
pub trait Client
2732
where
28-
Self: std::marker::Sized + TokenAuthenticated,
33+
Self: std::marker::Sized,
2934
{
3035
fn create(credentials_path: String) -> Result<Self, error::Error>;
3136
fn subscribe(&self, name: String) -> Subscription;
32-
}
37+
fn set_project(&mut self, project: String);
38+
fn project(&self) -> String;
3339

34-
pub trait TokenAuthenticated {
3540
fn spawn_token_renew(&self);
3641
fn refresh_token(&mut self) -> Result<(), error::Error>;
37-
fn get_token(&self) -> Result<goauth::auth::Token, goauth::error::GOErr>;
42+
fn get_token(&mut self) -> Result<goauth::auth::Token, goauth::error::GOErr>;
3843
}
3944

4045
pub type BaseClient = Arc<RwLock<State>>;
@@ -43,6 +48,7 @@ impl Client for BaseClient {
4348
fn subscribe(&self, name: String) -> Subscription {
4449
Subscription {
4550
client: self.clone(),
51+
canonical_name: format!("projects/{}/subscriptions/{}", self.project(), name),
4652
name,
4753
}
4854
}
@@ -51,16 +57,23 @@ impl Client for BaseClient {
5157
let mut client = Arc::new(RwLock::new(State {
5258
token: None,
5359
credentials_path,
60+
project: None,
5461
}));
5562

5663
match client.refresh_token() {
5764
Ok(_) => Ok(client),
5865
Err(e) => Err(e),
5966
}
6067
}
61-
}
6268

63-
impl TokenAuthenticated for BaseClient {
69+
fn set_project(&mut self, project: String) {
70+
self.write().unwrap().project = Some(project);
71+
}
72+
73+
fn project(&self) -> String {
74+
self.read().unwrap().project().to_string()
75+
}
76+
6477
fn spawn_token_renew(&self) {
6578
let mut client = self.clone();
6679
let renew_token_task = Interval::new(Instant::now(), Duration::from_secs(15 * 60))
@@ -86,11 +99,13 @@ impl TokenAuthenticated for BaseClient {
8699
}
87100
}
88101

89-
fn get_token(&self) -> Result<goauth::auth::Token, goauth::error::GOErr> {
102+
fn get_token(&mut self) -> Result<goauth::auth::Token, goauth::error::GOErr> {
90103
let credentials =
91104
goauth::credentials::Credentials::from_file(&self.read().unwrap().credentials_path)
92105
.unwrap();
93106

107+
self.set_project(credentials.project());
108+
94109
let claims = JwtClaims::new(
95110
credentials.iss(),
96111
&Scope::PubSub,

src/error.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,22 @@
1+
use serde_derive::Deserialize;
12
use std::fmt;
23

4+
#[derive(Deserialize)]
5+
#[serde(untagged)]
36
pub enum Error {
7+
#[serde(skip_deserializing)]
48
PubSubAuth(goauth::error::GOErr),
9+
#[serde(skip_deserializing)]
510
Http(hyper::Error),
11+
#[serde(skip_deserializing)]
612
Json(serde_json::Error),
13+
#[serde(skip_deserializing)]
714
Base64(base64::DecodeError),
15+
PubSub {
16+
code: i32,
17+
message: String,
18+
status: String,
19+
},
820
}
921

1022
impl fmt::Display for Error {
@@ -14,6 +26,15 @@ impl fmt::Display for Error {
1426
Error::Http(e) => write!(f, "Hyper({})", e),
1527
Error::Json(e) => write!(f, "Json({})", e),
1628
Error::Base64(e) => write!(f, "Base64({})", e),
29+
Error::PubSub {
30+
code,
31+
message,
32+
status,
33+
} => write!(
34+
f,
35+
"PubSub(code: {}, status: {}, message: {})",
36+
code, status, message
37+
),
1738
}
1839
}
1940
}

src/subscription.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::sync::{Arc, RwLock};
1111
struct Response {
1212
#[serde(alias = "receivedMessages")]
1313
received_messages: Option<Vec<Message>>,
14+
error: Option<error::Error>,
1415
}
1516

1617
#[derive(Serialize)]
@@ -22,16 +23,20 @@ struct AckRequest {
2223
pub struct Subscription {
2324
pub(crate) client: Arc<RwLock<State>>,
2425
pub name: String,
26+
pub(crate) canonical_name: String,
2527
}
2628

2729
impl Subscription {
2830
pub fn acknowledge_messages(&self, ids: Vec<String>) -> impl Future<Item = (), Error = ()> {
2931
let https = HttpsConnector::new(4).unwrap();
3032
let client = hyper::Client::builder().build::<_, hyper::Body>(https);
3133

32-
let uri: hyper::Uri = format!("https://pubsub.googleapis.com/v1/{}:acknowledge", self.name)
33-
.parse()
34-
.unwrap();
34+
let uri: hyper::Uri = format!(
35+
"https://pubsub.googleapis.com/v1/{}:acknowledge",
36+
self.canonical_name
37+
)
38+
.parse()
39+
.unwrap();
3540

3641
let json = serde_json::to_string(&AckRequest { ack_ids: ids }).unwrap();
3742

@@ -66,9 +71,12 @@ impl Subscription {
6671
let https = HttpsConnector::new(4).unwrap();
6772
let client = hyper::Client::builder().build::<_, hyper::Body>(https);
6873

69-
let uri: hyper::Uri = format!("https://pubsub.googleapis.com/v1/{}:pull", self.name)
70-
.parse()
71-
.unwrap();
74+
let uri: hyper::Uri = format!(
75+
"https://pubsub.googleapis.com/v1/{}:pull",
76+
self.canonical_name
77+
)
78+
.parse()
79+
.unwrap();
7280

7381
let json = r#"{"maxMessages": 100}"#;
7482

@@ -96,6 +104,9 @@ impl Subscription {
96104
.from_err::<error::Error>()
97105
.and_then(|body| {
98106
let response: Response = serde_json::from_slice(&body)?;
107+
if let Some(e) = response.error {
108+
return Err(e);
109+
}
99110
let messages = response.received_messages.unwrap_or_default();
100111
let ack_ids: Vec<String> = messages
101112
.as_slice()

0 commit comments

Comments
 (0)