melib/jmap: Use Url instead of String in deserializing
Cargo manifest lints / Lint Cargo manifests on ${{ matrix.build }} (linux-amd64, ubuntu-latest, stable, x86_64-unknown-linux-gnu) (pull_request) Successful in 13m55s Details
Run Tests / Test on ${{ matrix.build }} (linux-amd64, ubuntu-latest, stable, x86_64-unknown-linux-gnu) (pull_request) Successful in 18m21s Details
Run cargo lints / Lint on ${{ matrix.build }} (linux-amd64, ubuntu-latest, stable, x86_64-unknown-linux-gnu) (pull_request) Successful in 8m7s Details

Catch invalid URLs at the parsing stage.

Signed-off-by: Manos Pitsidianakis <manos@pitsidianak.is>
pull/353/head
Manos Pitsidianakis 2024-02-11 17:10:00 +02:00
parent 417b24cd84
commit 51e3f163d4
Signed by: Manos Pitsidianakis
GPG Key ID: 7729C7707F7E09D0
17 changed files with 901 additions and 682 deletions

2
Cargo.lock generated
View File

@ -1350,6 +1350,7 @@ dependencies = [
"socket2 0.5.5",
"stderrlog",
"unicode-segmentation",
"url",
"uuid",
"xdg",
]
@ -2457,6 +2458,7 @@ dependencies = [
"form_urlencoded",
"idna",
"percent-encoding",
"serde",
]
[[package]]

View File

@ -50,8 +50,8 @@ serde_path_to_error = { version = "0.1" }
smallvec = { version = "^1.5.0", features = ["serde"] }
smol = "1.0.0"
socket2 = { version = "0.5", features = [] }
unicode-segmentation = { version = "1.2.1", default-features = false, optional = true }
url = { version = "2.4", optional = true }
uuid = { version = "^1", features = ["serde", "v4", "v5"] }
xdg = "2.1.0"
@ -64,7 +64,7 @@ http = ["isahc"]
http-static = ["isahc", "isahc/static-curl"]
imap = ["imap-codec", "tls"]
imap-trace = ["imap"]
jmap = ["http"]
jmap = ["http", "url/serde"]
jmap-trace = ["jmap"]
nntp = ["tls"]
nntp-trace = ["nntp"]

View File

@ -19,6 +19,9 @@
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
// In case we forget to wait some future.
#![deny(unused_must_use)]
use smallvec::SmallVec;
#[macro_use]
mod protocol_parser;

View File

