C++回调定时器实现

Posted

技术标签:

【中文标题】C++回调定时器实现【英文标题】:C++ callback timer implementation 【发布时间】:2018-09-07 01:17:30 【问题描述】:

我找到了在我的 c++ 应用程序中使用的回调计时器的以下实现。然而,这个实现需要我从start调用者“加入”线程,这有效地阻塞了start函数的调用者。

我真正喜欢做的是以下。

    有人可以多次调用 foo(data) 并将它们存储在数据库中。 每当调用 foo(data) 时,它都会启动一个计时器几秒钟。 当计时器倒计时,foo(data) 可以被多次调用 可以存储时间和多个项目,但在计时器完成之前不会调用擦除 每当计时器到时, "remove" 函数被调用一次以从 分贝。

基本上我希望能够执行一项任务,然后等待几秒钟,然后在几秒钟后批量执行单个批处理任务 B。

class CallBackTimer 

public:

    /**
     * Constructor of the CallBackTimer
     */
    CallBackTimer() :_execute(false)  

    /**
     * Destructor
     */
    ~CallBackTimer() 
        if (_execute.load(std::memory_order_acquire)) 
            stop();
        ;
    

    /**
     * Stops the timer
     */
    void stop() 
        _execute.store(false, std::memory_order_release);
        if (_thd.joinable()) 
            _thd.join();
        
    

    /**
     * Start the timer function
     * @param interval Repeating duration in milliseconds, 0 indicates the @func will run only once
     * @param delay Time in milliseconds to wait before the first callback
     * @param func Callback function
     */
    void start(int interval, int delay, std::function<void(void)> func) 
        if(_execute.load(std::memory_order_acquire)) 
            stop();
        ;
        _execute.store(true, std::memory_order_release);


        _thd = std::thread([this, interval, delay, func]() 
            std::this_thread::sleep_for(std::chrono::milliseconds(delay));
            if (interval == 0) 
                func();
                stop();
             else 
                while (_execute.load(std::memory_order_acquire)) 
                    func();
                    std::this_thread::sleep_for(std::chrono::milliseconds(interval));
                
            
        );

    

    /**
     * Check if the timer is currently running
     * @return bool, true if timer is running, false otherwise.
     */
    bool is_running() const noexcept 
        return ( _execute.load(std::memory_order_acquire) && _thd.joinable() );
    


private:
    std::atomic<bool> _execute;
    std::thread _thd;

;

我已经尝试使用thread.detach() 修改上面的代码。但是,我在无法从数据库中写入(擦除)的分离线程中运行问题..

感谢任何帮助和建议!

【问题讨论】:

你能添加一个你是如何使用这个类的示例吗?您所要求的似乎有风险且不必要。 也许你可以在构造函数中构造线程并在析构函数中加入它?然后您只需要一个 std::queue 即可通过 start() 添加新项目(回调)并通过 stop() 刷新所有元素。一旦超出范围,析构函数将处理线程。然后线程中会有一个while循环,直到队列有任何项目。然后开始处理它。下一个 while 循环将检查计时器和项目数。在内部,它会一个接一个地处理项目。然后让它重复直到停止。析构函数将进行清理。我猜项目之间的间隔是等待,延迟只是它的触发时间。 【参考方案1】:

您可以使用std::async,而不是使用线程。以下类将在添加最后一个字符串后 4 秒按顺序处理排队的字符串。一次只会启动 1 个异步任务,std::aysnc 会为您处理所有线程。

如果在销毁类时队列中有未处理的项目,则异步任务会停止而无需等待,并且不会处理这些项目(但如果不是您想要的行为,这很容易更改)。

#include <iostream>
#include <string>
#include <future>
#include <mutex>
#include <chrono>
#include <queue>

class Batcher

public:
  Batcher()
    : taskDelay( 4 ),
      startTime( std::chrono::steady_clock::now() ) // only used for debugging
  
  

  void queue( const std::string& value )
  
    std::unique_lock< std::mutex > lock( mutex );
    std::cout << "queuing '" << value << " at " << std::chrono::duration_cast< std::chrono::milliseconds >( std::chrono::steady_clock::now() - startTime ).count() << "ms\n";
    work.push( value );
    // increase the time to process the queue to "now + 4 seconds"
    timeout = std::chrono::steady_clock::now() + taskDelay;
    if ( !running )
    
      // launch a new asynchronous task which will process the queue
      task = std::async( std::launch::async, [this] processWork();  );
      running = true;
    
  

  ~Batcher()
  
    std::unique_lock< std::mutex > lock( mutex );
    // stop processing the queue
    closing = true;
    bool wasRunning = running;
    condition.notify_all();
    lock.unlock();
    if ( wasRunning )
    
      // wait for the async task to complete
      task.wait();
    
  

private:
  std::mutex mutex;
  std::condition_variable condition;
  std::chrono::seconds taskDelay;
  std::chrono::steady_clock::time_point timeout;
  std::queue< std::string > work;
  std::future< void > task;
  bool closing = false;
  bool running = false;
  std::chrono::steady_clock::time_point startTime;

  void processWork()
  
    std::unique_lock< std::mutex > lock( mutex );
    // loop until std::chrono::steady_clock::now() > timeout
    auto wait = timeout - std::chrono::steady_clock::now();
    while ( !closing && wait > std::chrono::seconds( 0 ) )
    
      condition.wait_for( lock, wait );
      wait = timeout - std::chrono::steady_clock::now();
    
    if ( !closing )
    
      std::cout << "processing queue at " << std::chrono::duration_cast< std::chrono::milliseconds >( std::chrono::steady_clock::now() - startTime ).count() << "ms\n";
      while ( !work.empty() )
      
        std::cout << work.front() << "\n";
        work.pop();
      
      std::cout << std::flush;
    
    else
    
      std::cout << "aborting queue processing at " << std::chrono::duration_cast< std::chrono::milliseconds >( std::chrono::steady_clock::now() - startTime ).count() << "ms with " << work.size() << " remaining items\n";
    
    running = false;
  
;

int main()

  Batcher batcher;
  batcher.queue( "test 1" );
  std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
  batcher.queue( "test 2" );
  std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
  batcher.queue( "test 3" );
  std::this_thread::sleep_for( std::chrono::seconds( 2 ) );
  batcher.queue( "test 4" );
  std::this_thread::sleep_for( std::chrono::seconds( 5 ) );
  batcher.queue( "test 5" );

【讨论】:

感谢您的回答,实际上我发现根本问题不是因为 CallbackTimer / Batcher。出于某种原因,回调中的 sqlite 语句(在您的情况下,从表中删除行的 auto function = work.front() &amp;&amp; function() 被执行,但实际上并未删除该行。

以上是关于C++回调定时器实现的主要内容,如果未能解决你的问题,请参考以下文章

C ++回调计时器实现

使用 Windows 消息循环的回调实现

思考5 定时器和callback回调函数

C++ 实现的定时器

Boost asio 截止时间计时器立即完成(C++)

基于libco的c++协程实现(时间轮定时器)