Add metadata to Jobs, and add JobManager tab

Opened with command `manage-jobs`
async-cursors
Manos Pitsidianakis 2023-07-14 00:23:24 +03:00
parent 369c1dbdac
commit cf9a04a591
Signed by: Manos Pitsidianakis
GPG Key ID: 7729C7707F7E09D0
17 changed files with 703 additions and 96 deletions

View File

@ -876,6 +876,17 @@ Alternatives(&[to_stream!(One(Literal("add-attachment")), One(Filepath)), to_str
}
)
},
{ tags: ["manage-jobs"],
desc: "view and manage jobs",
tokens: &[One(Literal("manage-jobs"))],
parser:(
fn manage_jobs(input: &[u8]) -> IResult<&[u8], Action> {
let (input, _) = tag("manage-jobs")(input.trim())?;
let (input, _) = eof(input)?;
Ok((input, Tab(ManageJobs)))
}
)
},
{ tags: ["quit"],
desc: "quit meli",
tokens: &[One(Literal("quit"))],
@ -978,6 +989,10 @@ fn view(input: &[u8]) -> IResult<&[u8], Action> {
))(input)
}
fn new_tab(input: &[u8]) -> IResult<&[u8], Action> {
alt((manage_mailboxes, manage_jobs, compose_action))(input)
}
pub fn parse_command(input: &[u8]) -> Result<Action, Error> {
alt((
goto,
@ -989,13 +1004,12 @@ pub fn parse_command(input: &[u8]) -> Result<Action, Error> {
setenv,
printenv,
view,
compose_action,
create_mailbox,
sub_mailbox,
unsub_mailbox,
delete_mailbox,
rename_mailbox,
manage_mailboxes,
new_tab,
account_action,
print_setting,
toggle_mouse,

View File

@ -62,6 +62,7 @@ pub enum TabAction {
Kill(ComponentId),
New(Option<Box<dyn Component>>),
ManageMailboxes,
ManageJobs,
}
#[derive(Debug)]

View File

@ -34,18 +34,21 @@ use crate::{
};
pub mod mail;
pub use crate::mail::*;
pub use mail::*;
pub mod notifications;
pub mod utilities;
pub use self::utilities::*;
pub use utilities::*;
pub mod contacts;
pub use crate::contacts::*;
pub use contacts::*;
pub mod mailbox_management;
pub use self::mailbox_management::*;
pub use mailbox_management::*;
pub mod jobs_view;
pub use jobs_view::*;
#[cfg(feature = "svgscreenshot")]
pub mod svg;

View File

@ -0,0 +1,446 @@
/*
* meli
*
* Copyright 2019 Manos Pitsidianakis
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use std::{borrow::Cow, cmp};
use super::*;
use crate::{
jobs::{JobId, JobMetadata},
melib::utils::datetime::{self, formats::RFC3339_DATETIME_AND_SPACE},
};
#[derive(Debug)]
pub struct JobManager {
cursor_pos: usize,
new_cursor_pos: usize,
length: usize,
data_columns: DataColumns<5>,
entries: IndexMap<JobId, JobMetadata>,
initialized: bool,
theme_default: ThemeAttribute,
highlight_theme: ThemeAttribute,
dirty: bool,
movement: Option<PageMovement>,
id: ComponentId,
}
impl fmt::Display for JobManager {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "jobs")
}
}
impl JobManager {
pub fn new(context: &Context) -> Self {
let theme_default = crate::conf::value(context, "theme_default");
let mut data_columns = DataColumns::default();
data_columns.theme_config.set_single_theme(theme_default);
Self {
cursor_pos: 0,
new_cursor_pos: 0,
entries: IndexMap::default(),
length: 0,
data_columns,
theme_default,
highlight_theme: crate::conf::value(context, "highlight"),
initialized: false,
dirty: true,
movement: None,
id: ComponentId::default(),
}
}
fn initialize(&mut self, context: &mut Context) {
self.entries = (*context.main_loop_handler.job_executor.jobs.lock().unwrap()).clone();
self.length = self.entries.len();
self.entries.sort_by(|_, a, _, b| a.started.cmp(&b.started));
self.set_dirty(true);
let mut min_width = (
"id".len(),
"desc".len(),
"started".len(),
"finished".len(),
"succeeded".len(),
0,
);
for c in self.entries.values() {
/* title */
min_width.0 = cmp::max(min_width.0, c.id.to_string().len());
/* desc */
min_width.1 = cmp::max(min_width.1, c.desc.len());
}
min_width.2 = "1970-01-01 00:00:00".len();
min_width.3 = min_width.2;
/* name column */
self.data_columns.columns[0] =
CellBuffer::new_with_context(min_width.0, self.length, None, context);
/* path column */
self.data_columns.columns[1] =
CellBuffer::new_with_context(min_width.1, self.length, None, context);
/* size column */
self.data_columns.columns[2] =
CellBuffer::new_with_context(min_width.2, self.length, None, context);
/* subscribed column */
self.data_columns.columns[3] =
CellBuffer::new_with_context(min_width.3, self.length, None, context);
self.data_columns.columns[4] =
CellBuffer::new_with_context(min_width.4, self.length, None, context);
for (idx, e) in self.entries.values().enumerate() {
write_string_to_grid(
&e.id.to_string(),
&mut self.data_columns.columns[0],
self.theme_default.fg,
self.theme_default.bg,
self.theme_default.attrs,
((0, idx), (min_width.0, idx)),
None,
);
write_string_to_grid(
&e.desc,
&mut self.data_columns.columns[1],
self.theme_default.fg,
self.theme_default.bg,
self.theme_default.attrs,
((0, idx), (min_width.1, idx)),
None,
);
write_string_to_grid(
&datetime::timestamp_to_string(e.started, Some(RFC3339_DATETIME_AND_SPACE), true),
&mut self.data_columns.columns[2],
self.theme_default.fg,
self.theme_default.bg,
self.theme_default.attrs,
((0, idx), (min_width.2, idx)),
None,
);
write_string_to_grid(
&if let Some(t) = e.finished {
Cow::Owned(datetime::timestamp_to_string(
t,
Some(RFC3339_DATETIME_AND_SPACE),
true,
))
} else {
Cow::Borrowed("null")
},
&mut self.data_columns.columns[3],
self.theme_default.fg,
self.theme_default.bg,
self.theme_default.attrs,
((0, idx), (min_width.3, idx)),
None,
);
write_string_to_grid(
&if e.finished.is_some() {
Cow::Owned(format!("{:?}", e.succeeded))
} else {
Cow::Borrowed("-")
},
&mut self.data_columns.columns[4],
self.theme_default.fg,
self.theme_default.bg,
self.theme_default.attrs,
((0, idx), (min_width.4, idx)),
None,
);
}
if self.length == 0 {
let message = "No mailboxes.".to_string();
self.data_columns.columns[0] =
CellBuffer::new_with_context(message.len(), self.length, None, context);
write_string_to_grid(
&message,
&mut self.data_columns.columns[0],
self.theme_default.fg,
self.theme_default.bg,
self.theme_default.attrs,
((0, 0), (message.len() - 1, 0)),
None,
);
}
}
fn draw_list(&mut self, grid: &mut CellBuffer, area: Area, context: &mut Context) {
let (upper_left, bottom_right) = area;
if self.length == 0 {
clear_area(grid, area, self.theme_default);
copy_area(
grid,
&self.data_columns.columns[0],
area,
((0, 0), pos_dec(self.data_columns.columns[0].size(), (1, 1))),
);
context.dirty_areas.push_back(area);
return;
}
let rows = get_y(bottom_right) - get_y(upper_left) + 1;
if let Some(mvm) = self.movement.take() {
match mvm {
PageMovement::Up(amount) => {
self.new_cursor_pos = self.new_cursor_pos.saturating_sub(amount);
}
PageMovement::PageUp(multiplier) => {
self.new_cursor_pos = self.new_cursor_pos.saturating_sub(rows * multiplier);
}
PageMovement::Down(amount) => {
if self.new_cursor_pos + amount < self.length {
self.new_cursor_pos += amount;
} else {
self.new_cursor_pos = self.length - 1;
}
}
PageMovement::PageDown(multiplier) => {
#[allow(clippy::comparison_chain)]
if self.new_cursor_pos + rows * multiplier < self.length {
self.new_cursor_pos += rows * multiplier;
} else if self.new_cursor_pos + rows * multiplier > self.length {
self.new_cursor_pos = self.length - 1;
} else {
self.new_cursor_pos = (self.length / rows) * rows;
}
}
PageMovement::Right(_) | PageMovement::Left(_) => {}
PageMovement::Home => {
self.new_cursor_pos = 0;
}
PageMovement::End => {
self.new_cursor_pos = self.length - 1;
}
}
}
let prev_page_no = (self.cursor_pos).wrapping_div(rows);
let page_no = (self.new_cursor_pos).wrapping_div(rows);
let top_idx = page_no * rows;
if self.length >= rows {
context
.replies
.push_back(UIEvent::StatusEvent(StatusEvent::ScrollUpdate(
ScrollUpdate::Update {
id: self.id,
context: ScrollContext {
shown_lines: top_idx + rows,
total_lines: self.length,
has_more_lines: false,
},
},
)));
} else {
context
.replies
.push_back(UIEvent::StatusEvent(StatusEvent::ScrollUpdate(
ScrollUpdate::End(self.id),
)));
}
/* If cursor position has changed, remove the highlight from the previous
* position and apply it in the new one. */
if self.cursor_pos != self.new_cursor_pos && prev_page_no == page_no {
let old_cursor_pos = self.cursor_pos;
self.cursor_pos = self.new_cursor_pos;
for &(idx, highlight) in &[(old_cursor_pos, false), (self.new_cursor_pos, true)] {
if idx >= self.length {
continue; //bounds check
}
let new_area = nth_row_area(area, idx % rows);
self.data_columns
.draw(grid, idx, self.cursor_pos, grid.bounds_iter(new_area));
let row_attr = if highlight {
self.highlight_theme
} else {
self.theme_default
};
change_colors(grid, new_area, row_attr.fg, row_attr.bg);
context.dirty_areas.push_back(new_area);
}
return;
} else if self.cursor_pos != self.new_cursor_pos {
self.cursor_pos = self.new_cursor_pos;
}
if self.new_cursor_pos >= self.length {
self.new_cursor_pos = self.length - 1;
self.cursor_pos = self.new_cursor_pos;
}
/* Page_no has changed, so draw new page */
_ = self
.data_columns
.recalc_widths((width!(area), height!(area)), top_idx);
clear_area(grid, area, self.theme_default);
/* copy table columns */
self.data_columns
.draw(grid, top_idx, self.cursor_pos, grid.bounds_iter(area));
/* highlight cursor */
change_colors(
grid,
nth_row_area(area, self.cursor_pos % rows),
self.highlight_theme.fg,
self.highlight_theme.bg,
);
/* clear gap if available height is more than count of entries */
if top_idx + rows > self.length {
clear_area(
grid,
(
pos_inc(upper_left, (0, self.length - top_idx)),
bottom_right,
),
self.theme_default,
);
}
context.dirty_areas.push_back(area);
}
}
impl Component for JobManager {
fn draw(&mut self, grid: &mut CellBuffer, area: Area, context: &mut Context) {
if !self.is_dirty() {
return;
}
if !self.initialized {
self.initialize(context);
}
self.draw_list(grid, area, context);
self.dirty = false;
}
fn process_event(&mut self, event: &mut UIEvent, context: &mut Context) -> bool {
if let UIEvent::ConfigReload { old_settings: _ } = event {
self.theme_default = crate::conf::value(context, "theme_default");
self.initialized = false;
self.set_dirty(true);
}
let shortcuts = self.shortcuts(context);
match event {
UIEvent::StatusEvent(
StatusEvent::JobFinished(_) | StatusEvent::JobCanceled(_) | StatusEvent::NewJob(_),
) => {
self.initialized = false;
self.set_dirty(true);
return false;
}
UIEvent::Input(ref key)
if shortcut!(key == shortcuts[Shortcuts::GENERAL]["scroll_up"]) =>
{
let amount = 1;
self.movement = Some(PageMovement::Up(amount));
self.set_dirty(true);
return true;
}
UIEvent::Input(ref key)
if shortcut!(key == shortcuts[Shortcuts::GENERAL]["scroll_down"])
&& self.cursor_pos < self.length.saturating_sub(1) =>
{
let amount = 1;
self.set_dirty(true);
self.movement = Some(PageMovement::Down(amount));
return true;
}
UIEvent::Input(ref key)
if shortcut!(key == shortcuts[Shortcuts::GENERAL]["prev_page"]) =>
{
let mult = 1;
self.set_dirty(true);
self.movement = Some(PageMovement::PageUp(mult));
return true;
}
UIEvent::Input(ref key)
if shortcut!(key == shortcuts[Shortcuts::GENERAL]["next_page"]) =>
{
let mult = 1;
self.set_dirty(true);
self.movement = Some(PageMovement::PageDown(mult));
return true;
}
UIEvent::Input(ref key)
if shortcut!(key == shortcuts[Shortcuts::GENERAL]["home_page"]) =>
{
self.set_dirty(true);
self.movement = Some(PageMovement::Home);
return true;
}
UIEvent::Input(ref key)
if shortcut!(key == shortcuts[Shortcuts::GENERAL]["end_page"]) =>
{
self.set_dirty(true);
self.movement = Some(PageMovement::End);
return true;
}
_ => {}
}
false
}
fn is_dirty(&self) -> bool {
self.dirty
}
fn set_dirty(&mut self, value: bool) {
self.dirty = value;
}
fn kill(&mut self, uuid: ComponentId, context: &mut Context) {
debug_assert!(uuid == self.id);
context.replies.push_back(UIEvent::Action(Tab(Kill(uuid))));
}
fn shortcuts(&self, context: &Context) -> ShortcutMaps {
let mut map = ShortcutMaps::default();
map.insert(
Shortcuts::GENERAL,
context.settings.shortcuts.general.key_values(),
);
map
}
fn id(&self) -> ComponentId {
self.id
}
fn can_quit_cleanly(&mut self, _context: &Context) -> bool {
true
}
fn status(&self, _context: &Context) -> String {
format!("{} entries", self.entries.len())
}
}

View File

@ -1233,7 +1233,10 @@ impl Component for Composer {
Flag::SEEN,
) {
Ok(job) => {
let handle = context.main_loop_handler.job_executor.spawn_blocking(job);
let handle = context
.main_loop_handler
.job_executor
.spawn_blocking("compose::submit".into(), job);
context
.replies
.push_back(UIEvent::StatusEvent(StatusEvent::NewJob(

View File

@ -66,7 +66,7 @@ impl KeySelection {
let handle = context
.main_loop_handler
.job_executor
.spawn_specialized(job);
.spawn_specialized("gpg::keylist".into(), job);
let mut progress_spinner = ProgressSpinner::new(8, context);
progress_spinner.start();
Ok(KeySelection::LoadingKeys {

View File

@ -479,7 +479,7 @@ pub trait MailListingTrait: ListingTrait {
let handle = account
.main_loop_handler
.job_executor
.spawn_specialized(fut);
.spawn_specialized("set_seen".into(), fut);
account
.insert_job(handle.job_id, JobRequest::SetFlags { env_hashes, handle });
}
@ -501,7 +501,7 @@ pub trait MailListingTrait: ListingTrait {
let handle = account
.main_loop_handler
.job_executor
.spawn_specialized(fut);
.spawn_specialized("set_unseen".into(), fut);
account
.insert_job(handle.job_id, JobRequest::SetFlags { env_hashes, handle });
}
@ -523,7 +523,7 @@ pub trait MailListingTrait: ListingTrait {
let handle = account
.main_loop_handler
.job_executor
.spawn_specialized(fut);
.spawn_specialized("remove_tag".into(), fut);
account
.insert_job(handle.job_id, JobRequest::SetFlags { env_hashes, handle });
}
@ -545,7 +545,7 @@ pub trait MailListingTrait: ListingTrait {
let handle = account
.main_loop_handler
.job_executor
.spawn_specialized(fut);
.spawn_specialized("add_tag".into(), fut);
account
.insert_job(handle.job_id, JobRequest::SetFlags { env_hashes, handle });
}
@ -567,7 +567,7 @@ pub trait MailListingTrait: ListingTrait {
let handle = account
.main_loop_handler
.job_executor
.spawn_specialized(fut);
.spawn_specialized("delete".into(), fut);
account.insert_job(
handle.job_id,
JobRequest::DeleteMessages { env_hashes, handle },
@ -595,7 +595,7 @@ pub trait MailListingTrait: ListingTrait {
let handle = account
.main_loop_handler
.job_executor
.spawn_specialized(fut);
.spawn_specialized("copy_to_mailbox".into(), fut);
account.insert_job(
handle.job_id,
JobRequest::Generic {
@ -635,7 +635,7 @@ pub trait MailListingTrait: ListingTrait {
let handle = account
.main_loop_handler
.job_executor
.spawn_specialized(fut);
.spawn_specialized("move_to_mailbox".into(), fut);
account.insert_job(
handle.job_id,
JobRequest::Generic {
@ -714,7 +714,10 @@ pub trait MailListingTrait: ListingTrait {
let _ = sender.send(r);
Ok(())
});
let handle = account.main_loop_handler.job_executor.spawn_blocking(fut);
let handle = account
.main_loop_handler
.job_executor
.spawn_blocking("export_to_mbox".into(), fut);
let path = path.to_path_buf();
account.insert_job(
handle.job_id,
@ -1230,6 +1233,7 @@ impl Component for Listing {
));
}
}
return true;
}
#[cfg(feature = "debug-tracing")]
UIEvent::IntraComm {
@ -2026,6 +2030,13 @@ impl Component for Listing {
.push_back(UIEvent::Action(Tab(New(Some(Box::new(mgr))))));
return true;
}
UIEvent::Action(Action::Tab(ManageJobs)) => {
let mgr = JobManager::new(context);
context
.replies
.push_back(UIEvent::Action(Tab(New(Some(Box::new(mgr))))));
return true;
}
UIEvent::Action(Action::Compose(ComposeAction::Mailto(ref mailto))) => {
let account_hash = context.accounts[self.cursor_pos.0].hash();
let mut composer = Composer::with_account(account_hash, context);

View File

@ -1984,7 +1984,7 @@ impl Component for CompactListing {
let handle = context.accounts[&self.cursor_pos.0]
.main_loop_handler
.job_executor
.spawn_specialized(job);
.spawn_specialized("search".into(), job);
self.search_job = Some((filter_term.to_string(), handle));
}
Err(err) => {
@ -2007,7 +2007,7 @@ impl Component for CompactListing {
let mut handle = context.accounts[&self.cursor_pos.0]
.main_loop_handler
.job_executor
.spawn_specialized(job);
.spawn_specialized("select_by_search".into(), job);
if let Ok(Some(search_result)) = try_recv_timeout!(&mut handle.chan) {
self.select(search_term, search_result, context);
} else {

View File

@ -1508,7 +1508,7 @@ impl Component for ConversationsListing {
let handle = context.accounts[&self.cursor_pos.0]
.main_loop_handler
.job_executor
.spawn_specialized(job);
.spawn_specialized("search".into(), job);
self.search_job = Some((filter_term.to_string(), handle));
}
Err(err) => {

View File

@ -1581,7 +1581,7 @@ impl Component for PlainListing {
let handle = context.accounts[&self.cursor_pos.0]
.main_loop_handler
.job_executor
.spawn_specialized(job);
.spawn_specialized("search".into(), job);
self.search_job = Some((filter_term.to_string(), handle));
}
Err(err) => {

View File

@ -1508,7 +1508,7 @@ impl Component for ThreadListing {
let handle = context.accounts[&self.cursor_pos.0]
.main_loop_handler
.job_executor
.spawn_specialized(job);
.spawn_specialized("search".into(), job);
self.search_job = Some((filter_term.to_string(), handle));
}
Err(err) => {

View File

@ -121,7 +121,7 @@ impl MailView {
let mut handle = account
.main_loop_handler
.job_executor
.spawn_specialized(fut);
.spawn_specialized("fetch_envelopes".into(), fut);
let job_id = handle.job_id;
pending_action = if let MailViewState::Init {
ref mut pending_action,
@ -302,7 +302,7 @@ impl Component for MailView {
let handle = account
.main_loop_handler
.job_executor
.spawn_specialized(fut);
.spawn_specialized("set_flags".into(), fut);
account.insert_job(
handle.job_id,
JobRequest::SetFlags {
@ -532,12 +532,12 @@ impl Component for MailView {
context
.main_loop_handler
.job_executor
.spawn_specialized(bytes_job)
.spawn_specialized("fetch_envelope".into(), bytes_job)
} else {
context
.main_loop_handler
.job_executor
.spawn_blocking(bytes_job)
.spawn_blocking("fetch_envelope".into(), bytes_job)
};
context.accounts[&account_hash].insert_job(
handle.job_id,

View File

@ -255,8 +255,9 @@ impl EnvelopeView {
{
if view_settings.auto_verify_signatures {
let verify_fut = crate::components::mail::pgp::verify(a.clone());
let handle =
main_loop_handler.job_executor.spawn_specialized(verify_fut);
let handle = main_loop_handler
.job_executor
.spawn_specialized("gpg::verify_sig".into(), verify_fut);
active_jobs.insert(handle.job_id);
main_loop_handler.send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
StatusEvent::NewJob(handle.job_id),
@ -317,7 +318,7 @@ impl EnvelopeView {
crate::components::mail::pgp::decrypt(a.raw().to_vec());
let handle = main_loop_handler
.job_executor
.spawn_specialized(decrypt_fut);
.spawn_specialized("gpg::decrypt".into(), decrypt_fut);
active_jobs.insert(handle.job_id);
main_loop_handler.send(ThreadEvent::UIEvent(
UIEvent::StatusEvent(StatusEvent::NewJob(handle.job_id)),

View File

@ -515,11 +515,11 @@ impl Account {
let handle = if backend.capabilities().is_async {
main_loop_handler
.job_executor
.spawn_specialized(online_job.then(|_| mailboxes_job))
.spawn_specialized("mailboxes".into(), online_job.then(|_| mailboxes_job))
} else {
main_loop_handler
.job_executor
.spawn_blocking(online_job.then(|_| mailboxes_job))
.spawn_blocking("mailboxes".into(), online_job.then(|_| mailboxes_job))
};
let job_id = handle.job_id;
active_jobs.insert(job_id, JobRequest::Mailboxes { handle });
@ -716,11 +716,11 @@ impl Account {
let handle = if self.backend_capabilities.is_async {
self.main_loop_handler
.job_executor
.spawn_specialized(mailbox_job)
.spawn_specialized("fetch-mailbox".into(), mailbox_job)
} else {
self.main_loop_handler
.job_executor
.spawn_blocking(mailbox_job)
.spawn_blocking("fetch-mailbox".into(), mailbox_job)
};
let job_id = handle.job_id;
self.main_loop_handler
@ -790,8 +790,10 @@ impl Account {
);
}
Ok(job) => {
let handle =
self.main_loop_handler.job_executor.spawn_blocking(job);
let handle = self
.main_loop_handler
.job_executor
.spawn_blocking("sqlite3::update".into(), job);
self.insert_job(
handle.job_id,
JobRequest::Generic {
@ -837,8 +839,10 @@ impl Account {
)
}) {
Ok(job) => {
let handle =
self.main_loop_handler.job_executor.spawn_blocking(job);
let handle = self
.main_loop_handler
.job_executor
.spawn_blocking("sqlite3::remove".into(), job);
self.insert_job(
handle.job_id,
JobRequest::Generic {
@ -890,8 +894,10 @@ impl Account {
);
}
Ok(job) => {
let handle =
self.main_loop_handler.job_executor.spawn_blocking(job);
let handle = self
.main_loop_handler
.job_executor
.spawn_blocking("sqlite3::rename".into(), job);
self.insert_job(
handle.job_id,
JobRequest::Generic {
@ -932,6 +938,7 @@ impl Account {
#[cfg(feature = "sqlite3")]
if self.settings.conf.search_backend == crate::conf::SearchBackend::Sqlite3 {
let handle = self.main_loop_handler.job_executor.spawn_blocking(
"sqlite3::insert".into(),
crate::sqlite3::insert(
(*envelope).clone(),
self.backend.clone(),
@ -1096,11 +1103,11 @@ impl Account {
let handle = if self.backend_capabilities.is_async {
self.main_loop_handler
.job_executor
.spawn_specialized(refresh_job)
.spawn_specialized("refresh".into(), refresh_job)
} else {
self.main_loop_handler
.job_executor
.spawn_blocking(refresh_job)
.spawn_blocking("refresh".into(), refresh_job)
};
self.insert_job(
handle.job_id,
@ -1122,9 +1129,13 @@ impl Account {
match self.backend.read().unwrap().watch() {
Ok(fut) => {
let handle = if self.backend_capabilities.is_async {
self.main_loop_handler.job_executor.spawn_specialized(fut)
self.main_loop_handler
.job_executor
.spawn_specialized("watch".into(), fut)
} else {
self.main_loop_handler.job_executor.spawn_blocking(fut)
self.main_loop_handler
.job_executor
.spawn_blocking("watch".into(), fut)
};
self.active_jobs
.insert(handle.job_id, JobRequest::Watch { handle });
@ -1202,11 +1213,11 @@ impl Account {
let handle = if self.backend_capabilities.is_async {
self.main_loop_handler
.job_executor
.spawn_specialized(mailbox_job)
.spawn_specialized("mailbox_fetch".into(), mailbox_job)
} else {
self.main_loop_handler
.job_executor
.spawn_blocking(mailbox_job)
.spawn_blocking("mailbox_fetch".into(), mailbox_job)
};
self.insert_job(
handle.job_id,
@ -1294,9 +1305,13 @@ impl Account {
.save(bytes.to_vec(), mailbox_hash, flags)?;
let handle = if self.backend_capabilities.is_async {
self.main_loop_handler.job_executor.spawn_specialized(job)
self.main_loop_handler
.job_executor
.spawn_specialized("save".into(), job)
} else {
self.main_loop_handler.job_executor.spawn_blocking(job)
self.main_loop_handler
.job_executor
.spawn_blocking("save".into(), job)
};
self.insert_job(
handle.job_id,
@ -1362,14 +1377,14 @@ impl Account {
}
#[cfg(feature = "smtp")]
SendMail::Smtp(conf) => {
let handle = self
.main_loop_handler
.job_executor
.spawn_specialized(async move {
let handle = self.main_loop_handler.job_executor.spawn_specialized(
"smtp".into(),
async move {
let mut smtp_connection =
melib::smtp::SmtpConnection::new_connection(conf).await?;
smtp_connection.mail_transaction(&message, None).await
});
},
);
if complete_in_background {
self.insert_job(handle.job_id, JobRequest::SendMessageBackground { handle });
return Ok(None);
@ -1387,9 +1402,13 @@ impl Account {
.submit(message.into_bytes(), None, None)?;
let handle = if self.backend_capabilities.is_async {
self.main_loop_handler.job_executor.spawn_specialized(job)
self.main_loop_handler
.job_executor
.spawn_specialized("server_submission".into(), job)
} else {
self.main_loop_handler.job_executor.spawn_blocking(job)
self.main_loop_handler
.job_executor
.spawn_blocking("server_submission".into(), job)
};
self.insert_job(handle.job_id, JobRequest::SendMessageBackground { handle });
return Ok(None);
@ -1508,9 +1527,13 @@ impl Account {
.unwrap()
.create_mailbox(path.to_string())?;
let handle = if self.backend_capabilities.is_async {
self.main_loop_handler.job_executor.spawn_specialized(job)
self.main_loop_handler
.job_executor
.spawn_specialized("create_mailbox".into(), job)
} else {
self.main_loop_handler.job_executor.spawn_blocking(job)
self.main_loop_handler
.job_executor
.spawn_blocking("create_mailbox".into(), job)
};
self.insert_job(handle.job_id, JobRequest::CreateMailbox { path, handle });
Ok(())
@ -1523,9 +1546,13 @@ impl Account {
let mailbox_hash = self.mailbox_by_path(&path)?;
let job = self.backend.write().unwrap().delete_mailbox(mailbox_hash)?;
let handle = if self.backend_capabilities.is_async {
self.main_loop_handler.job_executor.spawn_specialized(job)
self.main_loop_handler
.job_executor
.spawn_specialized("delete_mailbox".into(), job)
} else {
self.main_loop_handler.job_executor.spawn_blocking(job)
self.main_loop_handler
.job_executor
.spawn_blocking("delete_mailbox".into(), job)
};
self.insert_job(
handle.job_id,
@ -1544,9 +1571,13 @@ impl Account {
.unwrap()
.set_mailbox_subscription(mailbox_hash, true)?;
let handle = if self.backend_capabilities.is_async {
self.main_loop_handler.job_executor.spawn_specialized(job)
self.main_loop_handler
.job_executor
.spawn_specialized("subscribe_mailbox".into(), job)
} else {
self.main_loop_handler.job_executor.spawn_blocking(job)
self.main_loop_handler
.job_executor
.spawn_blocking("subscribe_mailbox".into(), job)
};
self.insert_job(
handle.job_id,
@ -1566,9 +1597,13 @@ impl Account {
.unwrap()
.set_mailbox_subscription(mailbox_hash, false)?;
let handle = if self.backend_capabilities.is_async {
self.main_loop_handler.job_executor.spawn_specialized(job)
self.main_loop_handler
.job_executor
.spawn_specialized("unsubscribe_mailbox".into(), job)
} else {
self.main_loop_handler.job_executor.spawn_blocking(job)
self.main_loop_handler
.job_executor
.spawn_blocking("unsubscribe_mailbox".into(), job)
};
self.insert_job(
handle.job_id,
@ -1641,30 +1676,28 @@ impl Account {
use melib::utils::futures::sleep;
let handle = match (wait, self.backend_capabilities.is_async) {
(Some(wait), true) => {
self.main_loop_handler
.job_executor
.spawn_specialized(async move {
sleep(wait).await;
online_fut.await
})
}
(Some(wait), true) => self.main_loop_handler.job_executor.spawn_specialized(
"is_online".into(),
async move {
sleep(wait).await;
online_fut.await
},
),
(None, true) => self
.main_loop_handler
.job_executor
.spawn_specialized(online_fut),
(Some(wait), false) => {
self.main_loop_handler
.job_executor
.spawn_blocking(async move {
sleep(wait).await;
online_fut.await
})
}
.spawn_specialized("is_online".into(), online_fut),
(Some(wait), false) => self.main_loop_handler.job_executor.spawn_blocking(
"is_online".into(),
async move {
sleep(wait).await;
online_fut.await
},
),
(None, false) => self
.main_loop_handler
.job_executor
.spawn_blocking(online_fut),
.spawn_blocking("is_online".into(), online_fut),
};
self.insert_job(handle.job_id, JobRequest::IsOnline { handle });
}
@ -1724,6 +1757,7 @@ impl Account {
)));
if let Some(mut job) = self.active_jobs.remove(job_id) {
let job_id = *job_id;
match job {
JobRequest::Mailboxes { ref mut handle } => {
if let Ok(Some(mailboxes)) = handle.chan.try_recv() {
@ -1740,6 +1774,9 @@ impl Account {
self.main_loop_handler.send(ThreadEvent::UIEvent(
UIEvent::AccountStatusChange(self.hash, None),
));
self.main_loop_handler
.job_executor
.set_job_success(job_id, false);
return true;
}
let mailboxes_job = self.backend.read().unwrap().mailboxes();
@ -1747,11 +1784,11 @@ impl Account {
let handle = if self.backend_capabilities.is_async {
self.main_loop_handler
.job_executor
.spawn_specialized(mailboxes_job)
.spawn_specialized("mailboxes_list".into(), mailboxes_job)
} else {
self.main_loop_handler
.job_executor
.spawn_blocking(mailboxes_job)
.spawn_blocking("mailboxes_list".into(), mailboxes_job)
};
self.insert_job(handle.job_id, JobRequest::Mailboxes { handle });
};
@ -1773,6 +1810,9 @@ impl Account {
log::trace!("got payload in status for {}", mailbox_hash);
match handle.chan.try_recv() {
Err(_) => {
self.main_loop_handler
.job_executor
.set_job_success(job_id, false);
/* canceled */
return true;
}
@ -1792,6 +1832,9 @@ impl Account {
return true;
}
Ok(Some((Some(Err(err)), _))) => {
self.main_loop_handler
.job_executor
.set_job_success(job_id, false);
self.main_loop_handler.send(ThreadEvent::UIEvent(
UIEvent::Notification(
Some(format!("{}: could not fetch mailbox", &self.name)),
@ -1813,11 +1856,11 @@ impl Account {
let handle = if self.backend_capabilities.is_async {
self.main_loop_handler
.job_executor
.spawn_specialized(rest.into_future())
.spawn_specialized("rest_fetch".into(), rest.into_future())
} else {
self.main_loop_handler
.job_executor
.spawn_blocking(rest.into_future())
.spawn_blocking("rest_fetch".into(), rest.into_future())
};
self.insert_job(
handle.job_id,
@ -1870,11 +1913,11 @@ impl Account {
let handle = if self.backend_capabilities.is_async {
self.main_loop_handler
.job_executor
.spawn_specialized(online_job)
.spawn_specialized("is_online".into(), online_job)
} else {
self.main_loop_handler
.job_executor
.spawn_blocking(online_job)
.spawn_blocking("is_online".into(), online_job)
};
self.insert_job(handle.job_id, JobRequest::IsOnline { handle });
};
@ -1894,6 +1937,9 @@ impl Account {
));
}
Ok(Some(Err(err))) => {
self.main_loop_handler
.job_executor
.set_job_success(job_id, false);
self.is_online.set_err(err);
_ = self.is_online();
self.main_loop_handler.send(ThreadEvent::UIEvent(
@ -1904,6 +1950,9 @@ impl Account {
}
JobRequest::SetFlags { ref mut handle, .. } => {
if let Ok(Some(Err(err))) = handle.chan.try_recv() {
self.main_loop_handler
.job_executor
.set_job_success(job_id, false);
self.main_loop_handler
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!("{}: could not set flag", &self.name)),
@ -1918,6 +1967,9 @@ impl Account {
..
} => {
if let Ok(Some(Err(err))) = handle.chan.try_recv() {
self.main_loop_handler
.job_executor
.set_job_success(job_id, false);
log::error!("Could not save message: {err}");
let file =
crate::types::create_temp_file(bytes, None, None, Some("eml"), false);
@ -1940,6 +1992,9 @@ impl Account {
JobRequest::SendMessage => {}
JobRequest::SendMessageBackground { ref mut handle, .. } => {
if let Ok(Some(Err(err))) = handle.chan.try_recv() {
self.main_loop_handler
.job_executor
.set_job_success(job_id, false);
self.main_loop_handler
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some("Could not send message".to_string()),
@ -1950,6 +2005,9 @@ impl Account {
}
JobRequest::DeleteMessages { ref mut handle, .. } => {
if let Ok(Some(Err(err))) = handle.chan.try_recv() {
self.main_loop_handler
.job_executor
.set_job_success(job_id, false);
self.main_loop_handler
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!("{}: could not delete message", &self.name)),
@ -1966,6 +2024,9 @@ impl Account {
if let Ok(Some(r)) = handle.chan.try_recv() {
match r {
Err(err) => {
self.main_loop_handler
.job_executor
.set_job_success(job_id, false);
self.main_loop_handler.send(ThreadEvent::UIEvent(
UIEvent::Notification(
Some(format!(
@ -2051,6 +2112,9 @@ impl Account {
Err(_) => { /* canceled */ }
Ok(None) => {}
Ok(Some(Err(err))) => {
self.main_loop_handler
.job_executor
.set_job_success(job_id, false);
self.main_loop_handler.send(ThreadEvent::UIEvent(
UIEvent::Notification(
Some(format!("{}: could not delete mailbox", &self.name)),
@ -2120,6 +2184,9 @@ impl Account {
Err(_) => { /* canceled */ }
Ok(None) => {}
Ok(Some(Err(err))) => {
self.main_loop_handler
.job_executor
.set_job_success(job_id, false);
self.main_loop_handler.send(ThreadEvent::UIEvent(
UIEvent::Notification(
Some(format!(
@ -2154,6 +2221,9 @@ impl Account {
Err(_) => { /* canceled */ }
Ok(None) => {}
Ok(Some(Err(err))) => {
self.main_loop_handler
.job_executor
.set_job_success(job_id, false);
self.main_loop_handler.send(ThreadEvent::UIEvent(
UIEvent::Notification(
Some(format!(
@ -2196,6 +2266,9 @@ impl Account {
if err.kind.is_timeout() {
self.watch();
} else {
self.main_loop_handler
.job_executor
.set_job_success(job_id, false);
// [ref:TODO]: relaunch watch job with ratelimit for failure
self.main_loop_handler.send(ThreadEvent::UIEvent(
UIEvent::Notification(
@ -2215,6 +2288,9 @@ impl Account {
} => {
match handle.chan.try_recv() {
Ok(Some(Err(err))) => {
self.main_loop_handler
.job_executor
.set_job_success(job_id, false);
self.main_loop_handler.send(ThreadEvent::UIEvent(
UIEvent::Notification(
Some(format!("{}: {} failed", &self.name, name,)),

View File

@ -22,7 +22,7 @@
//! Async job executor thread pool
use std::{
collections::HashMap,
borrow::Cow,
future::Future,
iter,
panic::catch_unwind,
@ -37,7 +37,8 @@ use crossbeam::{
sync::{Parker, Unparker},
};
pub use futures::channel::oneshot;
use melib::{log, smol, uuid::Uuid};
use indexmap::IndexMap;
use melib::{log, smol, utils::datetime, uuid::Uuid, UnixTimestamp};
use crate::types::{ThreadEvent, UIEvent};
@ -101,16 +102,29 @@ uuid_hash_type!(TimerId);
pub struct MeliTask {
task: AsyncTask,
id: JobId,
desc: Cow<'static, str>,
timer: bool,
}
#[derive(Debug, Clone)]
/// A spawned future's metadata for book-keeping.
pub struct JobMetadata {
pub id: JobId,
pub desc: Cow<'static, str>,
pub timer: bool,
pub started: UnixTimestamp,
pub finished: Option<UnixTimestamp>,
pub succeeded: bool,
}
#[derive(Debug)]
pub struct JobExecutor {
global_queue: Arc<Injector<MeliTask>>,
workers: Vec<Stealer<MeliTask>>,
sender: Sender<ThreadEvent>,
parkers: Vec<Unparker>,
timers: Arc<Mutex<HashMap<TimerId, TimerPrivate>>>,
timers: Arc<Mutex<IndexMap<TimerId, TimerPrivate>>>,
pub jobs: Arc<Mutex<IndexMap<JobId, JobMetadata>>>,
}
#[derive(Debug, Default)]
@ -163,7 +177,8 @@ impl JobExecutor {
workers: vec![],
parkers: vec![],
sender,
timers: Arc::new(Mutex::new(HashMap::default())),
timers: Arc::new(Mutex::new(IndexMap::default())),
jobs: Arc::new(Mutex::new(IndexMap::default())),
};
let mut workers = vec![];
for _ in 0..num_cpus::get().max(1) {
@ -194,13 +209,18 @@ impl JobExecutor {
parker.park_timeout(Duration::from_millis(100));
let task = find_task(&local, &global, stealers.as_slice());
if let Some(meli_task) = task {
let MeliTask { task, id, timer } = meli_task;
let MeliTask {
task,
id,
timer,
desc,
} = meli_task;
if !timer {
log::trace!("Worker {} got task {:?}", i, id);
log::trace!("Worker {} got task {:?} {:?}", i, desc, id);
}
let _ = catch_unwind(|| task.run());
if !timer {
log::trace!("Worker {} returned after {:?}", i, id);
log::trace!("Worker {} returned after {:?} {:?}", i, desc, id);
}
}
})
@ -210,7 +230,7 @@ impl JobExecutor {
}
/// Spawns a future with a generic return value `R`
pub fn spawn_specialized<F, R>(&self, future: F) -> JoinHandle<R>
pub fn spawn_specialized<F, R>(&self, desc: Cow<'static, str>, future: F) -> JoinHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
@ -221,6 +241,19 @@ impl JobExecutor {
let injector = self.global_queue.clone();
let cancel = Arc::new(Mutex::new(false));
let cancel2 = cancel.clone();
self.jobs.lock().unwrap().insert(
job_id,
JobMetadata {
id: job_id,
desc: desc.clone(),
started: datetime::now(),
finished: None,
succeeded: true,
timer: false,
},
);
// Create a task and schedule it for execution.
let (handle, task) = async_task::spawn(
async move {
@ -234,9 +267,11 @@ impl JobExecutor {
if *cancel.lock().unwrap() {
return;
}
let desc = desc.clone();
injector.push(MeliTask {
task,
id: job_id,
desc,
timer: false,
})
},
@ -256,12 +291,15 @@ impl JobExecutor {
/// Spawns a future with a generic return value `R` that might block on a
/// new thread
pub fn spawn_blocking<F, R>(&self, future: F) -> JoinHandle<R>
pub fn spawn_blocking<F, R>(&self, desc: Cow<'static, str>, future: F) -> JoinHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
self.spawn_specialized(smol::unblock(move || futures::executor::block_on(future)))
self.spawn_specialized(
desc,
smol::unblock(move || futures::executor::block_on(future)),
)
}
pub fn create_timer(self: Arc<JobExecutor>, interval: Duration, value: Duration) -> Timer {
@ -327,6 +365,7 @@ impl JobExecutor {
injector.push(MeliTask {
task,
id: job_id,
desc: "timer".into(),
timer: true,
})
},
@ -356,6 +395,18 @@ impl JobExecutor {
timer.interval = new_val;
}
}
pub fn set_job_finished(&self, id: JobId) {
self.jobs.lock().unwrap().entry(id).and_modify(|entry| {
entry.finished = Some(datetime::now());
});
}
pub fn set_job_success(&self, id: JobId, value: bool) {
self.jobs.lock().unwrap().entry(id).and_modify(|entry| {
entry.succeeded = value;
});
}
}
pub type JobChannel<T> = oneshot::Receiver<T>;

View File

@ -294,6 +294,7 @@ fn run_app(opt: Opt) -> Result<()> {
},
ThreadEvent::JobFinished(id) => {
log::trace!("Job finished {}", id);
state.context.main_loop_handler.job_executor.set_job_finished(id);
for account in state.context.accounts.values_mut() {
if account.process_event(&id) {
break;

View File

@ -923,7 +923,7 @@ impl State {
.context
.main_loop_handler
.job_executor
.spawn_blocking(job);
.spawn_blocking("sqlite3::index".into(), job);
self.context.accounts[account_index].active_jobs.insert(
handle.job_id,
crate::conf::accounts::JobRequest::Generic {