diff --git a/src/logid/CMakeLists.txt b/src/logid/CMakeLists.txt index 052ba5c..6d93d7b 100644 --- a/src/logid/CMakeLists.txt +++ b/src/logid/CMakeLists.txt @@ -48,6 +48,9 @@ add_executable(logid backend/hidpp20/features/HiresScroll.cpp backend/dj/Report.cpp util/mutex_queue.h + util/workqueue.cpp + util/worker_thread.cpp + util/task.cpp util/thread.cpp util/ExceptionHandler.cpp) diff --git a/src/logid/Configuration.cpp b/src/logid/Configuration.cpp index a94265a..0826c0f 100644 --- a/src/logid/Configuration.cpp +++ b/src/logid/Configuration.cpp @@ -50,12 +50,28 @@ Configuration::Configuration(const std::string& config_file) return; } + _worker_threads = LOGID_DEFAULT_WORKER_COUNT; + try { + auto& worker_count = root["workers"]; + if(worker_count.getType() == Setting::TypeInt) { + _worker_threads = worker_count; + if(_worker_threads < 0) + logPrintf(WARN, "Line %d: workers cannot be negative.", + worker_count.getSourceLine()); + } else { + logPrintf(WARN, "Line %d: workers must be an integer.", + worker_count.getSourceLine()); + } + } catch(const SettingNotFoundException& e) { + // Ignore + } + _io_timeout = LOGID_DEFAULT_RAWDEVICE_TIMEOUT; try { auto& timeout = root["io_timeout"]; if(timeout.isNumber()) { auto t = timeout.getType(); - if(timeout.getType() == libconfig::Setting::TypeFloat) + if(timeout.getType() == Setting::TypeFloat) _io_timeout = duration_cast( duration(timeout)); else @@ -109,6 +125,11 @@ const char * Configuration::DeviceNotFound::what() const noexcept return _name.c_str(); } +int Configuration::workerCount() const +{ + return _worker_threads; +} + std::chrono::milliseconds Configuration::ioTimeout() const { return _io_timeout; diff --git a/src/logid/Configuration.h b/src/logid/Configuration.h index d083ef9..ed10893 100644 --- a/src/logid/Configuration.h +++ b/src/logid/Configuration.h @@ -25,6 +25,7 @@ #include #define LOGID_DEFAULT_RAWDEVICE_TIMEOUT std::chrono::seconds(2) +#define LOGID_DEFAULT_WORKER_COUNT 2 namespace logid { @@ -46,9 +47,11 @@ namespace logid }; std::chrono::milliseconds ioTimeout() const; + int workerCount() const; private: std::map _device_paths; std::chrono::milliseconds _io_timeout; + int _worker_threads; libconfig::Config _config; }; diff --git a/src/logid/actions/ToggleHiresScroll.cpp b/src/logid/actions/ToggleHiresScroll.cpp index 6dc9703..7fbf5ac 100644 --- a/src/logid/actions/ToggleHiresScroll.cpp +++ b/src/logid/actions/ToggleHiresScroll.cpp @@ -18,7 +18,7 @@ #include "ToggleHiresScroll.h" #include "../Device.h" #include "../util/log.h" -#include "../util/thread.h" +#include "../util/task.h" #include "../backend/hidpp20/features/ReprogControls.h" using namespace logid::actions; @@ -40,7 +40,7 @@ void ToggleHiresScroll::press() _pressed = true; if(_hires_scroll) { - thread::spawn([hires=this->_hires_scroll](){ + task::spawn([hires=this->_hires_scroll](){ auto mode = hires->getMode(); mode ^= backend::hidpp20::HiresScroll::HiRes; hires->setMode(mode); diff --git a/src/logid/actions/ToggleSmartShift.cpp b/src/logid/actions/ToggleSmartShift.cpp index 9713c16..dfe86a9 100644 --- a/src/logid/actions/ToggleSmartShift.cpp +++ b/src/logid/actions/ToggleSmartShift.cpp @@ -18,7 +18,7 @@ #include "ToggleSmartShift.h" #include "../Device.h" #include "../backend/hidpp20/features/ReprogControls.h" -#include "../util/thread.h" +#include "../util/task.h" using namespace logid::actions; using namespace logid::backend; @@ -39,7 +39,7 @@ void ToggleSmartShift::press() _pressed = true; if(_smartshift) { - thread::spawn([ss=this->_smartshift](){ + task::spawn([ss=this->_smartshift](){ auto status = ss->getStatus(); status.setActive = true; status.active = !status.active; diff --git a/src/logid/backend/dj/ReceiverMonitor.cpp b/src/logid/backend/dj/ReceiverMonitor.cpp index a983171..d483475 100644 --- a/src/logid/backend/dj/ReceiverMonitor.cpp +++ b/src/logid/backend/dj/ReceiverMonitor.cpp @@ -17,7 +17,7 @@ */ #include "ReceiverMonitor.h" -#include "../../util/thread.h" +#include "../../util/task.h" #include "../../util/log.h" #include @@ -57,7 +57,7 @@ void ReceiverMonitor::run() /* Running in a new thread prevents deadlocks since the * receiver may be enumerating. */ - thread::spawn({[this, report]() { + task::spawn({[this, report]() { if (report.subId() == Receiver::DeviceConnection) this->addDevice(this->_receiver->deviceConnectionEvent (report)); diff --git a/src/logid/backend/raw/DeviceMonitor.cpp b/src/logid/backend/raw/DeviceMonitor.cpp index fcbc6db..278f4ca 100644 --- a/src/logid/backend/raw/DeviceMonitor.cpp +++ b/src/logid/backend/raw/DeviceMonitor.cpp @@ -17,7 +17,7 @@ */ #include "DeviceMonitor.h" -#include "../../util/thread.h" +#include "../../util/task.h" #include "../../util/log.h" #include @@ -98,14 +98,14 @@ void DeviceMonitor::run() std::string devnode = udev_device_get_devnode(device); if (action == "add") - thread::spawn([this, name=devnode]() { + task::spawn([this, name=devnode]() { this->addDevice(name); }, [name=devnode](std::exception& e){ logPrintf(WARN, "Error adding device %s: %s", name.c_str(), e.what()); }); else if (action == "remove") - thread::spawn([this, name=devnode]() { + task::spawn([this, name=devnode]() { this->removeDevice(name); }, [name=devnode](std::exception& e){ logPrintf(WARN, "Error removing device %s: %s", @@ -157,7 +157,7 @@ void DeviceMonitor::enumerate() std::string devnode = udev_device_get_devnode(device); udev_device_unref(device); - thread::spawn([this, name=devnode]() { + task::spawn([this, name=devnode]() { this->addDevice(name); }, [name=devnode](std::exception& e){ logPrintf(ERROR, "Error adding device %s: %s", diff --git a/src/logid/logid.cpp b/src/logid/logid.cpp index 6fc1197..0d3bc34 100644 --- a/src/logid/logid.cpp +++ b/src/logid/logid.cpp @@ -25,6 +25,7 @@ #include "DeviceManager.h" #include "logid.h" #include "InputDevice.h" +#include "util/workqueue.h" #define LOGID_VIRTUAL_INPUT_NAME "LogiOps Virtual Input" #define DEFAULT_CONFIG_FILE "/etc/logid.cfg" @@ -42,6 +43,7 @@ LogLevel logid::global_loglevel = INFO; std::shared_ptr logid::global_config; std::unique_ptr logid::device_manager; std::unique_ptr logid::virtual_input; +std::unique_ptr logid::global_workqueue; bool logid::kill_logid = false; std::mutex logid::device_manager_reload; @@ -155,6 +157,8 @@ Possible options are: int main(int argc, char** argv) { + global_workqueue = std::make_unique(LOGID_DEFAULT_WORKER_COUNT); + readCliOptions(argc, argv); // Read config @@ -165,6 +169,8 @@ int main(int argc, char** argv) global_config = std::make_shared(); } + global_workqueue->setThreadCount(global_config->workerCount()); + //Create a virtual input device try { virtual_input = std::make_unique(LOGID_VIRTUAL_INPUT_NAME); diff --git a/src/logid/util/ExceptionHandler.cpp b/src/logid/util/ExceptionHandler.cpp index 859783f..48c723f 100644 --- a/src/logid/util/ExceptionHandler.cpp +++ b/src/logid/util/ExceptionHandler.cpp @@ -28,16 +28,16 @@ void ExceptionHandler::Default(std::exception& error) try { throw error; } catch(backend::hidpp10::Error& e) { - logPrintf(WARN, "HID++ 1.0 error ignored on detached thread: %s", + logPrintf(WARN, "HID++ 1.0 error ignored on detached thread/task: %s", error.what()); } catch(backend::hidpp20::Error& e) { - logPrintf(WARN, "HID++ 2.0 error ignored on detached thread: %s", + logPrintf(WARN, "HID++ 2.0 error ignored on detached thread/task: %s", error.what()); } catch(std::system_error& e) { - logPrintf(WARN, "System error ignored on detached thread: %s", + logPrintf(WARN, "System error ignored on detached thread/task: %s", error.what()); } catch(std::exception& e) { - logPrintf(WARN, "Error ignored on detached thread: %s", + logPrintf(WARN, "Error ignored on detached thread/task: %s", error.what()); } } \ No newline at end of file diff --git a/src/logid/util/task.cpp b/src/logid/util/task.cpp new file mode 100644 index 0000000..021e0fb --- /dev/null +++ b/src/logid/util/task.cpp @@ -0,0 +1,60 @@ +/* + * Copyright 2019-2020 PixlOne + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ +#include "task.h" +#include "workqueue.h" + +using namespace logid; + +task::task(const std::function& function, + const std::function& exception_handler) : + _function (std::make_shared>(function)), + _exception_handler (std::make_shared> + (exception_handler)), _status (Waiting), + _task_pkg ([this](){ + try { + (*_function)(); + } catch(std::exception& e) { + (*_exception_handler)(e); + } + }) +{ +} + +void task::run() +{ + _status = Running; + _task_pkg(); + _status = Completed; +} + +task::Status task::getStatus() +{ + return _status; +} + +void task::wait() +{ + _task_pkg.get_future().wait(); +} + +void task::spawn(const std::function& function, + const std::function& exception_handler) +{ + auto t = std::make_shared(function, exception_handler); + global_workqueue->queue(t); +} \ No newline at end of file diff --git a/src/logid/util/task.h b/src/logid/util/task.h new file mode 100644 index 0000000..100a938 --- /dev/null +++ b/src/logid/util/task.h @@ -0,0 +1,67 @@ +/* + * Copyright 2019-2020 PixlOne + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ +#ifndef LOGID_TASK_H +#define LOGID_TASK_H + +#include +#include +#include +#include "ExceptionHandler.h" + +namespace logid +{ + class task + { + public: + enum Status + { + Waiting, + Running, + Completed + }; + + explicit task(const std::function& function, + const std::function& + exception_handler={[](std::exception& e) + {ExceptionHandler::Default(e);}}); + + Status getStatus(); + + void run(); // Runs synchronously + void wait(); + + /* This function spawns a new task into the least used worker queue + * and forgets about it. + */ + static void spawn(const std::function& function, + const std::function& + exception_handler={[](std::exception& e) + {ExceptionHandler::Default(e);}}); + + static void autoQueue(std::shared_ptr); + + private: + std::shared_ptr> _function; + std::shared_ptr> + _exception_handler; + std::atomic _status; + std::packaged_task _task_pkg; + }; +} + +#endif //LOGID_TASK_H diff --git a/src/logid/util/worker_thread.cpp b/src/logid/util/worker_thread.cpp new file mode 100644 index 0000000..116e991 --- /dev/null +++ b/src/logid/util/worker_thread.cpp @@ -0,0 +1,88 @@ +/* + * Copyright 2019-2020 PixlOne + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ +#include +#include "worker_thread.h" +#include "log.h" +#include "workqueue.h" + +using namespace logid; + +worker_thread::worker_thread(workqueue* parent, std::size_t worker_number) : +_parent (parent), _worker_number (worker_number), _continue_run (false), +_thread (std::make_unique ([this](){ + _run(); }, [this](std::exception& e){ _exception_handler(e); })) +{ + _thread->run(); +} + +worker_thread::~worker_thread() +{ + _continue_run = false; + _queue_cv.notify_all(); + // Block until task is complete + std::unique_lock lock(_busy); + + while(!_queue.empty()) { + _parent->queue(_queue.front()); + _queue.pop(); + } +} + +void worker_thread::queue(std::shared_ptr t) +{ + _queue.push(t); + _queue_cv.notify_all(); +} + +bool worker_thread::busy() +{ + bool not_busy = _busy.try_lock(); + + if(not_busy) + _busy.unlock(); + + return !not_busy; +} + +void worker_thread::_run() +{ + std::unique_lock lock(_run_lock); + _continue_run = true; + while(_continue_run) { + _parent->busyUpdate(); + _queue_cv.wait(lock, [this]{ return !_queue.empty() || + !_continue_run; }); + if(!_continue_run) + return; + std::unique_lock busy_lock(_busy); + while(!_queue.empty()) { + _queue.front()->run(); + _queue.pop(); + } + } +} + +void worker_thread::_exception_handler(std::exception &e) +{ + logPrintf(WARN, "Exception caught on worker thread %d, restarting: %s", + _worker_number, e.what()); + // This action destroys the logid::thread, std::thread should detach safely. + _thread = std::make_unique([this](){ _run(); }, + [this](std::exception& e) { _exception_handler(e); }); + _thread->run(); +} \ No newline at end of file diff --git a/src/logid/util/worker_thread.h b/src/logid/util/worker_thread.h new file mode 100644 index 0000000..c4305e5 --- /dev/null +++ b/src/logid/util/worker_thread.h @@ -0,0 +1,56 @@ +/* + * Copyright 2019-2020 PixlOne + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ +#ifndef LOGID_WORKER_THREAD_H +#define LOGID_WORKER_THREAD_H + +#include "mutex_queue.h" +#include "task.h" +#include "thread.h" + +namespace logid +{ + class workqueue; + + class worker_thread + { + public: + worker_thread(workqueue* parent, std::size_t worker_number); + ~worker_thread(); + + void queue(std::shared_ptr t); + + bool busy(); + private: + void _run(); + void _exception_handler(std::exception& e); + + workqueue* _parent; + std::size_t _worker_number; + + std::unique_ptr _thread; + std::mutex _busy; + + std::mutex _run_lock; + std::atomic _continue_run; + std::condition_variable _queue_cv; + + mutex_queue> _queue; + }; +} + +#endif //LOGID_WORKER_THREAD_H diff --git a/src/logid/util/workqueue.cpp b/src/logid/util/workqueue.cpp new file mode 100644 index 0000000..14c1f79 --- /dev/null +++ b/src/logid/util/workqueue.cpp @@ -0,0 +1,151 @@ +/* + * Copyright 2019-2020 PixlOne + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ +#include "workqueue.h" +#include "log.h" + +using namespace logid; + +workqueue::workqueue(std::size_t thread_count) : _manager_thread ( + std::make_unique( + [this](){ _run(); } + , [this](std::exception& e){ _exception_handler(e); } + )), _continue_run (false), _worker_count (thread_count) +{ + _workers.reserve(_worker_count); + for(std::size_t i = 0; i < thread_count; i++) + _workers.push_back(std::make_unique(this, i)); + _manager_thread->run(); +} + +workqueue::~workqueue() +{ + stop(); + + while(_workers.empty()) + _workers.pop_back(); + + // Queue should have been empty before, but just confirm here. + while(!_queue.empty()) { + thread::spawn([t=_queue.front()](){ t->run(); }); + _queue.pop(); + } +} + +void workqueue::queue(std::shared_ptr t) +{ + _queue.push(t); + _queue_cv.notify_all(); +} + +void workqueue::busyUpdate() +{ + _busy_cv.notify_all(); +} + +void workqueue::stop() +{ + _continue_run = false; + std::unique_lock lock(_run_lock); +} + +void workqueue::setThreadCount(std::size_t count) +{ + while(_workers.size() < count) + _workers.push_back(std::make_unique(this, + _workers.size())); + + if(_workers.size() > count) { + // Restart manager thread + stop(); + while (_workers.size() > count) + _workers.pop_back(); + _manager_thread = std::make_unique( + [this](){ _run(); } + , [this](std::exception& e){ _exception_handler(e); } + ); + _manager_thread->run(); + } + + _worker_count = count; +} + +std::size_t workqueue::threadCount() const +{ + return _workers.size(); +} + +void workqueue::_run() +{ + using namespace std::chrono_literals; + + std::unique_lock lock(_run_lock); + _continue_run = true; + while(_continue_run) { + _queue_cv.wait(lock, [this]{ return !(_queue.empty()); }); + while(!_queue.empty()) { + + if(_workers.empty()) { + if(_worker_count) + logPrintf(DEBUG, "No workers were found, running task in" + " a new thread."); + thread::spawn([t=_queue.front()](){ t->run(); }); + _queue.pop(); + continue; + } + + auto worker = _workers.begin(); + for(; worker != _workers.end(); worker++) { + if(!(*worker)->busy()) + break; + } + if(worker != _workers.end()) + (*worker)->queue(_queue.front()); + else { + _busy_cv.wait_for(lock, 500ms, [this, &worker]{ + for(worker = _workers.begin(); worker != _workers.end(); + worker++) { + if (!(*worker)->busy()) { + return true; + } + } + return false; + }); + + if(worker != _workers.end()) + (*worker)->queue(_queue.front()); + else{ + // Workers busy, launch in new thread + logPrintf(DEBUG, "All workers were busy for 500ms, " + "running task in new thread."); + thread::spawn([t = _queue.front()]() { t->run(); }); + } + } + _queue.pop(); + } + } +} + +void workqueue::_exception_handler(std::exception &e) +{ + logPrintf(WARN, "Exception caught on workqueue manager thread, " + "restarting: %s" , e.what()); + // This action destroys the logid::thread, std::thread should detach safely. + _manager_thread = std::make_unique([this](){ _run(); }, + [this](std::exception& e) { _exception_handler(e); }); + _manager_thread->run(); +} \ No newline at end of file diff --git a/src/logid/util/workqueue.h b/src/logid/util/workqueue.h new file mode 100644 index 0000000..2234cf3 --- /dev/null +++ b/src/logid/util/workqueue.h @@ -0,0 +1,59 @@ +/* + * Copyright 2019-2020 PixlOne + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ +#ifndef LOGID_WORKQUEUE_H +#define LOGID_WORKQUEUE_H + +#include "worker_thread.h" +#include "thread.h" + +namespace logid +{ + class workqueue + { + public: + explicit workqueue(std::size_t thread_count); + ~workqueue(); + + void queue(std::shared_ptr t); + + void busyUpdate(); + + void stop(); + + void setThreadCount(std::size_t count); + std::size_t threadCount() const; + private: + void _run(); + + void _exception_handler(std::exception& e); + std::unique_ptr _manager_thread; + + mutex_queue> _queue; + std::condition_variable _queue_cv; + std::condition_variable _busy_cv; + std::mutex _run_lock; + std::atomic _continue_run; + + std::vector> _workers; + std::size_t _worker_count; + }; + + extern std::unique_ptr global_workqueue; +} + +#endif //LOGID_WORKQUEUE_H