一个异步日志IO的demo
Posted sesiria
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一个异步日志IO的demo相关的知识,希望对你有一定的参考价值。
整理了以下项目中用到的一个异步日志IO的实现,基于以下设计:
1. 使用了一个push队列和一个write队列。主线程往push队列中加数据,写线程从write队列中拿数据写到磁盘
2. 默认情况下write线程处于睡眠
3. 有两种情况会唤醒写线程。
1) push队列满后,会调用force_write将push队列和write队列交换并唤醒写线程将日志写入磁盘
2)如果push队列长时间未满,待定时器触发后,也会切换push和write并唤醒写线程。
3)每次写线程工作结束前将重置定时器,避免写线程刚刚写完数据定时器又触发导致无效的切换。
/*
* asynchronous log impelmentation
* FileName: async_log.cpp
* author: sesiria 2021-05-17
*/
#include <signal.h>
#include <pthread.h>
#include <errno.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <queue>
#include <string>
#include <algorithm>
#define TIMER_EVENT 3 // 3second
#define MAX_BUFFER 1000000 // 100
//============================
// data will be transfer to the thread function.
struct timer_data {
timer_t* tid;
struct itimerspec* ts;
};
//=============================
static pthread_mutex_t mtx_push = PTHREAD_MUTEX_INITIALIZER; // mutex for push queue
static pthread_mutex_t mtx_switch = PTHREAD_MUTEX_INITIALIZER; // mutes for the write queue
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // condition variable for the write thread
static int terminate = 0; // indicate whether the write thread should be exit.
std::queue<std::string> buffer_push; // the push queue
std::queue<std::string> buffer_write; // the write queue
// force write the push buffer into hard disk.
void force_write()
{
pthread_mutex_lock(&mtx_push);
pthread_mutex_lock(&mtx_switch);
// swap the buffer
std::swap(buffer_push, buffer_write);
// signal the write thread.
pthread_cond_signal(&cond);
// release the lock
pthread_mutex_unlock(&mtx_switch);
pthread_mutex_unlock(&mtx_push);
}
// this function will be called by the SIGEV_THREAD
static void
timer_event(union sigval sv) {
//printf("event_trigger!\\n");
printf("Trigger time event! Force to write the logs into disk!\\n");
}
// the call back for the write thread
void* write_thread(void* arg)
{
if (arg == NULL)
return NULL;
struct timer_data* tdata = (struct timer_data*)arg;
while (1) {
pthread_mutex_lock(&mtx_switch);
while (buffer_write.empty() && !terminate) {
pthread_cond_wait(&cond, &mtx_switch);
}
if (terminate) {
pthread_mutex_unlock(&mtx_switch);
break;
}
int cnt = 0;
FILE* fp = fopen("out.log", "w+");
if (fp == NULL) {
perror("failed to open the log file\\n");
exit(EXIT_FAILURE);
}
while (!buffer_write.empty()) {
cnt++;
auto & a = buffer_write.front();
if (fwrite(a.c_str(), a.size(),1, fp) != 1) {
perror("write error!");
exit(EXIT_FAILURE);
}
buffer_write.pop();
}
// reset timer
if (timer_settime(*tdata->tid, 0, tdata->ts, NULL) == -1) {
perror("timer_settime");
exit(EXIT_FAILURE);
}
fclose(fp);
printf("write buffer with %d to disk success!\\n", cnt);
pthread_mutex_unlock(&mtx_switch);
}
}
// the log push function will be called by the main thread.
void log_push(const std::string& msg) {
bool isPushed = false;
while(!isPushed)
{
pthread_mutex_lock(&mtx_push);
if (buffer_push.size() < MAX_BUFFER) {
buffer_push.push(msg);
isPushed = true;
}
pthread_mutex_unlock(&mtx_push);
if (!isPushed) {
// union sigval sv
force_write();
}
}
}
// the main thread
int main()
{
// init timer
struct sigevent sev;
struct itimerspec ts;
timer_t tid;
// init the sigevent
sev.sigev_notify = SIGEV_THREAD;
sev.sigev_notify_function = timer_event;
sev.sigev_notify_attributes = NULL;
sev.sigev_value.sival_ptr = NULL;
// init the itimerspec
ts.it_value.tv_sec = TIMER_EVENT;
ts.it_value.tv_nsec = 0;
ts.it_interval.tv_sec = TIMER_EVENT;
ts.it_interval.tv_nsec = 0;
// create timer
if (timer_create(CLOCK_REALTIME, &sev, &tid) == -1) {
perror("timer_create");
exit(EXIT_FAILURE);
}
// set the timer
if (timer_settime(tid, 0, &ts, NULL) == -1) {
perror("timer_settime");
exit(EXIT_FAILURE);
}
// create the write thread
pthread_t thread_id;
struct timer_data tdata;
tdata.tid = &tid;
tdata.ts = &ts;
int ret = pthread_create(&thread_id, NULL, write_thread, &tdata);
if (ret) {
perror("pthread_Create");
exit(EXIT_FAILURE);
}
long cnt = 0;
// infinity to push some logs
while (1) {
cnt++;
log_push("Unable to Connect to the Microsoft Visual Studio Remote https://docs.microsoft.com visualstudio > debugger\\n");
if (cnt % 10000 == 0)
printf("append 500000 logs\\n");
}
return 0;
}
测试性能能轻松达到100W QPS
以上是关于一个异步日志IO的demo的主要内容,如果未能解决你的问题,请参考以下文章