在StormCrawler上获取拓扑以正确编写warc文件

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在StormCrawler上获取拓扑以正确编写warc文件相关的知识,希望对你有一定的参考价值。

在我的项目中,stormcrawler maven archetype似乎与warc模块不太搭配。目前,它只创建名为“crawl-20180802121925-00000.warc.gz”的空0字节文件。我在这里错过了什么吗?

我尝试通过创建一个默认项目来启用warc写入:

mvn archetype:generate -DarchetypeGroupId=com.digitalpebble.stormcrawler -DarchetypeArtifactId=storm-crawler-archetype -DarchetypeVersion=1.10

然后将依赖项添加到pom.xml中的warc模块,就像这样

    <dependency>
        <groupId>com.digitalpebble.stormcrawler</groupId>
        <artifactId>storm-crawler-warc</artifactId>
        <version>1.10</version>
    </dependency>

然后我将WARCHdfsBolt添加到fetch分组,同时尝试写入本地文件系统目录。

public class CrawlTopology extends ConfigurableTopology {

    public static void main(String[] args) throws Exception {
        ConfigurableTopology.start(new CrawlTopology(), args);
    }

    @Override
    protected int run(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();

        String[] testURLs = new String[] { "http://www.lequipe.fr/",
                "http://www.lemonde.fr/", "http://www.bbc.co.uk/",
                "http://storm.apache.org/", "http://digitalpebble.com/" };

        builder.setSpout("spout", new MemorySpout(testURLs));

        builder.setBolt("partitioner", new URLPartitionerBolt())
                .shuffleGrouping("spout");

        builder.setBolt("fetch", new FetcherBolt())
                .fieldsGrouping("partitioner", new Fields("key"));

        builder.setBolt("warc", getWarcBolt())
                .localOrShuffleGrouping("fetch");

        builder.setBolt("sitemap", new SiteMapParserBolt())
                .localOrShuffleGrouping("fetch");

        builder.setBolt("feeds", new FeedParserBolt())
                .localOrShuffleGrouping("sitemap");

        builder.setBolt("parse", new JSoupParserBolt())
                .localOrShuffleGrouping("feeds");

        builder.setBolt("index", new StdOutIndexer())
                .localOrShuffleGrouping("parse");

        Fields furl = new Fields("url");

        // can also use MemoryStatusUpdater for simple recursive crawls
        builder.setBolt("status", new StdOutStatusUpdater())
                .fieldsGrouping("fetch", Constants.StatusStreamName, furl)
                .fieldsGrouping("sitemap", Constants.StatusStreamName, furl)
                .fieldsGrouping("feeds", Constants.StatusStreamName, furl)
                .fieldsGrouping("parse", Constants.StatusStreamName, furl)
                .fieldsGrouping("index", Constants.StatusStreamName, furl);

        return submit("crawl", conf, builder);
    }

    private WARCHdfsBolt getWarcBolt() {
        String warcFilePath = "/Users/user/Documents/workspace/test/warc";

        FileNameFormat fileNameFormat = new WARCFileNameFormat()
                .withPath(warcFilePath);

        Map<String,String> fields = new HashMap<>();
        fields.put("software:", "StormCrawler 1.0 http://stormcrawler.net/");
        fields.put("conformsTo:", "http://www.archive.org/documents/WarcFileFormat-1.0.html");

        WARCHdfsBolt warcbolt = (WARCHdfsBolt) new WARCHdfsBolt()
                .withFileNameFormat(fileNameFormat);
        warcbolt.withHeader(fields);

        // can specify the filesystem - will use the local FS by default
//        String fsURL = "hdfs://localhost:9000";
//        warcbolt.withFsUrl(fsURL);

        // a custom max length can be specified - 1 GB will be used as a default
        FileSizeRotationPolicy rotpol = new FileSizeRotationPolicy(50.0f,
                FileSizeRotationPolicy.Units.MB);
        warcbolt.withRotationPolicy(rotpol);
        return warcbolt;
    }
}

无论我是否使用助焊剂在本地运行,似乎没有任何区别。您可以在这里查看演示回购:https://github.com/keyboardsamurai/storm-test-warc

答案

谢谢你问这个。理论上,内容会在写入WARC文件时写入

  1. 在同步策略which we have by default at 10 tuples中设置了显式同步
  2. 有一个自动的,通过滴答元组every 15 secs by default发生
  3. 文件被旋转 - 在你的情况下,这应该发生在内容达到50MB时

由于您用作起点的拓扑不是递归的,并且不处理超过5个URL,因此永远不会满足条件1和3。

你可以通过使用来改变它

builder.setBolt("status", new MemoryStatusUpdater())

代替。这样就可以连续处理新的URL。或者,您可以添加

warcbolt.withSyncPolicy(new CountSyncPolicy(1));

到您的代码,以便在每个元组后触发同步。实际上,您不需要在URL不断出现的真实爬网中执行此操作。

现在奇怪的是,无论同步是由条件1还是2触发,我都看不到文件的任何变化,它仍然保持在0字节。版本1.8不是这种情况

<dependency>
    <groupId>com.digitalpebble.stormcrawler</groupId>
    <artifactId>storm-crawler-warc</artifactId>
    <version>1.8</version>
</dependency> 

所以这可能是由于之后代码的变化。

我知道有些用户一直依赖FileTimeSizeRotationPolicy,它可以根据时间触发上面的条件3。

随意在Github上打开一个问题,我会仔细看看它(当我下个月回来的时候)。

编辑:有一个错误的条目压缩现已修复,将成为下一个SC版本的一部分。

查看关于OP发布的issue的评论。

以上是关于在StormCrawler上获取拓扑以正确编写warc文件的主要内容,如果未能解决你的问题,请参考以下文章

StormCrawler的default-regex-filters.txt

Tika Parser放慢了StormCrawler的速度

在没有zookeeper,nimbus依赖的情况下以本地模式运行storm crawler

Stormcrawler的ContentParseFilter

StormCrawler maven包装错误

如何在StormCrawler中使用快速URL过滤器?