Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions examples/long_lived.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,18 @@ fn schedule_pubsub_pull(subscription: Arc<Subscription>) {
task::spawn(async move {
while subscription.client().is_running() {
match subscription.get_messages::<UpdatePacket>().await {
Ok((packets, acks)) => {
for packet in packets {
println!("Received: {:?}", packet);
}

if !acks.is_empty() {
let s = Arc::clone(&subscription);
task::spawn(async move {
s.acknowledge_messages(acks).await;
});
Ok(messages) => {
for (result, ack_id) in messages {
match result {
Ok(message) => {
println!("Received: {:?}", message);
let subscription = Arc::clone(&subscription);
task::spawn(async move {
subscription.acknowledge_messages(vec![ack_id]).await;
});
}
Err(e) => log::error!("Failed converting to UpdatePacket: {}", e),
}
}
}
Err(e) => eprintln!("Failed to pull PubSub messages: {}", e),
Expand Down
8 changes: 6 additions & 2 deletions examples/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,15 @@ async fn main() {

let order_sub = Arc::new(pubsub.subscribe(config.pubsub_subscription));
match order_sub.clone().get_messages::<UpdatePacket>().await {
Ok((packets, acks)) => {
for packet in packets {
Ok(packets) => {
for packet in &packets {
println!("Received: {:?}", packet);
}

let acks: Vec<String> = packets
.into_iter()
.map(|packet| packet.1)
.collect::<Vec<_>>();
if !acks.is_empty() {
task::spawn(async move { order_sub.acknowledge_messages(acks).await })
.await // This will block until acknowledgement is complete
Expand Down
27 changes: 15 additions & 12 deletions examples/singleshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,22 @@ async fn main() {
Ok(p) => p,
};

let order_sub = Arc::new(pubsub.subscribe(config.pubsub_subscription));
match order_sub.clone().get_messages::<UpdatePacket>().await {
Ok((packets, acks)) => {
for packet in packets {
println!("Received: {:?}", packet);
}

if !acks.is_empty() {
task::spawn(async move { order_sub.acknowledge_messages(acks).await })
.await // This will block until acknowledgement is complete
.expect("Failed to acknoweldge messages");
let subscription = Arc::new(pubsub.subscribe(config.pubsub_subscription));
match subscription.get_messages::<UpdatePacket>().await {
Ok(messages) => {
for (result, ack_id) in messages {
match result {
Ok(message) => {
println!("Received: {:?}", message);
let subscription = Arc::clone(&subscription);
task::spawn(async move {
subscription.acknowledge_messages(vec![ack_id]).await;
});
}
Err(e) => log::error!("Failed converting to UpdatePacket: {}", e),
}
}
}
Err(e) => println!("Error Checking PubSub: {}", e),
Err(e) => eprintln!("Failed to pull PubSub messages: {}", e),
}
}
10 changes: 7 additions & 3 deletions examples/subscribe_to_topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,21 @@ async fn main() {
let sub = topic.subscribe().await.expect("Failed to subscribe");

println!("Subscribed to topic with: {}", sub.name);
let (packets, acks) = sub
let packets = sub
.clone()
.get_messages::<UpdatePacket>()
.await
.expect("Error Checking PubSub");

for packet in packets {
for packet in &packets {
println!("Received: {:?}", packet);
}

if !acks.is_empty() {
if !packets.is_empty() {
let acks = packets
.into_iter()
.map(|packet| packet.1)
.collect::<Vec<_>>();
sub.acknowledge_messages(acks).await;
} else {
println!("Cleaning up");
Expand Down
24 changes: 7 additions & 17 deletions src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl Subscription {

pub async fn get_messages<T: FromPubSubMessage>(
&self,
) -> Result<(Vec<T>, Vec<String>), error::Error> {
) -> Result<Vec<(Result<T, error::Error>, String)>, error::Error> {
let client = self
.client
.as_ref()
Expand All @@ -76,6 +76,7 @@ impl Subscription {
*req.uri_mut() = uri.clone();

let response = client.hyper_client().request(req).await?;

if response.status() == StatusCode::NOT_FOUND {
return Err(error::Error::PubSub {
code: 404,
Expand All @@ -88,24 +89,13 @@ impl Subscription {
if let Some(e) = response.error {
return Err(e);
}
let messages = response.received_messages.unwrap_or_default();
let ack_ids: Vec<String> = messages
.as_slice()
.iter()
.map(|packet| packet.ack_id.clone())
.collect();
let packets = messages
let messages = response
.received_messages
.unwrap_or_default()
.into_iter()
.filter_map(|packet| match T::from(packet.message) {
Ok(o) => Some(o),
Err(e) => {
error!("Failed converting pubsub {}", e,);
None
}
})
.map(|m| (T::from(m.message), m.ack_id))
.collect();

Ok((packets, ack_ids))
Ok(messages)
}

pub async fn destroy(self) -> Result<(), error::Error> {
Expand Down