Skip to content

Commit d3d7e83

Browse files
committed
chore: upgrade lapin to v4
Signed-off-by: William Hankins <william@sundae.fi>
1 parent f3eada9 commit d3d7e83

File tree

2 files changed

+20
-20
lines changed

2 files changed

+20
-20
lines changed

process/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Caraytid process - loads and runs modules
22
[package]
33
name = "caryatid_process"
4-
version = "0.14.1"
4+
version = "0.14.2"
55
edition = "2021"
66
authors = ["Paul Clark <paul.clark@iohk.io>"]
77
description = "Library for building a Caryatid process"
@@ -19,7 +19,7 @@ tracing = "0.1.40"
1919
serde = "1.0.210"
2020
serde_json = "1.0"
2121
tracing-subscriber = "0.3.18"
22-
lapin = "3.7.1"
22+
lapin = "4.3"
2323
tokio-executor-trait = "3.1.0"
2424
async-trait = "0.1.88"
2525

process/src/rabbit_mq_bus.rs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use lapin::{
1010
BasicConsumeOptions, BasicPublishOptions, ExchangeDeclareOptions, QueueBindOptions,
1111
QueueDeclareOptions,
1212
},
13-
types::FieldTable,
13+
types::{FieldTable, ShortString},
1414
BasicProperties, Channel, Connection, ConnectionProperties, Consumer,
1515
};
1616
use std::marker::PhantomData;
@@ -60,7 +60,7 @@ impl<M: MessageBounds> Subscription<M> for RabbitMQSubscription<M> {
6060
pub struct RabbitMQBus<M: MessageBounds> {
6161
connection: Arc<Mutex<Connection>>, // RabbitMQ connection
6262
channel: Arc<Mutex<Channel>>, // RabbitMQ outgoing channel
63-
exchange: String, // Exchange name
63+
exchange: ShortString, // Exchange name
6464
request_timeout: Duration, // Handle request timeout
6565
_phantom: PhantomData<M>, // Required to associate with <M> (eww)
6666
}
@@ -74,18 +74,18 @@ impl<M: MessageBounds> RabbitMQBus<M> {
7474
.unwrap_or("amqp://127.0.0.1:5672/%2f".to_string());
7575
info!("Connecting to RabbitMQ at {}", url);
7676

77-
let props =
78-
ConnectionProperties::default().with_executor(tokio_executor_trait::Tokio::current());
77+
let props = ConnectionProperties::default();
7978
let connection = Connection::connect(&url, props)
8079
.await
8180
.with_context(|| "Can't create RabbitMQ connection")?;
8281

8382
info!("RabbitMQ connected");
8483

8584
// Get exchange name
86-
let exchange_name = config
85+
let exchange_name: ShortString = config
8786
.get_string("exchange")
88-
.unwrap_or("caryatid".to_string());
87+
.unwrap_or("caryatid".to_string())
88+
.into();
8989

9090
// Create a channel for outgoing messages
9191
let channel = connection
@@ -96,7 +96,7 @@ impl<M: MessageBounds> RabbitMQBus<M> {
9696
// Declare the topic exchange
9797
channel
9898
.exchange_declare(
99-
&exchange_name,
99+
exchange_name.clone(),
100100
lapin::ExchangeKind::Topic,
101101
ExchangeDeclareOptions::default(),
102102
FieldTable::default(),
@@ -126,7 +126,7 @@ impl<M: MessageBounds + serde::Serialize + serde::de::DeserializeOwned> MessageB
126126
{
127127
/// Publish a message on a topic
128128
async fn publish(&self, topic: &str, message: Arc<M>) -> Result<()> {
129-
let topic = topic.to_string();
129+
let topic: ShortString = topic.into();
130130
let channel = self.channel.lock().await;
131131

132132
// Serialise the message
@@ -135,8 +135,8 @@ impl<M: MessageBounds + serde::Serialize + serde::de::DeserializeOwned> MessageB
135135
// Publish the message to the queue
136136
channel
137137
.basic_publish(
138-
&self.exchange,
139-
&topic,
138+
self.exchange.clone(),
139+
topic,
140140
BasicPublishOptions::default(),
141141
&payload,
142142
BasicProperties::default(),
@@ -156,7 +156,7 @@ impl<M: MessageBounds + serde::Serialize + serde::de::DeserializeOwned> MessageB
156156
async fn subscribe(&self, topic: &str) -> Result<Box<dyn Subscription<M>>> {
157157
// Clone over async boundary
158158
let connection = self.connection.clone();
159-
let topic = topic.to_string();
159+
let topic: ShortString = topic.into();
160160
let exchange = self.exchange.clone();
161161

162162
// Create a new channel for this subscriber
@@ -170,7 +170,7 @@ impl<M: MessageBounds + serde::Serialize + serde::de::DeserializeOwned> MessageB
170170
// Declare the queue
171171
let queue = channel
172172
.queue_declare(
173-
&topic,
173+
topic.clone(),
174174
QueueDeclareOptions::default(),
175175
FieldTable::default(),
176176
)
@@ -180,9 +180,9 @@ impl<M: MessageBounds + serde::Serialize + serde::de::DeserializeOwned> MessageB
180180
// Bind the queue to the exchange with the specified pattern
181181
channel
182182
.queue_bind(
183-
queue.name().as_str(),
184-
&exchange,
185-
&topic,
183+
queue.name().clone(),
184+
exchange,
185+
topic,
186186
QueueBindOptions::default(),
187187
FieldTable::default(),
188188
)
@@ -192,8 +192,8 @@ impl<M: MessageBounds + serde::Serialize + serde::de::DeserializeOwned> MessageB
192192
// Start consuming messages from the queue
193193
let consumer = channel
194194
.basic_consume(
195-
queue.name().as_str(),
196-
"",
195+
queue.name().clone(),
196+
"".into(),
197197
BasicConsumeOptions::default(),
198198
FieldTable::default(),
199199
)
@@ -212,7 +212,7 @@ impl<M: MessageBounds + serde::Serialize + serde::de::DeserializeOwned> MessageB
212212

213213
// Close the connection
214214
let connection = self.connection.lock().await;
215-
connection.close(200, "Goodbye").await?;
215+
connection.close(200, "Goodbye".into()).await?;
216216
Ok(())
217217
}
218218
}

0 commit comments

Comments
 (0)