Skip to content

Commit 01c46d2

Browse files
committed
Update to a newer diesel version
* This fixes the multiconnection issues * This also adds support for connection instrumentation
1 parent af8b67a commit 01c46d2

File tree

3 files changed

+179
-101
lines changed

3 files changed

+179
-101
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,4 @@ dynamic-schema = ["diesel-dynamic-schema"]
5454
gst = []
5555

5656
[patch.crates-io]
57-
diesel = { git = "https://github.com/weiznich/diesel", rev = "e632a7ca4fa12b76d7638392aeaff7522f57adef" }
57+
diesel = { git = "https://github.com/weiznich/diesel", rev = "2d7031b0b0e7d59ba901de8e86f31d4f3d0ae597" }

src/oracle/connection/mod.rs

Lines changed: 163 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use super::backend::Oracle;
99
use super::query_builder::OciQueryBuilder;
1010
use super::OciDataType;
1111
use crate::oracle::connection::stmt_iter::RowIter;
12+
use diesel::connection::Instrumentation;
13+
use diesel::connection::InstrumentationEvent;
1214
use diesel::connection::{Connection, SimpleConnection, TransactionManager};
1315
use diesel::connection::{LoadConnection, MultiConnectionHelper};
1416
use diesel::deserialize::FromSql;
@@ -146,6 +148,7 @@ mod transaction;
146148
pub struct OciConnection {
147149
raw: oracle::Connection,
148150
transaction_manager: OCITransactionManager,
151+
instrumentation: Option<Box<dyn Instrumentation>>,
149152
}
150153

