Skip to content

Commit 623d9f1

Browse files
committed
Upgrade tokio to v1
1 parent 40b1fcc commit 623d9f1

File tree

10 files changed

+59
-80
lines changed

10 files changed

+59
-80
lines changed

Cargo.toml

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,20 @@ license = "MIT"
1111
readme = "README.md"
1212

1313
[dependencies]
14-
bytes = "0.5"
15-
hyper = "0.13"
16-
hyper-tls = "0.4.1"
17-
tokio = { version = "0.2", features=["full"] }
18-
goauth = "0.6"
19-
smpl_jwt = "0.4.0"
14+
bytes = "1"
15+
hyper = "0.14"
16+
hyper-tls = "0.5"
17+
tokio = "1"
18+
goauth = "0.9"
19+
smpl_jwt = "0.6"
2020
serde = "1.0"
2121
serde_derive = "1.0"
2222
serde_json = "1.0"
23-
base64 = "0.11"
23+
base64 = "0.13"
2424
lazy_static = "1.4.0"
25-
envy = "0.4"
26-
rand = "0.7"
25+
rand = "0.8"
2726
log = "0.4"
27+
28+
[dev-dependencies]
29+
envy = "0.4"
30+
tokio = { version = "1", features = ["rt-multi-thread", "macros", "signal"]}

examples/long_lived.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,10 @@ async fn main() -> Result<(), error::Error> {
5555
}
5656
let config = parsed_env.unwrap();
5757

