大数据采集中的异步处理问题

Posted jingyingggong

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据采集中的异步处理问题相关的知识,希望对你有一定的参考价值。

这段时间在学习nodejs,用jsdom采集了一些数据,也遇到了一些问题,贴出来征求一下大家的解决方案。

首先说一下目的,有几十万条图片数据,把这些图片抓取到本地文件夹中,采集完成后把成功数据归档为done.json,失败数据归档为undone.json,如下:

1 const IMGS = [
2     "http://xxx.com/1.jpg",
3     "http://xxx.com/2.jpg",
4     "http://xxx.com/3.jpg",
5     "http://xxx.com/4.jpg",
6     ...//此处省略无数的数据
7 ];

采集的心路历程:

方法1 forEach循环直接请求(失败

1 IMGS.forEach(v=>{
2      http.get(v, (res)=> {
3          //blablabla
4      });
5 }); 

这种方法一下子就把所有数据都异步的请求出去了,超出了网络链接数,直接就把服务爆掉了。

方法2 采用递归处理完一条自动处理下一条。(成功,但是有问题

 1 const DONE = [],
 2       UNDONE = [],
 3       FILEDONE = ‘./done.json‘,
 4       FILEUNDONE = ‘./undone.json‘;
 5 let counter = 0;
 6 const writeResult() =>{
 7     fs.writeFile(FILEDONE, JSON.stringify(DONE), "utf8", ()=>console.log(‘done saved!‘));
 8     fs.writeFile(FILEUNDONE , JSON.stringify(UNDONE), "utf8", ()=>console.log(‘undone saved!‘));
 9 }
10 const saveImage = (data, fn)=> {
11     fs.writeFile(counter++ + ‘.jpg‘, data, fn);
12 }
13 const downloadImage = (i)=> {
14     if(i >= IMGS.length) {
15         writeResult();
16         return;
17     }
18     http.get(IMGS[i], (res)=> {
19         //此处略去了异常处理的内容
20         res.setEncoding="binary";
21         let dt = ‘‘;
22         res.on(‘data‘, (ck)=> dt+=ck;)
23         res.on(‘end‘,()=> {
24             saveImage(dt, ()=> {
25                 DONE.push(IMGS[i]);
26                 downloadImag(i++);
27             });
28         })
29         
30     }).on(‘error‘,()=>UNDONE.push(IMGS[i])) ;
31 }
32 
33 downloadImage(0);

这种方法的确是能成功抓取到了数据,但是有一个最大的弊端就是只能一条一条顺序抓取,没办法批量并发一起抓取,这简直就是浪费了nodejs非阻塞机制。

好,变通处理添加循环多处理机制。

方法3 采用添加间隔器的方式进行批量处理。

const DONE = [],
    UNDONE = [],
    FILEDONE = ‘./done.json‘,
    FILEUNDONE = ‘./undone.json‘,
    STEP = 20;
let counter = 0, stepi = 0;
const writeResult() =>{
    if(stepi < STEP) return; //只有全部都请求完后才真正执行归档功能
    fs.writeFile(FILEDONE, JSON.stringify(DONE), "utf8", ()=>console.log(‘done saved!‘));
    fs.writeFile(FILEUNDONE , JSON.stringify(UNDONE), "utf8", ()=>console.log(‘undone saved!‘));
}
const saveImage = (data, fn)=> {
    fs.writeFile(counter++ + ‘.jpg‘, data, fn);
}
const downloadImage = (i)=> {
    if(i >= IMGS.length) {
        stepi++;
        writeResult();
        return;
    }
    http.get(IMGS[i], (res)=> {
            //此处略去了异常处理的内容
         res.setEncoding="binary";
            let dt = ‘‘;
            res.on(‘data‘, (ck)=> dt+=ck;)
            res.on(‘end‘,()=> {
                saveImage(dt, ()=> {
                    DONE.push(IMGS[i]);
                    downloadImag(i + STEP);
                });
            })
        }).on(‘error‘,()=>{
            downloadImag(i + STEP);
            UNDONE.push(IMGS[i])
        });
}

for(let i = 0; i < STEP; i++) {
    downloadImage(i); 
}//循环step个请求

 

方法4. 利用promise对象(暂时还未成功,还在想办法,有想法的同学可以贴出来大家借鉴一下)

let promise_imgs = IMGS.map(v => {
    new Promise((resolve, reject)=> {
        setTimeout(()=>{
            http.get(v, (res)=> {
                //blablabla
                res.on(‘end‘, ()=>
                    saveImage(dt, ()=>resolve())
                })
            })
        }, 0);
    });
});

Promise.all(promise_imgs).then(writeResult);//这里还是会发生和方法1一样的问题,多回调异步并发造成网络链接数的用尽,

大家有什么好的方法快快贴出来吧!

 

以上是关于大数据采集中的异步处理问题的主要内容,如果未能解决你的问题,请参考以下文章

WPF使用异步+绑定的方式处理大数据量

当活动中的异步任务完成时如何在片段中重新加载ui?

Oracle 数据库 - 使用UEStudio修改dmp文件版本号,解决imp命令恢复的数据库与dmp本地文件版本号不匹配导致的导入失败问题,“ORACLE error 12547”问题处理(代码片段

使用带有 viewpager 的异步任务时的竞争条件

在哪里以及如何使用片段填充我的标签

异步任务片段背景数据