HTTPDownloaderCurl: Use worker thread for processing

Should significantly speed up update downloads.
This commit is contained in:
Stenzek
2025-11-29 15:37:52 +10:00
parent d6f7084309
commit e2b3f65134

View File

@@ -1,4 +1,4 @@
// SPDX-FileCopyrightText: 2019-2024 Connor McLaughlin <stenzek@gmail.com>
// SPDX-FileCopyrightText: 2019-2025 Connor McLaughlin <stenzek@gmail.com>
// SPDX-License-Identifier: CC-BY-NC-ND-4.0
#include "http_downloader.h"
@@ -6,13 +6,16 @@
#include "common/assert.h"
#include "common/log.h"
#include "common/string_util.h"
#include "common/threading.h"
#include "common/timer.h"
#include <algorithm>
#include <curl/curl.h>
#include <deque>
#include <functional>
#include <pthread.h>
#include <signal.h>
#include <thread>
LOG_CHANNEL(HTTPDownloader);
@@ -32,6 +35,12 @@ protected:
void CloseRequest(HTTPDownloader::Request* request) override;
private:
enum class QueueAction
{
Add,
RemoveAndDelete
};
struct Request : HTTPDownloader::Request
{
CURL* handle = nullptr;
@@ -39,8 +48,21 @@ private:
static size_t WriteCallback(char* ptr, size_t size, size_t nmemb, void* userdata);
void WorkerThreadEntryPoint();
void ProcessQueuedActions();
void ReadMultiResults();
CURLM* m_multi_handle = nullptr;
std::string m_user_agent;
// Background thread for curl operations
std::thread m_worker_thread;
std::atomic_bool m_worker_thread_shutdown{false};
// Worker thread queue
std::deque<std::pair<QueueAction, Request*>> m_worker_queue;
std::mutex m_worker_queue_mutex;
};
} // namespace
@@ -50,6 +72,21 @@ HTTPDownloaderCurl::HTTPDownloaderCurl() : HTTPDownloader()
HTTPDownloaderCurl::~HTTPDownloaderCurl()
{
// Signal worker thread to shutdown
if (m_worker_thread.joinable())
{
{
const std::unique_lock lock(m_worker_queue_mutex);
m_worker_thread_shutdown.store(true, std::memory_order_release);
// Should break the curl_multi_poll wait.
if (m_multi_handle)
curl_multi_wakeup(m_multi_handle);
}
m_worker_thread.join();
}
if (m_multi_handle)
curl_multi_cleanup(m_multi_handle);
}
@@ -95,6 +132,9 @@ bool HTTPDownloaderCurl::Initialize(std::string user_agent, Error* error)
}
m_user_agent = std::move(user_agent);
// Start the worker thread
m_worker_thread = std::thread(&HTTPDownloaderCurl::WorkerThreadEntryPoint, this);
return true;
}
@@ -131,22 +171,72 @@ HTTPDownloader::Request* HTTPDownloaderCurl::InternalCreateRequest()
return req;
}
void HTTPDownloaderCurl::InternalPollRequests()
void HTTPDownloaderCurl::WorkerThreadEntryPoint()
{
Threading::SetNameOfCurrentThread("HTTPDownloaderCurl Worker Thread");
// Apparently OpenSSL can fire SIGPIPE...
sigset_t old_block_mask = {};
sigset_t new_block_mask = {};
sigemptyset(&old_block_mask);
sigemptyset(&new_block_mask);
sigaddset(&new_block_mask, SIGPIPE);
if (pthread_sigmask(SIG_BLOCK, &new_block_mask, &old_block_mask) != 0)
sigset_t block_mask = {};
sigemptyset(&block_mask);
sigaddset(&block_mask, SIGPIPE);
if (pthread_sigmask(SIG_BLOCK, &block_mask, nullptr) != 0)
WARNING_LOG("Failed to block SIGPIPE");
int running_handles;
const CURLMcode err = curl_multi_perform(m_multi_handle, &running_handles);
if (err != CURLM_OK)
ERROR_LOG("curl_multi_perform() returned {}", static_cast<int>(err));
while (!m_worker_thread_shutdown.load(std::memory_order_acquire))
{
// Wait for activity with curl_multi_poll
int numfds = 0;
curl_multi_poll(m_multi_handle, nullptr, 0, 1000, &numfds);
// Process any queued actions
ProcessQueuedActions();
// Perform curl operations
int running_handles;
const CURLMcode err = curl_multi_perform(m_multi_handle, &running_handles);
if (err != CURLM_OK)
ERROR_LOG("curl_multi_perform() returned {}", static_cast<int>(err));
// Read any results
ReadMultiResults();
}
}
void HTTPDownloaderCurl::ProcessQueuedActions()
{
const std::unique_lock lock(m_worker_queue_mutex);
while (!m_worker_queue.empty())
{
const auto& [action, request] = m_worker_queue.front();
switch (action)
{
case QueueAction::Add:
{
const CURLMcode err = curl_multi_add_handle(m_multi_handle, request->handle);
if (err != CURLM_OK)
{
ERROR_LOG("curl_multi_add_handle() returned {}", static_cast<int>(err));
request->error.SetStringFmt("curl_multi_add_handle() failed: {}", curl_multi_strerror(err));
request->state.store(Request::State::Complete, std::memory_order_release);
}
}
break;
case QueueAction::RemoveAndDelete:
{
curl_multi_remove_handle(m_multi_handle, request->handle);
curl_easy_cleanup(request->handle);
delete request;
}
break;
}
m_worker_queue.pop_front();
}
}
void HTTPDownloaderCurl::ReadMultiResults()
{
for (;;)
{
int msgq;
@@ -187,9 +277,11 @@ void HTTPDownloaderCurl::InternalPollRequests()
req->state.store(Request::State::Complete, std::memory_order_release);
}
}
if (pthread_sigmask(SIG_UNBLOCK, &new_block_mask, &old_block_mask) != 0)
WARNING_LOG("Failed to unblock SIGPIPE");
void HTTPDownloaderCurl::InternalPollRequests()
{
// noop - all handled by worker thread
}
bool HTTPDownloaderCurl::StartRequest(HTTPDownloader::Request* request)
@@ -213,16 +305,12 @@ bool HTTPDownloaderCurl::StartRequest(HTTPDownloader::Request* request)
req->state.store(Request::State::Started, std::memory_order_release);
req->start_time = Timer::GetCurrentValue();
const CURLMcode err = curl_multi_add_handle(m_multi_handle, req->handle);
if (err != CURLM_OK)
{
ERROR_LOG("curl_multi_add_handle() returned {}", static_cast<int>(err));
req->error.SetStringFmt("curl_multi_add_handle() failed: {}", curl_multi_strerror(err));
req->callback(HTTP_STATUS_ERROR, req->error, std::string(), req->data);
curl_easy_cleanup(req->handle);
delete req;
return false;
}
// Add to action queue for worker thread to process
const std::unique_lock lock(m_worker_queue_mutex);
m_worker_queue.emplace_back(QueueAction::Add, req);
// Wake up worker thread
curl_multi_wakeup(m_multi_handle);
return true;
}
@@ -231,7 +319,11 @@ void HTTPDownloaderCurl::CloseRequest(HTTPDownloader::Request* request)
{
Request* req = static_cast<Request*>(request);
DebugAssert(req->handle);
curl_multi_remove_handle(m_multi_handle, req->handle);
curl_easy_cleanup(req->handle);
delete req;
// Add to action queue for worker thread to process
const std::unique_lock lock(m_worker_queue_mutex);
m_worker_queue.emplace_back(QueueAction::RemoveAndDelete, req);
// Wake up worker thread
curl_multi_wakeup(m_multi_handle);
}