一个异步日志IO的demo

Posted sesiria

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一个异步日志IO的demo相关的知识,希望对你有一定的参考价值。

整理了以下项目中用到的一个异步日志IO的实现,基于以下设计:

1.  使用了一个push队列和一个write队列。主线程往push队列中加数据,写线程从write队列中拿数据写到磁盘

2.  默认情况下write线程处于睡眠

3. 有两种情况会唤醒写线程。

    1) push队列满后,会调用force_write将push队列和write队列交换并唤醒写线程将日志写入磁盘 

    2)如果push队列长时间未满,待定时器触发后,也会切换push和write并唤醒写线程。

    3)每次写线程工作结束前将重置定时器,避免写线程刚刚写完数据定时器又触发导致无效的切换。

 

 

/*
 *  asynchronous log impelmentation
 *  FileName: async_log.cpp
 *  author: sesiria    2021-05-17
 */
#include <signal.h>
#include <pthread.h>
#include <errno.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <queue>
#include <string>
#include <algorithm>


#define TIMER_EVENT	3 // 3second
#define MAX_BUFFER	1000000 // 100

//============================
// data will be transfer to the thread function.
struct timer_data {
	timer_t* tid;
	struct itimerspec* ts;
};
//=============================

static pthread_mutex_t mtx_push = PTHREAD_MUTEX_INITIALIZER;		// mutex for push queue
static pthread_mutex_t mtx_switch = PTHREAD_MUTEX_INITIALIZER;		// mutes for the write queue
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;				// condition variable for the write thread

static int terminate = 0;											// indicate whether the write thread should be exit.

std::queue<std::string> buffer_push;								// the push queue
std::queue<std::string> buffer_write;								// the write queue

// force write the push buffer into hard disk.
void force_write()
{
	pthread_mutex_lock(&mtx_push);
	pthread_mutex_lock(&mtx_switch);

	// swap the buffer
	std::swap(buffer_push, buffer_write);

	// signal the write thread.
	pthread_cond_signal(&cond);

	// release the lock
	pthread_mutex_unlock(&mtx_switch);
	pthread_mutex_unlock(&mtx_push);
}

// this function will be called by the SIGEV_THREAD
static void
timer_event(union sigval sv) {
	//printf("event_trigger!\\n");
	printf("Trigger time event! Force to write the logs into disk!\\n");
}

// the call back for the write thread
void* write_thread(void* arg)
{
	if (arg == NULL)
		return NULL;
	struct timer_data* tdata = (struct timer_data*)arg;
	while (1) {
		pthread_mutex_lock(&mtx_switch);
		while (buffer_write.empty() && !terminate) {
			pthread_cond_wait(&cond, &mtx_switch);
		}

		if (terminate) {
			pthread_mutex_unlock(&mtx_switch);
			break;
		}
		int cnt = 0;

		FILE* fp = fopen("out.log", "w+");
		if (fp == NULL) {
			perror("failed to open the log file\\n");
			exit(EXIT_FAILURE);
		}

		while (!buffer_write.empty()) {
			cnt++;
			auto & a = buffer_write.front();
			if (fwrite(a.c_str(), a.size(),1, fp) != 1) {
				perror("write error!");
				exit(EXIT_FAILURE);
			}
			buffer_write.pop();
		}

		// reset timer
		if (timer_settime(*tdata->tid, 0, tdata->ts, NULL) == -1) {
			perror("timer_settime");
			exit(EXIT_FAILURE);
		}

		fclose(fp);
		printf("write buffer with %d to disk success!\\n", cnt);

		pthread_mutex_unlock(&mtx_switch);
	}
}

// the log push function will be called by the main thread.
void log_push(const std::string& msg) {
	bool isPushed = false;
	while(!isPushed)
	{
		pthread_mutex_lock(&mtx_push);
		if (buffer_push.size() < MAX_BUFFER) {
			buffer_push.push(msg);
			isPushed = true;
		}
		pthread_mutex_unlock(&mtx_push);

		if (!isPushed) {
			// union sigval sv
			force_write();
		}

	} 
}

// the main thread
int main()
{
	// init timer
	struct sigevent sev;
	struct itimerspec ts;
	timer_t tid;

	// init the sigevent
	sev.sigev_notify = SIGEV_THREAD;
	sev.sigev_notify_function = timer_event;
	sev.sigev_notify_attributes = NULL;
	sev.sigev_value.sival_ptr = NULL;

	// init the itimerspec
	ts.it_value.tv_sec = TIMER_EVENT;
	ts.it_value.tv_nsec = 0;
	ts.it_interval.tv_sec = TIMER_EVENT;
	ts.it_interval.tv_nsec = 0;

	// create timer
	if (timer_create(CLOCK_REALTIME, &sev, &tid) == -1) {
		perror("timer_create");
		exit(EXIT_FAILURE);
	}

	// set the timer
	if (timer_settime(tid, 0, &ts, NULL) == -1) {
		perror("timer_settime");
		exit(EXIT_FAILURE);
	}

	// create the write thread
	pthread_t thread_id;
	struct timer_data tdata;
	tdata.tid = &tid;
	tdata.ts = &ts;
	int ret = pthread_create(&thread_id, NULL, write_thread, &tdata);
	if (ret) {
		perror("pthread_Create");
		exit(EXIT_FAILURE);
	}

	long cnt = 0;
	// infinity to push some logs
	while (1) {
		cnt++;
		log_push("Unable to Connect to the Microsoft Visual Studio Remote https://docs.microsoft.com visualstudio > debugger\\n");
		if (cnt % 10000  == 0)
			printf("append 500000 logs\\n");

	}

	return 0;
}

 

测试性能能轻松达到100W QPS

 

 

 

 

以上是关于一个异步日志IO的demo的主要内容,如果未能解决你的问题,请参考以下文章

Python--Demo18--异步IO之协程

socket-demo的实现

协程demo,1异步爬网页 2异步socket请求

python基础学习日志day10-SelectPollEpoll异步IO

异步函数Demo

argparse 代码片段只打印部分日志