melib/nntp: accept invalid (non-ascii) address comment text #270

Merged
Manos Pitsidianakis merged 3 commits from fix/269-invalid-ctext-loop into master 2023-08-10 18:38:48 +03:00
3 changed files with 136 additions and 77 deletions

View File

@ -781,6 +781,11 @@ pub mod generic {
))(input)
}
/// Invalid version of [`ctext`] that accepts non-ascii characters.
fn ctext_invalid(input: &[u8]) -> IResult<&[u8], ()> {
map(is_not("()\\"), |_| ())(input)
}
///```text
/// ctext = %d33-39 / ; Printable US-ASCII
/// %d42-91 / ; characters not including
@ -804,8 +809,10 @@ pub mod generic {
));
}
input = context("comment()", opt(fws))(input)?.0;
while let Ok((_input, _)) =
context("comment()", alt((ctext, map(quoted_pair, |_| ()))))(input)
while let Ok((_input, _)) = context(
"comment()",
alt((ctext, ctext_invalid, map(quoted_pair, |_| ()))),
)(input)
{
input = _input;
}

View File

@ -296,10 +296,9 @@ impl NntpStream {
ret: &mut String,
is_multiline: bool,
expected_reply_code: &[&str],
) -> Result<()> {
) -> Result<u32> {
self.read_lines(ret, is_multiline, expected_reply_code)
.await?;
Ok(())
.await
}
pub async fn read_lines(
@ -307,7 +306,7 @@ impl NntpStream {
ret: &mut String,
is_multiline: bool,
expected_reply_code: &[&str],
) -> Result<()> {
) -> Result<u32> {
let mut buf: Vec<u8> = vec![0; Connection::IO_BUF_SIZE];
ret.clear();
let mut last_line_idx: usize = 0;
@ -337,7 +336,14 @@ impl NntpStream {
if let Some(mut pos) = ret[last_line_idx..].rfind("\r\n") {
if !is_multiline {
break;
} else if let Some(pos) = ret.find("\r\n.\r\n") {
}
if !matches!(
expected_reply_code.iter().position(|r| ret.starts_with(r)),
Some(0) | None
) {
break;
}
if let Some(pos) = ret.find("\r\n.\r\n") {
ret.replace_range(pos + "\r\n".len()..pos + "\r\n.\r\n".len(), "");
break;
}
@ -355,27 +361,24 @@ impl NntpStream {
}
}
}
//debug!("returning nntp response:\n{:?}", &ret);
Ok(())
ret.split_whitespace()
.next()
.map(str::parse)
.and_then(std::result::Result::ok)
.ok_or_else(|| Error::new(format!("Internal error: {}", ret)))
}
pub async fn send_command(&mut self, command: &[u8]) -> Result<()> {
debug!("sending: {}", unsafe {
std::str::from_utf8_unchecked(command)
});
if let Err(err) = try_await(async move {
let command = command.trim();
self.stream.write_all(command).await?;
self.stream.write_all(b"\r\n").await?;
self.stream.flush().await?;
debug!("sent: {}", unsafe {
std::str::from_utf8_unchecked(command)
});
Ok(())
})
.await
{
debug!("stream send_command err {:?}", err);
log::debug!("stream send_command err {:?}", err);
Err(err)
} else {
Ok(())
@ -404,12 +407,11 @@ impl NntpStream {
}
self.stream.write_all(b".\r\n").await?;
self.stream.flush().await?;
debug!("sent data block {} bytes", data.len());
Ok(())
})
.await
{
debug!("stream send_multiline_data_block err {:?}", err);
log::debug!("stream send_multiline_data_block err {:?}", err);
Err(err)
} else {
Ok(())
@ -456,7 +458,7 @@ impl NntpConnection {
ret: &'a mut String,
is_multiline: bool,
expected_reply_code: &'static [&str],
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
) -> Pin<Box<dyn Future<Output = Result<u32>> + Send + 'a>> {
Box::pin(async move {
ret.clear();
self.stream
@ -480,13 +482,27 @@ impl NntpConnection {
}
pub async fn send_command(&mut self, command: &[u8]) -> Result<()> {
// RFC 3977
// 3. Basic Concepts
// 3.1.
// Commands and Responses Command lines MUST NOT exceed 512 octets, which
// includes the terminating CRLF pair.
if command.len() + b"\r\n".len() >= 512 {
log::error!(
"{}: Sending a command to the NNTP server that is over 511 bytes: this is invalid \
and should be fixed. Please report this as a bug to the melib bugtracker. The \
command line is `{:?}\\r\\n`",
&self.uid_store.account_name,
command
);
}
if let Err(err) =
try_await(async { self.stream.as_mut()?.send_command(command).await }).await
{
self.stream = Err(err.clone());
debug!(err.kind);
log::debug!("NNTP send command error {:?} {}", err.kind, err);
if err.kind.is_network() {
debug!(self.connect().await)?;
self.connect().await?;
}
Err(err)
} else {
@ -540,7 +556,7 @@ impl NntpConnection {
pub fn command_to_replycodes(c: &str) -> &'static [&'static str] {
if c.starts_with("OVER") {
&["224 "]
&["224 ", "423 "]
} else if c.starts_with("LIST") {
&["215 "]
} else if c.starts_with("POST") {

View File

@ -241,7 +241,7 @@ impl MailBackend for NntpType {
mailbox_hash,
uid_store: self.uid_store.clone(),
connection: self.connection.clone(),
high_low_total: None,
total_low_high: None,
};
Ok(Box::pin(async_stream::try_stream! {
{
@ -731,39 +731,65 @@ impl NntpType {
pub async fn nntp_mailboxes(connection: &Arc<FutureMutex<NntpConnection>>) -> Result<()> {
let mut res = String::with_capacity(8 * 1024);
let mut conn = connection.lock().await;
let command = {
let mut mailboxes = {
let mailboxes_lck = conn.uid_store.mailboxes.lock().await;
mailboxes_lck
.values()
.fold("LIST ACTIVE ".to_string(), |mut acc, x| {
if acc.len() != "LIST ACTIVE ".len() {
acc.push(',');
}
acc.push_str(x.name());
acc
})
.map(|m| m.name().to_string())
.collect::<SmallVec<[String; 16]>>()
};
conn.send_command(command.as_bytes()).await?;
conn.read_response(&mut res, true, &["215 "])
.await
.chain_err_summary(|| {
format!(
"Could not get newsgroups {}: expected LIST ACTIVE response but got: {}",
&conn.uid_store.account_name, res
)
})?;
debug!(&res);
let mut mailboxes_lck = conn.uid_store.mailboxes.lock().await;
for l in res.split_rn().skip(1) {
let s = l.split_whitespace().collect::<SmallVec<[&str; 4]>>();
if s.len() != 3 {
continue;
mailboxes.reverse();
while !mailboxes.is_empty() {
let mut command = "LIST ACTIVE ".to_string();
'batch: while let Some(m) = mailboxes.pop() {
/* first check if the group name itself is too big for `LIST ACTIVE`. */
if "LIST ACTIVE ".len() + m.len() + "\r\n".len() >= 512 {
log::warn!(
"{}: Newsgroup named {} has a name that exceeds RFC 3977 limits of \
maximum command lines (512 octets) with LIST ACTIVE. Skipping it.",
&conn.uid_store.account_name,
m
);
continue 'batch;
}
if command.len() != "LIST ACTIVE ".len() {
command.push(',');
}
// RFC 3977
// 3. Basic Concepts
// 3.1. Commands and Responses
// Command lines MUST NOT exceed 512 octets, which includes the terminating CRLF
// pair.
if command.len() + m.len() + "\r\n".len() >= 512 {
mailboxes.push(m);
if command.ends_with(',') {
command.pop();
}
break 'batch;
}
command.push_str(&m);
}
conn.send_command(command.as_bytes()).await?;
conn.read_response(&mut res, true, &["215 "])
.await
.chain_err_summary(|| {
format!(
"Could not get newsgroups {}: expected LIST ACTIVE response but got: {}",
&conn.uid_store.account_name, res
)
})?;
let mut mailboxes_lck = conn.uid_store.mailboxes.lock().await;
for l in res.split_rn().skip(1) {
let s = l.split_whitespace().collect::<SmallVec<[&str; 4]>>();
if s.len() != 3 {
continue;
}
let mailbox_hash = MailboxHash(get_path_hash!(&s[0]));
mailboxes_lck.entry(mailbox_hash).and_modify(|m| {
*m.high_watermark.lock().unwrap() = usize::from_str(s[1]).unwrap_or(0);
*m.low_watermark.lock().unwrap() = usize::from_str(s[2]).unwrap_or(0);
});
}
let mailbox_hash = MailboxHash(get_path_hash!(&s[0]));
mailboxes_lck.entry(mailbox_hash).and_modify(|m| {
*m.high_watermark.lock().unwrap() = usize::from_str(s[1]).unwrap_or(0);
*m.low_watermark.lock().unwrap() = usize::from_str(s[2]).unwrap_or(0);
});
}
Ok(())
}
@ -876,7 +902,7 @@ struct FetchState {
mailbox_hash: MailboxHash,
connection: Arc<FutureMutex<NntpConnection>>,
uid_store: Arc<UIDStore>,
high_low_total: Option<(usize, usize, usize)>,
total_low_high: Option<(usize, usize, usize)>,
}
impl FetchState {
@ -885,13 +911,14 @@ impl FetchState {
mailbox_hash,
ref connection,
ref uid_store,
ref mut high_low_total,
ref mut total_low_high,
} = self;
let mailbox_hash = *mailbox_hash;
let mut res = String::with_capacity(8 * 1024);
let mut conn = connection.lock().await;
let mut unseen = LazyCountSet::new();
if high_low_total.is_none() {
if total_low_high.is_none() {
conn.select_group(mailbox_hash, true, &mut res).await?;
/*
* Parameters
@ -911,37 +938,46 @@ impl FetchState {
)));
}
let total = usize::from_str(s[1]).unwrap_or(0);
let _low = usize::from_str(s[2]).unwrap_or(0);
let low = usize::from_str(s[2]).unwrap_or(0);
let high = usize::from_str(s[3]).unwrap_or(0);
*high_low_total = Some((high, _low, total));
*total_low_high = Some((total, low, high));
{
let f = &uid_store.mailboxes.lock().await[&mailbox_hash];
f.exists.lock().unwrap().set_not_yet_seen(total);
};
}
let (high, low, _) = high_low_total.unwrap();
if high <= low {
return Ok(None);
}
const CHUNK_SIZE: usize = 50000;
let new_low = std::cmp::max(low, high.saturating_sub(CHUNK_SIZE));
high_low_total.as_mut().unwrap().0 = new_low;
// [ref:FIXME]: server might not implement OVER capability
conn.send_command(format!("OVER {}-{}", new_low, high).as_bytes())
.await?;
conn.read_response(&mut res, true, command_to_replycodes("OVER"))
.await
.chain_err_summary(|| {
format!(
"{} Could not select newsgroup: expected OVER response but got: {}",
&uid_store.account_name, res
)
})?;
let mut ret = Vec::with_capacity(high - new_low);
//hash_index: Arc<Mutex<HashMap<EnvelopeHash, (UID, MailboxHash)>>>,
//uid_index: Arc<Mutex<HashMap<(MailboxHash, UID), EnvelopeHash>>>,
let (low, new_low) = loop {
let (_, low, high) = total_low_high.unwrap();
if high <= low {
return Ok(None);
}
const CHUNK_SIZE: usize = 50000;
let new_low = std::cmp::max(low, std::cmp::min(high, low.saturating_add(CHUNK_SIZE)));
total_low_high.as_mut().unwrap().1 = new_low;
// [ref:FIXME]: server might not implement OVER capability
conn.send_command(format!("OVER {}-{}", low, new_low).as_bytes())
.await?;
let reply_code = conn
.read_response(&mut res, true, command_to_replycodes("OVER"))
.await
.chain_err_summary(|| {
format!(
"{} Could not select newsgroup: expected OVER response but got: {}",
&uid_store.account_name, res
)
})?;
if reply_code == 423 {
// No articles in this range, so move on to next chunk.
continue;
}
break (low, new_low);
};
let mut ret = Vec::with_capacity(new_low - low);
let mut latest_article: Option<crate::UnixTimestamp> = None;
{
let mut message_id_lck = uid_store.message_id_index.lock().await;
let mut uid_index_lck = uid_store.uid_index.lock().await;