Properly output TimeoutError

Fixed issue where receiver devices aren't detected (hopefully).
This commit is contained in:
pixl 2020-07-12 02:51:56 -04:00
parent 9ad8ce0fea
commit f7638b0905
No known key found for this signature in database
GPG Key ID: 1866C148CD593B6E
10 changed files with 64 additions and 38 deletions

View File

@ -25,7 +25,7 @@
#include <chrono> #include <chrono>
#define LOGID_DEFAULT_RAWDEVICE_TIMEOUT std::chrono::seconds(2) #define LOGID_DEFAULT_RAWDEVICE_TIMEOUT std::chrono::seconds(2)
#define LOGID_DEFAULT_WORKER_COUNT 2 #define LOGID_DEFAULT_WORKER_COUNT 4
namespace logid namespace logid
{ {

View File

@ -24,6 +24,7 @@
#include "util/log.h" #include "util/log.h"
#include "backend/hidpp10/Error.h" #include "backend/hidpp10/Error.h"
#include "backend/dj/Receiver.h" #include "backend/dj/Receiver.h"
#include "backend/Error.h"
#define NON_WIRELESS_DEV(index) (index) == HIDPP::DefaultDevice ? "default" : "corded" #define NON_WIRELESS_DEV(index) (index) == HIDPP::DefaultDevice ? "default" : "corded"
@ -46,6 +47,9 @@ void DeviceManager::addDevice(std::string path)
logPrintf(WARN, "I/O error on %s: %s, skipping device.", logPrintf(WARN, "I/O error on %s: %s, skipping device.",
path.c_str(), e.what()); path.c_str(), e.what());
return; return;
} catch (TimeoutError &e) {
logPrintf(WARN, "Device %s timed out.", path.c_str());
defaultExists = false;
} }
if(isReceiver) { if(isReceiver) {

View File

@ -160,7 +160,7 @@ void DeviceMonitor::enumerate()
task::spawn([this, name=devnode]() { task::spawn([this, name=devnode]() {
this->addDevice(name); this->addDevice(name);
}, [name=devnode](std::exception& e){ }, [name=devnode](std::exception& e){
logPrintf(ERROR, "Error adding device %s: %s", logPrintf(WARN, "Error adding device %s: %s",
name.c_str(), e.what()); name.c_str(), e.what());
}); });
} }

View File

@ -24,6 +24,8 @@
#include "../hidpp/Report.h" #include "../hidpp/Report.h"
#include "../../Configuration.h" #include "../../Configuration.h"
#include "../../util/thread.h" #include "../../util/thread.h"
#include "../../util/task.h"
#include "../../util/workqueue.h"
#include <string> #include <string>
#include <system_error> #include <system_error>
@ -63,7 +65,7 @@ bool RawDevice::supportedReport(uint8_t id, uint8_t length)
} }
RawDevice::RawDevice(std::string path) : _path (std::move(path)), RawDevice::RawDevice(std::string path) : _path (std::move(path)),
_continue_listen (false) _continue_listen (false), _continue_respond (false)
{ {
int ret; int ret;
@ -151,15 +153,41 @@ std::vector<uint8_t> RawDevice::sendReport(const std::vector<uint8_t>& report)
/* If the listener will stop, handle I/O manually. /* If the listener will stop, handle I/O manually.
* Otherwise, push to queue and wait for result. */ * Otherwise, push to queue and wait for result. */
if(_continue_listen) { if(_continue_listen) {
std::packaged_task<std::vector<uint8_t>()> task( [this, report]() { std::mutex send_report;
std::unique_lock<std::mutex> lock(send_report);
std::condition_variable cv;
bool top_of_queue = false;
std::packaged_task<std::vector<uint8_t>()> task( [this, report, &cv,
&top_of_queue] () {
top_of_queue = true;
cv.notify_all();
return this->_respondToReport(report); return this->_respondToReport(report);
}); });
auto f = task.get_future(); auto f = task.get_future();
_io_queue.push(&task); _io_queue.push(&task);
cv.wait(lock, [&top_of_queue]{ return top_of_queue; });
auto status = f.wait_for(global_config->ioTimeout());
if(status == std::future_status::timeout) {
_continue_respond = false;
throw TimeoutError();
}
return f.get(); return f.get();
} }
else else {
return _respondToReport(report); std::vector<uint8_t> response;
std::shared_ptr<task> t = std::make_shared<task>(
[this, report, &response]() {
response = _respondToReport(report);
});
global_workqueue->queue(t);
t->waitStart();
auto status = t->waitFor(global_config->ioTimeout());
if(status == std::future_status::timeout) {
_continue_respond = false;
throw TimeoutError();
} else
return response;
}
} }
// DJ commands are not systematically acknowledged, do not expect a result. // DJ commands are not systematically acknowledged, do not expect a result.
@ -184,7 +212,8 @@ std::vector<uint8_t> RawDevice::_respondToReport
(const std::vector<uint8_t>& request) (const std::vector<uint8_t>& request)
{ {
_sendReport(request); _sendReport(request);
while(true) { _continue_respond = true;
while(_continue_respond) {
std::vector<uint8_t> response; std::vector<uint8_t> response;
_readReport(response, MAX_DATA_LENGTH); _readReport(response, MAX_DATA_LENGTH);
@ -227,6 +256,8 @@ std::vector<uint8_t> RawDevice::_respondToReport
if(_continue_listen) if(_continue_listen)
this->_handleEvent(response); this->_handleEvent(response);
} }
return {};
} }
int RawDevice::_sendReport(const std::vector<uint8_t>& report) int RawDevice::_sendReport(const std::vector<uint8_t>& report)

