大数据反爬日记01

Posted 大树的困惑

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据反爬日记01相关的知识,希望对你有一定的参考价值。

大数据反爬日记01

记录自己的反爬日记

既然要做反爬,就肯定得有有爬虫来爬取页面,这里前面已经写好了一个简单的爬虫,将爬取的数据通过python+flask+gunicorn+nginx部署到linux上面了,接下来通过采集爬虫对页面的请求日志进行分析

1.环境准备

hadoop (因为是采用最近比较火的大数据技术,所以需要提前准备好大数据的相关环境)

hive(用于分析离线指标)

spark(用spark引擎来分析实时请求的日志)

Hbase(大数据的数据库)

filebeat+logstash(请求日志的采集工具)

kafka(高吞吐的实时的消息队列)

zookeeper

azkaban(用于任务的调度)

以上作为整个反爬的架构准备,本文主要讲逻辑开发,环境搭建可以参考网上的博文,后续可能会更新大数据框架的搭建

本次仅记录采集数据到kafka,通过spark进行数据etl+结构化输出

下个博客在进行爬虫分析

2.UML图

简单设计建模以下,后续可能会有改动,但是大致的方向就如上图所示,比较简单

补充:

关于数据采集方面,有很多选择,比如flume,或值filebeat可以直接发送到kafka,这里之所以这么选型主要是模拟的是一种大数据环境.

filebeat是logstash的一种轻量化实现,用于部署在承载着web程序的服务器上,减少服务器压力,将采集的日志转发到部署logstash的服务器上,由于lostash是重量级的java程序,所以应该部署在一个资源相对多的服务上专门作为处理数据转发的服务器(不参与业务服务)

3.开干

数据采集

​ 数据源

python爬虫日记01

Python爬虫日记02-数据可视化_

​ 采集工具

filebeat的搭建与配置

在程序根目录创建conf文件夹,并新建配置文件filebeat_logstash.yml

进行配置filebeat_logstash.yml

filebeat:
  prospectors:
    - input_type: log 
      paths:
        - /usr/local/nginx/logs/user_access.log 


output.logstash:
  hosts: ["192.168.100.230:5044"]

/usr/local/nginx/logs/user_access.log 是我们采集的日志路径

hosts: [“192.168.100.230:5044”] 是我们转发到对应logstash所监听的端口 这里为了方便调试就在同一台机器进行实现,如果要在不同的服务器部署,需要开通对应的防火墙和通信

在程序根目录启动filebeat测试

 ./filebeat -e -c conf/filebeat_logstash.yml

这里报错是因为我们还没有启动logstash监听5044端口

部署logstash

在根目录创建myconfs目录,并新建配置文件

filebeat-logstash-kafka.conf

input 
    beats 
        port => "5044"
    


output
    file
    path => ['/logall/data/log/filebeat/filebeat-output.log']  


启动logstash进行测试

bin/logstash -f myconfs/filebeat-logstash-kafka.conf 

成功启动了监听

然后启动filebeat

也启动成功了

查看我们logstash的落地文件/logall/data/log/filebeat/filebeat-output.log

cat /logall/data/log/filebeat/filebeat-output.log

可以看到我们的数据已经同步过来了,我们的请求数据是以#cs#作为分割符进行拼接的,这个在之前一篇博客有nginx的配置

这里数据仅仅是落地到另一个文件而已,我们期望的是转发到kafka中,所以我们需要准备好kafka

数据发送到消息队列

对接kafka

kafka依赖于zookeeper,所以我们需要先启动zookeeper

这里准备了三台虚拟机,分别启动zk,进入zookeeper的根目录

bin/zkServer.sh start

分别启动了三台之后,查看zk状态

bin/zkServer.sh status

这样就是启动成功了

然后分别启动kafka

进入kafka的目录,采用后台启动的方式

nohup bin/kafka-server-start.sh config/server.properties 2>&1 &

这般则为启动成功

新建一个topic 在kafka的根目录下执行

bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --replication-factor 1 --partitions 3 --topic spider_nginx_log

简单介绍以下这几个参数

kafka-topics.sh 这个是调用脚本,针对kafkatopic主题的

create 是告诉脚本创建topic

zookeeper 是指定我们kafka已经配置好的zookeeper,里面存放着一些元数据信息,包括历史的topic,一些offset等等…

replication-factor 指定我们数据的副本数量,这里测试,所以不需要太多副本

partitions 指定我们的分区,分区决定了我们数据的分散成都,也是kafka最大的特点之一,因为分区的存在,让kafka有很大的并行度支持大吞吐,当然支持大吞吐还有零拷贝这一大杀器

topic 指定我们创建的topic

修改logstash的配置,之前落地的是文件,现在修改成输出到kafka

input 
    beats 
        port => "5044"
    


