Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion src/adapter/src/coord/sequencer/inner/copy_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ impl Coordinator {
.retire(Err(AdapterError::Unstructured(anyhow::anyhow!("{err}"))));
}
}
mz_storage_types::oneshot_sources::ContentSource::Http { url }
mz_storage_types::oneshot_sources::ContentSource::Http {
url: mz_ore::url::SensitiveUrl(url),
}
}
CopyFromSource::AwsS3 {
uri,
Expand Down
1 change: 1 addition & 0 deletions src/storage-operators/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ tokio-stream.workspace = true
tokio-util = { workspace = true, features = ["io"] }
tracing.workspace = true
url.workspace = true
urlencoding.workspace = true
uuid = { workspace = true, features = ["v4"] }

[features]
Expand Down
49 changes: 37 additions & 12 deletions src/storage-operators/src/oneshot_source/http_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ use bytes::Bytes;
use derivative::Derivative;
use futures::TryStreamExt;
use futures::stream::{BoxStream, StreamExt};
use mz_ore::url::SensitiveUrl;
use reqwest::Client;
use reqwest::dns::{Addrs, Name, Resolve, Resolving};
use serde::{Deserialize, Serialize};
use url::Url;

use crate::oneshot_source::util::IntoRangeHeaderValue;
use crate::oneshot_source::{
Expand Down Expand Up @@ -87,20 +87,49 @@ fn check_not_redirect(response: &reqwest::Response) -> Result<(), StorageErrorX>
pub struct HttpOneshotSource {
#[derivative(Debug = "ignore")]
client: Client,
origin: Url,
origin: SensitiveUrl,
}

impl HttpOneshotSource {
pub fn new(client: Client, origin: Url) -> Self {
pub fn new(client: Client, origin: SensitiveUrl) -> Self {
HttpOneshotSource { client, origin }
}
}

/// Build a request, transferring any userinfo on `url` to a `Basic` auth header
/// so the password does not surface via `reqwest::Error::Display` (which
/// renders the URL using `Url::Display` and would otherwise leak it on
/// connect/DNS failures).
///
/// `Url::username` and `Url::password` return values in their
/// percent-encoded form, but `RequestBuilder::basic_auth` base64-encodes its
/// arguments verbatim. Decode here so credentials containing characters that
/// must be percent-encoded in a URL (`@`, spaces, `:`, ...) authenticate
/// correctly.
fn build_request(
client: &Client,
method: reqwest::Method,
url: &SensitiveUrl,
) -> reqwest::RequestBuilder {
let mut url = url.0.clone();
let user = String::from_utf8_lossy(&urlencoding::decode_binary(url.username().as_bytes()))
.into_owned();
let pass = url
.password()
.map(|p| String::from_utf8_lossy(&urlencoding::decode_binary(p.as_bytes())).into_owned());
if user.is_empty() && pass.is_none() {
return client.request(method, url);
}
let _ = url.set_password(None);
let _ = url.set_username("");
client.request(method, url).basic_auth(user, pass)
}

/// Object returned from an [`HttpOneshotSource`].
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HttpObject {
/// [`Url`] to access the file.
url: Url,
/// [`SensitiveUrl`] to access the file.
url: SensitiveUrl,
/// Name of the file.
filename: String,
/// Size of this file reported by the [`Content-Length`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Length) header
Expand Down Expand Up @@ -207,9 +236,7 @@ impl OneshotSource for HttpOneshotSource {

// To get metadata about a file we'll first try issuing a `HEAD` request, which
// canonically is the right thing do.
let response = self
.client
.head(self.origin.clone())
let response = build_request(&self.client, reqwest::Method::HEAD, &self.origin)
.send()
.await
.context("HEAD request")?;
Expand All @@ -223,9 +250,7 @@ impl OneshotSource for HttpOneshotSource {
Err(err) => {
tracing::warn!(status = ?err.status(), "HEAD request failed");

let response = self
.client
.get(self.origin.clone())
let response = build_request(&self.client, reqwest::Method::GET, &self.origin)
.send()
.await
.context("GET request")?;
Expand Down Expand Up @@ -296,7 +321,7 @@ impl OneshotSource for HttpOneshotSource {
// TODO(cf1): Validate our checksum.

let initial_response = async move {
let mut request = self.client.get(object.url);
let mut request = build_request(&self.client, reqwest::Method::GET, &object.url);

if let Some(range) = &range {
let value = range.into_range_header_value();
Expand Down
9 changes: 6 additions & 3 deletions src/storage-types/src/oneshot_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@

//! Types for oneshot sources.

use derivative::Derivative;
use mz_expr::SafeMfpPlan;
use mz_ore::url::SensitiveUrl;
use mz_pgcopy::CopyCsvFormatParams;
use mz_repr::{CatalogItemId, RelationDesc};
use mz_timely_util::builder_async::PressOnDropButton;

use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::UnboundedReceiver;
use url::Url;

use crate::connections::aws::AwsConnection;

Expand All @@ -39,12 +40,14 @@ pub struct OneshotIngestionRequest {
pub shape: ContentShape,
}

#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
#[derive(Derivative, Clone, PartialEq, Eq, Deserialize, Serialize)]
#[derivative(Debug)]
pub enum ContentSource {
Http {
url: Url,
url: SensitiveUrl,
},
AwsS3 {
#[derivative(Debug = "ignore")]
connection: AwsConnection,
connection_id: CatalogItemId,
uri: String,
Expand Down
7 changes: 6 additions & 1 deletion src/storage/src/storage_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1057,7 +1057,12 @@ impl<'w> Worker<'w> {
}
}
StorageCommand::RunOneshotIngestion(ingestion) => {
info!(%worker_id, ?ingestion, "reconcile: received RunOneshotIngestion command");
info!(
%worker_id,
ingestion_id = %ingestion.ingestion_id,
collection_id = %ingestion.collection_id,
"reconcile: received RunOneshotIngestion command",
);
create_oneshot_ingestions.insert(ingestion.ingestion_id);
}
StorageCommand::CancelOneshotIngestion(uuid) => {
Expand Down
Loading