Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,10 @@ pub enum Config {
/// storing the same token multiple times on the server.
EncryptedDeviceToken,

/// Make `TestContext::pop_sent_msg_opt()` and related functions pop messages from the `smtp`
/// head, i.e. make it a queue. For historical reasons the default is a stack.
PopSentMsgFromHead,

/// Enables running test hooks, e.g. see `InnerContext::pre_encrypt_mime_hook`.
/// This way is better than conditional compilation, i.e. `#[cfg(test)]`, because tests not
/// using this still run unmodified code.
Expand Down
7 changes: 7 additions & 0 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,13 @@ impl Context {
.await?
.to_string(),
);
res.insert(
"pop_sent_msg_from_head",
self.sql
.get_raw_config("pop_sent_msg_from_head")
.await?
.unwrap_or_default(),
);
res.insert(
"test_hooks",
self.sql
Expand Down
8 changes: 7 additions & 1 deletion src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1901,9 +1901,10 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec<MsgId>) -> Result<()>
//
// We also don't send read receipts for contact requests.
// Read receipts will not be sent even after accepting the chat.
let wants_mdn = curr_param.get_bool(Param::WantsMdn).unwrap_or_default();
let to_id = if curr_blocked == Blocked::Not
&& !curr_hidden
&& curr_param.get_bool(Param::WantsMdn).unwrap_or_default()
&& wants_mdn
&& curr_param.get_cmd() == SystemMessage::Unknown
&& context.should_send_mdns().await?
{
Expand All @@ -1925,6 +1926,11 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec<MsgId>) -> Result<()>
None
};
if let Some(to_id) = to_id {
info!(
context,
"Queuing MDN to {to_id} for {id} from {curr_from_id}, wants_mdn={wants_mdn}, cmd={}.",
curr_param.get_cmd()
);
context
.sql
.execute(
Expand Down
10 changes: 8 additions & 2 deletions src/mimeparser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -951,14 +951,16 @@ impl MimeMessage {
self.parse_attachments();

// See if an MDN is requested from the other side
let mut wants_mdn = false;
if self.decryption_error.is_none()
&& !self.parts.is_empty()
&& (!self.parts.is_empty() || matches!(&self.pre_message, PreMessageMode::Pre { .. }))
&& let Some(ref dn_to) = self.chat_disposition_notification_to
{
// Check that the message is not outgoing.
let from = &self.from.addr;
if !context.is_self_addr(from).await? {
if from.to_lowercase() == dn_to.addr.to_lowercase() {
wants_mdn = true;
if let Some(part) = self.parts.last_mut() {
part.param.set_int(Param::WantsMdn, 1);
}
Expand All @@ -980,7 +982,9 @@ impl MimeMessage {
typ: Viewtype::Text,
..Default::default()
};

if wants_mdn {
part.param.set_int(Param::WantsMdn, 1);
}
if let Some(ref subject) = self.get_subject()
&& !self.has_chat_version()
&& self.webxdc_status_update.is_none()
Expand Down Expand Up @@ -2492,6 +2496,8 @@ async fn handle_mdn(
let Some((msg_id, chat_id, has_mdns, is_dup)) = context
.sql
.query_row_optional(
// MDN on a pre-message references the post-message, see `receive_imf`. So we can't tell
// which one was seen, but this is on purpose.
"SELECT
m.id AS msg_id,
c.id AS chat_id,
Expand Down
105 changes: 69 additions & 36 deletions src/smtp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::config::Config;
use crate::contact::{Contact, ContactId};
use crate::context::Context;
use crate::events::EventType;
use crate::log::{LogExt, warn};
use crate::log::warn;
use crate::message::Message;
use crate::message::{self, MsgId};
use crate::mimefactory::MimeFactory;
Expand Down Expand Up @@ -590,44 +590,77 @@ async fn send_mdn_rfc724_mid(
if context.get_config_bool(Config::BccSelf).await? {
add_self_recipients(context, &mut recipients, encrypted).await?;
}
let recipients: Vec<_> = recipients
.into_iter()
.filter_map(|addr| {
async_smtp::EmailAddress::new(addr.clone())
.with_context(|| format!("Invalid recipient: {addr}"))
.log_err(context)
.ok()
})
.collect();

match smtp_send(context, &recipients, &body, smtp, None).await {
SendResult::Success => {
if !recipients.is_empty() {
info!(context, "Successfully sent MDN for {rfc724_mid}.");
#[cfg(not(test))]
{
use crate::log::LogExt;

let recipients: Vec<_> = recipients
.into_iter()
.filter_map(|addr| {
async_smtp::EmailAddress::new(addr.clone())
.with_context(|| format!("Invalid recipient: {addr}"))
.log_err(context)
.ok()
})
.collect();

match smtp_send(context, &recipients, &body, smtp, None).await {
SendResult::Success => {
if !recipients.is_empty() {
info!(context, "Successfully sent MDN for {rfc724_mid}.");
}
context
.sql
.transaction(|transaction| {
let mut stmt =
transaction.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?;
stmt.execute((rfc724_mid,))?;
for additional_rfc724_mid in additional_rfc724_mids {
stmt.execute((additional_rfc724_mid,))?;
}
Ok(())
})
.await?;
Ok(true)
}
context
.sql
.transaction(|transaction| {
let mut stmt =
transaction.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?;
stmt.execute((rfc724_mid,))?;
for additional_rfc724_mid in additional_rfc724_mids {
stmt.execute((additional_rfc724_mid,))?;
}
Ok(())
})
.await?;
Ok(true)
}
SendResult::Retry => {
info!(
context,
"Temporary SMTP failure while sending an MDN for {rfc724_mid}."
);
Ok(false)
SendResult::Retry => {
info!(
context,
"Temporary SMTP failure while sending an MDN for {rfc724_mid}."
);
Ok(false)
}
SendResult::Failure(err) => Err(err),
}
SendResult::Failure(err) => Err(err),
}
#[cfg(test)]
{
let _ = smtp;
context
.sql
.transaction(|t| {
t.execute(
"INSERT INTO smtp (rfc724_mid, recipients, mime, msg_id)
VALUES (?, ?, ?, ?)",
(rfc724_mid, recipients.join(" "), body, u32::MAX),
)?;
let mut stmt = t.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?;
stmt.execute((rfc724_mid,))?;
for additional_rfc724_mid in additional_rfc724_mids {
stmt.execute((additional_rfc724_mid,))?;
}
Ok(())
})
.await?;
Ok(true)
}
}

#[cfg(test)]
pub(crate) async fn queue_mdn(context: &Context) -> Result<()> {
let queued = send_mdn(context, &mut Smtp::new()).await?;
assert!(queued);
Ok(())
}

/// Tries to send a single MDN. Returns true if more MDNs should be sent.
Expand Down
5 changes: 4 additions & 1 deletion src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,10 @@ impl TestContext {
}

pub async fn pop_sent_msg_opt(&self, timeout: Duration) -> Option<SentMessage<'_>> {
let rev_order = true;
let rev_order = !self
.get_config_bool(Config::PopSentMsgFromHead)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we instead have a new function shift_sent_msg_opt that does this and not a global config that is only used for tests? Otherwise we will have to add this config to all tests over time.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise we will have to add this config to all tests over time.

Why? Only to those where sending messages from head is needed to reproduce some regression. Anyway, do you mean adding a function that moves an smtp job from head to tail?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean creating a function shift_sent_msg_opt that takes message from the beginning of the queue rather than the end, so we can use it in new tests.

Copy link
Copy Markdown
Collaborator Author

@iequidoo iequidoo Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first i wanted to do is to create unqueue_sent_msg_opt() which does the same as pop_sent_msg_opt(), but takes the first queued message. But there are other functions calling pop_sent_msg_opt(), so we should forward some flag everywhere. E.g. in the new test it's send_msg() returning SentMessage. Note sure you mean the same, what does shift_ mean? Anyway, i'd avoid adding a flag everywhere, better we add a new Config key or some flag (maybe one-shot) to Context.

I also don't expect many tests needing this, so a config key doesn't look like a problem.

Copy link
Copy Markdown
Collaborator Author

@iequidoo iequidoo Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it appeared that not the sending order is a problem, but absence of message text, so i can drop this part at all. But as it's already done, maybe let it remain?

EDIT: The discussion here is a bit outdated because pop_sent_msg_ex() was added meanwhile, but we still need a way to revert the order for messages popped from the SMTP queue indirectly.

.await
.unwrap();
self.pop_sent_msg_ex(rev_order, timeout).await
}

Expand Down
86 changes: 80 additions & 6 deletions src/tests/pre_messages/receiving.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
//! Tests about receiving Pre-Messages and Post-Message
use std::time::Duration;

use anyhow::{Context as _, Result};
use pretty_assertions::assert_eq;

use crate::EventType;
use crate::chat;
use crate::chat::send_msg;
use crate::config::Config;
use crate::contact;
use crate::download::{DownloadState, PRE_MSG_ATTACHMENT_SIZE_THRESHOLD, PostMsgMetadata};
use crate::message::{Message, MessageState, Viewtype, delete_msgs, markseen_msgs};
use crate::message::{self, Message, MessageState, Viewtype, delete_msgs, markseen_msgs};
use crate::mimeparser::MimeMessage;
use crate::param::Param;
use crate::reaction::{get_msg_reactions, send_reaction};
use crate::receive_imf::receive_imf;
use crate::smtp;
use crate::summary::assert_summary_texts;
use crate::test_utils::TestContextManager;
use crate::tests::pre_messages::util::{
big_webxdc_app, send_large_file_message, send_large_image_message, send_large_webxdc_message,
};
use crate::tools::{SystemTime, time};
use crate::webxdc::StatusUpdateSerial;

/// Test that mimeparser can correctly detect and parse pre-messages and Post-Messages
Expand Down Expand Up @@ -261,6 +264,64 @@ async fn test_lost_pre_msg() -> Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_pre_msg_mdn_before_sending_full() -> Result<()> {
pre_msg_mdn_before_sending_full("").await
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_pre_msg_mdn_before_sending_full_with_text() -> Result<()> {
pre_msg_mdn_before_sending_full("text").await
}

async fn pre_msg_mdn_before_sending_full(text: &str) -> Result<()> {
let mut tcm = TestContextManager::new();
let alice = &tcm.alice().await;
alice
.set_config_bool(Config::PopSentMsgFromHead, true)
.await?;
let bob = &tcm.bob().await;
let alice_chat_id = alice.create_group_with_members("", &[bob]).await;

let file_bytes = include_bytes!("../../../test-data/image/screenshot.gif");
let mut msg = Message::new(Viewtype::Image);
msg.set_file_from_bytes(alice, "a.jpg", file_bytes, None)?;
msg.set_text(text.to_string());
let pre_msg = alice.send_msg(alice_chat_id, &mut msg).await;
let alice_msg_id = msg.id;

let msg = bob.recv_msg(&pre_msg).await;
assert_eq!(msg.download_state, DownloadState::Available);
assert_eq!(msg.id.get_state(bob).await?, MessageState::InFresh);
assert_eq!(msg.text, text);
assert!(msg.param.get_bool(Param::WantsMdn).unwrap_or_default());
msg.chat_id.accept(bob).await?;
markseen_msgs(bob, vec![msg.id]).await?;
assert_eq!(msg.id.get_state(bob).await?, MessageState::InSeen);
assert_eq!(
bob.sql
.count(
"SELECT COUNT(*) FROM smtp_mdns WHERE from_id=?",
(msg.from_id,)
)
.await?,
1
);
smtp::queue_mdn(bob).await?;
alice.recv_msg_trash(&bob.pop_sent_msg().await).await;
assert_eq!(
alice_msg_id.get_state(alice).await?,
MessageState::OutPending
);

let _full_msg = alice.pop_sent_msg().await;
assert_eq!(
alice_msg_id.get_state(alice).await?,
MessageState::OutMdnRcvd
);
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_post_msg_bad_sender() -> Result<()> {
let mut tcm = TestContextManager::new();
Expand Down Expand Up @@ -546,7 +607,7 @@ async fn test_webxdc_updates_in_post_message_after_pre_message() -> Result<()> {
.send_webxdc_status_update(alice_instance.id, r#"{"payload":42, "info":"i"}"#)
.await?;

send_msg(alice, alice_chat_id, &mut alice_instance).await?;
chat::send_msg(alice, alice_chat_id, &mut alice_instance).await?;
let post_message = alice.pop_sent_msg().await;
let pre_message = alice.pop_sent_msg().await;

Expand Down Expand Up @@ -587,7 +648,7 @@ async fn test_webxdc_updates_in_post_message_without_pre_message() -> Result<()>
.send_webxdc_status_update(alice_instance.id, r#"{"payload":42, "info":"i"}"#)
.await?;

send_msg(alice, alice_chat_id, &mut alice_instance).await?;
chat::send_msg(alice, alice_chat_id, &mut alice_instance).await?;
let post_message = alice.pop_sent_msg().await;
let pre_message = alice.pop_sent_msg().await;

Expand Down Expand Up @@ -625,7 +686,7 @@ async fn test_markseen_pre_msg() -> Result<()> {
alice.create_chat(bob).await; // Make sure the chat is accepted.

tcm.section("Bob sends a large message to Alice");
let (pre_message, post_message, _bob_msg_id) =
let (pre_message, post_message, bob_msg_id) =
send_large_file_message(bob, bob_chat_id, Viewtype::File, &vec![0u8; 1_000_000]).await?;

tcm.section("Alice receives a pre-message message from Bob");
Expand All @@ -640,10 +701,23 @@ async fn test_markseen_pre_msg() -> Result<()> {
assert_eq!(
alice
.sql
.count("SELECT COUNT(*) FROM smtp_mdns", ())
.count(
"SELECT COUNT(*) FROM smtp_mdns WHERE from_id=?",
(msg.from_id,)
)
.await?,
1
);
smtp::queue_mdn(alice).await?;
SystemTime::shift(Duration::from_secs(3600));
bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
let bob_msg = Message::load_from_db(bob, bob_msg_id).await?;
assert_eq!(bob_msg.get_state(), MessageState::OutMdnRcvd);
let read_receipts = message::get_msg_read_receipts(bob, bob_msg_id).await?;
assert_eq!(read_receipts.len(), 1);
let (contact_id, ts) = read_receipts[0];
assert_eq!(contact_id, bob.add_or_lookup_contact_id(alice).await);
assert!(ts + 3600 - 60 < time());

tcm.section("Alice downloads message");
alice.recv_msg_trash(&post_message).await;
Expand Down