58-
let pubsub = match Client::new(config.google_application_credentials) {
58+
let pubsub = match Client::new(config.google_application_credentials).await {
5959
Err(e) => panic!("Failed to initialize pubsub: {}", e),
6060
Ok(mut client) => {
61-
if let Err(e) = client.refresh_token() {
61+
if let Err(e) = client.refresh_token().await {
6262
eprintln!("Failed to get token: {}", e);
6363
} else {
6464
println!("Got fresh token");

examples/send_message.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ async fn main() {
1717
}
1818
let config = parsed_env.unwrap();
1919

20-
let pubsub = match Client::new(config.google_application_credentials) {
20+
let pubsub = match Client::new(config.google_application_credentials).await {
2121
Err(e) => panic!("Failed to initialize pubsub: {}", e),
2222
Ok(p) => Arc::new(p),
2323
};

examples/serde.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ async fn main() {
3434
}
3535
let config = parsed_env.unwrap();
3636

37-
let pubsub = match Client::new(config.google_application_credentials) {
37+
let pubsub = match Client::new(config.google_application_credentials).await {
3838
Err(e) => panic!("Failed to initialize pubsub: {}", e),
3939
Ok(p) => p,
4040
};

examples/singleshot.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ async fn main() {
3131
}
3232
let config = parsed_env.unwrap();
3333

34-
let pubsub = match Client::new(config.google_application_credentials) {
34+
let pubsub = match Client::new(config.google_application_credentials).await {
3535
Err(e) => panic!("Failed to initialize pubsub: {}", e),
3636
Ok(p) => p,
3737
};

examples/subscribe_to_topic.rs

Lines changed: 27 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
use cloud_pubsub::error;
22
use cloud_pubsub::{Client, EncodedMessage, FromPubSubMessage};
3-
use futures::future::lazy;
43
use serde_derive::Deserialize;
5-
use std::sync::Arc;
6-
use tokio::prelude::*;
74

85
#[derive(Deserialize)]
96
struct Config {
@@ -23,57 +20,35 @@ impl FromPubSubMessage for UpdatePacket {
2320
}
2421
}
2522

26-
fn main() {
27-
let parsed_env = envy::from_env::<Config>();
28-
if let Err(e) = parsed_env {
29-
eprintln!("ENV is not valid: {}", e);
30-
std::process::exit(1);
31-
}
32-
let config = parsed_env.unwrap();
23+
#[tokio::main]
24+
async fn main() {
25+
let config: Config = envy::from_env().expect("ENV is not valid");
26+
27+
let pubsub = Client::new(config.google_application_credentials)
28+
.await
29+
.expect("Failed to initialize pubsub");
3330

34-
let pubsub = match Client::new(config.google_application_credentials) {
35-
Err(e) => panic!("Failed to initialize pubsub: {}", e),
36-
Ok(p) => p,
37-
};
31+
let topic = pubsub.topic(config.topic);
3832

39-
let topic = Arc::new(pubsub.topic(config.topic));
33+
let sub = topic.subscribe().await.expect("Failed to subscribe");
4034

41-
tokio::run(lazy(move || {
42-
topic
43-
.subscribe()
44-
.map(|subscription| {
45-
println!("Subscribed to topic with: {}", subscription.name);
46-
let sub = Arc::new(subscription);
47-
let get = sub
48-
.clone()
49-
.get_messages::<UpdatePacket>()
50-
.map(move |(packets, acks)| {
51-
for packet in packets {
52-
println!("Received: {:?}", packet);
53-
}
35+
println!("Subscribed to topic with: {}", sub.name);
36+
let (packets, acks) = sub
37+
.clone()
38+
.get_messages::<UpdatePacket>()
39+
.await
40+
.expect("Error Checking PubSub");
5441

55-
if !acks.is_empty() {
56-
tokio::spawn(sub.acknowledge_messages(acks));
57-
} else {
58-
println!("Cleaning up");
59-
if let Ok(s) = Arc::try_unwrap(sub) {
60-
let destroy = s
61-
.destroy()
62-
.map(|_r| println!("Successfully deleted subscription"))
63-
.map_err(|e| eprintln!("Failed deleting subscription: {}", e))
64-
.then(move |_| {
65-
drop(pubsub);
66-
Ok(())
67-
});
68-
tokio::spawn(destroy);
69-
} else {
70-
eprintln!("Subscription is still owned");
71-
}
72-
}
73-
})
74-
.map_err(|e| println!("Error Checking PubSub: {}", e));
75-
tokio::spawn(get);
76-
})
77-
.map_err(|e| eprintln!("Failed to subscribe: {}", e))
78-
}));
42+
for packet in packets {
43+
println!("Received: {:?}", packet);
44+
}
45+
46+
if !acks.is_empty() {
47+
sub.acknowledge_messages(acks).await;
48+
} else {
49+
println!("Cleaning up");
50+
drop(pubsub);
51+
sub.destroy().await.expect("Failed deleting subscription");
52+
println!("Successfully deleted subscription");
53+
}
7954
}

src/client.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ impl Clone for Client {
4848
}
4949

5050
impl Client {
51-
pub fn from_string(credentials_string: String) -> Result<Self, error::Error> {
51+
pub async fn from_string(credentials_string: String) -> Result<Self, error::Error> {
5252
let mut client = Client(Arc::new(RwLock::new(State {
5353
token: None,
5454
credentials_string,
@@ -57,15 +57,15 @@ impl Client {
5757
running: Arc::new(AtomicBool::new(true)),
5858
})));
5959

60-
match client.refresh_token() {
60+
match client.refresh_token().await {
6161
Ok(_) => Ok(client),
6262
Err(e) => Err(e),
6363
}
6464
}
6565

66-
pub fn new(credentials_path: String) -> Result<Self, error::Error> {
66+
pub async fn new(credentials_path: String) -> Result<Self, error::Error> {
6767
let credentials_string = fs::read_to_string(credentials_path).unwrap();
68-
Self::from_string(credentials_string)
68+
Self::from_string(credentials_string).await
6969
}
7070

7171
pub fn subscribe(&self, name: String) -> Subscription {
@@ -112,7 +112,7 @@ impl Client {
112112
if c.is_running() {
113113
int.tick().await;
114114
println!("Renewing pubsub token");
115-
if let Err(e) = client.refresh_token() {
115+
if let Err(e) = client.refresh_token().await {
116116
error!("Failed to update token: {}", e);
117117
}
118118
}
@@ -122,8 +122,8 @@ impl Client {
122122
task::spawn(renew_token_task);
123123
}
124124

125-
pub fn refresh_token(&mut self) -> Result<(), error::Error> {
126-
match self.get_token() {
125+
pub async fn refresh_token(&mut self) -> Result<(), error::Error> {
126+
match self.get_token().await {
127127
Ok(token) => {
128128
self.0.write().unwrap().token = Some(token);
129129
Ok(())
@@ -132,7 +132,7 @@ impl Client {
132132
}
133133
}
134134

135-
fn get_token(&mut self) -> Result<goauth::auth::Token, goauth::error::GOErr> {
135+
async fn get_token(&mut self) -> Result<goauth::auth::Token, goauth::GoErr> {
136136
let credentials =
137137
goauth::credentials::Credentials::from_str(&self.0.read().unwrap().credentials_string)
138138
.unwrap();
@@ -147,7 +147,7 @@ impl Client {
147147
None,
148148
);
149149
let jwt = Jwt::new(claims, credentials.rsa_key().unwrap(), None);
150-
goauth::get_token_with_creds(&jwt, &credentials)
150+
goauth::get_token(&jwt, &credentials).await
151151
}
152152

153153
pub(crate) fn request<T: Into<hyper::Body>>(

src/error.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::io;
66
#[serde(untagged)]
77
pub enum Error {
88
#[serde(skip_deserializing)]
9-
PubSubAuth(goauth::error::GOErr),
9+
PubSubAuth(goauth::GoErr),
1010
#[serde(skip_deserializing)]
1111
Http(hyper::Error),
1212
#[serde(skip_deserializing)]
@@ -45,8 +45,8 @@ impl fmt::Display for Error {
4545

4646
impl std::error::Error for Error {}
4747

48-
impl From<goauth::error::GOErr> for Error {
49-
fn from(err: goauth::error::GOErr) -> Error {
48+
impl From<goauth::GoErr> for Error {
49+
fn from(err: goauth::GoErr) -> Error {
5050
Error::PubSubAuth(err)
5151
}
5252
}

src/subscription.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::client::Client;
22
use crate::error;
33
use crate::message::{FromPubSubMessage, Message};
4-
use bytes::buf::BufExt as _;
4+
use hyper::body::Buf;
55
use hyper::{Method, StatusCode};
66
use lazy_static::lazy_static;
77
use log::error;

src/topic.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::client::Client;
22
use crate::error;
33
use crate::subscription::*;
44
use crate::EncodedMessage;
5-
use bytes::buf::BufExt as _;
5+
use hyper::body::Buf;
66
use hyper::{Method, StatusCode};
77
use lazy_static::lazy_static;
88
use rand::distributions::Alphanumeric;
@@ -124,6 +124,7 @@ impl Topic {
124124
let slug = thread_rng()
125125
.sample_iter(&Alphanumeric)
126126
.take(30)
127+
.map(char::from)
127128
.collect::<String>();
128129

129130
format!("projects/{}/subscriptions/RST{}", project, slug)

0 commit comments

Comments
 (0)