基于Vert.x和RxJava 2构建通用的爬虫框架

Posted Java与Android技术栈

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于Vert.x和RxJava 2构建通用的爬虫框架相关的知识,希望对你有一定的参考价值。

最近由于业务需要监控一些数据,虽然市面上有很多优秀的爬虫框架,但是我仍然打算从头开始实现一套完整的爬虫框架。

在技术选型上,我没有选择Spring来搭建项目,而是选择了更轻量级的Vert.x。一方面感觉Spring太重了,而Vert.x是一个基于JVM、轻量级、高性能的框架。它基于事件和异步,依托于全异步Java服务器Netty,并扩展了很多其他特性。

一. 爬虫框架的功能

爬虫框架包含爬虫引擎(SpiderEngine)和爬虫(Spider)。SpiderEngine可以管理多个Spider。

1.1 Spider

在Spider中,主要包含几个组件:downloader、queue、parser、pipeline以及代理池IP(proxypool),代理池是一个单独的项目,我前段时间写的,在使用爬虫框架时经常需要切换代理IP,所以把它引入进来。


其余四个组件都是接口,在爬虫框架中内置了一些实现,例如内置了多个下载器(downloader)包括vertx的webclient、http client、okhttp3、selenium实现的下载器。开发者可以根据自身情况来选择使用或者自己开发全新的downloader。

Downloader的download方法会返回一个Maybe

 
   
   
 
  1. package com.cv4j.netdiscovery.core.downloader;

  2. import com.cv4j.netdiscovery.core.domain.Request;

  3. import com.cv4j.netdiscovery.core.domain.Response;

  4. import io.reactivex.Maybe;

  5. /**

  6. * Created by tony on 2017/12/23.

  7. */

  8. public interface Downloader {

  9.    Maybe<Response> download(Request request);

  10.    void close();

  11. }

在Spider中,通过Maybe 对象来实现后续的一系列的链式调用,比如将Response转换成Page对象,再对Page对象进行解析,Page解析完毕之后做一系列的pipeline操作。

 
   
   
 
  1.                  downloader.download(request)

  2.                            .observeOn(Schedulers.io())

  3.                            .map(new Function<Response, Page>() {

  4.                                @Override

  5.                                public Page apply(Response response) throws Exception {

  6.                                    Page page = new Page();

  7.                                    page.sethtml(new Html(response.getContent()));

  8.                                    page.setRequest(request);

  9.                                    page.setUrl(request.getUrl());

  10.                                    page.setStatusCode(response.getStatusCode());

  11.                                    return page;

  12.                                }

  13.                            })

  14.                            .map(new Function<Page, Page>() {

  15.                                @Override

  16.                                public Page apply(Page page) throws Exception {

  17.                                    if (parser != null) {

  18.                                        parser.process(page);

  19.                                    }

  20.                                    return page;

  21.                                }

  22.                            })

  23.                            .map(new Function<Page, Page>() {

  24.                                @Override

  25.                                public Page apply(Page page) throws Exception {

  26.                                    if (Preconditions.isNotBlank(pipelines)) {

  27.                                        pipelines.stream()

  28.                                                .forEach(pipeline -> pipeline.process(page.getResultItems()));

  29.                                    }

  30.                                    return page;

  31.                                }

  32.                            })

  33.                            .subscribe(new Consumer<Page>() {

  34.                                @Override

  35.                                public void accept(Page page) throws Exception {

  36.                                    log.info(page.getUrl());

  37.                                    if (request.getAfterRequest()!=null) {

  38.                                        request.getAfterRequest().process(page);

  39.                                    }

  40.                                }

  41.                            }, new Consumer<Throwable>() {

  42.                                @Override

  43.                                public void accept(Throwable throwable) throws Exception {

  44.                                    log.error(throwable.getMessage());

  45.                                }

  46.                            });

在这里使用RxJava 2可以让整个爬虫框架看起来更加响应式:)

基于Vert.x和RxJava 2构建通用的爬虫框架



1.2 SpiderEngine