151154
struct ErrorHelper(oracle::Error);
@@ -251,54 +254,20 @@ impl Connection for OciConnection {
251254
/// should be a valid connection string for a given backend. See the
252255
/// documentation for the specific backend for specifics.
253256
fn establish(database_url: &str) -> ConnectionResult<Self> {
254-
let url = url::Url::parse(database_url)
255-
.map_err(|_| ConnectionError::InvalidConnectionUrl("Invalid url".into()))?;
256-
if url.scheme() != "oracle" {
257-
return Err(ConnectionError::InvalidConnectionUrl(format!(
258-
"Got a unsupported url scheme: {}",
259-
url.scheme()
260-
)));
261-
}
262-
let user = url.username();
263-
264-
if user.is_empty() {
265-
return Err(ConnectionError::InvalidConnectionUrl(
266-
"Username not set".into(),
267-
));
268-
}
269-
let user = match percent_encoding::percent_decode_str(url.username()).decode_utf8() {
270-
Ok(username) => username,
271-
Err(_e) => {
272-
return Err(ConnectionError::InvalidConnectionUrl(
273-
"Username could not be percent decoded".into(),
274-
))
275-
}
276-
};
277-
let password = url
278-
.password()
279-
.ok_or_else(|| ConnectionError::InvalidConnectionUrl("Password not set".into()))?;
280-
281-
let host = url
282-
.host_str()
283-
.ok_or_else(|| ConnectionError::InvalidConnectionUrl("Hostname not set".into()))?;
284-
let port = url.port();
285-
let path = url.path();
286-
287-
let mut url = host.to_owned();
288-
if let Some(port) = port {
289-
write!(url, ":{}", port).expect("Write to string does not fail");
290-
}
291-
url += path;
292-
293-
let mut raw = oracle::Connection::connect(user, password, url)
294-
.map_err(ErrorHelper::from)
295-
.map_err(|e| ConnectionError::CouldntSetupConfiguration(e.into()))?;
296-
297-
raw.set_autocommit(true);
257+
let mut instrumentation = diesel::connection::get_default_instrumentation();
258+
instrumentation.on_connection_event(InstrumentationEvent::start_establish_connection(
259+
database_url,
260+
));
261+
let raw = Self::inner_establish(database_url);
262+
instrumentation.on_connection_event(InstrumentationEvent::finish_establish_connection(
263+
database_url,
264+
raw.as_ref().err(),
265+
));
298266

299267
Ok(Self {
300-
raw,
268+
raw: raw?,
301269
transaction_manager: OCITransactionManager::new(),
270+
instrumentation,
302271
})
303272
}
304273

@@ -307,35 +276,17 @@ impl Connection for OciConnection {
307276
where
308277
T: QueryFragment<Self::Backend> + QueryId,
309278
{
310-
let mut qb = OciQueryBuilder::default();
311-
312-
source.to_sql(&mut qb, &Oracle)?;
313-
314-
let conn = &self.raw;
315-
let sql = qb.finish();
316-
let mut stmt = conn.statement(&sql);
317-
if !source.is_safe_to_cache_prepared(&Oracle)? {
318-
stmt.exclude_from_cache();
319-
}
320-
let mut stmt = stmt.build().map_err(ErrorHelper::from)?;
321-
let mut bind_collector = OracleBindCollector::default();
322-
323-
source.collect_binds(&mut bind_collector, &mut (), &Oracle)?;
324-
let binds = bind_collector
325-
.binds
326-
.iter()
327-
.map(|(n, b)| -> (&str, &dyn oracle::sql_type::ToSql) {
328-
(n as &str, std::ops::Deref::deref(b))
329-
})
330-
.collect::<Vec<_>>();
331-
332-
if stmt.is_query() {
333-
stmt.query_named(&binds).map_err(ErrorHelper::from)?;
334-
} else {
335-
stmt.execute_named(&binds).map_err(ErrorHelper::from)?;
336-
}
337-
338-
Ok(stmt.row_count().map_err(ErrorHelper::from)? as usize)
279+
self.instrumentation
280+
.on_connection_event(InstrumentationEvent::start_query(&diesel::debug_query(
281+
source,
282+
)));
283+
let res = self.inner_executing_returning_count(source);
284+
self.instrumentation
285+
.on_connection_event(InstrumentationEvent::finish_query(
286+
&diesel::debug_query(source),
287+
res.as_ref().err(),
288+
));
289+
res
339290
}
340291

341292
fn transaction_state(
@@ -357,6 +308,14 @@ impl Connection for OciConnection {
357308
self.transaction_manager.is_test_transaction = true;
358309
Ok(())
359310
}
311+
312+
fn instrumentation(&mut self) -> &mut dyn diesel::connection::Instrumentation {
313+
&mut self.instrumentation
314+
}
315+
316+
fn set_instrumentation(&mut self, instrumentation: impl diesel::connection::Instrumentation) {
317+
self.instrumentation = Some(Box::new(instrumentation));
318+
}
360319
}
361320

362321
impl LoadConnection for OciConnection {
@@ -370,8 +329,11 @@ impl LoadConnection for OciConnection {
370329
Self::Backend: QueryMetadata<T::SqlType>,
371330
{
372331
let query = source.as_query();
373-
374-
self.with_prepared_statement(query, |mut stmt, bind_collector| {
332+
self.instrumentation
333+
.on_connection_event(InstrumentationEvent::start_query(&diesel::debug_query(
334+
&query,
335+
)));
336+
let res = self.with_prepared_statement(&query, |mut stmt, bind_collector| {
375337
if stmt.is_query() {
376338
let binds = bind_collector
377339
.binds
@@ -394,7 +356,13 @@ impl LoadConnection for OciConnection {
394356
} else {
395357
unreachable!()
396358
}
397-
})
359+
});
360+
self.instrumentation
361+
.on_connection_event(InstrumentationEvent::finish_query(
362+
&diesel::debug_query(&query),
363+
res.as_ref().err(),
364+
));
365+
res
398366
}
399367
}
400368

@@ -421,7 +389,7 @@ where
421389
impl OciConnection {
422390
fn with_prepared_statement<'conn, 'query, T, R>(
423391
&'conn mut self,
424-
query: T,
392+
query: &T,
425393
callback: impl FnOnce(oracle::Statement<'conn>, OracleBindCollector) -> QueryResult<R>,
426394
) -> Result<R, Error>
427395
where
@@ -632,36 +600,131 @@ impl OciConnection {
632600
});
633601

634602
if let Some(first_record) = record_iter.next() {
635-
let mut qb = OciQueryBuilder::default();
636-
first_record.to_sql(&mut qb, &Oracle)?;
637-
let query_string = qb.finish();
638-
let mut batch = self
639-
.raw
640-
.batch(&query_string, record_count)
641-
.build()
642-
.map_err(ErrorHelper::from)?;
603+
self.instrumentation
604+
.on_connection_event(InstrumentationEvent::start_query(&diesel::debug_query(
605+
&first_record,
606+
)));
607+
let res = self.inner_batch_insert(&first_record, record_count, record_iter);
608+
self.instrumentation
609+
.on_connection_event(InstrumentationEvent::finish_query(
610+
&diesel::debug_query(&first_record),
611+
res.as_ref().err(),
612+
));
613+
res
614+
} else {
615+
Ok(0)
616+
}
617+
}
643618

644-
bind_params_to_batch(first_record, &mut batch)?;
645-
for record in record_iter {
646-
bind_params_to_batch(record, &mut batch)?;
619+
fn inner_batch_insert<Q>(
620+
&mut self,
621+
first_record: &Q,
622+
record_count: usize,
623+
record_iter: impl Iterator<Item = Q>,
624+
) -> Result<usize, Error>
625+
where
626+
Q: QueryFragment<Oracle>,
627+
{
628+
let mut qb = OciQueryBuilder::default();
629+
first_record.to_sql(&mut qb, &Oracle)?;
630+
let query_string = qb.finish();
631+
let mut batch = self
632+
.raw
633+
.batch(&query_string, record_count)
634+
.build()
635+
.map_err(ErrorHelper::from)?;
636+
637+
bind_params_to_batch(first_record, &mut batch)?;
638+
for record in record_iter {
639+
bind_params_to_batch(&record, &mut batch)?;
640+
}
641+
batch.execute().map_err(ErrorHelper::from)?;
642+
Ok(record_count)
643+
}
644+
645+
fn inner_establish(database_url: &str) -> Result<oracle::Connection, ConnectionError> {
646+
let url = url::Url::parse(database_url)
647+
.map_err(|_| ConnectionError::InvalidConnectionUrl("Invalid url".into()))?;
648+
if url.scheme() != "oracle" {
649+
return Err(ConnectionError::InvalidConnectionUrl(format!(
650+
"Got a unsupported url scheme: {}",
651+
url.scheme()
652+
)));
653+
}
654+
let user = url.username();
655+
if user.is_empty() {
656+
return Err(ConnectionError::InvalidConnectionUrl(
657+
"Username not set".into(),
658+
));
659+
}
660+
let user = match percent_encoding::percent_decode_str(url.username()).decode_utf8() {
661+
Ok(username) => username,
662+
Err(_e) => {
663+
return Err(ConnectionError::InvalidConnectionUrl(
664+
"Username could not be percent decoded".into(),
665+
))
647666
}
648-
batch.execute().map_err(ErrorHelper::from)?;
649-
Ok(record_count)
667+
};
668+
let password = url
669+
.password()
670+
.ok_or_else(|| ConnectionError::InvalidConnectionUrl("Password not set".into()))?;
671+
let host = url
672+
.host_str()
673+
.ok_or_else(|| ConnectionError::InvalidConnectionUrl("Hostname not set".into()))?;
674+
let port = url.port();
675+
let path = url.path();
676+
let mut url = host.to_owned();
677+
if let Some(port) = port {
678+
write!(url, ":{}", port).expect("Write to string does not fail");
679+
}
680+
url += path;
681+
let mut raw = oracle::Connection::connect(user, password, url)
682+
.map_err(ErrorHelper::from)
683+
.map_err(|e| ConnectionError::CouldntSetupConfiguration(e.into()))?;
684+
raw.set_autocommit(true);
685+
Ok(raw)
686+
}
687+
688+
fn inner_executing_returning_count<T>(&mut self, source: &T) -> Result<usize, Error>
689+
where
690+
T: QueryFragment<Oracle> + QueryId,
691+
{
692+
let mut qb = OciQueryBuilder::default();
693+
694+
source.to_sql(&mut qb, &Oracle)?;
695+
696+
let conn = &self.raw;
697+
let sql = qb.finish();
698+
let mut stmt = conn.statement(&sql);
699+
if !source.is_safe_to_cache_prepared(&Oracle)? {
700+
stmt.exclude_from_cache();
701+
}
702+
let mut stmt = stmt.build().map_err(ErrorHelper::from)?;
703+
let mut bind_collector = OracleBindCollector::default();
704+
705+
source.collect_binds(&mut bind_collector, &mut (), &Oracle)?;
706+
let binds = bind_collector
707+
.binds
708+
.iter()
709+
.map(|(n, b)| -> (&str, &dyn oracle::sql_type::ToSql) {
710+
(n as &str, std::ops::Deref::deref(b))
711+
})
712+
.collect::<Vec<_>>();
713+
714+
if stmt.is_query() {
715+
stmt.query_named(&binds).map_err(ErrorHelper::from)?;
650716
} else {
651-
Ok(0)
717+
stmt.execute_named(&binds).map_err(ErrorHelper::from)?;
652718
}
719+
720+
Ok(stmt.row_count().map_err(ErrorHelper::from)? as usize)
653721
}
654722
}
655723

656-
fn bind_params_to_batch<'a, T, V, Op>(
657-
record: InsertStatement<T, &'a ValuesClause<V, T>, Op>,
724+
fn bind_params_to_batch(
725+
record: &impl QueryFragment<Oracle>,
658726
batch: &mut oracle::Batch,
659-
) -> Result<(), Error>
660-
where
661-
T: Table + 'a,
662-
V: 'a,
663-
InsertStatement<T, &'a ValuesClause<V, T>, Op>: QueryFragment<Oracle>,
664-
{
727+
) -> Result<(), Error> {
665728
let mut bind_collector = OracleBindCollector::default();
666729
record.collect_binds(&mut bind_collector, &mut (), &Oracle)?;
667730
let binds = bind_collector

src/oracle/connection/transaction.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ use std::num::NonZeroU32;
22

33
use super::ErrorHelper;
44
use super::OciConnection;
5+
use diesel::connection::Instrumentation;
6+
use diesel::connection::InstrumentationEvent;
57
use diesel::connection::SimpleConnection;
68
use diesel::connection::TransactionDepthChange;
79
use diesel::connection::TransactionManager;
@@ -61,6 +63,11 @@ impl TransactionManager<OciConnection> for OCITransactionManager {
6163

6264
fn begin_transaction(conn: &mut OciConnection) -> QueryResult<()> {
6365
let transaction_depth = Self::get_transaction_depth(conn)?;
66+
conn.instrumentation
67+
.on_connection_event(InstrumentationEvent::begin_transaction(
68+
NonZeroU32::new(transaction_depth.map_or(0, NonZeroU32::get).wrapping_add(1))
69+
.expect("Transaction depth is too large"),
70+
));
6471
match transaction_depth {
6572
None => {
6673
conn.raw.set_autocommit(false);
@@ -77,6 +84,10 @@ impl TransactionManager<OciConnection> for OCITransactionManager {
7784
// c.f. https://docs.oracle.com/cd/E25054_01/server.1111/e25789/transact.htm#sthref1318
7885
let transaction_depth = Self::get_transaction_depth(conn)?;
7986
let mut mark_as_broken = false;
87+
if let Some(depth) = transaction_depth {
88+
conn.instrumentation
89+
.on_connection_event(InstrumentationEvent::rollback_transaction(depth));
90+
}
8091
match transaction_depth.map(|d| d.into()) {
8192
Some(1) => {
8293
let res = conn
@@ -106,6 +117,10 @@ impl TransactionManager<OciConnection> for OCITransactionManager {
106117

107118
fn commit_transaction(conn: &mut OciConnection) -> QueryResult<()> {
108119
let transaction_depth = Self::get_transaction_depth(conn)?;
120+
if let Some(depth) = transaction_depth {
121+
conn.instrumentation
122+
.on_connection_event(InstrumentationEvent::commit_transaction(depth));
123+
}
109124
// since oracle doesn't support nested transactions we only commit the outmost transaction
110125
// and not every inner transaction; if the outmost transaction fails everything will be
111126
// rolled back, every inner transaction can fail, but no be committed since it doesn't make

0 commit comments

Comments
 (0)