Host: Move async tasks to host namespace

So we can use proper QThreads for them to avoid lazy-creating the Qt
per-thread data.
This commit is contained in:
Stenzek
2025-12-20 21:43:10 +10:00
parent 57adcddc29
commit f4ff36b565
14 changed files with 88 additions and 48 deletions

View File

@@ -3224,7 +3224,7 @@ void Achievements::UpdateProgressDatabase()
}
// done asynchronously so we don't hitch on disk I/O
System::QueueAsyncTask([game_id = s_state.game_id, achievements_unlocked, achievements_unlocked_hardcore]() {
Host::QueueAsyncTask([game_id = s_state.game_id, achievements_unlocked, achievements_unlocked_hardcore]() {
// no point storing it in memory, just write directly to the file
Error error;
FileSystem::ManagedCFilePtr fp = OpenProgressDatabase(true, false, &error);

View File

@@ -2526,7 +2526,7 @@ void FullscreenUI::DrawCoverDownloaderWindow()
{
// TODO: Remove release once using move_only_function
std::unique_ptr<ProgressCallback> progress = OpenModalProgressDialog(FSUI_STR("Cover Downloader"), 1000.0f);
System::QueueAsyncTask([progress = progress.release(), urls = StringUtil::SplitNewString(template_urls, '\n'),
Host::QueueAsyncTask([progress = progress.release(), urls = StringUtil::SplitNewString(template_urls, '\n'),
use_serial_names = use_serial_names]() {
Error error;
if (!GameList::DownloadCovers(
@@ -4943,7 +4943,7 @@ void FullscreenUI::StartAchievementsProgressRefresh()
{
auto progress = OpenModalProgressDialog(FSUI_STR("Refresh Achievement Progress"));
System::QueueAsyncTask([progress = progress.release()]() {
Host::QueueAsyncTask([progress = progress.release()]() {
Error error;
const bool result = Achievements::RefreshAllProgressDatabase(progress, &error);
Host::RunOnCoreThread([error = std::move(error), progress, result]() mutable {
@@ -4962,7 +4962,7 @@ void FullscreenUI::StartAchievementsGameIconDownload()
{
auto progress = OpenModalProgressDialog(FSUI_STR("Download Game Icons"));
System::QueueAsyncTask([progress = progress.release()]() {
Host::QueueAsyncTask([progress = progress.release()]() {
Error error;
const bool result = Achievements::DownloadGameIcons(progress, &error);
Host::RunOnCoreThread([error = std::move(error), progress, result]() mutable {

View File

@@ -621,7 +621,7 @@ GPUTexture* FullscreenUI::GetCachedTextureAsync(std::string_view name)
tex_ptr = s_state.texture_cache.Insert(std::string(name), s_state.placeholder_texture);
// queue the actual load
System::QueueAsyncTask([path = std::string(name)]() mutable {
Host::QueueAsyncTask([path = std::string(name)]() mutable {
std::optional<Image> image(LoadTextureImage(path.c_str(), 0, 0));
// don't bother queuing back if it doesn't exist
@@ -653,7 +653,7 @@ GPUTexture* FullscreenUI::GetCachedTextureAsync(std::string_view name, u32 svg_w
tex_ptr = s_state.texture_cache.Insert(std::string(wh_name), s_state.placeholder_texture);
// queue the actual load
System::QueueAsyncTask([path = std::string(name), wh_name = std::string(wh_name), svg_width, svg_height]() mutable {
Host::QueueAsyncTask([path = std::string(name), wh_name = std::string(wh_name), svg_width, svg_height]() mutable {
std::optional<Image> image(LoadTextureImage(path.c_str(), svg_width, svg_height));
// don't bother queuing back if it doesn't exist

View File

@@ -10,6 +10,7 @@
#include "gpu_shadergen.h"
#include "gpu_sw_rasterizer.h"
#include "gpu_thread.h"
#include "host.h"
#include "interrupt_controller.h"
#include "performance_counters.h"
#include "settings.h"
@@ -2314,7 +2315,7 @@ void GPU::StopRecordingGPUDump()
Host::AddIconOSDMessage(
OSDMessageType::Persistent, osd_key, ICON_EMOJI_CAMERA_WITH_FLASH,
fmt::format(TRANSLATE_FS("GPU", "Compressing GPU trace '{}'..."), Path::GetFileName(source_path)));
System::QueueAsyncTask([compress_mode, source_path = std::move(source_path), osd_key = std::move(osd_key)]() mutable {
Host::QueueAsyncTask([compress_mode, source_path = std::move(source_path), osd_key = std::move(osd_key)]() mutable {
Error error;
if (GPUDump::Recorder::Compress(source_path, compress_mode, &error))
{

View File

@@ -6,6 +6,7 @@
#include "gpu_presenter.h"
#include "gpu_sw_rasterizer.h"
#include "gpu_thread.h"
#include "host.h"
#include "performance_counters.h"
#include "save_state_version.h"
#include "settings.h"
@@ -777,9 +778,9 @@ void GPUBackend::RenderScreenshotToFile(const std::string_view path, DisplayScre
fmt::format(TRANSLATE_FS("GPU", "Saving screenshot to '{}'."), Path::GetFileName(path)));
}
System::QueueAsyncTask([path = std::move(path), fp = fp.release(), quality,
flip_y = g_gpu_device->UsesLowerLeftOrigin(), image = std::move(image),
osd_key = std::move(osd_key)]() mutable {
Host::QueueAsyncTask([path = std::move(path), fp = fp.release(), quality,
flip_y = g_gpu_device->UsesLowerLeftOrigin(), image = std::move(image),
osd_key = std::move(osd_key)]() mutable {
Error error;
if (flip_y)

View File

@@ -2886,7 +2886,7 @@ void GPUTextureCache::DumpTexture(TextureReplacementType type, u32 offset_x, u32
GPUTextureCache::DecodeTexture(mode, &g_vram[rect.top * VRAM_WIDTH + rect.left], palette_data, image.GetPixels(),
image.GetPitch(), width, height, GPUTexture::Format::RGBA8);
System::QueueAsyncTask([path = std::move(path), image = std::move(image), width, height, semitransparent]() mutable {
Host::QueueAsyncTask([path = std::move(path), image = std::move(image), width, height, semitransparent]() mutable {
// TODO: Vectorize this.
u32* image_pixels = reinterpret_cast<u32*>(image.GetPixels());
const u32* image_pixels_end = image_pixels + (width * height);

View File

@@ -70,6 +70,10 @@ void RunOnCoreThread(std::function<void()> function, bool block = false);
/// Safely executes a function on the main/UI thread.
void RunOnUIThread(std::function<void()> function, bool block = false);
/// Asynchronous work tasks, complete on worker thread.
void QueueAsyncTask(std::function<void()> function);
void WaitForAllAsyncTasks();
/// Commits any changes made to the base settings layer to the host.
void CommitBaseSettingChanges();

View File

@@ -69,7 +69,6 @@
#include "common/path.h"
#include "common/ryml_helpers.h"
#include "common/string_util.h"
#include "common/task_queue.h"
#include "common/time_helpers.h"
#include "common/timer.h"
@@ -335,9 +334,6 @@ struct ALIGN_TO_CACHE_LINE StateVars
// internal async task counters
std::atomic_uint32_t outstanding_save_state_tasks{0};
// async task pool
TaskQueue async_task_queue;
#ifdef ENABLE_SOCKET_MULTIPLEXER
std::unique_ptr<SocketMultiplexer> socket_multiplexer;
#endif
@@ -519,7 +515,7 @@ void System::ProcessShutdown()
CPU::CodeCache::ProcessShutdown();
}
bool System::CoreThreadInitialize(Error* error, u32 async_worker_thread_count)
bool System::CoreThreadInitialize(Error* error)
{
#ifdef _WIN32
// On Win32, we have a bunch of things which use COM (e.g. SDL, Cubeb, etc).
@@ -534,7 +530,6 @@ bool System::CoreThreadInitialize(Error* error, u32 async_worker_thread_count)
#endif
s_state.core_thread_handle = Threading::ThreadHandle::GetForCallingThread();
s_state.async_task_queue.SetWorkerCount(async_worker_thread_count);
// This will call back to Host::LoadSettings() -> ReloadSources().
LoadSettings(false);
@@ -564,7 +559,6 @@ void System::CoreThreadShutdown()
InputManager::CloseSources();
s_state.async_task_queue.SetWorkerCount(0);
s_state.core_thread_handle = {};
#ifdef _WIN32
@@ -3265,9 +3259,9 @@ bool System::SaveState(std::string path, Error* error, bool backup_existing_save
FlushSaveStates();
s_state.outstanding_save_state_tasks.fetch_add(1, std::memory_order_acq_rel);
s_state.async_task_queue.SubmitTask([path = std::move(path), buffer = std::move(buffer),
completion_callback = std::move(completion_callback), backup_existing_save,
compression = g_settings.save_state_compression]() {
Host::QueueAsyncTask([path = std::move(path), buffer = std::move(buffer),
completion_callback = std::move(completion_callback), backup_existing_save,
compression = g_settings.save_state_compression]() {
INFO_LOG("Saving state to '{}'...", path);
Error lerror;
@@ -3417,7 +3411,7 @@ void System::SaveStateToSlot(bool global, s32 slot)
void System::FlushSaveStates()
{
while (s_state.outstanding_save_state_tasks.load(std::memory_order_acquire) > 0)
WaitForAllAsyncTasks();
Host::WaitForAllAsyncTasks();
}
bool System::SaveStateToBuffer(SaveStateBuffer* buffer, Error* error, u32 screenshot_size /* = 256 */)
@@ -6218,16 +6212,6 @@ u64 System::GetSessionPlayedTime()
return static_cast<u64>(std::round(Timer::ConvertValueToSeconds(ctime - s_state.session_start_time)));
}
void System::QueueAsyncTask(std::function<void()> function)
{
s_state.async_task_queue.SubmitTask(std::move(function));
}
void System::WaitForAllAsyncTasks()
{
s_state.async_task_queue.WaitForAll();
}
SocketMultiplexer* System::GetSocketMultiplexer()
{
#ifdef ENABLE_SOCKET_MULTIPLEXER

View File

@@ -454,10 +454,6 @@ void CalculateRewindMemoryUsage(u32 num_saves, u32 resolution_scale, u32 multisa
void ClearMemorySaveStates(bool reallocate_resources, bool recycle_textures);
void SetRunaheadReplayFlag(bool is_analog_input);
/// Asynchronous work tasks, complete on worker thread.
void QueueAsyncTask(std::function<void()> function);
void WaitForAllAsyncTasks();
/// Shared socket multiplexer.
SocketMultiplexer* GetSocketMultiplexer();
void ReleaseSocketMultiplexer();

View File

@@ -69,7 +69,7 @@ bool ProcessStartup(Error* error);
void ProcessShutdown();
/// Called on CPU thread initialization.
bool CoreThreadInitialize(Error* error, u32 async_worker_thread_count);
bool CoreThreadInitialize(Error* error);
/// Called on CPU thread shutdown.
void CoreThreadShutdown();

View File

@@ -39,6 +39,7 @@
#include "common/log.h"
#include "common/path.h"
#include "common/string_util.h"
#include "common/task_queue.h"
#include "common/threading.h"
#include "common/time_helpers.h"
@@ -131,6 +132,8 @@ struct SDLHostState
};
static SDLHostState s_state;
ALIGN_TO_CACHE_LINE static TaskQueue s_async_task_queue;
} // namespace MiniHost
//////////////////////////////////////////////////////////////////////////
@@ -962,12 +965,15 @@ void MiniHost::CoreThreadEntryPoint()
// input source setup must happen on emu thread
Error error;
if (!System::CoreThreadInitialize(&error, NUM_ASYNC_WORKER_THREADS))
if (!System::CoreThreadInitialize(&error))
{
Host::ReportFatalError("CPU Thread Initialization Failed", error.GetDescription());
return;
}
// startup worker threads
s_async_task_queue.SetWorkerCount(NUM_ASYNC_WORKER_THREADS);
// start up GPU thread
s_state.gpu_thread.Start(&GPUThreadEntryPoint);
@@ -999,6 +1005,9 @@ void MiniHost::CoreThreadEntryPoint()
GPUThread::Internal::RequestShutdown();
s_state.gpu_thread.Join();
// join worker threads
s_async_task_queue.SetWorkerCount(0);
System::CoreThreadShutdown();
// Tell the UI thread to shut down.
@@ -1197,6 +1206,16 @@ void Host::RunOnUIThread(std::function<void()> function, bool block /* = false *
SDL_PushEvent(&ev);
}
void Host::QueueAsyncTask(std::function<void()> function)
{
MiniHost::s_async_task_queue.SubmitTask(std::move(function));
}
void Host::WaitForAllAsyncTasks()
{
MiniHost::s_async_task_queue.WaitForAll();
}
void Host::RefreshGameListAsync(bool invalidate_cache)
{
using namespace MiniHost;
@@ -1211,7 +1230,7 @@ void Host::RefreshGameListAsync(bool invalidate_cache)
}
s_state.game_list_refresh_progress = new FullscreenUI::BackgroundProgressCallback("glrefresh");
System::QueueAsyncTask([invalidate_cache]() {
Host::QueueAsyncTask([invalidate_cache]() {
GameList::Refresh(invalidate_cache, false, s_state.game_list_refresh_progress);
std::unique_lock lock(s_state.state_mutex);
@@ -1232,7 +1251,7 @@ void Host::CancelGameListRefresh()
s_state.game_list_refresh_progress->SetCancelled();
}
System::WaitForAllAsyncTasks();
Host::WaitForAllAsyncTasks();
}
void Host::OnGameListEntriesChanged(std::span<const u32> changed_indices)
@@ -1407,7 +1426,7 @@ void Host::ConfirmMessageAsync(std::string_view title, std::string_view message,
std::string_view no_text /* = std::string_view() */)
{
Host::RunOnCoreThread([title = std::string(title), message = std::string(message), callback = std::move(callback),
yes_text = std::string(yes_text), no_text = std::move(no_text)]() mutable {
yes_text = std::string(yes_text), no_text = std::move(no_text)]() mutable {
// in case we haven't started yet...
if (!FullscreenUI::IsInitialized())
{

View File

@@ -44,6 +44,7 @@
#include "common/path.h"
#include "common/scoped_guard.h"
#include "common/string_util.h"
#include "common/task_queue.h"
#include "common/threading.h"
#include "util/audio_stream.h"
@@ -175,6 +176,7 @@ struct State
} // namespace
ALIGN_TO_CACHE_LINE static State s_state;
ALIGN_TO_CACHE_LINE static TaskQueue s_async_task_queue;
CoreThread* g_core_thread;
@@ -1419,6 +1421,16 @@ void Host::RunOnUIThread(std::function<void()> function, bool block /* = false*/
block ? Qt::BlockingQueuedConnection : Qt::QueuedConnection, std::move(function));
}
void Host::QueueAsyncTask(std::function<void()> function)
{
s_async_task_queue.SubmitTask(std::move(function));
}
void Host::WaitForAllAsyncTasks()
{
s_async_task_queue.WaitForAll();
}
QtAsyncTask::QtAsyncTask(WorkCallback callback)
{
m_callback = std::move(callback);
@@ -1431,7 +1443,7 @@ void QtAsyncTask::create(QObject* owner, WorkCallback callback)
// NOTE: Must get connected before queuing, because otherwise you risk a race.
QtAsyncTask* task = new QtAsyncTask(std::move(callback));
connect(task, &QtAsyncTask::completed, owner, [task]() { std::get<CompletionCallback>(task->m_callback)(); });
System::QueueAsyncTask([task]() {
Host::QueueAsyncTask([task]() {
task->m_callback = std::get<WorkCallback>(task->m_callback)();
Host::RunOnUIThread([task]() {
emit task->completed(task);
@@ -1971,7 +1983,7 @@ void CoreThread::run()
// input source setup must happen on emu thread
{
Error startup_error;
if (!System::CoreThreadInitialize(&startup_error, NUM_ASYNC_WORKER_THREADS))
if (!System::CoreThreadInitialize(&startup_error))
{
moveToThread(m_ui_thread);
Host::ReportFatalError("Fatal Startup Error", startup_error.GetDescription());
@@ -1979,6 +1991,10 @@ void CoreThread::run()
}
}
// start up worker threads
// TODO: Replace this with QThreads
s_async_task_queue.SetWorkerCount(NUM_ASYNC_WORKER_THREADS);
// enumerate all devices, even those which were added early
m_input_device_list_model->enumerateDevices();
@@ -2019,6 +2035,9 @@ void CoreThread::run()
GPUThread::Internal::RequestShutdown();
gpu_thread.Join();
// join worker threads
s_async_task_queue.SetWorkerCount(0);
// and tidy up everything left
System::CoreThreadShutdown();

View File

@@ -164,7 +164,7 @@ void QtAsyncTaskWithProgress::start()
// Disconnect from the calling thread, so it can be pulled by the async task.
moveToThread(nullptr);
System::QueueAsyncTask([this]() mutable {
Host::QueueAsyncTask([this]() mutable {
QThread* const worker_thread = QThread::currentThread();
moveToThread(worker_thread);
@@ -290,7 +290,7 @@ QtAsyncTaskWithProgressDialog* QtAsyncTaskWithProgressDialog::create(QWidget* pa
connect(task, &QtAsyncTaskWithProgressDialog::completed, parent,
[task]() { std::get<CompletionCallback>(task->m_callback)(); });
System::QueueAsyncTask([task]() {
Host::QueueAsyncTask([task]() {
task->m_callback = std::get<WorkCallback>(task->m_callback)(task);
Host::RunOnUIThread([task]() {
emit task->completed(task);

View File

@@ -35,6 +35,7 @@
#include "common/path.h"
#include "common/sha256_digest.h"
#include "common/string_util.h"
#include "common/task_queue.h"
#include "common/threading.h"
#include "common/time_helpers.h"
#include "common/timer.h"
@@ -71,6 +72,7 @@ struct RegTestHostState
};
static RegTestHostState s_state;
ALIGN_TO_CACHE_LINE static TaskQueue s_async_task_queue;
} // namespace RegTestHost
@@ -394,6 +396,16 @@ void Host::RunOnUIThread(std::function<void()> function, bool block /* = false *
RunOnCoreThread(std::move(function), block);
}
void Host::QueueAsyncTask(std::function<void()> function)
{
RegTestHost::s_async_task_queue.SubmitTask(std::move(function));
}
void Host::WaitForAllAsyncTasks()
{
RegTestHost::s_async_task_queue.WaitForAll();
}
void Host::RequestResizeHostDisplay(s32 width, s32 height)
{
//
@@ -512,7 +524,7 @@ void Host::FrameDoneOnGPUThread(GPUBackend* gpu_backend, u32 frame_number)
return;
}
System::QueueAsyncTask([path = std::move(path), fp = fp.release(), image = std::move(image)]() mutable {
Host::QueueAsyncTask([path = std::move(path), fp = fp.release(), image = std::move(image)]() mutable {
Error error;
if (image.GetFormat() != ImageFormat::RGBA8)
@@ -1022,13 +1034,15 @@ int main(int argc, char* argv[])
if (!RegTestHost::SetNewDataRoot(autoboot->path))
return EXIT_FAILURE;
// Only one async worker.
if (!System::CoreThreadInitialize(&startup_error, 1))
if (!System::CoreThreadInitialize(&startup_error))
{
ERROR_LOG("CoreThreadInitialize() failed: {}", startup_error.GetDescription());
return EXIT_FAILURE;
}
// Only one async worker, keep the CPU usage down so we can parallelize execution of regtest itself.
RegTestHost::s_async_task_queue.SetWorkerCount(1);
RegTestHost::HookSignals();
s_gpu_thread.Start(&RegTestHost::GPUThreadEntryPoint);
@@ -1084,6 +1098,8 @@ cleanup:
s_gpu_thread.Join();
}
RegTestHost::s_async_task_queue.SetWorkerCount(0);
RegTestHost::ProcessCoreThreadEvents();
System::CoreThreadShutdown();
System::ProcessShutdown();