View File

@ -75,6 +75,7 @@ namespace raw
std::vector<uint8_t> rdesc; std::vector<uint8_t> rdesc;
std::atomic<bool> _continue_listen; std::atomic<bool> _continue_listen;
std::atomic<bool> _continue_respond;
std::condition_variable _listen_condition; std::condition_variable _listen_condition;
std::map<std::string, std::shared_ptr<RawEventHandler>> std::map<std::string, std::shared_ptr<RawEventHandler>>

View File

@ -43,7 +43,7 @@ LogLevel logid::global_loglevel = INFO;
std::shared_ptr<Configuration> logid::global_config; std::shared_ptr<Configuration> logid::global_config;
std::unique_ptr<DeviceManager> logid::device_manager; std::unique_ptr<DeviceManager> logid::device_manager;
std::unique_ptr<InputDevice> logid::virtual_input; std::unique_ptr<InputDevice> logid::virtual_input;
std::unique_ptr<workqueue> logid::global_workqueue; std::shared_ptr<workqueue> logid::global_workqueue;
bool logid::kill_logid = false; bool logid::kill_logid = false;
std::mutex logid::device_manager_reload; std::mutex logid::device_manager_reload;
@ -157,8 +157,6 @@ Possible options are:
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
global_workqueue = std::make_unique<workqueue>(LOGID_DEFAULT_WORKER_COUNT);
readCliOptions(argc, argv); readCliOptions(argc, argv);
// Read config // Read config
@ -168,8 +166,7 @@ int main(int argc, char** argv)
catch (std::exception &e) { catch (std::exception &e) {
global_config = std::make_shared<Configuration>(); global_config = std::make_shared<Configuration>();
} }
global_workqueue = std::make_shared<workqueue>(global_config->workerCount());
global_workqueue->setThreadCount(global_config->workerCount());
//Create a virtual input device //Create a virtual input device
try { try {

View File

@ -38,6 +38,7 @@ task::task(const std::function<void()>& function,
void task::run() void task::run()
{ {
_status = Running; _status = Running;
_status_cv.notify_all();
_task_pkg(); _task_pkg();
_status = Completed; _status = Completed;
} }
@ -52,6 +53,18 @@ void task::wait()
_task_pkg.get_future().wait(); _task_pkg.get_future().wait();
} }
void task::waitStart()
{
std::mutex wait_start;
std::unique_lock<std::mutex> lock(wait_start);
_status_cv.wait(lock, [this](){ return _status != Waiting; });
}
std::future_status task::waitFor(std::chrono::milliseconds ms)
{
return _task_pkg.get_future().wait_for(ms);
}
void task::spawn(const std::function<void ()>& function, void task::spawn(const std::function<void ()>& function,
const std::function<void (std::exception &)>& exception_handler) const std::function<void (std::exception &)>& exception_handler)
{ {

View File

@ -44,6 +44,8 @@ namespace logid
void run(); // Runs synchronously void run(); // Runs synchronously
void wait(); void wait();
void waitStart();
std::future_status waitFor(std::chrono::milliseconds ms);
/* This function spawns a new task into the least used worker queue /* This function spawns a new task into the least used worker queue
* and forgets about it. * and forgets about it.
@ -53,13 +55,12 @@ namespace logid
exception_handler={[](std::exception& e) exception_handler={[](std::exception& e)
{ExceptionHandler::Default(e);}}); {ExceptionHandler::Default(e);}});
static void autoQueue(std::shared_ptr<task>);
private: private:
std::shared_ptr<std::function<void()>> _function; std::shared_ptr<std::function<void()>> _function;
std::shared_ptr<std::function<void(std::exception&)>> std::shared_ptr<std::function<void(std::exception&)>>
_exception_handler; _exception_handler;
std::atomic<Status> _status; std::atomic<Status> _status;
std::condition_variable _status_cv;
std::packaged_task<void()> _task_pkg; std::packaged_task<void()> _task_pkg;
}; };
} }

View File

@ -15,6 +15,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
* *
*/ */
#include <cassert>
#include "workqueue.h" #include "workqueue.h"
#include "log.h" #include "log.h"
@ -48,6 +49,7 @@ workqueue::~workqueue()
void workqueue::queue(std::shared_ptr<task> t) void workqueue::queue(std::shared_ptr<task> t)
{ {
assert(t != nullptr);
_queue.push(t); _queue.push(t);
_queue_cv.notify_all(); _queue_cv.notify_all();
} }
@ -63,27 +65,6 @@ void workqueue::stop()
std::unique_lock<std::mutex> lock(_run_lock); std::unique_lock<std::mutex> lock(_run_lock);
} }
void workqueue::setThreadCount(std::size_t count)
{
while(_workers.size() < count)
_workers.push_back(std::make_unique<worker_thread>(this,
_workers.size()));
if(_workers.size() > count) {
// Restart manager thread
stop();
while (_workers.size() > count)
_workers.pop_back();
_manager_thread = std::make_unique<thread>(
[this](){ _run(); }
, [this](std::exception& e){ _exception_handler(e); }
);
_manager_thread->run();
}
_worker_count = count;
}
std::size_t workqueue::threadCount() const std::size_t workqueue::threadCount() const
{ {
return _workers.size(); return _workers.size();
@ -98,7 +79,6 @@ void workqueue::_run()
while(_continue_run) { while(_continue_run) {
_queue_cv.wait(lock, [this]{ return !(_queue.empty()); }); _queue_cv.wait(lock, [this]{ return !(_queue.empty()); });
while(!_queue.empty()) { while(!_queue.empty()) {
if(_workers.empty()) { if(_workers.empty()) {
if(_worker_count) if(_worker_count)
logPrintf(DEBUG, "No workers were found, running task in" logPrintf(DEBUG, "No workers were found, running task in"

View File

@ -35,7 +35,6 @@ namespace logid
void stop(); void stop();
void setThreadCount(std::size_t count);
std::size_t threadCount() const; std::size_t threadCount() const;
private: private:
void _run(); void _run();
@ -53,7 +52,7 @@ namespace logid
std::size_t _worker_count; std::size_t _worker_count;
}; };
extern std::unique_ptr<workqueue> global_workqueue; extern std::shared_ptr<workqueue> global_workqueue;
} }
#endif //LOGID_WORKQUEUE_H #endif //LOGID_WORKQUEUE_H