output
    kafka 
        bootstrap_servers => "192.168.100.210:9092" 
        topic_id => "spider_nginx_log"
        batch_size => 5
      

重新启动

bin/logstash -f myconfs/filebeat-logstash-kafka.conf 

启动之前部署好的web程序Python爬虫日记02

监听kafka,消费一下topic,然后访问页面观察是否有数据

bin/kafka-console-consumer.sh --from-beginning --topic spider_nginx_log --zookeeper node01:2181,node02:2181,node03:2181

这边看到数据已经传入了,接下来配置spark对接kafka

Spark接入kafka

1.在本地创建maven项目

使用的工具是IDEA

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>it.luke</groupId>
    <artifactId>Spider_spark</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.2.0</spark.version>
    </properties>

    <dependencies>

        <!--kafka-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version> 0.10.0.0</version>
        </dependency>

        <!--scala-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>

        <!--spark-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>$spark.version</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>$spark.version</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>$spark.version</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>

            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>$project.build.directory/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

新建scala object

package it.luke.spark.spider_kafka

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.Seconds, StreamingContext

/**
  * @author luke
  * @date 2021/5/1521:17
  */
object KafkaStream 

  def main(args: Array[String]): Unit = 
    /*
 *  1.创建配置对象,生成sparkStream对象
 *  2.配置kafka参数,连接kafka
 *  3.读取kafka,解析数据,结构化,输出
 *
 * */
    //sparkconf
    var conf = new SparkConf().setMaster("local").setAppName("my_spiderapp")
    //本地测试
    //创建stream对象
    val scc = new StreamingContext(conf, Seconds(2))
    scc.sparkContext.setLogLevel("WARN") //提高日志输出级别,方便调试
    //kafka 配置
    //kafka topic
    val topics = Array("spider_nginx_log")
    //kafka 其它参数
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "mygroup_spider_nginx_log",
      "auto.offset.reset" -> "latest", //可以指定读取kafka的进度
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    /*createDirectStream(
        ssc: StreamingContext,
        locationStrategy: LocationStrategy,
        consumerStrategy: ConsumerStrategy[K, V])


    * */
    val source = KafkaUtils.createDirectStream(scc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )
    /*
    * LocationStrategies.PreferConsistent,是一种分配策略,让每个executor参与分区
    * ConsumerStrategies.Subscribe 是一种消费策略
    *   Subscribe为consumer自动分配partition,有内部算法保证topic-partitions以最优的方式均匀分配给同group下的不同consumer
    * */
    
    source.foreachRDD(rdd => 
      val asd: RDD[ConsumerRecord[String, String]] = rdd
      asd.map(_.value()).foreach(x=>
        
          var strings = x.split("#csc#")
          if(strings.length >2)
          
            var obj= req_obj(strings(0),
              strings(2),
              strings(3),
              strings(4),
              strings(5),
              strings(6),
              strings(7),
              strings(8)
            )
            println(obj)
        )

//      println(req_obj.toString())
      //提交offset
      val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      source.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
    )

    //启动streaming
    scc.start()
    //阻塞
    scc.awaitTermination()

  



/*
* '$remote_addr"#csc#"-"#csc#"$remote_user"#csc#"[$time_local]"#csc#""$request""#csc#"'
                      '$status"#csc#"$body_bytes_sent"#csc#""$http_referer""#csc#"'
                      '"$http_user_agent""#csc#""$http_x_forwarded_for"';
* */
case class req_obj(remote_addr: String,
                   remote_user: String,
                   time_local: String,
                   request: String,
                   body_bytes_sent: String,
                   http_referer: String,
                   http_user_agent: String,
                   http_x_forwarded_for: String
                  )


这里看到,数据已经从kafka对接过来了,spark是一种伪流式的引擎,因为我们需要指定每一批的接受间隔,

val scc = new StreamingContext(conf, Seconds(2))

这里是每两秒处理一批,将数据封装到rdd里面进行处理

4.总结

接下来要根据采集到的数据,分为两个方向,一个是离线分析,一个是实时分析

离线分析需要主要是展示一种未来或者过去的一种趋势作为分析,所以需要有一定的分析数据.

实时的话,是根据统计短时间内抓到的几批数据中,最有可能是爬虫的数据,进行打标,加入黑名单,并入库

后续就可以根据对接的数据,进行实时分析,根据自定义的策略抓爬虫了^

欢迎分享交流~

以上是关于大数据反爬日记01的主要内容,如果未能解决你的问题,请参考以下文章

大数据反爬日记01

大数据反爬日记01

爬虫日记(71):用OCR来对抗字体反爬

爬虫日记1——百度口碑医学教育网

黑马程序员-大数据反爬项目Lua+Spark+Redis+Hadoop框...高清完整资源

hm0024 - 大数据反爬项目Lua+Spark+Redis+Hadoop框架搭建