From f7638b09059efe303335f094169fd8e9a9e05b6f Mon Sep 17 00:00:00 2001 From: pixl Date: Sun, 12 Jul 2020 02:51:56 -0400 Subject: [PATCH] Properly output TimeoutError Fixed issue where receiver devices aren't detected (hopefully). --- src/logid/Configuration.h | 2 +- src/logid/DeviceManager.cpp | 4 +++ src/logid/backend/raw/DeviceMonitor.cpp | 2 +- src/logid/backend/raw/RawDevice.cpp | 41 ++++++++++++++++++++++--- src/logid/backend/raw/RawDevice.h | 1 + src/logid/logid.cpp | 7 ++--- src/logid/util/task.cpp | 13 ++++++++ src/logid/util/task.h | 5 +-- src/logid/util/workqueue.cpp | 24 ++------------- src/logid/util/workqueue.h | 3 +- 10 files changed, 64 insertions(+), 38 deletions(-) diff --git a/src/logid/Configuration.h b/src/logid/Configuration.h index b05da63..3666fbc 100644 --- a/src/logid/Configuration.h +++ b/src/logid/Configuration.h @@ -25,7 +25,7 @@ #include #define LOGID_DEFAULT_RAWDEVICE_TIMEOUT std::chrono::seconds(2) -#define LOGID_DEFAULT_WORKER_COUNT 2 +#define LOGID_DEFAULT_WORKER_COUNT 4 namespace logid { diff --git a/src/logid/DeviceManager.cpp b/src/logid/DeviceManager.cpp index 95448b7..da05400 100644 --- a/src/logid/DeviceManager.cpp +++ b/src/logid/DeviceManager.cpp @@ -24,6 +24,7 @@ #include "util/log.h" #include "backend/hidpp10/Error.h" #include "backend/dj/Receiver.h" +#include "backend/Error.h" #define NON_WIRELESS_DEV(index) (index) == HIDPP::DefaultDevice ? "default" : "corded" @@ -46,6 +47,9 @@ void DeviceManager::addDevice(std::string path) logPrintf(WARN, "I/O error on %s: %s, skipping device.", path.c_str(), e.what()); return; + } catch (TimeoutError &e) { + logPrintf(WARN, "Device %s timed out.", path.c_str()); + defaultExists = false; } if(isReceiver) { diff --git a/src/logid/backend/raw/DeviceMonitor.cpp b/src/logid/backend/raw/DeviceMonitor.cpp index 278f4ca..54dde67 100644 --- a/src/logid/backend/raw/DeviceMonitor.cpp +++ b/src/logid/backend/raw/DeviceMonitor.cpp @@ -160,7 +160,7 @@ void DeviceMonitor::enumerate() task::spawn([this, name=devnode]() { this->addDevice(name); }, [name=devnode](std::exception& e){ - logPrintf(ERROR, "Error adding device %s: %s", + logPrintf(WARN, "Error adding device %s: %s", name.c_str(), e.what()); }); } diff --git a/src/logid/backend/raw/RawDevice.cpp b/src/logid/backend/raw/RawDevice.cpp index 8019aa5..6859271 100644 --- a/src/logid/backend/raw/RawDevice.cpp +++ b/src/logid/backend/raw/RawDevice.cpp @@ -24,6 +24,8 @@ #include "../hidpp/Report.h" #include "../../Configuration.h" #include "../../util/thread.h" +#include "../../util/task.h" +#include "../../util/workqueue.h" #include #include @@ -63,7 +65,7 @@ bool RawDevice::supportedReport(uint8_t id, uint8_t length) } RawDevice::RawDevice(std::string path) : _path (std::move(path)), - _continue_listen (false) + _continue_listen (false), _continue_respond (false) { int ret; @@ -151,15 +153,41 @@ std::vector RawDevice::sendReport(const std::vector& report) /* If the listener will stop, handle I/O manually. * Otherwise, push to queue and wait for result. */ if(_continue_listen) { - std::packaged_task()> task( [this, report]() { + std::mutex send_report; + std::unique_lock lock(send_report); + std::condition_variable cv; + bool top_of_queue = false; + std::packaged_task()> task( [this, report, &cv, + &top_of_queue] () { + top_of_queue = true; + cv.notify_all(); return this->_respondToReport(report); }); auto f = task.get_future(); _io_queue.push(&task); + cv.wait(lock, [&top_of_queue]{ return top_of_queue; }); + auto status = f.wait_for(global_config->ioTimeout()); + if(status == std::future_status::timeout) { + _continue_respond = false; + throw TimeoutError(); + } return f.get(); } - else - return _respondToReport(report); + else { + std::vector response; + std::shared_ptr t = std::make_shared( + [this, report, &response]() { + response = _respondToReport(report); + }); + global_workqueue->queue(t); + t->waitStart(); + auto status = t->waitFor(global_config->ioTimeout()); + if(status == std::future_status::timeout) { + _continue_respond = false; + throw TimeoutError(); + } else + return response; + } } // DJ commands are not systematically acknowledged, do not expect a result. @@ -184,7 +212,8 @@ std::vector RawDevice::_respondToReport (const std::vector& request) { _sendReport(request); - while(true) { + _continue_respond = true; + while(_continue_respond) { std::vector response; _readReport(response, MAX_DATA_LENGTH); @@ -227,6 +256,8 @@ std::vector RawDevice::_respondToReport if(_continue_listen) this->_handleEvent(response); } + + return {}; } int RawDevice::_sendReport(const std::vector& report) diff --git a/src/logid/backend/raw/RawDevice.h b/src/logid/backend/raw/RawDevice.h index 4c7dec9..dca622e 100644 --- a/src/logid/backend/raw/RawDevice.h +++ b/src/logid/backend/raw/RawDevice.h @@ -75,6 +75,7 @@ namespace raw std::vector rdesc; std::atomic _continue_listen; + std::atomic _continue_respond; std::condition_variable _listen_condition; std::map> diff --git a/src/logid/logid.cpp b/src/logid/logid.cpp index 0d3bc34..42c75e4 100644 --- a/src/logid/logid.cpp +++ b/src/logid/logid.cpp @@ -43,7 +43,7 @@ LogLevel logid::global_loglevel = INFO; std::shared_ptr logid::global_config; std::unique_ptr logid::device_manager; std::unique_ptr logid::virtual_input; -std::unique_ptr logid::global_workqueue; +std::shared_ptr logid::global_workqueue; bool logid::kill_logid = false; std::mutex logid::device_manager_reload; @@ -157,8 +157,6 @@ Possible options are: int main(int argc, char** argv) { - global_workqueue = std::make_unique(LOGID_DEFAULT_WORKER_COUNT); - readCliOptions(argc, argv); // Read config @@ -168,8 +166,7 @@ int main(int argc, char** argv) catch (std::exception &e) { global_config = std::make_shared(); } - - global_workqueue->setThreadCount(global_config->workerCount()); + global_workqueue = std::make_shared(global_config->workerCount()); //Create a virtual input device try { diff --git a/src/logid/util/task.cpp b/src/logid/util/task.cpp index 021e0fb..79432d5 100644 --- a/src/logid/util/task.cpp +++ b/src/logid/util/task.cpp @@ -38,6 +38,7 @@ task::task(const std::function& function, void task::run() { _status = Running; + _status_cv.notify_all(); _task_pkg(); _status = Completed; } @@ -52,6 +53,18 @@ void task::wait() _task_pkg.get_future().wait(); } +void task::waitStart() +{ + std::mutex wait_start; + std::unique_lock lock(wait_start); + _status_cv.wait(lock, [this](){ return _status != Waiting; }); +} + +std::future_status task::waitFor(std::chrono::milliseconds ms) +{ + return _task_pkg.get_future().wait_for(ms); +} + void task::spawn(const std::function& function, const std::function& exception_handler) { diff --git a/src/logid/util/task.h b/src/logid/util/task.h index 100a938..91157a6 100644 --- a/src/logid/util/task.h +++ b/src/logid/util/task.h @@ -44,6 +44,8 @@ namespace logid void run(); // Runs synchronously void wait(); + void waitStart(); + std::future_status waitFor(std::chrono::milliseconds ms); /* This function spawns a new task into the least used worker queue * and forgets about it. @@ -53,13 +55,12 @@ namespace logid exception_handler={[](std::exception& e) {ExceptionHandler::Default(e);}}); - static void autoQueue(std::shared_ptr); - private: std::shared_ptr> _function; std::shared_ptr> _exception_handler; std::atomic _status; + std::condition_variable _status_cv; std::packaged_task _task_pkg; }; } diff --git a/src/logid/util/workqueue.cpp b/src/logid/util/workqueue.cpp index 14c1f79..a7e04f7 100644 --- a/src/logid/util/workqueue.cpp +++ b/src/logid/util/workqueue.cpp @@ -15,6 +15,7 @@ * along with this program. If not, see . * */ +#include #include "workqueue.h" #include "log.h" @@ -48,6 +49,7 @@ workqueue::~workqueue() void workqueue::queue(std::shared_ptr t) { + assert(t != nullptr); _queue.push(t); _queue_cv.notify_all(); } @@ -63,27 +65,6 @@ void workqueue::stop() std::unique_lock lock(_run_lock); } -void workqueue::setThreadCount(std::size_t count) -{ - while(_workers.size() < count) - _workers.push_back(std::make_unique(this, - _workers.size())); - - if(_workers.size() > count) { - // Restart manager thread - stop(); - while (_workers.size() > count) - _workers.pop_back(); - _manager_thread = std::make_unique( - [this](){ _run(); } - , [this](std::exception& e){ _exception_handler(e); } - ); - _manager_thread->run(); - } - - _worker_count = count; -} - std::size_t workqueue::threadCount() const { return _workers.size(); @@ -98,7 +79,6 @@ void workqueue::_run() while(_continue_run) { _queue_cv.wait(lock, [this]{ return !(_queue.empty()); }); while(!_queue.empty()) { - if(_workers.empty()) { if(_worker_count) logPrintf(DEBUG, "No workers were found, running task in" diff --git a/src/logid/util/workqueue.h b/src/logid/util/workqueue.h index 2234cf3..301b60b 100644 --- a/src/logid/util/workqueue.h +++ b/src/logid/util/workqueue.h @@ -35,7 +35,6 @@ namespace logid void stop(); - void setThreadCount(std::size_t count); std::size_t threadCount() const; private: void _run(); @@ -53,7 +52,7 @@ namespace logid std::size_t _worker_count; }; - extern std::unique_ptr global_workqueue; + extern std::shared_ptr global_workqueue; } #endif //LOGID_WORKQUEUE_H