从零系列--node爬虫利用进程池写数据

Posted sjptech

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从零系列--node爬虫利用进程池写数据相关的知识,希望对你有一定的参考价值。

1、主进程

const http = require(‘http‘);
const fs = require(‘fs‘);
const cheerio = require(‘cheerio‘);
const request = require(‘request‘);
const makePool = require(‘./pooler‘)
const runJob = makePool(‘./worker‘)
var i = 0;
var url = "http://xxx.com/articles/"; 
//初始url 
let g = ‘‘;
function fetchPage(x) {     //封装了一层函数
  console.log(x)
  if(!x || x==‘‘){
    g.next()
    return
  }
    startRequest(x); 
}


function startRequest(x) {
     //采用http模块向服务器发起一次get请求      
    return http.get(x, function (res) {     
        var html = ‘‘;        //用来存储请求网页的整个html内容
        var titles = [];        
        res.setEncoding(‘utf-8‘); //防止中文乱码
     //监听data事件,每次取一块数据
        res.on(‘data‘, function (chunk) {   
            html += chunk;
        });
     //监听end事件,如果整个网页内容的html都获取完毕,就执行回调函数
        res.on(‘end‘, function () {
          var $ = cheerio.load(html); //采用cheerio模块解析html

          var time = new Date();
          var p =  $(‘.content p‘)
          p.each((index,item)=>{
                if($(item).find(‘strong‘).length) {
                  var fex_item = {
                    //获取文章的标题
                      title: $(item).find(‘strong‘).text().trim(),
                  //获取文章发布的时间
                      time: time,   
                  //获取当前文章的url
                      link: $($(item).children(‘a‘).get(0)).attr(‘href‘),
                      des:$(item).children().remove()&&$(item).text(),
                  //i是用来判断获取了多少篇文章
                      i: index+1     
      
                  };
                  runJob(fex_item,(err,data)=>{
                    if(err) console.error(‘get link error‘)
                    console.log(‘get link ok‘)
                  })
                }
                
          })
          g.next()
        })         

    }).on(‘error‘, function (err) {
        console.log(err);
        g.next()
    });

}
function* gen(urls){
  let len = urls.length;
  for(var i=0;i<len;i++){
    yield fetchPage(urls[i])
  }
}

function getUrl(x){
    //采用http模块向服务器发起一次get请求      
    http.get(x, function (res) {     
      var html = ‘‘;        //用来存储请求网页的整个html内容
      var titles = [];        
      res.setEncoding(‘utf-8‘); //防止中文乱码
   //监听data事件,每次取一块数据
      res.on(‘data‘, function (chunk) {   
          html += chunk;
      });
   //监听end事件,如果整个网页内容的html都获取完毕,就执行回调函数
      res.on(‘end‘, function () {
        var $ = cheerio.load(html); //采用cheerio模块解析html

        var time = new Date();
        var lists =  $(‘.articles .post-list li‘)
        var urls = [];
        lists.each(function(index,item){
          if($(item).find(‘a‘).length) {
              var url = ‘http://xxxx.com‘+$($(item).children(‘a‘).get(0)).attr(‘href‘);
              if(url)
              urls.push(url);      //主程序开始运行
          }
       })
        g = gen(urls)
        g.next()
      })         

  }).on(‘error‘, function (err) {
      console.log(err);
  });
}

getUrl(url)

2、创建进程池

const cp = require(‘child_process‘)
const cpus = require(‘os‘).cpus().length;

module.exports =  function pooler(workModule){
  let awaiting = [],readyPool = [],poolSize = 0;
  return function doWork(job,cb){
    if(!readyPool.length&&poolSize>cpus)
      return awaiting.push([doWork,job,cb])

    let child = readyPool.length ? readyPool.shift():(poolSize++,cp.fork(workModule))
    let cbTriggered = false;
    child.removeAllListeners()
    .once(‘error‘,function(err){
      if(!cbTriggered){
        cb(err)
        cbTriggered = true
      }
      child.kill()
    })
    .once(‘eixt‘,function(){
      if(!cbTriggered)
      cb(new Error(‘childe exited with code:‘+code))
      poolSize--;
      let childIdx = readyPool.indexOf(child)
      if(childIdx > -1)readyPool.splice(childIdx,1)
    })
    .once(‘message‘,function(msg){
      cb(null,msg)
      cbTriggered = true
      readyPool.push(child)
      if(awaiting.length)setImmediate.apply(null,awaiting.shift())
    })
    .send(job)
  }
}

3、工作进程接受消息并处理内容

const fs = require(‘fs‘)
process.on(‘message‘,function(job){
  let _job = job
  let x = ‘TITLE:‘+_job.title+‘
‘ + ‘LINK:‘+_job.link + ‘
 DES:‘+_job.des+‘
 SAVE-TIME:‘+_job.time
  
  fs.writeFile(‘../xx/data/‘ + _job.title + ‘.txt‘, x, ‘utf-8‘, function (err) {
      if (err) {
          console.log(err);
      }
  });
  process.send(‘finish‘)
})

 

以上是关于从零系列--node爬虫利用进程池写数据的主要内容,如果未能解决你的问题,请参考以下文章

Python爬虫系列:从零开始,安装环境

从零开始学python爬虫(八):selenium提取数据和其他使用方法

系列3|走进Node.js之多进程模型

《从零开始,学会Python爬虫不再难!!!》系列导航(持续更新中)

《从零开始,学会Python爬虫不再难!!!》系列导航(持续更新中)

学习《从零开始学Python网络爬虫》PDF+源代码+《精通Scrapy网络爬虫》PDF