Add workqueue system
This commit is contained in:
		@@ -48,6 +48,9 @@ add_executable(logid
 | 
			
		||||
        backend/hidpp20/features/HiresScroll.cpp
 | 
			
		||||
        backend/dj/Report.cpp
 | 
			
		||||
        util/mutex_queue.h
 | 
			
		||||
        util/workqueue.cpp
 | 
			
		||||
        util/worker_thread.cpp
 | 
			
		||||
        util/task.cpp
 | 
			
		||||
        util/thread.cpp
 | 
			
		||||
        util/ExceptionHandler.cpp)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -50,12 +50,28 @@ Configuration::Configuration(const std::string& config_file)
 | 
			
		||||
        return;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    _worker_threads = LOGID_DEFAULT_WORKER_COUNT;
 | 
			
		||||
    try {
 | 
			
		||||
        auto& worker_count = root["workers"];
 | 
			
		||||
        if(worker_count.getType() == Setting::TypeInt) {
 | 
			
		||||
            _worker_threads = worker_count;
 | 
			
		||||
            if(_worker_threads < 0)
 | 
			
		||||
                logPrintf(WARN, "Line %d: workers cannot be negative.",
 | 
			
		||||
                        worker_count.getSourceLine());
 | 
			
		||||
        } else {
 | 
			
		||||
            logPrintf(WARN, "Line %d: workers must be an integer.",
 | 
			
		||||
                    worker_count.getSourceLine());
 | 
			
		||||
        }
 | 
			
		||||
    } catch(const SettingNotFoundException& e) {
 | 
			
		||||
        // Ignore
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    _io_timeout = LOGID_DEFAULT_RAWDEVICE_TIMEOUT;
 | 
			
		||||
    try {
 | 
			
		||||
        auto& timeout = root["io_timeout"];
 | 
			
		||||
        if(timeout.isNumber()) {
 | 
			
		||||
            auto t = timeout.getType();
 | 
			
		||||
            if(timeout.getType() == libconfig::Setting::TypeFloat)
 | 
			
		||||
            if(timeout.getType() == Setting::TypeFloat)
 | 
			
		||||
                _io_timeout = duration_cast<milliseconds>(
 | 
			
		||||
                        duration<double, std::milli>(timeout));
 | 
			
		||||
            else
 | 
			
		||||
@@ -109,6 +125,11 @@ const char * Configuration::DeviceNotFound::what() const noexcept
 | 
			
		||||
    return _name.c_str();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int Configuration::workerCount() const
 | 
			
		||||
{
 | 
			
		||||
    return _worker_threads;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
std::chrono::milliseconds Configuration::ioTimeout() const
 | 
			
		||||
{
 | 
			
		||||
    return _io_timeout;
 | 
			
		||||
 
 | 
			
		||||
@@ -25,6 +25,7 @@
 | 
			
		||||
#include <chrono>
 | 
			
		||||
 | 
			
		||||
#define LOGID_DEFAULT_RAWDEVICE_TIMEOUT std::chrono::seconds(2)
 | 
			
		||||
#define LOGID_DEFAULT_WORKER_COUNT 2
 | 
			
		||||
 | 
			
		||||
namespace logid
 | 
			
		||||
{
 | 
			
		||||
@@ -46,9 +47,11 @@ namespace logid
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        std::chrono::milliseconds ioTimeout() const;
 | 
			
		||||
        int workerCount() const;
 | 
			
		||||
    private:
 | 
			
		||||
        std::map<std::string, std::string> _device_paths;
 | 
			
		||||
        std::chrono::milliseconds _io_timeout;
 | 
			
		||||
        int _worker_threads;
 | 
			
		||||
        libconfig::Config _config;
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -18,7 +18,7 @@
 | 
			
		||||
#include "ToggleHiresScroll.h"
 | 
			
		||||
#include "../Device.h"
 | 
			
		||||
#include "../util/log.h"
 | 
			
		||||
#include "../util/thread.h"
 | 
			
		||||
#include "../util/task.h"
 | 
			
		||||
#include "../backend/hidpp20/features/ReprogControls.h"
 | 
			
		||||
 | 
			
		||||
using namespace logid::actions;
 | 
			
		||||
@@ -40,7 +40,7 @@ void ToggleHiresScroll::press()
 | 
			
		||||
    _pressed = true;
 | 
			
		||||
    if(_hires_scroll)
 | 
			
		||||
    {
 | 
			
		||||
        thread::spawn([hires=this->_hires_scroll](){
 | 
			
		||||
        task::spawn([hires=this->_hires_scroll](){
 | 
			
		||||
            auto mode = hires->getMode();
 | 
			
		||||
            mode ^= backend::hidpp20::HiresScroll::HiRes;
 | 
			
		||||
            hires->setMode(mode);
 | 
			
		||||
 
 | 
			
		||||
@@ -18,7 +18,7 @@
 | 
			
		||||
#include "ToggleSmartShift.h"
 | 
			
		||||
#include "../Device.h"
 | 
			
		||||
#include "../backend/hidpp20/features/ReprogControls.h"
 | 
			
		||||
#include "../util/thread.h"
 | 
			
		||||
#include "../util/task.h"
 | 
			
		||||
 | 
			
		||||
using namespace logid::actions;
 | 
			
		||||
using namespace logid::backend;
 | 
			
		||||
@@ -39,7 +39,7 @@ void ToggleSmartShift::press()
 | 
			
		||||
    _pressed = true;
 | 
			
		||||
    if(_smartshift)
 | 
			
		||||
    {
 | 
			
		||||
        thread::spawn([ss=this->_smartshift](){
 | 
			
		||||
        task::spawn([ss=this->_smartshift](){
 | 
			
		||||
            auto status = ss->getStatus();
 | 
			
		||||
            status.setActive = true;
 | 
			
		||||
            status.active = !status.active;
 | 
			
		||||
 
 | 
			
		||||
@@ -17,7 +17,7 @@
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
#include "ReceiverMonitor.h"
 | 
			
		||||
#include "../../util/thread.h"
 | 
			
		||||
#include "../../util/task.h"
 | 
			
		||||
#include "../../util/log.h"
 | 
			
		||||
 | 
			
		||||
#include <utility>
 | 
			
		||||
@@ -57,7 +57,7 @@ void ReceiverMonitor::run()
 | 
			
		||||
            /* Running in a new thread prevents deadlocks since the
 | 
			
		||||
             * receiver may be enumerating.
 | 
			
		||||
             */
 | 
			
		||||
            thread::spawn({[this, report]() {
 | 
			
		||||
            task::spawn({[this, report]() {
 | 
			
		||||
                if (report.subId() == Receiver::DeviceConnection)
 | 
			
		||||
                    this->addDevice(this->_receiver->deviceConnectionEvent
 | 
			
		||||
                    (report));
 | 
			
		||||
 
 | 
			
		||||
@@ -17,7 +17,7 @@
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
#include "DeviceMonitor.h"
 | 
			
		||||
#include "../../util/thread.h"
 | 
			
		||||
#include "../../util/task.h"
 | 
			
		||||
#include "../../util/log.h"
 | 
			
		||||
 | 
			
		||||
#include <thread>
 | 
			
		||||
@@ -98,14 +98,14 @@ void DeviceMonitor::run()
 | 
			
		||||
            std::string devnode = udev_device_get_devnode(device);
 | 
			
		||||
 | 
			
		||||
            if (action == "add")
 | 
			
		||||
                thread::spawn([this, name=devnode]() {
 | 
			
		||||
                task::spawn([this, name=devnode]() {
 | 
			
		||||
                    this->addDevice(name);
 | 
			
		||||
                }, [name=devnode](std::exception& e){
 | 
			
		||||
                    logPrintf(WARN, "Error adding device %s: %s",
 | 
			
		||||
                            name.c_str(), e.what());
 | 
			
		||||
                });
 | 
			
		||||
            else if (action == "remove")
 | 
			
		||||
                thread::spawn([this, name=devnode]() {
 | 
			
		||||
                task::spawn([this, name=devnode]() {
 | 
			
		||||
                    this->removeDevice(name);
 | 
			
		||||
                }, [name=devnode](std::exception& e){
 | 
			
		||||
                    logPrintf(WARN, "Error removing device %s: %s",
 | 
			
		||||
@@ -157,7 +157,7 @@ void DeviceMonitor::enumerate()
 | 
			
		||||
        std::string devnode = udev_device_get_devnode(device);
 | 
			
		||||
        udev_device_unref(device);
 | 
			
		||||
 | 
			
		||||
        thread::spawn([this, name=devnode]() {
 | 
			
		||||
        task::spawn([this, name=devnode]() {
 | 
			
		||||
            this->addDevice(name);
 | 
			
		||||
        }, [name=devnode](std::exception& e){
 | 
			
		||||
            logPrintf(ERROR, "Error adding device %s: %s",
 | 
			
		||||
 
 | 
			
		||||
@@ -25,6 +25,7 @@
 | 
			
		||||
#include "DeviceManager.h"
 | 
			
		||||
#include "logid.h"
 | 
			
		||||
#include "InputDevice.h"
 | 
			
		||||
#include "util/workqueue.h"
 | 
			
		||||
 | 
			
		||||
#define LOGID_VIRTUAL_INPUT_NAME "LogiOps Virtual Input"
 | 
			
		||||
#define DEFAULT_CONFIG_FILE "/etc/logid.cfg"
 | 
			
		||||
@@ -42,6 +43,7 @@ LogLevel logid::global_loglevel = INFO;
 | 
			
		||||
std::shared_ptr<Configuration> logid::global_config;
 | 
			
		||||
std::unique_ptr<DeviceManager> logid::device_manager;
 | 
			
		||||
std::unique_ptr<InputDevice> logid::virtual_input;
 | 
			
		||||
std::unique_ptr<workqueue> logid::global_workqueue;
 | 
			
		||||
 | 
			
		||||
bool logid::kill_logid = false;
 | 
			
		||||
std::mutex logid::device_manager_reload;
 | 
			
		||||
@@ -155,6 +157,8 @@ Possible options are:
 | 
			
		||||
 | 
			
		||||
int main(int argc, char** argv)
 | 
			
		||||
{
 | 
			
		||||
    global_workqueue = std::make_unique<workqueue>(LOGID_DEFAULT_WORKER_COUNT);
 | 
			
		||||
 | 
			
		||||
    readCliOptions(argc, argv);
 | 
			
		||||
 | 
			
		||||
    // Read config
 | 
			
		||||
@@ -165,6 +169,8 @@ int main(int argc, char** argv)
 | 
			
		||||
        global_config = std::make_shared<Configuration>();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    global_workqueue->setThreadCount(global_config->workerCount());
 | 
			
		||||
 | 
			
		||||
    //Create a virtual input device
 | 
			
		||||
    try {
 | 
			
		||||
        virtual_input = std::make_unique<InputDevice>(LOGID_VIRTUAL_INPUT_NAME);
 | 
			
		||||
 
 | 
			
		||||
@@ -28,16 +28,16 @@ void ExceptionHandler::Default(std::exception& error)
 | 
			
		||||
    try {
 | 
			
		||||
        throw error;
 | 
			
		||||
    } catch(backend::hidpp10::Error& e) {
 | 
			
		||||
        logPrintf(WARN, "HID++ 1.0 error ignored on detached thread: %s",
 | 
			
		||||
        logPrintf(WARN, "HID++ 1.0 error ignored on detached thread/task: %s",
 | 
			
		||||
                error.what());
 | 
			
		||||
    } catch(backend::hidpp20::Error& e) {
 | 
			
		||||
        logPrintf(WARN, "HID++ 2.0 error ignored on detached thread: %s",
 | 
			
		||||
        logPrintf(WARN, "HID++ 2.0 error ignored on detached thread/task: %s",
 | 
			
		||||
                   error.what());
 | 
			
		||||
    } catch(std::system_error& e) {
 | 
			
		||||
        logPrintf(WARN, "System error ignored on detached thread: %s",
 | 
			
		||||
        logPrintf(WARN, "System error ignored on detached thread/task: %s",
 | 
			
		||||
                   error.what());
 | 
			
		||||
    } catch(std::exception& e) {
 | 
			
		||||
        logPrintf(WARN, "Error ignored on detached thread: %s",
 | 
			
		||||
        logPrintf(WARN, "Error ignored on detached thread/task: %s",
 | 
			
		||||
                   error.what());
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										60
									
								
								src/logid/util/task.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										60
									
								
								src/logid/util/task.cpp
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,60 @@
 | 
			
		||||
/*
 | 
			
		||||
 * Copyright 2019-2020 PixlOne
 | 
			
		||||
 *
 | 
			
		||||
 * This program is free software: you can redistribute it and/or modify
 | 
			
		||||
 * it under the terms of the GNU General Public License as published by
 | 
			
		||||
 * the Free Software Foundation, either version 3 of the License, or
 | 
			
		||||
 * (at your option) any later version.
 | 
			
		||||
 *
 | 
			
		||||
 * This program is distributed in the hope that it will be useful,
 | 
			
		||||
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
			
		||||
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
			
		||||
 * GNU General Public License for more details.
 | 
			
		||||
 *
 | 
			
		||||
 * You should have received a copy of the GNU General Public License
 | 
			
		||||
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | 
			
		||||
 *
 | 
			
		||||
 */
 | 
			
		||||
#include "task.h"
 | 
			
		||||
#include "workqueue.h"
 | 
			
		||||
 | 
			
		||||
using namespace logid;
 | 
			
		||||
 | 
			
		||||
task::task(const std::function<void()>& function,
 | 
			
		||||
     const std::function<void(std::exception&)>& exception_handler) :
 | 
			
		||||
     _function (std::make_shared<std::function<void()>>(function)),
 | 
			
		||||
     _exception_handler (std::make_shared<std::function<void(std::exception&)>>
 | 
			
		||||
             (exception_handler)), _status (Waiting),
 | 
			
		||||
     _task_pkg ([this](){
 | 
			
		||||
         try {
 | 
			
		||||
             (*_function)();
 | 
			
		||||
         } catch(std::exception& e) {
 | 
			
		||||
             (*_exception_handler)(e);
 | 
			
		||||
         }
 | 
			
		||||
     })
 | 
			
		||||
{
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void task::run()
 | 
			
		||||
{
 | 
			
		||||
    _status = Running;
 | 
			
		||||
    _task_pkg();
 | 
			
		||||
    _status = Completed;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
task::Status task::getStatus()
 | 
			
		||||
{
 | 
			
		||||
    return _status;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void task::wait()
 | 
			
		||||
{
 | 
			
		||||
    _task_pkg.get_future().wait();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void task::spawn(const std::function<void ()>& function,
 | 
			
		||||
        const std::function<void (std::exception &)>& exception_handler)
 | 
			
		||||
{
 | 
			
		||||
    auto t = std::make_shared<task>(function, exception_handler);
 | 
			
		||||
    global_workqueue->queue(t);
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										67
									
								
								src/logid/util/task.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										67
									
								
								src/logid/util/task.h
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,67 @@
 | 
			
		||||
/*
 | 
			
		||||
 * Copyright 2019-2020 PixlOne
 | 
			
		||||
 *
 | 
			
		||||
 * This program is free software: you can redistribute it and/or modify
 | 
			
		||||
 * it under the terms of the GNU General Public License as published by
 | 
			
		||||
 * the Free Software Foundation, either version 3 of the License, or
 | 
			
		||||
 * (at your option) any later version.
 | 
			
		||||
 *
 | 
			
		||||
 * This program is distributed in the hope that it will be useful,
 | 
			
		||||
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
			
		||||
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
			
		||||
 * GNU General Public License for more details.
 | 
			
		||||
 *
 | 
			
		||||
 * You should have received a copy of the GNU General Public License
 | 
			
		||||
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | 
			
		||||
 *
 | 
			
		||||
 */
 | 
			
		||||
#ifndef LOGID_TASK_H
 | 
			
		||||
#define LOGID_TASK_H
 | 
			
		||||
 | 
			
		||||
#include <functional>
 | 
			
		||||
#include <memory>
 | 
			
		||||
#include <future>
 | 
			
		||||
#include "ExceptionHandler.h"
 | 
			
		||||
 | 
			
		||||
namespace logid
 | 
			
		||||
{
 | 
			
		||||
    class task
 | 
			
		||||
    {
 | 
			
		||||
    public:
 | 
			
		||||
        enum Status
 | 
			
		||||
        {
 | 
			
		||||
            Waiting,
 | 
			
		||||
            Running,
 | 
			
		||||
            Completed
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        explicit task(const std::function<void()>& function,
 | 
			
		||||
                        const std::function<void(std::exception&)>&
 | 
			
		||||
                        exception_handler={[](std::exception& e)
 | 
			
		||||
                                           {ExceptionHandler::Default(e);}});
 | 
			
		||||
 | 
			
		||||
        Status getStatus();
 | 
			
		||||
 | 
			
		||||
        void run(); // Runs synchronously
 | 
			
		||||
        void wait();
 | 
			
		||||
 | 
			
		||||
        /* This function spawns a new task into the least used worker queue
 | 
			
		||||
         * and forgets about it.
 | 
			
		||||
         */
 | 
			
		||||
        static void spawn(const std::function<void()>& function,
 | 
			
		||||
                          const std::function<void(std::exception&)>&
 | 
			
		||||
                          exception_handler={[](std::exception& e)
 | 
			
		||||
                                             {ExceptionHandler::Default(e);}});
 | 
			
		||||
 | 
			
		||||
        static void autoQueue(std::shared_ptr<task>);
 | 
			
		||||
 | 
			
		||||
    private:
 | 
			
		||||
        std::shared_ptr<std::function<void()>> _function;
 | 
			
		||||
        std::shared_ptr<std::function<void(std::exception&)>>
 | 
			
		||||
                _exception_handler;
 | 
			
		||||
        std::atomic<Status> _status;
 | 
			
		||||
        std::packaged_task<void()> _task_pkg;
 | 
			
		||||
    };
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#endif //LOGID_TASK_H
 | 
			
		||||
							
								
								
									
										88
									
								
								src/logid/util/worker_thread.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										88
									
								
								src/logid/util/worker_thread.cpp
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,88 @@
 | 
			
		||||
/*
 | 
			
		||||
 * Copyright 2019-2020 PixlOne
 | 
			
		||||
 *
 | 
			
		||||
 * This program is free software: you can redistribute it and/or modify
 | 
			
		||||
 * it under the terms of the GNU General Public License as published by
 | 
			
		||||
 * the Free Software Foundation, either version 3 of the License, or
 | 
			
		||||
 * (at your option) any later version.
 | 
			
		||||
 *
 | 
			
		||||
 * This program is distributed in the hope that it will be useful,
 | 
			
		||||
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
			
		||||
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
			
		||||
 * GNU General Public License for more details.
 | 
			
		||||
 *
 | 
			
		||||
 * You should have received a copy of the GNU General Public License
 | 
			
		||||
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | 
			
		||||
 *
 | 
			
		||||
 */
 | 
			
		||||
#include <vector>
 | 
			
		||||
#include "worker_thread.h"
 | 
			
		||||
#include "log.h"
 | 
			
		||||
#include "workqueue.h"
 | 
			
		||||
 | 
			
		||||
using namespace logid;
 | 
			
		||||
 | 
			
		||||
worker_thread::worker_thread(workqueue* parent, std::size_t worker_number) :
 | 
			
		||||
_parent (parent), _worker_number (worker_number), _continue_run (false),
 | 
			
		||||
_thread (std::make_unique<thread> ([this](){
 | 
			
		||||
    _run(); }, [this](std::exception& e){ _exception_handler(e); }))
 | 
			
		||||
{
 | 
			
		||||
    _thread->run();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
worker_thread::~worker_thread()
 | 
			
		||||
{
 | 
			
		||||
    _continue_run = false;
 | 
			
		||||
    _queue_cv.notify_all();
 | 
			
		||||
    // Block until task is complete
 | 
			
		||||
    std::unique_lock<std::mutex> lock(_busy);
 | 
			
		||||
 | 
			
		||||
    while(!_queue.empty()) {
 | 
			
		||||
        _parent->queue(_queue.front());
 | 
			
		||||
        _queue.pop();
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void worker_thread::queue(std::shared_ptr<task> t)
 | 
			
		||||
{
 | 
			
		||||
    _queue.push(t);
 | 
			
		||||
    _queue_cv.notify_all();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
bool worker_thread::busy()
 | 
			
		||||
{
 | 
			
		||||
    bool not_busy = _busy.try_lock();
 | 
			
		||||
 | 
			
		||||
    if(not_busy)
 | 
			
		||||
        _busy.unlock();
 | 
			
		||||
 | 
			
		||||
    return !not_busy;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void worker_thread::_run()
 | 
			
		||||
{
 | 
			
		||||
    std::unique_lock<std::mutex> lock(_run_lock);
 | 
			
		||||
    _continue_run = true;
 | 
			
		||||
    while(_continue_run) {
 | 
			
		||||
        _parent->busyUpdate();
 | 
			
		||||
        _queue_cv.wait(lock, [this]{ return !_queue.empty() ||
 | 
			
		||||
            !_continue_run; });
 | 
			
		||||
        if(!_continue_run)
 | 
			
		||||
            return;
 | 
			
		||||
        std::unique_lock<std::mutex> busy_lock(_busy);
 | 
			
		||||
        while(!_queue.empty()) {
 | 
			
		||||
            _queue.front()->run();
 | 
			
		||||
            _queue.pop();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void worker_thread::_exception_handler(std::exception &e)
 | 
			
		||||
{
 | 
			
		||||
    logPrintf(WARN, "Exception caught on worker thread %d, restarting: %s",
 | 
			
		||||
            _worker_number, e.what());
 | 
			
		||||
    // This action destroys the logid::thread, std::thread should detach safely.
 | 
			
		||||
    _thread = std::make_unique<thread>([this](){ _run(); },
 | 
			
		||||
            [this](std::exception& e) { _exception_handler(e); });
 | 
			
		||||
    _thread->run();
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										56
									
								
								src/logid/util/worker_thread.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										56
									
								
								src/logid/util/worker_thread.h
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,56 @@
 | 
			
		||||
/*
 | 
			
		||||
 * Copyright 2019-2020 PixlOne
 | 
			
		||||
 *
 | 
			
		||||
 * This program is free software: you can redistribute it and/or modify
 | 
			
		||||
 * it under the terms of the GNU General Public License as published by
 | 
			
		||||
 * the Free Software Foundation, either version 3 of the License, or
 | 
			
		||||
 * (at your option) any later version.
 | 
			
		||||
 *
 | 
			
		||||
 * This program is distributed in the hope that it will be useful,
 | 
			
		||||
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
			
		||||
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
			
		||||
 * GNU General Public License for more details.
 | 
			
		||||
 *
 | 
			
		||||
 * You should have received a copy of the GNU General Public License
 | 
			
		||||
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | 
			
		||||
 *
 | 
			
		||||
 */
 | 
			
		||||
#ifndef LOGID_WORKER_THREAD_H
 | 
			
		||||
#define LOGID_WORKER_THREAD_H
 | 
			
		||||
 | 
			
		||||
#include "mutex_queue.h"
 | 
			
		||||
#include "task.h"
 | 
			
		||||
#include "thread.h"
 | 
			
		||||
 | 
			
		||||
namespace logid
 | 
			
		||||
{
 | 
			
		||||
    class workqueue;
 | 
			
		||||
 | 
			
		||||
    class worker_thread
 | 
			
		||||
    {
 | 
			
		||||
    public:
 | 
			
		||||
        worker_thread(workqueue* parent, std::size_t worker_number);
 | 
			
		||||
        ~worker_thread();
 | 
			
		||||
 | 
			
		||||
        void queue(std::shared_ptr<task> t);
 | 
			
		||||
 | 
			
		||||
        bool busy();
 | 
			
		||||
    private:
 | 
			
		||||
        void _run();
 | 
			
		||||
        void _exception_handler(std::exception& e);
 | 
			
		||||
 | 
			
		||||
        workqueue* _parent;
 | 
			
		||||
        std::size_t _worker_number;
 | 
			
		||||
 | 
			
		||||
        std::unique_ptr<thread> _thread;
 | 
			
		||||
        std::mutex _busy;
 | 
			
		||||
 | 
			
		||||
        std::mutex _run_lock;
 | 
			
		||||
        std::atomic<bool> _continue_run;
 | 
			
		||||
        std::condition_variable _queue_cv;
 | 
			
		||||
 | 
			
		||||
        mutex_queue<std::shared_ptr<task>> _queue;
 | 
			
		||||
    };
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#endif //LOGID_WORKER_THREAD_H
 | 
			
		||||
							
								
								
									
										151
									
								
								src/logid/util/workqueue.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										151
									
								
								src/logid/util/workqueue.cpp
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,151 @@
 | 
			
		||||
/*
 | 
			
		||||
 * Copyright 2019-2020 PixlOne
 | 
			
		||||
 *
 | 
			
		||||
 * This program is free software: you can redistribute it and/or modify
 | 
			
		||||
 * it under the terms of the GNU General Public License as published by
 | 
			
		||||
 * the Free Software Foundation, either version 3 of the License, or
 | 
			
		||||
 * (at your option) any later version.
 | 
			
		||||
 *
 | 
			
		||||
 * This program is distributed in the hope that it will be useful,
 | 
			
		||||
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
			
		||||
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
			
		||||
 * GNU General Public License for more details.
 | 
			
		||||
 *
 | 
			
		||||
 * You should have received a copy of the GNU General Public License
 | 
			
		||||
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | 
			
		||||
 *
 | 
			
		||||
 */
 | 
			
		||||
#include "workqueue.h"
 | 
			
		||||
#include "log.h"
 | 
			
		||||
 | 
			
		||||
using namespace logid;
 | 
			
		||||
 | 
			
		||||
workqueue::workqueue(std::size_t thread_count) : _manager_thread (
 | 
			
		||||
        std::make_unique<thread>(
 | 
			
		||||
                [this](){ _run(); }
 | 
			
		||||
                , [this](std::exception& e){ _exception_handler(e); }
 | 
			
		||||
        )), _continue_run (false), _worker_count (thread_count)
 | 
			
		||||
{
 | 
			
		||||
    _workers.reserve(_worker_count);
 | 
			
		||||
    for(std::size_t i = 0; i < thread_count; i++)
 | 
			
		||||
        _workers.push_back(std::make_unique<worker_thread>(this, i));
 | 
			
		||||
    _manager_thread->run();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
workqueue::~workqueue()
 | 
			
		||||
{
 | 
			
		||||
    stop();
 | 
			
		||||
 | 
			
		||||
    while(_workers.empty())
 | 
			
		||||
        _workers.pop_back();
 | 
			
		||||
 | 
			
		||||
    // Queue should have been empty before, but just confirm here.
 | 
			
		||||
    while(!_queue.empty()) {
 | 
			
		||||
        thread::spawn([t=_queue.front()](){ t->run(); });
 | 
			
		||||
        _queue.pop();
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void workqueue::queue(std::shared_ptr<task> t)
 | 
			
		||||
{
 | 
			
		||||
    _queue.push(t);
 | 
			
		||||
    _queue_cv.notify_all();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void workqueue::busyUpdate()
 | 
			
		||||
{
 | 
			
		||||
    _busy_cv.notify_all();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void workqueue::stop()
 | 
			
		||||
{
 | 
			
		||||
    _continue_run = false;
 | 
			
		||||
    std::unique_lock<std::mutex> lock(_run_lock);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void workqueue::setThreadCount(std::size_t count)
 | 
			
		||||
{
 | 
			
		||||
    while(_workers.size() < count)
 | 
			
		||||
        _workers.push_back(std::make_unique<worker_thread>(this,
 | 
			
		||||
            _workers.size()));
 | 
			
		||||
 | 
			
		||||
    if(_workers.size() > count) {
 | 
			
		||||
        // Restart manager thread
 | 
			
		||||
        stop();
 | 
			
		||||
        while (_workers.size() > count)
 | 
			
		||||
            _workers.pop_back();
 | 
			
		||||
        _manager_thread = std::make_unique<thread>(
 | 
			
		||||
                [this](){ _run(); }
 | 
			
		||||
                , [this](std::exception& e){ _exception_handler(e); }
 | 
			
		||||
        );
 | 
			
		||||
        _manager_thread->run();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    _worker_count = count;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
std::size_t workqueue::threadCount() const
 | 
			
		||||
{
 | 
			
		||||
    return _workers.size();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void workqueue::_run()
 | 
			
		||||
{
 | 
			
		||||
    using namespace std::chrono_literals;
 | 
			
		||||
 | 
			
		||||
    std::unique_lock<std::mutex> lock(_run_lock);
 | 
			
		||||
    _continue_run = true;
 | 
			
		||||
    while(_continue_run) {
 | 
			
		||||
        _queue_cv.wait(lock, [this]{ return !(_queue.empty()); });
 | 
			
		||||
        while(!_queue.empty()) {
 | 
			
		||||
 | 
			
		||||
            if(_workers.empty()) {
 | 
			
		||||
                if(_worker_count)
 | 
			
		||||
                    logPrintf(DEBUG, "No workers were found, running task in"
 | 
			
		||||
                                 " a new thread.");
 | 
			
		||||
                thread::spawn([t=_queue.front()](){ t->run(); });
 | 
			
		||||
                _queue.pop();
 | 
			
		||||
                continue;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            auto worker = _workers.begin();
 | 
			
		||||
            for(; worker != _workers.end(); worker++) {
 | 
			
		||||
                if(!(*worker)->busy())
 | 
			
		||||
                    break;
 | 
			
		||||
            }
 | 
			
		||||
            if(worker != _workers.end())
 | 
			
		||||
                (*worker)->queue(_queue.front());
 | 
			
		||||
            else {
 | 
			
		||||
                _busy_cv.wait_for(lock, 500ms, [this, &worker]{
 | 
			
		||||
                    for(worker = _workers.begin(); worker != _workers.end();
 | 
			
		||||
                        worker++) {
 | 
			
		||||
                        if (!(*worker)->busy()) {
 | 
			
		||||
                            return true;
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                    return false;
 | 
			
		||||
                });
 | 
			
		||||
 | 
			
		||||
                if(worker != _workers.end())
 | 
			
		||||
                    (*worker)->queue(_queue.front());
 | 
			
		||||
                else{
 | 
			
		||||
                    // Workers busy, launch in new thread
 | 
			
		||||
                    logPrintf(DEBUG, "All workers were busy for 500ms, "
 | 
			
		||||
                                     "running task in new thread.");
 | 
			
		||||
                    thread::spawn([t = _queue.front()]() { t->run(); });
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            _queue.pop();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void workqueue::_exception_handler(std::exception &e)
 | 
			
		||||
{
 | 
			
		||||
    logPrintf(WARN, "Exception caught on workqueue manager thread, "
 | 
			
		||||
                    "restarting: %s" , e.what());
 | 
			
		||||
    // This action destroys the logid::thread, std::thread should detach safely.
 | 
			
		||||
    _manager_thread = std::make_unique<thread>([this](){ _run(); },
 | 
			
		||||
            [this](std::exception& e) { _exception_handler(e); });
 | 
			
		||||
    _manager_thread->run();
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										59
									
								
								src/logid/util/workqueue.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										59
									
								
								src/logid/util/workqueue.h
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,59 @@
 | 
			
		||||
/*
 | 
			
		||||
 * Copyright 2019-2020 PixlOne
 | 
			
		||||
 *
 | 
			
		||||
 * This program is free software: you can redistribute it and/or modify
 | 
			
		||||
 * it under the terms of the GNU General Public License as published by
 | 
			
		||||
 * the Free Software Foundation, either version 3 of the License, or
 | 
			
		||||
 * (at your option) any later version.
 | 
			
		||||
 *
 | 
			
		||||
 * This program is distributed in the hope that it will be useful,
 | 
			
		||||
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
			
		||||
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
			
		||||
 * GNU General Public License for more details.
 | 
			
		||||
 *
 | 
			
		||||
 * You should have received a copy of the GNU General Public License
 | 
			
		||||
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | 
			
		||||
 *
 | 
			
		||||
 */
 | 
			
		||||
#ifndef LOGID_WORKQUEUE_H
 | 
			
		||||
#define LOGID_WORKQUEUE_H
 | 
			
		||||
 | 
			
		||||
#include "worker_thread.h"
 | 
			
		||||
#include "thread.h"
 | 
			
		||||
 | 
			
		||||
namespace logid
 | 
			
		||||
{
 | 
			
		||||
    class workqueue
 | 
			
		||||
    {
 | 
			
		||||
    public:
 | 
			
		||||
        explicit workqueue(std::size_t thread_count);
 | 
			
		||||
        ~workqueue();
 | 
			
		||||
 | 
			
		||||
        void queue(std::shared_ptr<task> t);
 | 
			
		||||
 | 
			
		||||
        void busyUpdate();
 | 
			
		||||
 | 
			
		||||
        void stop();
 | 
			
		||||
 | 
			
		||||
        void setThreadCount(std::size_t count);
 | 
			
		||||
        std::size_t threadCount() const;
 | 
			
		||||
    private:
 | 
			
		||||
        void _run();
 | 
			
		||||
 | 
			
		||||
        void _exception_handler(std::exception& e);
 | 
			
		||||
        std::unique_ptr<thread> _manager_thread;
 | 
			
		||||
 | 
			
		||||
        mutex_queue<std::shared_ptr<task>> _queue;
 | 
			
		||||
        std::condition_variable _queue_cv;
 | 
			
		||||
        std::condition_variable _busy_cv;
 | 
			
		||||
        std::mutex _run_lock;
 | 
			
		||||
        std::atomic<bool> _continue_run;
 | 
			
		||||
 | 
			
		||||
        std::vector<std::unique_ptr<worker_thread>> _workers;
 | 
			
		||||
        std::size_t _worker_count;
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    extern std::unique_ptr<workqueue> global_workqueue;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#endif //LOGID_WORKQUEUE_H
 | 
			
		||||
		Reference in New Issue
	
	Block a user