mirror of
https://github.com/stenzek/duckstation.git
synced 2026-02-11 00:44:32 +00:00
HTTPDownloaderCurl: Use worker thread for processing
Should significantly speed up update downloads.
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
// SPDX-FileCopyrightText: 2019-2024 Connor McLaughlin <stenzek@gmail.com>
|
||||
// SPDX-FileCopyrightText: 2019-2025 Connor McLaughlin <stenzek@gmail.com>
|
||||
// SPDX-License-Identifier: CC-BY-NC-ND-4.0
|
||||
|
||||
#include "http_downloader.h"
|
||||
@@ -6,13 +6,16 @@
|
||||
#include "common/assert.h"
|
||||
#include "common/log.h"
|
||||
#include "common/string_util.h"
|
||||
#include "common/threading.h"
|
||||
#include "common/timer.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <curl/curl.h>
|
||||
#include <deque>
|
||||
#include <functional>
|
||||
#include <pthread.h>
|
||||
#include <signal.h>
|
||||
#include <thread>
|
||||
|
||||
LOG_CHANNEL(HTTPDownloader);
|
||||
|
||||
@@ -32,6 +35,12 @@ protected:
|
||||
void CloseRequest(HTTPDownloader::Request* request) override;
|
||||
|
||||
private:
|
||||
enum class QueueAction
|
||||
{
|
||||
Add,
|
||||
RemoveAndDelete
|
||||
};
|
||||
|
||||
struct Request : HTTPDownloader::Request
|
||||
{
|
||||
CURL* handle = nullptr;
|
||||
@@ -39,8 +48,21 @@ private:
|
||||
|
||||
static size_t WriteCallback(char* ptr, size_t size, size_t nmemb, void* userdata);
|
||||
|
||||
void WorkerThreadEntryPoint();
|
||||
|
||||
void ProcessQueuedActions();
|
||||
void ReadMultiResults();
|
||||
|
||||
CURLM* m_multi_handle = nullptr;
|
||||
std::string m_user_agent;
|
||||
|
||||
// Background thread for curl operations
|
||||
std::thread m_worker_thread;
|
||||
std::atomic_bool m_worker_thread_shutdown{false};
|
||||
|
||||
// Worker thread queue
|
||||
std::deque<std::pair<QueueAction, Request*>> m_worker_queue;
|
||||
std::mutex m_worker_queue_mutex;
|
||||
};
|
||||
} // namespace
|
||||
|
||||
@@ -50,6 +72,21 @@ HTTPDownloaderCurl::HTTPDownloaderCurl() : HTTPDownloader()
|
||||
|
||||
HTTPDownloaderCurl::~HTTPDownloaderCurl()
|
||||
{
|
||||
// Signal worker thread to shutdown
|
||||
if (m_worker_thread.joinable())
|
||||
{
|
||||
{
|
||||
const std::unique_lock lock(m_worker_queue_mutex);
|
||||
m_worker_thread_shutdown.store(true, std::memory_order_release);
|
||||
|
||||
// Should break the curl_multi_poll wait.
|
||||
if (m_multi_handle)
|
||||
curl_multi_wakeup(m_multi_handle);
|
||||
}
|
||||
|
||||
m_worker_thread.join();
|
||||
}
|
||||
|
||||
if (m_multi_handle)
|
||||
curl_multi_cleanup(m_multi_handle);
|
||||
}
|
||||
@@ -95,6 +132,9 @@ bool HTTPDownloaderCurl::Initialize(std::string user_agent, Error* error)
|
||||
}
|
||||
|
||||
m_user_agent = std::move(user_agent);
|
||||
|
||||
// Start the worker thread
|
||||
m_worker_thread = std::thread(&HTTPDownloaderCurl::WorkerThreadEntryPoint, this);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -131,22 +171,72 @@ HTTPDownloader::Request* HTTPDownloaderCurl::InternalCreateRequest()
|
||||
return req;
|
||||
}
|
||||
|
||||
void HTTPDownloaderCurl::InternalPollRequests()
|
||||
void HTTPDownloaderCurl::WorkerThreadEntryPoint()
|
||||
{
|
||||
Threading::SetNameOfCurrentThread("HTTPDownloaderCurl Worker Thread");
|
||||
|
||||
// Apparently OpenSSL can fire SIGPIPE...
|
||||
sigset_t old_block_mask = {};
|
||||
sigset_t new_block_mask = {};
|
||||
sigemptyset(&old_block_mask);
|
||||
sigemptyset(&new_block_mask);
|
||||
sigaddset(&new_block_mask, SIGPIPE);
|
||||
if (pthread_sigmask(SIG_BLOCK, &new_block_mask, &old_block_mask) != 0)
|
||||
sigset_t block_mask = {};
|
||||
sigemptyset(&block_mask);
|
||||
sigaddset(&block_mask, SIGPIPE);
|
||||
if (pthread_sigmask(SIG_BLOCK, &block_mask, nullptr) != 0)
|
||||
WARNING_LOG("Failed to block SIGPIPE");
|
||||
|
||||
int running_handles;
|
||||
const CURLMcode err = curl_multi_perform(m_multi_handle, &running_handles);
|
||||
if (err != CURLM_OK)
|
||||
ERROR_LOG("curl_multi_perform() returned {}", static_cast<int>(err));
|
||||
while (!m_worker_thread_shutdown.load(std::memory_order_acquire))
|
||||
{
|
||||
// Wait for activity with curl_multi_poll
|
||||
int numfds = 0;
|
||||
curl_multi_poll(m_multi_handle, nullptr, 0, 1000, &numfds);
|
||||
|
||||
// Process any queued actions
|
||||
ProcessQueuedActions();
|
||||
|
||||
// Perform curl operations
|
||||
int running_handles;
|
||||
const CURLMcode err = curl_multi_perform(m_multi_handle, &running_handles);
|
||||
if (err != CURLM_OK)
|
||||
ERROR_LOG("curl_multi_perform() returned {}", static_cast<int>(err));
|
||||
|
||||
// Read any results
|
||||
ReadMultiResults();
|
||||
}
|
||||
}
|
||||
|
||||
void HTTPDownloaderCurl::ProcessQueuedActions()
|
||||
{
|
||||
const std::unique_lock lock(m_worker_queue_mutex);
|
||||
while (!m_worker_queue.empty())
|
||||
{
|
||||
const auto& [action, request] = m_worker_queue.front();
|
||||
switch (action)
|
||||
{
|
||||
case QueueAction::Add:
|
||||
{
|
||||
const CURLMcode err = curl_multi_add_handle(m_multi_handle, request->handle);
|
||||
if (err != CURLM_OK)
|
||||
{
|
||||
ERROR_LOG("curl_multi_add_handle() returned {}", static_cast<int>(err));
|
||||
request->error.SetStringFmt("curl_multi_add_handle() failed: {}", curl_multi_strerror(err));
|
||||
request->state.store(Request::State::Complete, std::memory_order_release);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case QueueAction::RemoveAndDelete:
|
||||
{
|
||||
curl_multi_remove_handle(m_multi_handle, request->handle);
|
||||
curl_easy_cleanup(request->handle);
|
||||
delete request;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
m_worker_queue.pop_front();
|
||||
}
|
||||
}
|
||||
|
||||
void HTTPDownloaderCurl::ReadMultiResults()
|
||||
{
|
||||
for (;;)
|
||||
{
|
||||
int msgq;
|
||||
@@ -187,9 +277,11 @@ void HTTPDownloaderCurl::InternalPollRequests()
|
||||
|
||||
req->state.store(Request::State::Complete, std::memory_order_release);
|
||||
}
|
||||
}
|
||||
|
||||
if (pthread_sigmask(SIG_UNBLOCK, &new_block_mask, &old_block_mask) != 0)
|
||||
WARNING_LOG("Failed to unblock SIGPIPE");
|
||||
void HTTPDownloaderCurl::InternalPollRequests()
|
||||
{
|
||||
// noop - all handled by worker thread
|
||||
}
|
||||
|
||||
bool HTTPDownloaderCurl::StartRequest(HTTPDownloader::Request* request)
|
||||
@@ -213,16 +305,12 @@ bool HTTPDownloaderCurl::StartRequest(HTTPDownloader::Request* request)
|
||||
req->state.store(Request::State::Started, std::memory_order_release);
|
||||
req->start_time = Timer::GetCurrentValue();
|
||||
|
||||
const CURLMcode err = curl_multi_add_handle(m_multi_handle, req->handle);
|
||||
if (err != CURLM_OK)
|
||||
{
|
||||
ERROR_LOG("curl_multi_add_handle() returned {}", static_cast<int>(err));
|
||||
req->error.SetStringFmt("curl_multi_add_handle() failed: {}", curl_multi_strerror(err));
|
||||
req->callback(HTTP_STATUS_ERROR, req->error, std::string(), req->data);
|
||||
curl_easy_cleanup(req->handle);
|
||||
delete req;
|
||||
return false;
|
||||
}
|
||||
// Add to action queue for worker thread to process
|
||||
const std::unique_lock lock(m_worker_queue_mutex);
|
||||
m_worker_queue.emplace_back(QueueAction::Add, req);
|
||||
|
||||
// Wake up worker thread
|
||||
curl_multi_wakeup(m_multi_handle);
|
||||
|
||||
return true;
|
||||
}
|
||||
@@ -231,7 +319,11 @@ void HTTPDownloaderCurl::CloseRequest(HTTPDownloader::Request* request)
|
||||
{
|
||||
Request* req = static_cast<Request*>(request);
|
||||
DebugAssert(req->handle);
|
||||
curl_multi_remove_handle(m_multi_handle, req->handle);
|
||||
curl_easy_cleanup(req->handle);
|
||||
delete req;
|
||||
|
||||
// Add to action queue for worker thread to process
|
||||
const std::unique_lock lock(m_worker_queue_mutex);
|
||||
m_worker_queue.emplace_back(QueueAction::RemoveAndDelete, req);
|
||||
|
||||
// Wake up worker thread
|
||||
curl_multi_wakeup(m_multi_handle);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user