nodejs fork 子进程创建任务以及简单的prometheus 监控

Posted rongfengliang-荣锋亮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了nodejs fork 子进程创建任务以及简单的prometheus 监控相关的知识,希望对你有一定的参考价值。

以下是一个简单的基于nodejs 的fork 子进程创建子任务,同时使用prometheus 暴露一些简单的metrics
使用express 框架

环境准备

  • 项目结构
 
├── Dockerfile
├── README.md
├── app.js
├── docker-compose.yaml
├── grafana
└── metrics.json
├── metrics.js
├── package.json
├── prometheus.yml
├── send_mail.js
├── taskstatus.js
├── utils.js
└── yarn.lock
  • 结构说明
    send_mail.js 为简单的fork 子进程的代码
    taskstatus.js 为存储子进程任务状态的
    utils.js 为一个简单的工具模块,主要对于已经完成的子任务的清理
    metrics.js 为prometheus metrics 定义
    app.js express rest 接口操作以及任务处理的核心代码
    docker-compose.yaml docker-compose 全家桶运行
    Dockerfile nodejs 应用容器化
    grafana/metrics.json 为应用grafana 对于 prometheus 的dashboard 配置
  • 代码说明
    app.js 核心处理
 
const { fork } = require(\'child_process\')
const express = require("express")
const util = require("./utils")
const uuid = require("uuid/v4")
const { child_process_status_all, child_process_status_pending, child_process_status_ok } = require("./taskstatus")
const { child_process_status_all_counter, child_process_status_pending_gauge, child_process_status_ok_counter, initMetrics, initGcStats, process_ok_clean_timer__status, up } = require("./metrics")
const app = express();
const main_process_id = process.pid;
let interval = false;
/**
 * metrics route register  注册prometheus metrcis 路由
 */
app.get(\'/metrics\', (req, res) => {
  initMetrics(req, res);
})
/**
 * disable process clean timer 禁用定时任务清理
 */
app.get("/disable_timer", (req, res) => {
  if (interval) {
    interval = false;
  }
  process_ok_clean_timer__status.set(0)
  res.send({ timer_statuss: false })
})
/**
 * enable process clean timer 启用定时任务清理子进程
 */
app.get("/enable_timer", (req, res) => {
  if (interval == false) {
    interval = true;
  }
  process_ok_clean_timer__status.set(1)
  res.send({ timer_statuss: true })
})
/**
 * for create process workers
 */
app.get(\'/endpoint\', (req, res) => {
  // fork another process  子进程的创建以及消息的通信(状态以及部分prometheus metrics 维护)
  const myprocess = fork(\'./send_mail.js\');
  child_process_status_pending[myprocess.pid] = {
    status: "pending"
  }
  child_process_status_all[myprocess.pid] = {
    status: "pending"
  }
  child_process_status_all_counter.inc(1)
  child_process_status_pending_gauge.inc(1)
  console.log(`fork process pid:${myprocess.pid}`)
  const mails = uuid();
  // send list of e-mails to forked process
  myprocess.send({ mails });
  // listen for messages from forked process
  myprocess.on(\'message\', (message) => {
    console.log(`Number of mails sent ${message.counter}`);
    child_process_status_ok[myprocess.pid] = {
      status: "ok"
    }
    child_process_status_ok_counter.inc(1)
    child_process_status_pending_gauge.dec(1)
    child_process_status_all[myprocess.pid] = {
      status: "ok"
    }
    delete child_process_status_pending[myprocess.pid]
  });
  return res.json({ status: true, sent: true });
});
/**
 * call api for stop processed workers   删除任务完成的子进程
 */
app.get("/stop", (req, res) => {
  util.stopProcess(main_process_id, (err, data) => {
    if (err == null) {
      res.send({ timer_clean_status: "ok" })
    }
  })
})
// init gc metrics   gc metrcis 暴露
initGcStats()
// clean ok process timer 定时任务清理完成的进程
setInterval(function () {
  if (interval) {
    util.stopProcess(main_process_id, (err, data) => {
      if (err == null) {
        console.log({ timer_clean_status: "ok" })
      } else {
        process_ok_clean_timer__status.set(0)
      }
    })
  }
}, 10000)
// set metric status to up 
up.set(1)
app.listen(8080, "0.0.0.0", () => {
  console.log(`go to http://localhost:8080/ to generate traffic`)
}).on("error", () => {
  up.set(0)
})

utils.js 定时任务清理模块

