diff --git a/deltachat-rpc-client/tests/test_something.py b/deltachat-rpc-client/tests/test_something.py index fe329b4bf7..d742a90b69 100644 --- a/deltachat-rpc-client/tests/test_something.py +++ b/deltachat-rpc-client/tests/test_something.py @@ -348,7 +348,7 @@ def test_receive_imf_failure(acfactory) -> None: snapshot.text == "❌ Failed to receive a message:" " Condition failed: `!context.get_config_bool(Config::SimulateReceiveImfError).await?`." f" Core version {version}." - " Please report this bug to delta@merlinux.eu or https://support.delta.chat/." + " Please report this bug to delta@merlinux.eu or https://support.delta.chat/" ) # The failed message doesn't break the IMAP loop. diff --git a/src/imap.rs b/src/imap.rs index 892253bc2e..17439321c9 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -1383,13 +1383,15 @@ impl Session { let res = receive_imf_inner(context, rfc724_mid, body, is_seen).await; let received_msg = match res { Err(err) => { - warn!(context, "receive_imf error: {err:#}."); - - let text = format!( - "❌ Failed to receive a message: {err:#}. Core version v{DC_VERSION_STR}. Please report this bug to delta@merlinux.eu or https://support.delta.chat/.", - ); - let mut msg = Message::new_text(text); - add_device_msg(context, None, Some(&mut msg)).await?; + let err = format!("{err:#}"); + warn!(context, "receive_imf error: {err}."); + if !err.contains("(SKIP_DEVICE_MSG)") { + let text = format!( + "❌ Failed to receive a message: {err}. Core version v{DC_VERSION_STR}. Please report this bug to delta@merlinux.eu or https://support.delta.chat/", + ); + let mut msg = Message::new_text(text); + add_device_msg(context, None, Some(&mut msg)).await?; + } None } Ok(msg) => msg, diff --git a/src/reaction.rs b/src/reaction.rs index b459a132e5..3d2f3d2c17 100644 --- a/src/reaction.rs +++ b/src/reaction.rs @@ -18,7 +18,7 @@ use std::cmp::Ordering; use std::collections::BTreeMap; use std::fmt; -use anyhow::Result; +use anyhow::{Result, bail}; use serde::{Deserialize, Serialize}; use crate::chat::{Chat, ChatId, send_msg}; @@ -259,9 +259,8 @@ pub(crate) async fn set_msg_reaction( }); } } else { - info!( - context, - "Can't assign reaction to unknown message with Message-ID {}", in_reply_to + bail!( + "Can't assign reaction to unknown message with Message-ID {in_reply_to} (SKIP_DEVICE_MSG)" ); } Ok(()) @@ -519,6 +518,54 @@ Content-Disposition: reaction\n\ Ok(()) } + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_reaction_and_multitransport() -> Result<()> { + let mut tcm = TestContextManager::new(); + let alice = &tcm.alice().await; + let device_chat_id = ChatId::get_for_contact(alice, ContactId::DEVICE).await?; + let n_device_msgs = get_chat_msgs(alice, device_chat_id).await?.len(); + + let reaction_bytes = "To: alice@example.org, claire@example.org\n\ +From: bob@example.net\n\ +Date: Today, 29 February 2021 00:00:10 -800\n\ +Message-ID: 56789@example.net\n\ +In-Reply-To: 12345@example.org\n\ +Content-Type: text/plain; charset=utf-8\n\ +Content-Disposition: reaction\n\ +\n\ +\u{1F44D}" + .as_bytes(); + // Alice receives a reaction to Claire's message from Bob earler than the message itself + // because Bob knows about Alice's new transport. + assert!(receive_imf(alice, reaction_bytes, false).await.is_err()); + + let msg_id = receive_imf( + alice, + "To: alice@example.org, bob@example.net\n\ +From: claire@example.org\n\ +Date: Today, 29 February 2021 00:00:00 -800\n\ +Message-ID: 12345@example.org\n\ +\n\ +Can we chat at 1pm pacific, today?" + .as_bytes(), + false, + ) + .await? + .unwrap() + .msg_ids[0]; + + // Finally the reaction arrives on Alice's older transport. + receive_imf(alice, reaction_bytes, false).await?; + let reactions = get_msg_reactions(alice, msg_id).await?; + assert_eq!(reactions.to_string(), "👍1"); + + assert_eq!( + get_chat_msgs(alice, device_chat_id).await?.len(), + n_device_msgs + ); + Ok(()) + } + async fn expect_reactions_changed_event( t: &TestContext, expected_chat_id: ChatId, diff --git a/src/receive_imf.rs b/src/receive_imf.rs index 35b60002ea..87773ddf78 100644 --- a/src/receive_imf.rs +++ b/src/receive_imf.rs @@ -6,7 +6,7 @@ use std::iter; use std::str::FromStr as _; use std::sync::LazyLock; -use anyhow::{Context as _, Result, ensure}; +use anyhow::{Context as _, Result, bail, ensure}; use deltachat_contact_tools::{ ContactAddress, addr_cmp, addr_normalize, may_be_valid_addr, sanitize_bidi_characters, sanitize_single_line, @@ -935,75 +935,6 @@ UPDATE config SET value=? WHERE keyname='configured_addr' AND value!=?1 // This is a Delta Chat MDN. Mark as read. markseen_on_imap_table(context, rfc724_mid_orig).await?; } - if !mime_parser.incoming && !context.get_config_bool(Config::TeamProfile).await? { - let mut updated_chats = BTreeMap::new(); - let mut archived_chats_maybe_noticed = false; - for report in &mime_parser.mdn_reports { - for msg_rfc724_mid in report - .original_message_id - .iter() - .chain(&report.additional_message_ids) - { - let Some(msg_id) = rfc724_mid_exists(context, msg_rfc724_mid).await? else { - continue; - }; - let Some(msg) = Message::load_from_db_optional(context, msg_id).await? else { - continue; - }; - if msg.state < MessageState::InFresh || msg.state >= MessageState::InSeen { - continue; - } - if !mime_parser.was_encrypted() && msg.get_showpadlock() { - warn!(context, "MDN: Not encrypted. Ignoring."); - continue; - } - message::update_msg_state(context, msg_id, MessageState::InSeen).await?; - if let Err(e) = msg_id.start_ephemeral_timer(context).await { - error!(context, "start_ephemeral_timer for {msg_id}: {e:#}."); - } - if !mime_parser.has_chat_version() { - continue; - } - archived_chats_maybe_noticed |= msg.state < MessageState::InNoticed - && msg.chat_visibility == ChatVisibility::Archived; - updated_chats - .entry(msg.chat_id) - .and_modify(|pos| *pos = cmp::max(*pos, (msg.timestamp_sort, msg.id))) - .or_insert((msg.timestamp_sort, msg.id)); - } - } - for (chat_id, (timestamp_sort, msg_id)) in updated_chats { - context - .sql - .execute( - " -UPDATE msgs SET state=? WHERE - state=? AND - hidden=0 AND - chat_id=? AND - (timestamp,id)<(?,?)", - ( - MessageState::InNoticed, - MessageState::InFresh, - chat_id, - timestamp_sort, - msg_id, - ), - ) - .await - .context("UPDATE msgs.state")?; - if chat_id.get_fresh_msg_cnt(context).await? == 0 { - // Removes all notifications for the chat in UIs. - context.emit_event(EventType::MsgsNoticed(chat_id)); - } else { - context.emit_msgs_changed_without_msg_id(chat_id); - } - chatlist_events::emit_chatlist_item_changed(context, chat_id); - } - if archived_chats_maybe_noticed { - context.on_archived_chats_maybe_noticed(); - } - } } if mime_parser.is_call() { @@ -2065,9 +1996,8 @@ async fn add_parts( } } None => { - warn!( - context, - "Cannot add iroh peer because WebXDC instance does not exist." + bail!( + "Cannot add iroh peer because WebXDC instance {in_reply_to} does not exist (SKIP_DEVICE_MSG)" ); } }, @@ -2100,6 +2030,82 @@ async fn add_parts( warn!(context, "Call: Not a reply.") } } + if !mime_parser.incoming && !context.get_config_bool(Config::TeamProfile).await? { + let mut missing_rfc724_mid = None; + let mut updated_chats = BTreeMap::new(); + let mut archived_chats_maybe_noticed = false; + for report in &mime_parser.mdn_reports { + for msg_rfc724_mid in report + .original_message_id + .iter() + .chain(&report.additional_message_ids) + { + let Some(msg_id) = rfc724_mid_exists(context, msg_rfc724_mid).await? else { + missing_rfc724_mid.get_or_insert(msg_rfc724_mid.as_str()); + continue; + }; + let Some(msg) = Message::load_from_db_optional(context, msg_id).await? else { + continue; + }; + if msg.state < MessageState::InFresh || msg.state >= MessageState::InSeen { + continue; + } + if !mime_parser.was_encrypted() && msg.get_showpadlock() { + warn!(context, "MDN: Not encrypted. Ignoring."); + continue; + } + message::update_msg_state(context, msg_id, MessageState::InSeen).await?; + if let Err(e) = msg_id.start_ephemeral_timer(context).await { + error!(context, "start_ephemeral_timer for {msg_id}: {e:#}."); + } + if !mime_parser.has_chat_version() { + continue; + } + archived_chats_maybe_noticed |= msg.state < MessageState::InNoticed + && msg.chat_visibility == ChatVisibility::Archived; + updated_chats + .entry(msg.chat_id) + .and_modify(|pos| *pos = cmp::max(*pos, (msg.timestamp_sort, msg.id))) + .or_insert((msg.timestamp_sort, msg.id)); + } + } + for (chat_id, (timestamp_sort, msg_id)) in updated_chats { + context + .sql + .execute( + " +UPDATE msgs SET state=? WHERE +state=? AND +hidden=0 AND +chat_id=? AND +(timestamp,id)<(?,?)", + ( + MessageState::InNoticed, + MessageState::InFresh, + chat_id, + timestamp_sort, + msg_id, + ), + ) + .await + .context("UPDATE msgs.state")?; + if chat_id.get_fresh_msg_cnt(context).await? == 0 { + // Removes all notifications for the chat in UIs. + context.emit_event(EventType::MsgsNoticed(chat_id)); + } else { + context.emit_msgs_changed_without_msg_id(chat_id); + } + chatlist_events::emit_chatlist_item_changed(context, chat_id); + } + if archived_chats_maybe_noticed { + context.on_archived_chats_maybe_noticed(); + } + ensure!( + missing_rfc724_mid.is_none(), + "Self-MDN: {} not found (SKIP_DEVICE_MSG)", + missing_rfc724_mid.unwrap_or(""), + ); + } let hidden = mime_parser.parts.iter().all(|part| part.is_reaction); let mut parts = mime_parser.parts.iter().peekable(); @@ -2380,10 +2386,7 @@ async fn handle_edit_delete( warn!(context, "Edit message: Database entry does not exist."); } } else { - warn!( - context, - "Edit message: rfc724_mid {rfc724_mid:?} not found." - ); + bail!("Edit message: rfc724_mid {rfc724_mid:?} not found (SKIP_DEVICE_MSG)"); } } else if let Some(rfc724_mid_list) = mime_parser.get_header(HeaderDef::ChatDelete) && let Some(part) = mime_parser.parts.first() diff --git a/src/receive_imf/receive_imf_tests.rs b/src/receive_imf/receive_imf_tests.rs index f36295717d..eccf4d7450 100644 --- a/src/receive_imf/receive_imf_tests.rs +++ b/src/receive_imf/receive_imf_tests.rs @@ -14,7 +14,9 @@ use crate::contact; use crate::imap::prefetch_should_download; use crate::imex::{ImexMode, imex}; use crate::key; +use crate::message::markseen_msgs; use crate::securejoin::get_securejoin_qr; +use crate::smtp; use crate::test_utils::{ TestContext, TestContextManager, alice_keypair, get_chat_msg, mark_as_verified, }; @@ -2655,6 +2657,32 @@ async fn test_read_receipts_dont_unmark_bots() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_self_mdn_before_msg() -> Result<()> { + let mut tcm = TestContextManager::new(); + let alice = &tcm.alice().await; + let bob = &tcm.bob().await; + let bob2 = &tcm.bob().await; + let alice_chat = alice.create_chat(bob).await; + + let sent = alice.send_text(alice_chat.id, "hi").await; + let msg = bob.recv_msg(&sent).await; + msg.chat_id.accept(bob).await?; + markseen_msgs(bob, vec![msg.id]).await?; + smtp::queue_mdn(bob).await?; + let sent_mdn = bob.pop_sent_msg().await; + + let Err(err) = receive_imf(bob2, sent_mdn.payload().as_bytes(), false).await else { + unreachable!(); + }; + assert!(format!("{err:#}").contains("(SKIP_DEVICE_MSG)")); + let msg = bob2.recv_msg(&sent).await; + assert_eq!(msg.get_state(), MessageState::InFresh); + bob2.recv_msg_trash(&sent_mdn).await; + assert_eq!(msg.id.get_state(bob2).await?, MessageState::InSeen); + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_gmx_forwarded_msg() -> Result<()> { let t = TestContext::new_alice().await; diff --git a/src/smtp.rs b/src/smtp.rs index 020c575fd3..76deb73f87 100644 --- a/src/smtp.rs +++ b/src/smtp.rs @@ -13,7 +13,7 @@ use crate::config::Config; use crate::contact::{Contact, ContactId}; use crate::context::Context; use crate::events::EventType; -use crate::log::{LogExt, warn}; +use crate::log::warn; use crate::message::Message; use crate::message::{self, MsgId}; use crate::mimefactory::MimeFactory; @@ -590,44 +590,77 @@ async fn send_mdn_rfc724_mid( if context.get_config_bool(Config::BccSelf).await? { add_self_recipients(context, &mut recipients, encrypted).await?; } - let recipients: Vec<_> = recipients - .into_iter() - .filter_map(|addr| { - async_smtp::EmailAddress::new(addr.clone()) - .with_context(|| format!("Invalid recipient: {addr}")) - .log_err(context) - .ok() - }) - .collect(); - - match smtp_send(context, &recipients, &body, smtp, None).await { - SendResult::Success => { - if !recipients.is_empty() { - info!(context, "Successfully sent MDN for {rfc724_mid}."); + #[cfg(not(test))] + { + use crate::log::LogExt; + + let recipients: Vec<_> = recipients + .into_iter() + .filter_map(|addr| { + async_smtp::EmailAddress::new(addr.clone()) + .with_context(|| format!("Invalid recipient: {addr}")) + .log_err(context) + .ok() + }) + .collect(); + + match smtp_send(context, &recipients, &body, smtp, None).await { + SendResult::Success => { + if !recipients.is_empty() { + info!(context, "Successfully sent MDN for {rfc724_mid}."); + } + context + .sql + .transaction(|transaction| { + let mut stmt = + transaction.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?; + stmt.execute((rfc724_mid,))?; + for additional_rfc724_mid in additional_rfc724_mids { + stmt.execute((additional_rfc724_mid,))?; + } + Ok(()) + }) + .await?; + Ok(true) } - context - .sql - .transaction(|transaction| { - let mut stmt = - transaction.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?; - stmt.execute((rfc724_mid,))?; - for additional_rfc724_mid in additional_rfc724_mids { - stmt.execute((additional_rfc724_mid,))?; - } - Ok(()) - }) - .await?; - Ok(true) - } - SendResult::Retry => { - info!( - context, - "Temporary SMTP failure while sending an MDN for {rfc724_mid}." - ); - Ok(false) + SendResult::Retry => { + info!( + context, + "Temporary SMTP failure while sending an MDN for {rfc724_mid}." + ); + Ok(false) + } + SendResult::Failure(err) => Err(err), } - SendResult::Failure(err) => Err(err), } + #[cfg(test)] + { + let _ = smtp; + context + .sql + .transaction(|t| { + t.execute( + "INSERT INTO smtp (rfc724_mid, recipients, mime, msg_id) + VALUES (?, ?, ?, ?)", + (rfc724_mid, recipients.join(" "), body, u32::MAX), + )?; + let mut stmt = t.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?; + stmt.execute((rfc724_mid,))?; + for additional_rfc724_mid in additional_rfc724_mids { + stmt.execute((additional_rfc724_mid,))?; + } + Ok(()) + }) + .await?; + Ok(true) + } +} + +#[cfg(test)] +pub(crate) async fn queue_mdn(context: &Context) -> Result<()> { + let queued = send_mdn(context, &mut Smtp::new()).await?; + assert!(queued); + Ok(()) } /// Tries to send a single MDN. Returns true if more MDNs should be sent.