melib/imap: add connection instance id string for debugging in logs

- Add an ID field in ImapConnection and ImapStream that records where
  each instance was created. This is useful for differentiating main
  backend connections from watching thread connections (the ones that
  listen to updates from the IMAP server with IDLE or polling).
- Add an imap_trace! macro that uses log::trace! internally but also
  prepends the connection's ID string to each log line.
pull/223/head
Manos Pitsidianakis 2023-06-17 19:30:57 +03:00
parent 8f14a2373e
commit fd0faade06
Signed by: Manos Pitsidianakis
GPG Key ID: 7729C7707F7E09D0
3 changed files with 166 additions and 54 deletions

View File

@ -27,6 +27,7 @@ mod mailbox;
pub use mailbox::*;
mod operations;
pub use operations::*;
#[macro_use]
mod connection;
pub use connection::*;
mod watch;
@ -368,6 +369,8 @@ impl MailBackend for ImapType {
}
};
Ok(Box::pin(async_stream::try_stream! {
#[cfg(debug_assertions)]
let id = state.connection.lock().await.id.clone();
{
let f = &state.uid_store.mailboxes.lock().await[&mailbox_hash];
prepare_cl(f);
@ -378,8 +381,10 @@ impl MailBackend for ImapType {
};
loop {
let res = fetch_hlpr(&mut state).await.map_err(|err| {
debug!("fetch_hlpr err {:?}", &err);
err})?;
#[cfg(debug_assertions)]
log::trace!("{} fetch_hlpr err {:?}", id, &err);
err
})?;
yield res;
if state.stage == FetchStage::Finished {
return;
@ -459,7 +464,7 @@ impl MailBackend for ImapType {
Ok(Box::pin(async move {
match timeout(timeout_dur, connection.lock()).await {
Ok(mut conn) => {
debug!("is_online");
imap_trace!(conn, "is_online");
match timeout(timeout_dur, conn.connect()).await {
Ok(Ok(())) => Ok(()),
Err(err) | Ok(Err(err)) => {
@ -493,14 +498,24 @@ impl MailBackend for ImapType {
};
while let Err(err) = if has_idle {
idle(ImapWatchKit {
conn: ImapConnection::new_connection(&server_conf, uid_store.clone()),
conn: ImapConnection::new_connection(
&server_conf,
#[cfg(debug_assertions)]
"watch()::idle".into(),
uid_store.clone(),
),
main_conn: main_conn.clone(),
uid_store: uid_store.clone(),
})
.await
} else {
poll_with_examine(ImapWatchKit {
conn: ImapConnection::new_connection(&server_conf, uid_store.clone()),
conn: ImapConnection::new_connection(
&server_conf,
#[cfg(debug_assertions)]
"watch()::poll_with_examine".into(),
uid_store.clone(),
),
main_conn: main_conn.clone(),
uid_store: uid_store.clone(),
})
@ -512,16 +527,27 @@ impl MailBackend for ImapType {
} else {
return Err(err);
}
debug!("Watch failure: {}", err.to_string());
log::trace!(
"{} Watch failure: {}",
uid_store.account_name,
err.to_string()
);
match timeout(uid_store.timeout, main_conn_lck.connect())
.await
.and_then(|res| res)
{
Err(err2) => {
debug!("Watch reconnect attempt failed: {}", err2.to_string());
log::trace!(
"{} Watch reconnect attempt failed: {}",
uid_store.account_name,
err2.to_string()
);
}
Ok(()) => {
debug!("Watch reconnect attempt succesful");
log::trace!(
"{} Watch reconnect attempt succesful",
uid_store.account_name
);
continue;
}
}
@ -533,7 +559,7 @@ impl MailBackend for ImapType {
});
return Err(err);
}
debug!("watch future returning");
log::trace!("{} watch future returning", uid_store.account_name);
Ok(())
}))
}
@ -862,7 +888,11 @@ impl MailBackend for ImapType {
conn.send_command(CommandBody::Expunge).await?;
conn.read_response(&mut response, RequiredResponses::empty())
.await?;
debug!("EXPUNGE response: {}", &String::from_utf8_lossy(&response));
imap_trace!(
conn,
"EXPUNGE response: {}",
&String::from_utf8_lossy(&response)
);
Ok(())
}))
}
@ -913,7 +943,11 @@ impl MailBackend for ImapType {
}
for root_mailbox in mailboxes.values().filter(|f| f.parent.is_none()) {
if path.starts_with(&root_mailbox.name) {
debug!("path starts with {:?}", &root_mailbox);
log::trace!(
"{} path starts with {:?}",
uid_store.account_name,
&root_mailbox
);
path = path.replace(
'/',
(root_mailbox.separator as char).encode_utf8(&mut [0; 4]),
@ -1002,7 +1036,7 @@ impl MailBackend for ImapType {
}
conn_lck
.send_command(debug!(CommandBody::delete(imap_path.as_str())?))
.send_command(CommandBody::delete(imap_path.as_str())?)
.await?;
conn_lck
.read_response(&mut response, RequiredResponses::empty())
@ -1107,9 +1141,7 @@ impl MailBackend for ImapType {
}
{
let mut conn_lck = connection.lock().await;
conn_lck
.send_command_raw(debug!(command).as_bytes())
.await?;
conn_lck.send_command_raw(command.as_bytes()).await?;
conn_lck
.read_response(&mut response, RequiredResponses::empty())
.await?;
@ -1181,7 +1213,8 @@ impl MailBackend for ImapType {
.await?;
conn.read_response(&mut response, RequiredResponses::SEARCH)
.await?;
debug!(
imap_trace!(
conn,
"searching for {} returned: {}",
query_str,
String::from_utf8_lossy(&response)
@ -1277,7 +1310,12 @@ impl ImapType {
server_conf.timeout,
)
});
let connection = ImapConnection::new_connection(&server_conf, uid_store.clone());
let connection = ImapConnection::new_connection(
&server_conf,
#[cfg(debug_assertions)]
"ImapType::new".into(),
uid_store.clone(),
);
Ok(Box::new(ImapType {
server_conf,
@ -1288,7 +1326,12 @@ impl ImapType {
}
pub fn shell(&mut self) {
let mut conn = ImapConnection::new_connection(&self.server_conf, self.uid_store.clone());
let mut conn = ImapConnection::new_connection(
&self.server_conf,
#[cfg(debug_assertions)]
"ImapType::shell".into(),
self.uid_store.clone(),
);
futures::executor::block_on(timeout(self.server_conf.timeout, conn.connect()))
.unwrap()
@ -1333,7 +1376,7 @@ impl ImapType {
if input.trim() == "IDLE" {
let mut iter = ImapBlockingConnection::from(conn);
while let Some(line) = iter.next() {
debug!("out: {}", unsafe { std::str::from_utf8_unchecked(&line) });
imap_trace!("out: {}", unsafe { std::str::from_utf8_unchecked(&line) });
}
conn = iter.into_conn();
}
@ -1371,7 +1414,7 @@ impl ImapType {
conn.read_response(&mut res, RequiredResponses::LIST_REQUIRED)
.await?;
}
debug!("LIST reply: {}", String::from_utf8_lossy(&res));
imap_trace!(conn, "LIST reply: {}", String::from_utf8_lossy(&res));
for l in res.split_rn() {
if !l.starts_with(b"*") {
continue;
@ -1415,14 +1458,14 @@ impl ImapType {
}
}
} else {
debug!("parse error for {:?}", l);
imap_trace!(conn, "parse error for {:?}", l);
}
}
mailboxes.retain(|_, v| !v.hash.is_null());
conn.send_command(CommandBody::lsub("", "*")?).await?;
conn.read_response(&mut res, RequiredResponses::LSUB_REQUIRED)
.await?;
debug!("LSUB reply: {}", String::from_utf8_lossy(&res));
imap_trace!(conn, "LSUB reply: {}", String::from_utf8_lossy(&res));
for l in res.split_rn() {
if !l.starts_with(b"*") {
continue;
@ -1437,7 +1480,7 @@ impl ImapType {
f.is_subscribed = true;
}
} else {
debug!("parse error for {:?}", l);
imap_trace!(conn, "parse error for {:?}", l);
}
}
Ok(mailboxes)
@ -1578,7 +1621,12 @@ struct FetchState {
}
async fn fetch_hlpr(state: &mut FetchState) -> Result<Vec<Envelope>> {
debug!((state.mailbox_hash, &state.stage));
imap_trace!(
state.connection.lock().await,
"fetch_hlpr mailbox: {:?} stage: {:?}",
state.mailbox_hash,
&state.stage
);
loop {
match state.stage {
FetchStage::InitialFresh => {
@ -1635,7 +1683,8 @@ async fn fetch_hlpr(state: &mut FetchState) -> Result<Vec<Envelope>> {
}
Ok(Some(cached_payload)) => {
state.stage = FetchStage::ResyncCache;
debug!(
imap_trace!(
state.connection.lock().await,
"fetch_hlpr fetch_cached_envs payload {} len for mailbox_hash {}",
cached_payload.len(),
state.mailbox_hash
@ -1706,7 +1755,7 @@ async fn fetch_hlpr(state: &mut FetchState) -> Result<Vec<Envelope>> {
conn.examine_mailbox(mailbox_hash, &mut response, false)
.await?;
if max_uid_left > 0 {
debug!("{} max_uid_left= {}", mailbox_hash, max_uid_left);
imap_trace!(conn, "{} max_uid_left= {}", mailbox_hash, max_uid_left);
let sequence_set = if max_uid_left == 1 {
SequenceSet::from(ONE)
} else {
@ -1730,7 +1779,8 @@ async fn fetch_hlpr(state: &mut FetchState) -> Result<Vec<Envelope>> {
)
})?;
let (_, mut v, _) = protocol_parser::fetch_responses(&response)?;
debug!(
imap_trace!(
conn,
"fetch response is {} bytes and {} lines and has {} parsed Envelopes",
response.len(),
String::from_utf8_lossy(&response).lines().count(),
@ -1746,12 +1796,20 @@ async fn fetch_hlpr(state: &mut FetchState) -> Result<Vec<Envelope>> {
} in v.iter_mut()
{
if uid.is_none() || envelope.is_none() || flags.is_none() {
debug!("BUG? in fetch is none");
debug!(uid);
debug!(envelope);
debug!(flags);
debug!("response was: {}", String::from_utf8_lossy(&response));
debug!(conn.process_untagged(raw_fetch_value).await)?;
imap_trace!(
conn,
"BUG? something in fetch is none. UID: {:?}, envelope: {:?} \
flags: {:?}",
uid,
envelope,
flags
);
imap_trace!(
conn,
"response was: {}",
String::from_utf8_lossy(&response)
);
conn.process_untagged(raw_fetch_value).await?;
continue;
}
let uid = uid.unwrap();
@ -1802,7 +1860,7 @@ async fn fetch_hlpr(state: &mut FetchState) -> Result<Vec<Envelope>> {
let uid = uid.unwrap();
let env = envelope.unwrap();
/*
debug!(
imap_trace!(
"env hash {} {} UID = {} MSN = {}",
env.hash(),
env.subject(),

View File

@ -28,6 +28,8 @@ use crate::{
LogLevel,
};
extern crate native_tls;
#[cfg(debug_assertions)]
use std::borrow::Cow;
use std::{
collections::HashSet,
convert::TryFrom,
@ -58,14 +60,25 @@ pub use smol::Async as AsyncWrapper;
const IMAP_PROTOCOL_TIMEOUT: Duration = Duration::from_secs(60 * 28);
macro_rules! imap_trace {
($conn:expr, $fmt:literal, $($t:tt)*) => {
#[cfg(debug_assertions)]
log::trace!(std::concat!("{} ", $fmt), $conn.id, $($t)*);
};
($conn:expr, $fmt:literal) => {
#[cfg(debug_assertions)]
log::trace!(std::concat!("{} ", $fmt), $conn.id);
};
}
use super::{protocol_parser, Capabilities, ImapServerConf, UIDStore};
#[derive(Debug, Clone, Copy)]
pub enum SyncPolicy {
None,
///rfc4549 `Synch Ops for Disconnected IMAP4 Clients` <https://tools.ietf.org/html/rfc4549>
/// RFC4549 `Synch Ops for Disconnected IMAP4 Clients` <https://tools.ietf.org/html/rfc4549>
Basic,
///rfc7162 `IMAP Extensions: Quick Flag Changes Resynchronization
/// RFC7162 `IMAP Extensions: Quick Flag Changes Resynchronization
/// (CONDSTORE) and Quick Mailbox Resynchronization (QRESYNC)`
Condstore,
CondstoreQresync,
@ -101,6 +114,8 @@ impl Default for ImapExtensionUse {
#[derive(Debug)]
pub struct ImapStream {
pub cmd_id: usize,
#[cfg(debug_assertions)]
pub id: Cow<'static, str>,
pub stream: AsyncWrapper<Connection>,
pub protocol: ImapProtocol,
pub current_mailbox: MailboxSelection,
@ -126,6 +141,8 @@ async fn try_await(cl: impl Future<Output = Result<()>> + Send) -> Result<()> {
#[derive(Debug)]
pub struct ImapConnection {
#[cfg(debug_assertions)]
pub id: Cow<'static, str>,
pub stream: Result<ImapStream>,
pub server_conf: ImapServerConf,
pub sync_policy: SyncPolicy,
@ -135,6 +152,7 @@ pub struct ImapConnection {
impl ImapStream {
pub async fn new_connection(
server_conf: &ImapServerConf,
#[cfg(debug_assertions)] id: Cow<'static, str>,
uid_store: &UIDStore,
) -> Result<(Capabilities, ImapStream)> {
use std::net::TcpStream;
@ -280,6 +298,8 @@ impl ImapStream {
let mut res = Vec::with_capacity(8 * 1024);
let mut ret = ImapStream {
cmd_id,
#[cfg(debug_assertions)]
id,
stream,
protocol: server_conf.protocol,
current_mailbox: MailboxSelection::None,
@ -493,7 +513,7 @@ impl ImapStream {
}
}
}
//debug!("returning IMAP response:\n{:?}", &ret);
//imap_trace!(self, "returning IMAP response:\n{:?}", &ret);
Ok(())
}
@ -511,6 +531,16 @@ impl ImapStream {
Command { tag, body }
};
match self.protocol {
ImapProtocol::IMAP { .. } => {
if matches!(command.body, CommandBody::Login { .. }) {
imap_trace!(self, "sent: M{} LOGIN ..", self.cmd_id - 1);
} else {
imap_trace!(self, "sent: M{} {:?}", self.cmd_id - 1, command.body);
}
}
ImapProtocol::ManageSieve => {}
}
for action in command.encode() {
match action {
@ -560,11 +590,11 @@ impl ImapStream {
match self.protocol {
ImapProtocol::IMAP { .. } => {
if !command.starts_with(b"LOGIN") {
debug!("sent: M{} {}", self.cmd_id - 1, unsafe {
imap_trace!(self, "sent: M{} {}", self.cmd_id - 1, unsafe {
std::str::from_utf8_unchecked(command)
});
} else {
debug!("sent: M{} LOGIN ..", self.cmd_id - 1);
imap_trace!(self, "sent: M{} LOGIN ..", self.cmd_id - 1);
}
}
ImapProtocol::ManageSieve => {}
@ -594,10 +624,13 @@ impl ImapStream {
impl ImapConnection {
pub fn new_connection(
server_conf: &ImapServerConf,
#[cfg(debug_assertions)] id: Cow<'static, str>,
uid_store: Arc<UIDStore>,
) -> ImapConnection {
ImapConnection {
stream: Err(Error::new("Offline".to_string())),
#[cfg(debug_assertions)]
id,
server_conf: server_conf.clone(),
sync_policy: if uid_store.keep_offline_cache {
SyncPolicy::Basic
@ -625,23 +658,30 @@ impl ImapConnection {
}
if self.stream.is_ok() {
let mut ret = Vec::new();
if let Err(err) = try_await(async {
if let Err(_err) = try_await(async {
self.send_command(CommandBody::Noop).await?;
self.read_response(&mut ret, RequiredResponses::empty())
.await
})
.await
{
debug!("connect(): connection is probably dead: {:?}", &err);
imap_trace!(self, "connect(): connection is probably dead: {:?}", &_err);
} else {
debug!(
imap_trace!(
self,
"connect(): connection is probably alive, NOOP returned {:?}",
&String::from_utf8_lossy(&ret)
);
return Ok(());
}
}
let new_stream = ImapStream::new_connection(&self.server_conf, &self.uid_store).await;
let new_stream = ImapStream::new_connection(
&self.server_conf,
#[cfg(debug_assertions)]
self.id.clone(),
&self.uid_store,
)
.await;
if let Err(err) = new_stream.as_ref() {
self.uid_store.is_online.lock().unwrap().1 = Err(err.clone());
} else {
@ -717,6 +757,8 @@ impl ImapConnection {
ImapResponse::Ok(_) => {
let ImapStream {
cmd_id,
#[cfg(debug_assertions)]
id,
stream,
protocol,
current_mailbox,
@ -725,6 +767,8 @@ impl ImapConnection {
let stream = stream.into_inner()?;
self.stream = Ok(ImapStream {
cmd_id,
#[cfg(debug_assertions)]
id,
stream: AsyncWrapper::new(stream.deflate())?,
protocol,
current_mailbox,
@ -764,17 +808,19 @@ impl ImapConnection {
ret.extend_from_slice(&response);
return r.into();
}
ImapResponse::No(ref response_code)
ImapResponse::No(ref _response_code)
if required_responses.intersects(RequiredResponses::NO_REQUIRED) =>
{
debug!(
imap_trace!(
self,
"Received expected NO response: {:?} {:?}",
response_code,
_response_code,
String::from_utf8_lossy(&response)
);
}
ImapResponse::No(ref response_code) => {
debug!(
imap_trace!(
self,
"Received NO response: {:?} {:?}",
response_code,
String::from_utf8_lossy(&response)
@ -791,7 +837,8 @@ impl ImapConnection {
return r.into();
}
ImapResponse::Bad(ref response_code) => {
debug!(
imap_trace!(
self,
"Received BAD response: {:?} {:?}",
response_code,
String::from_utf8_lossy(&response)
@ -809,12 +856,12 @@ impl ImapConnection {
}
_ => {}
}
/*debug!(
/* imap_trace!(self,
"check every line for required_responses: {:#?}",
&required_responses
);*/
for l in response.split_rn() {
/* debug!("check line: {}", &l); */
/* imap_trace!(self, "check line: {}", &l); */
if required_responses.check(l) || !self.process_untagged(l).await? {
ret.extend_from_slice(l);
}
@ -933,8 +980,9 @@ impl ImapConnection {
.await?;
self.read_response(ret, RequiredResponses::SELECT_REQUIRED)
.await?;
debug!(
"{} select response {}",
imap_trace!(
self,
"{} SELECT response {}",
imap_path,
String::from_utf8_lossy(ret)
);
@ -1019,7 +1067,8 @@ impl ImapConnection {
.await?;
self.read_response(ret, RequiredResponses::EXAMINE_REQUIRED)
.await?;
debug!("examine response {}", String::from_utf8_lossy(ret));
imap_trace!(self, "EXAMINE response {}", String::from_utf8_lossy(ret));
let select_response = protocol_parser::select_response(ret).chain_err_summary(|| {
format!("Could not parse select response for mailbox {}", imap_path)
})?;

View File

@ -334,7 +334,12 @@ impl ManageSieveConnection {
)
});
Ok(Self {
inner: ImapConnection::new_connection(&server_conf, uid_store),
inner: ImapConnection::new_connection(
&server_conf,
#[cfg(debug_assertions)]
"ManageSieveConnection::new()".into(),
uid_store,
),
})
}