SpiderEngine可以包含多个Spider,可以通过addSpider()、createSpider()来将爬虫添加到SpiderEngine和创建新的Spider并添加到SpiderEngine。

基于Vert.x和RxJava 2构建通用的爬虫框架


在SpiderEngine中,如果调用了httpd(port)方法,还可以监控SpiderEngine中各个Spider。

1.2.1 获取某个爬虫的状态

http://localhost:{port}/netdiscovery/spider/{spiderName}

类型:GET

1.2.2 获取SpiderEngine中所有爬虫的状态

http://localhost:{port}/netdiscovery/spiders/

类型:GET

1.2.3 修改某个爬虫的状态

http://localhost:{port}/netdiscovery/spider/{spiderName}/status

类型:POST

参数说明:

 
   
   
 
  1. {

  2.    "status":2   //让爬虫暂停

  3. }

status 作用
2 让爬虫暂停
3 让爬虫从暂停中恢复
4 让爬虫停止

使用框架的例子

创建一个SpiderEngine,然后创建三个Spider,每个爬虫每隔一定的时间去爬取一个页面。

 
   
   
 
  1.        SpiderEngine engine = SpiderEngine.create();

  2.        Spider spider = Spider.create()

  3.                .name("tony1")

  4.                .repeatRequest(10000,"http://www.163.com")

  5.                .initialDelay(10000);

  6.        engine.addSpider(spider);

  7.        Spider spider2 = Spider.create()

  8.                .name("tony2")

  9.                .repeatRequest(10000,"http://www.baidu.com")

  10.                .initialDelay(10000);

  11.        engine.addSpider(spider2);

  12.        Spider spider3 = Spider.create()

  13.                .name("tony3")

  14.                .repeatRequest(10000,"http://www.126.com")

  15.                .initialDelay(10000);

  16.        engine.addSpider(spider3);

  17.        engine.httpd(8080);

  18.        engine.run();

上述程序运行一段时间之后,在浏览器中输入:http://localhost:8080/netdiscovery/spiders

我们能看到三个爬虫运行的结果。

基于Vert.x和RxJava 2构建通用的爬虫框架


将json格式化一下

 
   
   
 
  1. {

  2.    "code": 200,

  3.    "data": [{

  4.        "downloaderType": "VertxDownloader",

  5.        "leftRequestSize": 0,

  6.        "queueType": "DefaultQueue",

  7.        "spiderName": "tony2",

  8.        "spiderStatus": 1,

  9.        "totalRequestSize": 7

  10.    }, {

  11.        "downloaderType": "VertxDownloader",

  12.        "leftRequestSize": 0,

  13.        "queueType": "DefaultQueue",

  14.        "spiderName": "tony3",

  15.        "spiderStatus": 1,

  16.        "totalRequestSize": 7

  17.    }, {

  18.        "downloaderType": "VertxDownloader",

  19.        "leftRequestSize": 0,

  20.        "queueType": "DefaultQueue",

  21.        "spiderName": "tony1",

  22.        "spiderStatus": 1,

  23.        "totalRequestSize": 7

  24.    }],

  25.    "message": "success"

  26. }

案例


TODO

  1. 增加对登录验证码的识别

  2. 增加elasticsearch的支持

总结

这个爬虫框架才刚刚起步,我也参考了很多优秀的爬虫框架。未来我会在框架中考虑增加通过截屏图片来分析图片中的数据。甚至会结合cv4j(https://github.com/imageprocessor/cv4j)框架。过年前,在爬虫框架中会优先实现对登录验证码的识别。



关注【Java与Android技术栈】

更多精彩内容请关注扫码


以上是关于基于Vert.x和RxJava 2构建通用的爬虫框架的主要内容,如果未能解决你的问题,请参考以下文章

为爬虫框架构建Selenium模块DSL模块(Kotlin实现)

设置 Hystrix 的请求上下文与在 Vert.X 中运行的 RxJava 崩溃

Vert.x-Web的讲解和使用

Vert.x-Web的讲解和使用

使用Vert.x构建Web服务器和消息系统

从API到DSL —— 使用 Kotlin 特性为爬虫框架进一步封装