const psTree = require("ps-tree")
const {spawn } = require(\'child_process\')
const {child_process_status_ok} = require("./taskstatus")
const {process_ok_clean_timer__status} = require("./metrics")
/**
 * 
 * @param {mainProcessID} mainProcessID 
 * @param {cb} callback for check status
 */
function stopProcess(mainProcessID,cb){
    psTree(mainProcessID, function (err, children) {
        if (err){
          process_ok_clean_timer__status.set(0)
        }
        let pids = [];
        for (const key in child_process_status_ok) {
          if (child_process_status_ok.hasOwnProperty(key)) {
            pids.push(key)
            delete child_process_status_ok[key]
          }
        }
        let info = children.filter(item => item.COMM == "ps" || item.COMMAND == "ps").map(function (p) { return p.PID })
        pids.push(...info)
        console.log(`stop child process ids: ${JSON.stringify(pids)}`)
        spawn(\'kill\', [\'-9\'].concat(pids));
        cb(null,"ok")
      })
}
module.exports = {
    stopProcess
};
  

metrics.js prometheus metrics 模块
主要是metrics 定义,以及一个初始化的方法

 
const Prometheus = require("prom-client")
const gcStats = require(\'prometheus-gc-stats\')
module.exports = {
    child_process_status_all_counter:new Prometheus.Counter({
        name: \'child_process_status_all_total\',
        help: \'all running process\',
        labelNames: [\'process_all\']
      }),
      child_process_status_pending_gauge:new Prometheus.Gauge({
        name: \'current_child_process_status_pending\',
        help: \'current pending process\',
        labelNames: [\'process_pending\']
      }),
      child_process_status_ok_counter:new Prometheus.Counter({
        name: \'child_process_status_ok_total\',
        help: \'all ok process\',
        labelNames: [\'process_ok\']
      }),
      process_ok_clean_timer__status:new Prometheus.Gauge({
        name: \'process_ok_clean_timer_status\',
        help: \'process_ok_clean_timer_status\',
        labelNames: [\'process_timer\']
      }),
      up:new Prometheus.Gauge({
        name: \'up\',
        help: \'metrics_status\',
        labelNames: [\'metrics_status\']
      }),
      initGcStats: function(){
        const startGcStats = gcStats(Prometheus.register)
        startGcStats()
      },
      initMetrics:function(req,res){
        res.set(\'Content-Type\', Prometheus.register.contentType)
        res.end(Prometheus.register.metrics())
      }
}

taskstatus.js 状态存储(很简单,就是一个json 对象)

module.exports = {
    child_process_status_all:{},
    child_process_status_pending:{},
    child_process_status_ok:{}
}

send_mail.js 子进程任务处理

async function sendMultipleMails(mails) {
    let sendMails = 0;
    // logic for
    // sending multiple mails
    return sendMails;
 }
 // receive message from master process
 process.on(\'message\', async (message) => {
   console.log("get messageId",message)
   const numberOfMailsSend = await sendMultipleMails(message.mailsid); 
   setTimeout(()=>{
    process.send({ counter: numberOfMailsSend });
   },Number.parseInt(Math.random()*10000))
   // send response to master process
 });

Dockerfile 为了方便使用docker 运行

FROM node:12.14.0-alpine3.9
WORKDIR /app
COPY app.js /app/app.js
COPY package.json /app/package.json
COPY yarn.lock /app/yarn.lock
COPY metrics.js /app/metrics.js
COPY utils.js /app/utils.js
COPY taskstatus.js /app/taskstatus.js
COPY send_mail.js /app/send_mail.js
#CMD [ "yarn","app"]
EXPOSE 8080
RUN yarn
ENTRYPOINT [ "yarn","app" ]

docker-compose 文件主要是包含prometheus 以及grafana还有nodejs 应用

version: "3"
services:
  app:
    build: ./
    ports: 
    - "8080:8080"
    image: dalongrong/node-process
  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"
  prometheus:
    image: prom/prometheus
    volumes:
      - "./prometheus.yml:/etc/prometheus/prometheus.yml"
    ports:
      - "9090:9090"

启动&&效果

  • 启动
docker-compose up -d
  • 简单压测
ab -n 200 -c 20 http://localhost:8080/endpoint
  • grafana 效果

 

 

说明

以上就是一个简单的基于fork 以及prometheus 的nodejs 子任务创建

参考资料

https://github.com/rongfengliang/node-process-fork-learning
https://nodejs.org/api/child_process.html#child_process_child_process_fork_modulepath_args_options

以上是关于nodejs fork 子进程创建任务以及简单的prometheus 监控的主要内容,如果未能解决你的问题,请参考以下文章

golang 进程创建,fork,以及热重启(无缝升级)

fork/Join 线程可以创建多个吗?

进程管理

一次java进程fork大量子进程导致OOM的解决方案

fork()、pipe() 和 exec() 进程创建和通信

nodejs 怎样检测子进程执行完成