@ -19,7 +19,7 @@
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use std::{convert::TryFrom, sync::MutexGuard};
use std::convert::TryFrom;
use isahc::config::Configurable;
@ -28,8 +28,7 @@ use crate::error::NetworkErrorKind;
#[derive(Debug)]
pub struct JmapConnection {
pub session: Arc<Mutex<Session>>,
pub request_no: Arc<Mutex<usize>>,
pub request_no: Arc<FutureMutex<usize>>,
pub client: Arc<HttpClient>,
pub server_conf: JmapServerConf,
pub store: Arc<Store>,
@ -69,8 +68,7 @@ impl JmapConnection {
let client = client.build()?;
let server_conf = server_conf.clone();
Ok(Self {
session: Arc::new(Mutex::new(Default::default())),
request_no: Arc::new(Mutex::new(0)),
request_no: Arc::new(FutureMutex::new(0)),
client: Arc::new(client),
server_conf,
store,
@ -79,28 +77,37 @@ impl JmapConnection {
}
pub async fn connect(&mut self) -> Result<()> {
if self.store.online_status.lock().await.1.is_ok() {
if self.store.online_status.is_ok().await {
return Ok(());
}
fn to_well_known(uri: &str) -> String {
let uri = uri.trim_start_matches('/');
format!("{uri}/.well-known/jmap")
fn to_well_known(uri: &Url) -> Url {
let mut uri = uri.clone();
uri.set_path(".well-known/jmap");
uri
}
let mut jmap_session_resource_url = to_well_known(&self.server_conf.server_url);
let mut req = match self.client.get_async(&jmap_session_resource_url).await {
let mut req = match self
.client
.get_async(jmap_session_resource_url.as_str())
.await
{
Err(err) => 'block: {
if matches!(NetworkErrorKind::from(err.kind()), NetworkErrorKind::ProtocolViolation if self.server_conf.server_url.starts_with("http://"))
if matches!(NetworkErrorKind::from(err.kind()), NetworkErrorKind::ProtocolViolation if self.server_conf.server_url.scheme() == "http")
{
// attempt recovery by trying https://
self.server_conf.server_url = format!(
"https{}",
self.server_conf.server_url.trim_start_matches("http")
self.server_conf.server_url.set_scheme("https").expect(
"set_scheme to https must succeed here because we checked earlier that \
current scheme is http",
);
jmap_session_resource_url = to_well_known(&self.server_conf.server_url);
if let Ok(s) = self.client.get_async(&jmap_session_resource_url).await {
if let Ok(s) = self
.client
.get_async(jmap_session_resource_url.as_str())
.await
{
log::error!(
"Account {} server URL should start with `https`. Please correct your \
configuration value. Its current value is `{}`.",
@ -119,11 +126,12 @@ impl JmapConnection {
&self.server_conf.server_url, &err
))
.set_source(Some(Arc::new(err)));
*self.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
_ = self.store.online_status.set(None, Err(err.clone())).await;
return Err(err);
}
Ok(s) => s,
};
let req_instant = Instant::now();
if !req.status().is_success() {
let kind: crate::error::NetworkErrorKind = req.status().into();
@ -133,7 +141,11 @@ impl JmapConnection {
&self.server_conf.server_url, res_text
))
.set_kind(kind.into());
*self.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
_ = self
.store
.online_status
.set(Some(req_instant), Err(err.clone()))
.await;
return Err(err);
}
@ -147,7 +159,11 @@ impl JmapConnection {
&self.server_conf.server_url, &err
))
.set_source(Some(Arc::new(err)));
*self.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
_ = self
.store
.online_status
.set(Some(req_instant), Err(err.clone()))
.await;
return Err(err);
}
Ok(s) => s,
@ -163,7 +179,11 @@ impl JmapConnection {
&self.server_conf.server_url, &res_text
))
.set_source(Some(Arc::new(err)));
*self.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
_ = self
.store
.online_status
.set(Some(req_instant), Err(err.clone()))
.await;
return Err(err);
}
Ok(s) => s,
@ -181,7 +201,11 @@ impl JmapConnection {
.join(", "),
core_capability = JMAP_CORE_CAPABILITY
));
*self.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
_ = self
.store
.online_status
.set(Some(req_instant), Err(err.clone()))
.await;
return Err(err);
}
if !session.capabilities.contains_key(JMAP_MAIL_CAPABILITY) {
@ -197,24 +221,37 @@ impl JmapConnection {
.join(", "),
mail_capability = JMAP_MAIL_CAPABILITY
));
*self.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
_ = self
.store
.online_status
.set(Some(req_instant), Err(err.clone()))
.await;
return Err(err);
}
*self.store.core_capabilities.lock().unwrap() = session.capabilities.clone();
*self.store.online_status.lock().await = (Instant::now(), Ok(()));
*self.session.lock().unwrap() = session;
*self.store.core_capabilities.lock().unwrap() = session.capabilities.clone();
let mail_account_id = session.mail_account_id();
_ = self
.store
.online_status
.set(Some(req_instant), Ok(session))
.await;
/* Fetch account identities. */
let mut id_list = {
let mut req = Request::new(self.request_no.clone());
let identity_get = IdentityGet::new().account_id(self.mail_account_id());
req.add_call(&identity_get);
let identity_get = IdentityGet::new().account_id(mail_account_id.clone());
req.add_call(&identity_get).await;
let mut res_text = self.post_async(None, serde_json::to_string(&req)?).await?;
let res_text = res_text.text().await?;
let mut v: MethodResponse = match deserialize_from_str(&res_text) {
Err(err) => {
*self.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
_ = self
.store
.online_status
.set(Some(req_instant), Err(err.clone()))
.await;
return Err(err);
}
Ok(s) => s,
@ -227,7 +264,7 @@ impl JmapConnection {
let mut req = Request::new(self.request_no.clone());
let identity_set = IdentitySet(
Set::<IdentityObject>::new()
.account_id(self.mail_account_id())
.account_id(mail_account_id.clone())
.create(Some({
let address =
crate::email::Address::try_from(self.store.main_identity.as_str())
@ -258,24 +295,32 @@ impl JmapConnection {
}
})),
);
req.add_call(&identity_set);
req.add_call(&identity_set).await;
let mut res_text = self.post_async(None, serde_json::to_string(&req)?).await?;
let res_text = res_text.text().await?;
let _: MethodResponse = match deserialize_from_str(&res_text) {
Err(err) => {
*self.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
_ = self
.store
.online_status
.set(Some(req_instant), Err(err.clone()))
.await;
return Err(err);
}
Ok(s) => s,
};
let mut req = Request::new(self.request_no.clone());
let identity_get = IdentityGet::new().account_id(self.mail_account_id());
req.add_call(&identity_get);
let identity_get = IdentityGet::new().account_id(mail_account_id.clone());
req.add_call(&identity_get).await;
let mut res_text = self.post_async(None, serde_json::to_string(&req)?).await?;
let res_text = res_text.text().await?;
let mut v: MethodResponse = match deserialize_from_str(&res_text) {
Err(err) => {
*self.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
_ = self
.store
.online_status
.set(Some(req_instant), Err(err.clone()))
.await;
return Err(err);
}
Ok(s) => s,
@ -284,28 +329,17 @@ impl JmapConnection {
GetResponse::<IdentityObject>::try_from(v.method_responses.remove(0))?;
id_list = list;
}
self.session.lock().unwrap().identities =
self.session_guard().await?.identities =
id_list.into_iter().map(|id| (id.id.clone(), id)).collect();
Ok(())
}
pub fn mail_account_id(&self) -> Id<Account> {
self.session.lock().unwrap().primary_accounts[JMAP_MAIL_CAPABILITY].clone()
}
pub fn mail_identity_id(&self) -> Option<Id<IdentityObject>> {
self.session
.lock()
.unwrap()
.identities
.keys()
.next()
.cloned()
}
pub fn session_guard(&'_ self) -> MutexGuard<'_, Session> {
self.session.lock().unwrap()
#[inline]
pub async fn session_guard(
&'_ self,
) -> Result<FutureMappedMutexGuard<'_, (Instant, Result<Session>), Session>> {
self.store.online_status.session_guard().await
}
pub fn add_refresh_event(&self, event: RefreshEvent) {
@ -325,15 +359,16 @@ impl JmapConnection {
} else {
return Ok(());
};
let mail_account_id = self.session_guard().await?.mail_account_id();
loop {
let email_changes_call: EmailChanges = EmailChanges::new(
Changes::<EmailObject>::new()
.account_id(self.mail_account_id().clone())
.account_id(mail_account_id.clone())
.since_state(current_state.clone()),
);
let mut req = Request::new(self.request_no.clone());
let prev_seq = req.add_call(&email_changes_call);
let prev_seq = req.add_call(&email_changes_call).await;
let email_get_call: EmailGet = EmailGet::new(
Get::new()
.ids(Some(Argument::reference::<
@ -344,43 +379,46 @@ impl JmapConnection {
prev_seq,
ResultField::<EmailChanges, EmailObject>::new("/created"),
)))
.account_id(self.mail_account_id().clone()),
.account_id(mail_account_id.clone()),
);
req.add_call(&email_get_call);
let mailbox_id: Id<MailboxObject>;
if let Some(mailbox) = self.store.mailboxes.read().unwrap().get(&mailbox_hash) {
if let Some(email_query_state) = mailbox.email_query_state.lock().unwrap().clone() {
mailbox_id = mailbox.id.clone();
let email_query_changes_call = EmailQueryChanges::new(
QueryChanges::new(self.mail_account_id().clone(), email_query_state)
.filter(Some(Filter::Condition(
EmailFilterCondition::new()
.in_mailbox(Some(mailbox_id.clone()))
.into(),
))),
);
let seq_no = req.add_call(&email_query_changes_call);
let email_get_call: EmailGet = EmailGet::new(
Get::new()
.ids(Some(Argument::reference::<
EmailQueryChanges,
EmailObject,
EmailObject,
>(
seq_no,
ResultField::<EmailQueryChanges, EmailObject>::new("/removed"),
)))
.account_id(self.mail_account_id().clone())
.properties(Some(vec![
"keywords".to_string(),
"mailboxIds".to_string(),
])),
);
req.add_call(&email_get_call);
} else {
return Ok(());
}
req.add_call(&email_get_call).await;
let mailbox = self
.store
.mailboxes
.read()
.unwrap()
.get(&mailbox_hash)
.map(|m| {
let email_query_state = m.email_query_state.lock().unwrap().clone();
let mailbox_id: Id<MailboxObject> = m.id.clone();
(email_query_state, mailbox_id)
});
if let Some((Some(email_query_state), mailbox_id)) = mailbox {
let email_query_changes_call = EmailQueryChanges::new(
QueryChanges::new(mail_account_id.clone(), email_query_state).filter(Some(
Filter::Condition(
EmailFilterCondition::new()
.in_mailbox(Some(mailbox_id.clone()))
.into(),
),
)),
);
let seq_no = req.add_call(&email_query_changes_call).await;
let email_get_call: EmailGet = EmailGet::new(
Get::new()
.ids(Some(Argument::reference::<
EmailQueryChanges,
EmailObject,
EmailObject,
>(
seq_no,
ResultField::<EmailQueryChanges, EmailObject>::new("/removed"),
)))
.account_id(mail_account_id.clone())
.properties(Some(vec!["keywords".to_string(), "mailboxIds".to_string()])),
);
req.add_call(&email_get_call).await;
} else {
return Ok(());
}
@ -395,7 +433,7 @@ impl JmapConnection {
}
let mut v: MethodResponse = match deserialize_from_str(&res_text) {
Err(err) => {
*self.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
_ = self.store.online_status.set(None, Err(err.clone())).await;
return Err(err);
}
Ok(s) => s,
@ -425,11 +463,8 @@ impl JmapConnection {
.collect::<SmallVec<[MailboxHash; 8]>>();
mailbox_hashes.push(v);
}
for (env, mailbox_hashes) in list
.into_iter()
.map(|obj| self.store.add_envelope(obj))
.zip(mailbox_hashes)
{
for (obj, mailbox_hashes) in list.into_iter().zip(mailbox_hashes) {
let env = self.store.add_envelope(obj).await;
for mailbox_hash in mailbox_hashes.iter().skip(1).cloned() {
let mut mailboxes_lck = self.store.mailboxes.write().unwrap();
mailboxes_lck.entry(mailbox_hash).and_modify(|mbox| {
@ -460,7 +495,7 @@ impl JmapConnection {
}
}
}
let reverse_id_store_lck = self.store.reverse_id_store.lock().unwrap();
let reverse_id_store_lck = self.store.reverse_id_store.lock().await;
let response = v.method_responses.remove(0);
match EmailQueryChangesResponse::try_from(response) {
Ok(EmailQueryChangesResponse {
@ -581,7 +616,7 @@ impl JmapConnection {
let _: MethodResponse = match deserialize_from_str(&res_text) {
Err(err) => {
log::error!("{}", &err);
*self.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
_ = self.store.online_status.set(None, Err(err.clone())).await;
return Err(err);
}
Ok(s) => s,
@ -589,19 +624,19 @@ impl JmapConnection {
Ok(res_text)
}
pub async fn get_async(&self, url: &str) -> Result<isahc::Response<isahc::AsyncBody>> {
pub async fn get_async(&self, url: &Url) -> Result<isahc::Response<isahc::AsyncBody>> {
if cfg!(feature = "jmap-trace") {
let res = self.client.get_async(url).await;
let res = self.client.get_async(url.as_str()).await;
log::trace!("get_async(): url `{}` response {:?}", url, res);
Ok(res?)
} else {
Ok(self.client.get_async(url).await?)
Ok(self.client.get_async(url.as_str()).await?)
}
}
pub async fn post_async<T: Into<Vec<u8>> + Send + Sync>(
&self,
api_url: Option<&str>,
api_url: Option<&Url>,
request: T,
) -> Result<isahc::Response<isahc::AsyncBody>> {
let request: Vec<u8> = request.into();
@ -612,9 +647,9 @@ impl JmapConnection {
);
}
if let Some(api_url) = api_url {
Ok(self.client.post_async(api_url, request).await?)
Ok(self.client.post_async(api_url.as_str(), request).await?)
} else {
let api_url = self.session.lock().unwrap().api_url.clone();
let api_url = self.session_guard().await?.api_url.clone();
Ok(self.client.post_async(api_url.as_str(), request).await?)
}
}

View File

@ -19,6 +19,9 @@
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
// In case we forget to wait some future.
#![deny(unused_must_use)]
use std::{
collections::{BTreeSet, HashMap, HashSet},
convert::TryFrom,
@ -28,11 +31,18 @@ use std::{
time::{Duration, Instant},
};
use futures::{lock::Mutex as FutureMutex, Stream};
use futures::{
lock::{
MappedMutexGuard as FutureMappedMutexGuard, Mutex as FutureMutex,
MutexGuard as FutureMutexGuard,
},
Stream,
};
use indexmap::{IndexMap, IndexSet};
use isahc::{config::RedirectPolicy, AsyncReadResponseExt, HttpClient};
use serde_json::{json, Value};
use smallvec::SmallVec;
use url::Url;
use crate::{
backends::*,
@ -117,7 +127,7 @@ pub struct EnvelopeCache {
#[derive(Clone, Debug)]
pub struct JmapServerConf {
pub server_url: String,
pub server_url: Url,
pub server_username: String,
pub server_password: String,
pub use_token: bool,
@ -135,6 +145,19 @@ macro_rules! get_conf_val {
))
})
};
($s:ident[$var:literal], $t:ty) => {
get_conf_val!($s[$var]).and_then(|v| {
<$t>::from_str(&v).map_err(|e| {
Error::new(format!(
"Configuration error ({}): Invalid value for field `{}`: {}\n{}",
$s.name.as_str(),
$var,
v,
e
))
})
})
};
($s:ident[$var:literal], $default:expr) => {
$s.extra
.get($var)
@ -169,8 +192,8 @@ impl JmapServerConf {
)));
}
Ok(Self {
server_url: get_conf_val!(s["server_url"])?.to_string(),
server_username: get_conf_val!(s["server_username"])?.to_string(),
server_url: get_conf_val!(s["server_url"], Url)?,
server_username: get_conf_val!(s["server_username"], String)?,
server_password: s.server_password()?,
use_token,
danger_accept_invalid_certs: get_conf_val!(s["danger_accept_invalid_certs"], false)?,
@ -185,66 +208,114 @@ impl JmapServerConf {
}
}
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct OnlineStatus(pub Arc<FutureMutex<(Instant, Result<Session>)>>);
impl OnlineStatus {
/// Returns if session value is `Ok(_)`.
pub async fn is_ok(&self) -> bool {
self.0.lock().await.1.is_ok()
}
/// Get timestamp of last update.
pub async fn timestamp(&self) -> Instant {
self.0.lock().await.0
}
/// Get timestamp of last update.
pub async fn update_timestamp(&self, value: Option<Instant>) {
self.0.lock().await.0 = value.unwrap_or_else(Instant::now);
}
/// Set inner value.
pub async fn set(&self, t: Option<Instant>, value: Result<Session>) -> Result<Session> {
std::mem::replace(
&mut (*self.0.lock().await),
(t.unwrap_or_else(Instant::now), value),
)
.1
}
pub async fn session_guard(
&'_ self,
) -> Result<FutureMappedMutexGuard<'_, (Instant, Result<Session>), Session>> {
let guard = self.0.lock().await;
if let Err(ref err) = guard.1 {
return Err(err.clone());
}
Ok(FutureMutexGuard::map(guard, |status| {
// SAFETY: we checked if it's an Err() in the previous line, but we cannot do it
// in here since it's a closure. So unwrap unchecked for API
// convenience.
unsafe { status.1.as_mut().unwrap_unchecked() }
}))
}
}
#[derive(Debug)]
pub struct Store {
pub account_name: Arc<String>,
pub account_hash: AccountHash,
pub main_identity: String,
pub extra_identities: Vec<String>,
pub account_id: Arc<Mutex<Id<Account>>>,
pub byte_cache: Arc<Mutex<HashMap<EnvelopeHash, EnvelopeCache>>>,
pub id_store: Arc<Mutex<HashMap<EnvelopeHash, Id<EmailObject>>>>,
pub reverse_id_store: Arc<Mutex<HashMap<Id<EmailObject>, EnvelopeHash>>>,
pub blob_id_store: Arc<Mutex<HashMap<EnvelopeHash, Id<BlobObject>>>>,
pub byte_cache: Arc<FutureMutex<HashMap<EnvelopeHash, EnvelopeCache>>>,
pub id_store: Arc<FutureMutex<HashMap<EnvelopeHash, Id<EmailObject>>>>,
pub reverse_id_store: Arc<FutureMutex<HashMap<Id<EmailObject>, EnvelopeHash>>>,
pub blob_id_store: Arc<FutureMutex<HashMap<EnvelopeHash, Id<BlobObject>>>>,
pub collection: Collection,
pub mailboxes: Arc<RwLock<HashMap<MailboxHash, JmapMailbox>>>,
pub mailboxes_index: Arc<RwLock<HashMap<MailboxHash, HashSet<EnvelopeHash>>>>,
pub mailbox_state: Arc<Mutex<State<MailboxObject>>>,
pub online_status: Arc<FutureMutex<(Instant, Result<()>)>>,
pub mailbox_state: Arc<FutureMutex<State<MailboxObject>>>,
pub online_status: OnlineStatus,
pub is_subscribed: Arc<IsSubscribedFn>,
pub core_capabilities: Arc<Mutex<IndexMap<String, CapabilitiesObject>>>,
pub event_consumer: BackendEventConsumer,
}
impl Store {
pub fn add_envelope(&self, obj: EmailObject) -> Envelope {
pub async fn add_envelope(&self, obj: EmailObject) -> Envelope {
let mut flags = Flag::default();
let mut labels: IndexSet<TagHash> = IndexSet::new();
let mut tag_lck = self.collection.tag_index.write().unwrap();
for t in obj.keywords().keys() {
match t.as_str() {
"$draft" => {
flags |= Flag::DRAFT;
}
"$seen" => {
flags |= Flag::SEEN;
}
"$flagged" => {
flags |= Flag::FLAGGED;
}
"$answered" => {
flags |= Flag::REPLIED;
}
"$junk" | "$notjunk" => { /* ignore */ }
_ => {
let tag_hash = TagHash::from_bytes(t.as_bytes());
tag_lck.entry(tag_hash).or_insert_with(|| t.to_string());
labels.insert(tag_hash);
let id;
let mailbox_ids;
let blob_id;
{
let mut tag_lck = self.collection.tag_index.write().unwrap();
for t in obj.keywords().keys() {
match t.as_str() {
"$draft" => {
flags |= Flag::DRAFT;
}
"$seen" => {
flags |= Flag::SEEN;
}
"$flagged" => {
flags |= Flag::FLAGGED;
}
"$answered" => {
flags |= Flag::REPLIED;
}
"$junk" | "$notjunk" => { /* ignore */ }
_ => {
let tag_hash = TagHash::from_bytes(t.as_bytes());
tag_lck.entry(tag_hash).or_insert_with(|| t.to_string());
labels.insert(tag_hash);
}
}
}
}
let id = obj.id.clone();
let mailbox_ids = obj.mailbox_ids.clone();
let blob_id = obj.blob_id.clone();
drop(tag_lck);
id = obj.id.clone();
mailbox_ids = obj.mailbox_ids.clone();
blob_id = obj.blob_id.clone();
}
let mut ret: Envelope = obj.into();
ret.set_flags(flags);
ret.tags_mut().extend(labels);
let mut id_store_lck = self.id_store.lock().unwrap();
let mut reverse_id_store_lck = self.reverse_id_store.lock().unwrap();
let mut blob_id_store_lck = self.blob_id_store.lock().unwrap();
let mut id_store_lck = self.id_store.lock().await;
let mut reverse_id_store_lck = self.reverse_id_store.lock().await;
let mut blob_id_store_lck = self.blob_id_store.lock().await;
let mailboxes_lck = self.mailboxes.read().unwrap();
let mut mailboxes_index_lck = self.mailboxes_index.write().unwrap();
for (mailbox_id, _) in mailbox_ids {
@ -262,14 +333,14 @@ impl Store {
ret
}
pub fn remove_envelope(
pub async fn remove_envelope(
&self,
obj_id: Id<EmailObject>,
) -> Option<(EnvelopeHash, SmallVec<[MailboxHash; 8]>)> {
let env_hash = self.reverse_id_store.lock().unwrap().remove(&obj_id)?;
self.id_store.lock().unwrap().remove(&env_hash);
self.blob_id_store.lock().unwrap().remove(&env_hash);
self.byte_cache.lock().unwrap().remove(&env_hash);
let env_hash = self.reverse_id_store.lock().await.remove(&obj_id)?;
self.id_store.lock().await.remove(&env_hash);
self.blob_id_store.lock().await.remove(&env_hash);
self.byte_cache.lock().await.remove(&env_hash);
let mut mailbox_hashes = SmallVec::new();
{
let mut mailboxes_lck = self.mailboxes_index.write().unwrap();
@ -318,14 +389,9 @@ impl MailBackend for JmapType {
let connection = self.connection.clone();
let timeout_dur = self.server_conf.timeout;
Ok(Box::pin(async move {
match timeout(timeout_dur, connection.lock()).await {
Ok(_conn) => match timeout(timeout_dur, online.lock()).await {
Err(err) => Err(err),
Ok(lck) if lck.1.is_err() => lck.1.clone(),
_ => Ok(()),
},
Err(err) => Err(err),
}
let _conn = timeout(timeout_dur, connection.lock()).await?;
let _session = timeout(timeout_dur, online.session_guard()).await??;
Ok(())
}))
}
@ -440,13 +506,13 @@ impl MailBackend for JmapType {
* 1. upload binary blob, get blobId
* 2. Email/import
*/
let upload_url = { conn.session.lock().unwrap().upload_url.clone() };
let (upload_url, mail_account_id) = {
let g = conn.session_guard().await?;
(g.upload_url.clone(), g.mail_account_id())
};
let mut res = conn
.post_async(
Some(&upload_request_format(
upload_url.as_str(),
&conn.mail_account_id(),
)),
Some(&upload_request_format(&upload_url, &mail_account_id)?),
bytes,
)
.await?;
@ -466,7 +532,7 @@ impl MailBackend for JmapType {
let upload_response: UploadResponse = match deserialize_from_str(&res_text) {
Err(err) => {
*conn.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
_ = conn.store.online_status.set(None, Err(err.clone())).await;
return Err(err);
}
Ok(s) => s,
@ -474,23 +540,24 @@ impl MailBackend for JmapType {
let mut req = Request::new(conn.request_no.clone());
let creation_id: Id<EmailObject> = "1".to_string().into();
let import_call: EmailImport = EmailImport::new()
.account_id(conn.mail_account_id())
.emails(indexmap! {
creation_id.clone() => EmailImportObject::new()
.blob_id(upload_response.blob_id)
.mailbox_ids(indexmap! {
mailbox_id => true
})
});
let import_call: EmailImport =
EmailImport::new()
.account_id(mail_account_id)
.emails(indexmap! {
creation_id.clone() => EmailImportObject::new()
.blob_id(upload_response.blob_id)
.mailbox_ids(indexmap! {
mailbox_id => true
})
});
req.add_call(&import_call);
req.add_call(&import_call).await;
let mut res = conn.post_async(None, serde_json::to_string(&req)?).await?;
let res_text = res.text().await?;
let mut v: MethodResponse = match deserialize_from_str(&res_text) {
Err(err) => {
*conn.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
_ = conn.store.online_status.set(None, Err(err.clone())).await;
return Err(err);
}
Ok(s) => s,
@ -549,28 +616,29 @@ impl MailBackend for JmapType {
Ok(Box::pin(async move {
let mut conn = connection.lock().await;
conn.connect().await?;
let mail_account_id = conn.session_guard().await?.mail_account_id();
let email_call: EmailQuery = EmailQuery::new(
Query::new()
.account_id(conn.mail_account_id())
.account_id(mail_account_id)
.filter(Some(filter))
.position(0),
)
.collapse_threads(false);
let mut req = Request::new(conn.request_no.clone());
req.add_call(&email_call);
req.add_call(&email_call).await;
let mut res = conn.post_async(None, serde_json::to_string(&req)?).await?;
let res_text = res.text().await?;
let mut v: MethodResponse = match deserialize_from_str(&res_text) {
Err(err) => {
*conn.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
_ = conn.store.online_status.set(None, Err(err.clone())).await;
return Err(err);
}
Ok(s) => s,
};
*store.online_status.lock().await = (std::time::Instant::now(), Ok(()));
store.online_status.update_timestamp(None).await;
let m = QueryResponse::<EmailObject>::try_from(v.method_responses.remove(0))?;
let QueryResponse::<EmailObject> { ids, .. } = m;
let ret = ids.into_iter().map(|id| id.into_hash()).collect();
@ -596,9 +664,10 @@ impl MailBackend for JmapType {
let connection = self.connection.clone();
Ok(Box::pin(async move {
let mut conn = connection.lock().await;
let mail_account_id = conn.session_guard().await?.mail_account_id();
let mailbox_set_call: MailboxSet = MailboxSet::new(
Set::<MailboxObject>::new()
.account_id(conn.mail_account_id())
.account_id(mail_account_id)
.create(Some({
let id: Id<MailboxObject> = path.as_str().into();
indexmap! {
@ -612,7 +681,7 @@ impl MailBackend for JmapType {
);
let mut req = Request::new(conn.request_no.clone());
let _prev_seq = req.add_call(&mailbox_set_call);
let _prev_seq = req.add_call(&mailbox_set_call).await;
let new_mailboxes = protocol::get_mailboxes(&mut conn, Some(req)).await?;
*store.mailboxes.write().unwrap() = new_mailboxes;
@ -707,7 +776,7 @@ impl MailBackend for JmapType {
}
{
for env_hash in env_hashes.iter() {
if let Some(id) = store.id_store.lock().unwrap().get(&env_hash) {
if let Some(id) = store.id_store.lock().await.get(&env_hash) {
// ids.push(id.clone());
// id_map.insert(id.clone(), env_hash);
update_map.insert(
@ -718,15 +787,16 @@ impl MailBackend for JmapType {
}
}
let conn = connection.lock().await;
let mail_account_id = conn.session_guard().await?.mail_account_id();
let email_set_call: EmailSet = EmailSet::new(
Set::<EmailObject>::new()
.account_id(conn.mail_account_id())
.account_id(mail_account_id)
.update(Some(update_map)),
);
let mut req = Request::new(conn.request_no.clone());
let _prev_seq = req.add_call(&email_set_call);
let _prev_seq = req.add_call(&email_set_call).await;
let mut res = conn.post_async(None, serde_json::to_string(&req)?).await?;
@ -734,12 +804,12 @@ impl MailBackend for JmapType {
let mut v: MethodResponse = match deserialize_from_str(&res_text) {
Err(err) => {
*conn.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
_ = conn.store.online_status.set(None, Err(err.clone())).await;
return Err(err);
}
Ok(s) => s,
};
*store.online_status.lock().await = (std::time::Instant::now(), Ok(()));
store.online_status.update_timestamp(None).await;
let m = SetResponse::<EmailObject>::try_from(v.method_responses.remove(0))?;
if let Some(ids) = m.not_updated {
if !ids.is_empty() {
@ -815,7 +885,7 @@ impl MailBackend for JmapType {
}
{
for hash in env_hashes.iter() {
if let Some(id) = store.id_store.lock().unwrap().get(&hash) {
if let Some(id) = store.id_store.lock().await.get(&hash) {
ids.push(id.clone());
id_map.insert(id.clone(), hash);
update_map.insert(
@ -826,23 +896,24 @@ impl MailBackend for JmapType {
}
}
let conn = connection.lock().await;
let mail_account_id = conn.session_guard().await?.mail_account_id();
let email_set_call: EmailSet = EmailSet::new(
Set::<EmailObject>::new()
.account_id(conn.mail_account_id())
.account_id(mail_account_id.clone())
.update(Some(update_map)),
);
let mut req = Request::new(conn.request_no.clone());
req.add_call(&email_set_call);
req.add_call(&email_set_call).await;
let email_call: EmailGet = EmailGet::new(
Get::new()
.ids(Some(Argument::Value(ids)))
.account_id(conn.mail_account_id())
.account_id(mail_account_id)
.properties(Some(vec!["keywords".to_string()])),
);
req.add_call(&email_call);
req.add_call(&email_call).await;
let mut res = conn.post_async(None, serde_json::to_string(&req)?).await?;
@ -857,12 +928,12 @@ impl MailBackend for JmapType {
//debug!("res_text = {}", &res_text);
let mut v: MethodResponse = match deserialize_from_str(&res_text) {
Err(err) => {
*conn.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
_ = conn.store.online_status.set(None, Err(err.clone())).await;
return Err(err);
}
Ok(s) => s,
};
*store.online_status.lock().await = (std::time::Instant::now(), Ok(()));
store.online_status.update_timestamp(None).await;
let m = SetResponse::<EmailObject>::try_from(v.method_responses.remove(0))?;
if let Some(ids) = m.not_updated {
return Err(Error::new(
@ -992,19 +1063,18 @@ impl MailBackend for JmapType {
})
};
let conn = connection.lock().await;
let mail_account_id = conn.session_guard().await?.mail_account_id();
// [ref:TODO] smarter identity detection based on From: ?
let Some(identity_id) = conn.mail_identity_id() else {
let Some(identity_id) = conn.session_guard().await?.mail_identity_id() else {
return Err(Error::new(
"You need to setup an Identity in the JMAP server.",
));
};
let upload_url = { conn.session.lock().unwrap().upload_url.clone() };
let upload_url = { conn.session_guard().await?.upload_url.clone() };
let mut res = conn
.post_async(
Some(&upload_request_format(
upload_url.as_str(),
&conn.mail_account_id(),
)),
Some(&upload_request_format(&upload_url, &mail_account_id)?),
bytes,
)
.await?;
@ -1012,7 +1082,7 @@ impl MailBackend for JmapType {
let upload_response: UploadResponse = match deserialize_from_str(&res_text) {
Err(err) => {
*conn.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
_ = conn.store.online_status.set(None, Err(err.clone())).await;
return Err(err);
}
Ok(s) => s,
@ -1021,7 +1091,7 @@ impl MailBackend for JmapType {
let mut req = Request::new(conn.request_no.clone());
let creation_id: Id<EmailObject> = "newid".into();
let import_call: EmailImport = EmailImport::new()
.account_id(conn.mail_account_id())
.account_id(mail_account_id.clone())
.emails(indexmap! {
creation_id => EmailImportObject::new()
.blob_id(upload_response.blob_id)
@ -1034,13 +1104,13 @@ impl MailBackend for JmapType {
}),
});
req.add_call(&import_call);
req.add_call(&import_call).await;
let mut res = conn.post_async(None, serde_json::to_string(&req)?).await?;
let res_text = res.text().await?;
let v: MethodResponse = match deserialize_from_str(&res_text) {
Err(err) => {
*conn.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
_ = conn.store.online_status.set(None, Err(err.clone())).await;
return Err(err);
}
Ok(s) => s,
@ -1061,10 +1131,10 @@ impl MailBackend for JmapType {
let mut req = Request::new(conn.request_no.clone());
let subm_set_call: EmailSubmissionSet = EmailSubmissionSet::new(
Set::<EmailSubmissionObject>::new()
.account_id(conn.mail_account_id())
.account_id(mail_account_id.clone())
.create(Some(indexmap! {
Argument::from(Id::from("k1490")) => EmailSubmissionObject::new(
/* account_id: */ conn.mail_account_id(),
/* account_id: */ mail_account_id,
/* identity_id: */ identity_id,
/* email_id: */ email_id,
/* envelope: */ None,
@ -1080,15 +1150,15 @@ impl MailBackend for JmapType {
})
}));
req.add_call(&subm_set_call);
let mut res = conn.post_async(None, serde_json::to_string(&req)?).await?;
req.add_call(&subm_set_call).await;
let mut res = conn.post_async(None, serde_json::to_string(&req)?).await?;
let res_text = res.text().await?;
// [ref:TODO] parse/return any error.
let _: MethodResponse = match deserialize_from_str(&res_text) {
Err(err) => {
*conn.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
_ = conn.store.online_status.set(None, Err(err.clone())).await;
return Err(err);
}
Ok(s) => s,
@ -1106,10 +1176,10 @@ impl JmapType {
is_subscribed: Box<dyn Fn(&str) -> bool + Send + Sync>,
event_consumer: BackendEventConsumer,
) -> Result<Box<dyn MailBackend>> {
let online_status = Arc::new(FutureMutex::new((
let online_status = OnlineStatus(Arc::new(FutureMutex::new((
std::time::Instant::now(),
Err(Error::new("Account is uninitialised.")),
)));
))));
let server_conf = JmapServerConf::new(s)?;
let account_hash = AccountHash::from_bytes(s.name.as_bytes());
@ -1118,7 +1188,6 @@ impl JmapType {
account_hash,
main_identity: s.make_display_name(),
extra_identities: s.extra_identities.clone(),
account_id: Arc::new(Mutex::new(Id::empty())),
online_status,
event_consumer,
is_subscribed: Arc::new(IsSubscribedFn(is_subscribed)),
@ -1154,6 +1223,19 @@ impl JmapType {
))
})
};
($s:ident[$var:literal], $t:ty) => {
get_conf_val!($s[$var]).and_then(|v| {
<$t>::from_str(&v).map_err(|e| {
Error::new(format!(
"Configuration error ({}): Invalid value for field `{}`: {}\n{}",
$s.name.as_str(),
$var,
v,
e
))
})
})
};
($s:ident[$var:literal], $default:expr) => {
$s.extra
.remove($var)
@ -1171,7 +1253,7 @@ impl JmapType {
.unwrap_or_else(|| Ok($default))
};
}
get_conf_val!(s["server_url"])?;
get_conf_val!(s["server_url"], Url)?;
get_conf_val!(s["server_username"])?;
get_conf_val!(s["use_token"], false)?;

View File

@ -35,3 +35,6 @@ pub use identity::*;
mod submission;
pub use submission::*;
#[cfg(test)]
mod tests;

View File

@ -747,49 +747,6 @@ impl From<crate::search::Query> for Filter<EmailFilterCondition, EmailObject> {
}
}
#[test]
fn test_jmap_query() {
use std::sync::{Arc, Mutex};
let q: crate::search::Query = crate::search::Query::try_from(
"subject:wah or (from:Manos and (subject:foo or subject:bar))",
)
.unwrap();
let f: Filter<EmailFilterCondition, EmailObject> = Filter::from(q);
assert_eq!(
r#"{"operator":"OR","conditions":[{"subject":"wah"},{"operator":"AND","conditions":[{"from":"Manos"},{"operator":"OR","conditions":[{"subject":"foo"},{"subject":"bar"}]}]}]}"#,
serde_json::to_string(&f).unwrap().as_str()
);
let filter = {
let mailbox_id = "mailbox_id".to_string();
let mut r = Filter::Condition(
EmailFilterCondition::new()
.in_mailbox(Some(mailbox_id.into()))
.into(),
);
r &= f;
r
};
let email_call: EmailQuery = EmailQuery::new(
Query::new()
.account_id("account_id".to_string().into())
.filter(Some(filter))
.position(0),
)
.collapse_threads(false);
let request_no = Arc::new(Mutex::new(0));
let mut req = Request::new(request_no.clone());
req.add_call(&email_call);
assert_eq!(
r#"{"using":["urn:ietf:params:jmap:core","urn:ietf:params:jmap:mail"],"methodCalls":[["Email/query",{"accountId":"account_id","calculateTotal":false,"collapseThreads":false,"filter":{"conditions":[{"inMailbox":"mailbox_id"},{"conditions":[{"subject":"wah"},{"conditions":[{"from":"Manos"},{"conditions":[{"subject":"foo"},{"subject":"bar"}],"operator":"OR"}],"operator":"AND"}],"operator":"OR"}],"operator":"AND"},"position":0,"sort":null},"m0"]]}"#,
serde_json::to_string(&req).unwrap().as_str()
);