From 8111b77e95b083137faf888aeb5892073adf7ab4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Frings-F=C3=BCrst?= Date: Wed, 27 Jun 2018 16:59:37 +0200 Subject: New upstream version 2.0.0 --- lib/spdlog/details/async_log_helper.h | 333 ++++++++++++++++++++++++++++++++++ 1 file changed, 333 insertions(+) create mode 100644 lib/spdlog/details/async_log_helper.h (limited to 'lib/spdlog/details/async_log_helper.h') diff --git a/lib/spdlog/details/async_log_helper.h b/lib/spdlog/details/async_log_helper.h new file mode 100644 index 0000000..49789b9 --- /dev/null +++ b/lib/spdlog/details/async_log_helper.h @@ -0,0 +1,333 @@ +// +// Copyright(c) 2015 Gabi Melman. +// Distributed under the MIT License (http://opensource.org/licenses/MIT) +// + +// async log helper : +// Process logs asynchronously using a back thread. +// +// If the internal queue of log messages reaches its max size, +// then the client call will block until there is more room. +// + +#pragma once + +#include "../common.h" +#include "../details/log_msg.h" +#include "../details/mpmc_blocking_q.h" +#include "../details/os.h" +#include "../formatter.h" +#include "../sinks/sink.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace spdlog { +namespace details { + +class async_log_helper +{ + // Async msg to move to/from the queue + // Movable only. should never be copied + enum class async_msg_type + { + log, + flush, + terminate + }; + + struct async_msg + { + level::level_enum level; + log_clock::time_point time; + size_t thread_id; + std::string txt; + async_msg_type msg_type; + size_t msg_id; + + async_msg() = default; + ~async_msg() = default; + + explicit async_msg(async_msg_type m_type) + : level(level::info) + , thread_id(0) + , msg_type(m_type) + , msg_id(0) + { + } + + async_msg(async_msg &&other) = default; + async_msg &operator=(async_msg &&other) = default; + + // never copy or assign. should only be moved.. + async_msg(const async_msg &) = delete; + async_msg &operator=(const async_msg &other) = delete; + + // construct from log_msg + explicit async_msg(const details::log_msg &m) + : level(m.level) + , time(m.time) + , thread_id(m.thread_id) + , txt(m.raw.data(), m.raw.size()) + , msg_type(async_msg_type::log) + , msg_id(m.msg_id) + { + } + + // copy into log_msg + void fill_log_msg(log_msg &msg, std::string *logger_name) + { + msg.logger_name = logger_name; + msg.level = level; + msg.time = time; + msg.thread_id = thread_id; + msg.raw.clear(); + msg.raw << txt; + msg.msg_id = msg_id; + } + }; + +public: + using item_type = async_msg; + using q_type = details::mpmc_bounded_queue; + + using clock = std::chrono::steady_clock; + + async_log_helper(std::string logger_name, formatter_ptr formatter, std::vector sinks, size_t queue_size, + const log_err_handler err_handler, const async_overflow_policy overflow_policy = async_overflow_policy::block_retry, + std::function worker_warmup_cb = nullptr, + const std::chrono::milliseconds &flush_interval_ms = std::chrono::milliseconds::zero(), + std::function worker_teardown_cb = nullptr); + + void log(const details::log_msg &msg); + + // stop logging and join the back thread + ~async_log_helper(); + + async_log_helper(const async_log_helper &) = delete; + async_log_helper &operator=(const async_log_helper &) = delete; + + void set_formatter(formatter_ptr msg_formatter); + + void flush(); + + void set_error_handler(spdlog::log_err_handler err_handler); + +private: + std::string _logger_name; + formatter_ptr _formatter; + std::vector> _sinks; + + // queue of messages to log + q_type _q; + + log_err_handler _err_handler; + + std::chrono::time_point _last_flush; + + // overflow policy + const async_overflow_policy _overflow_policy; + + // worker thread warmup callback - one can set thread priority, affinity, etc + const std::function _worker_warmup_cb; + + // auto periodic sink flush parameter + const std::chrono::milliseconds _flush_interval_ms; + + // worker thread teardown callback + const std::function _worker_teardown_cb; + + std::mutex null_mutex_; + // null_mutex null_mutex_; + std::condition_variable_any not_empty_cv_; + std::condition_variable_any not_full_cv_; + + // worker thread + std::thread _worker_thread; + + void enqueue_msg(async_msg &&new_msg, async_overflow_policy policy); + + // worker thread main loop + void worker_loop(); + + // dequeue next message from the queue and process it. + // return false if termination of the queue is required + bool process_next_msg(); + + void handle_flush_interval(); + + void flush_sinks(); +}; +} // namespace details +} // namespace spdlog + +/////////////////////////////////////////////////////////////////////////////// +// async_sink class implementation +/////////////////////////////////////////////////////////////////////////////// +inline spdlog::details::async_log_helper::async_log_helper(std::string logger_name, formatter_ptr formatter, std::vector sinks, + size_t queue_size, log_err_handler err_handler, const async_overflow_policy overflow_policy, std::function worker_warmup_cb, + const std::chrono::milliseconds &flush_interval_ms, std::function worker_teardown_cb) + : _logger_name(std::move(logger_name)) + , _formatter(std::move(formatter)) + , _sinks(std::move(sinks)) + , _q(queue_size) + , _err_handler(std::move(err_handler)) + , _last_flush(os::now()) + , _overflow_policy(overflow_policy) + , _worker_warmup_cb(std::move(worker_warmup_cb)) + , _flush_interval_ms(flush_interval_ms) + , _worker_teardown_cb(std::move(worker_teardown_cb)) +{ + _worker_thread = std::thread(&async_log_helper::worker_loop, this); +} + +// send to the worker thread terminate message, and join it. +inline spdlog::details::async_log_helper::~async_log_helper() +{ + try + { + enqueue_msg(async_msg(async_msg_type::terminate), async_overflow_policy::block_retry); + _worker_thread.join(); + } + catch (...) // don't crash in destructor + { + } +} + +// try to push and block until succeeded (if the policy is not to discard when the queue is full) +inline void spdlog::details::async_log_helper::log(const details::log_msg &msg) +{ + enqueue_msg(async_msg(msg), _overflow_policy); +} + +inline void spdlog::details::async_log_helper::enqueue_msg(details::async_log_helper::async_msg &&new_msg, async_overflow_policy policy) +{ + + // block until succeeded pushing to the queue + if (policy == async_overflow_policy::block_retry) + { + _q.enqueue(std::move(new_msg)); + } + else + { + _q.enqueue_nowait(std::move(new_msg)); + } +} + +// optionally wait for the queue be empty and request flush from the sinks +inline void spdlog::details::async_log_helper::flush() +{ + enqueue_msg(async_msg(async_msg_type::flush), _overflow_policy); +} + +inline void spdlog::details::async_log_helper::worker_loop() +{ + if (_worker_warmup_cb) + { + _worker_warmup_cb(); + } + auto active = true; + while (active) + { + try + { + active = process_next_msg(); + } + SPDLOG_CATCH_AND_HANDLE + } + if (_worker_teardown_cb) + { + _worker_teardown_cb(); + } +} + +// process next message in the queue +// return true if this thread should still be active (while no terminate msg was received) +inline bool spdlog::details::async_log_helper::process_next_msg() +{ + async_msg incoming_async_msg; + bool dequeued = _q.dequeue_for(incoming_async_msg, std::chrono::seconds(2)); + if (!dequeued) + { + handle_flush_interval(); + return true; + } + + switch (incoming_async_msg.msg_type) + { + case async_msg_type::flush: + flush_sinks(); + return true; + + case async_msg_type::terminate: + flush_sinks(); + return false; + + default: + log_msg incoming_log_msg; + incoming_async_msg.fill_log_msg(incoming_log_msg, &_logger_name); + _formatter->format(incoming_log_msg); + for (auto &s : _sinks) + { + if (s->should_log(incoming_log_msg.level)) + { + try + { + s->log(incoming_log_msg); + } + SPDLOG_CATCH_AND_HANDLE + } + } + handle_flush_interval(); + return true; + } + assert(false); + return true; // should not be reached +} + +inline void spdlog::details::async_log_helper::set_formatter(formatter_ptr msg_formatter) +{ + _formatter = std::move(msg_formatter); +} + +inline void spdlog::details::async_log_helper::set_error_handler(spdlog::log_err_handler err_handler) +{ + _err_handler = std::move(err_handler); +} + +// flush all sinks if _flush_interval_ms has expired. +inline void spdlog::details::async_log_helper::handle_flush_interval() +{ + if (_flush_interval_ms == std::chrono::milliseconds::zero()) + { + return; + } + auto delta = details::os::now() - _last_flush; + ; + if (delta >= _flush_interval_ms) + { + flush_sinks(); + } +} + +// flush all sinks if _flush_interval_ms has expired. only called if queue is empty +inline void spdlog::details::async_log_helper::flush_sinks() +{ + + for (auto &s : _sinks) + { + try + { + s->flush(); + } + SPDLOG_CATCH_AND_HANDLE + } + _last_flush = os::now(); +} -- cgit v1.2.3