Spark + Kafka大数据环境的搭建和示例的简单运行

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark + Kafka大数据环境的搭建和示例的简单运行相关的知识,希望对你有一定的参考价值。

参考技术A 搭建Hadoop集群环境一般建议三个节点以上,一个作为Hadoop的NameNode节点。另外两个作为DataNode节点。在本次实验中,采用了三台CentOS 7.5作为实验环境。

将所需要的java 文件解压到合适的目录,并将整个java 目录添加进 /etc/profile ,并 source /etc/profile

需要说明的是ssh免密登录的配置不是双向的,是单向的。也就是说,每个节点都需要和另外两个节点进行ssh的免密配置。

此时会在用户目录的 .ssh 下,生成秘钥文件。现在需要将此验证文件拷贝至slave1节点,

在 /etc/profile 目录下追加:

vim /home/postgres/hadoop/hadoop-3.3.0/etc/hadoop/hadoop-env.sh 修改配置文件java路径

vim /home/postgres/hadoop/hadoop-3.3.0/etc/hadoop/core-site.xml 修改core-site文件

vim /home/postgres/hadoop/hadoop-3.3.0/etc/hadoop/hdfs-site.xml 修改hdfs-site文件

vim /home/postgres/hadoop/hadoop-3.3.0/etc/hadoop/yarn-site.xml 修改yarn-site文件

vim /home/postgres/hadoop/hadoop-3.3.0/etc/hadoop/mapred-site.xml 修改mapred-site文件

vim /home/postgres/hadoop/hadoop-3.3.0/etc/hadoop/slaves 修改slaves文件

手动创建文件夹:/home/postgres/hadoop/hdfs/ logs 和 data 目录,并分配777权限。

在hadoop初始化启动后,在master上xxx/name/namesecondary/下会自动创建./current/VERSION文件路径。

在master运行: hadoop namenode -format

如果有必要,运行DataNode命令: hadoop datanode -format

master+slave1+slave2启动集群: start-all.sh

master : jps

slave1 : jps

slave2 : jps

hadoop dfsadmin -report

按照上述的配置情况:一个namenode节点,两个datanode节点,整个集群监控情况如下:

输入 http://[master ip]:9870 就可以在浏览器看到hdfs集群的监控情况。

输入 http://[master ip]:8001就可以在浏览器看到hadoop集群的监控情况。

可以尝试向Hadoop中插入第一个mapreduce任务:

hdfs dfs -mkdir /HadoopTest 在文件系统中创建一个目录

hdfs dfs -put a.txt /HadoopTest 向创建的目录中存放第一个a.txt文件

hdfs dfs -ls /HadoopTest 查看文件系统的情况

hdfs dfs -text /HadoopTest/a.txt 查看所需要查询文件的内容

检验是否搭建成功的方法就是,运行一个官方的demo查看是否可以运行。

最终运行后,发现可以得出正确的结果,虽然不太准确。。。。

下面运行另一个demo:统计某个文件中的所有单词。

之前的运行结果会显示失败的个数等信息,如上述显示:没有任何报错信息。接下来查看输出文件夹下有多少个文件。一个是 _SUCCESS , part-r-00000 。

然后查看每个文件的内容,因为上述运行的结果是统计单词的个数(估计是数空格)。所以 part-r-00000 就是将a.txt文件中的内容进行单词的统计,并将结果按照英文字母排序。

准备一个scala的安装包 scala-2.13.4.rpm

解压缩maven压缩包 apache-maven-3.6.3-bin.tar.gz

将maven的路径配置进/etc/profile的PATH路径,并检查安装是否生效。

首先配置maven的环境变量,在 /etc/profile 下添加如下信息:

修改scala的版本信息为2.13

解压spark安装包

由于spark和hadoop里面关于启动集群的指令是一样的,所以这里就不打算配置spark的环境变量了。想用直接去目录直接运行即可。

进入spark的安装目录,即可查看所有的文件如下:

修改 conf/spark-env.sh 文件

修改 conf/spark-defaults.conf 文件

修改 slaves 文件

将上述文件分别拷贝至slave1,slave2中。

在master节点启动spark

首先验证其在master上运行spark业务的正确性。 --master spark://192.168.65.140:7077

接下来验证其在yarn架构下程序运行的正确性 --master yarn

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十三)定义一个avro schema使用comsumer发送avro字符流,producer接受avro字符流并解析(示例代码

参考《在Kafka中使用Avro编码消息:Consumer篇》、《在Kafka中使用Avro编码消息:Producter篇

pom.xml

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.21</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.0</version>
        </dependency>

        <dependency>
            <groupId>com.twitter</groupId>
            <artifactId>bijection-avro_2.10</artifactId>
            <version>0.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.7.4</version>
        </dependency>

需要依赖于avro的包,同时这里是需要使用kafka api。

在使用 Avro 之前,我们需要先定义模式(schemas)。模式通常使用 JSON 来编写,我们不需要再定义相关的类,这篇文章中,我们将使用如下的模式:

{
    "fields": [
        { "name": "str1", "type": "string" },
        { "name": "str2", "type": "string" },
        { "name": "int1", "type": "int" }
    ],
    "name": "Iteblog",
    "type": "record"
}

上面的模式中,我们定义了一种 record 类型的对象,名字为 Iteblog,这个对象包含了两个字符串和一个 int 类型的fields。定义好模式之后,我们可以使用 avro 提供的相应方法来解析这个模式:

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);

这里的 USER_SCHEMA 变量存储的就是上面定义好的模式。

解析好模式定义的对象之后,我们需要将这个对象序列化成字节数组,或者将字节数组转换成对象。Avro 提供的 API 不太易于使用,所以本文使用 twitter 开源的 Bijection 库来方便地实现这些操作。我们先创建 Injection 对象来讲对象转换成字节数组:

Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

现在我们可以根据之前定义好的模式来创建相关的 Record,并使用 recordInjection 来序列化这个 Record :

GenericData.Record record = new GenericData.Record(schema);
avroRecord.put("str1", "My first string");
avroRecord.put("str2", "My second string");
avroRecord.put("int1", 42);
 
byte[] bytes = recordInjection.apply(record);

Producter实现

有了上面的介绍之后,我们现在就可以在 Kafka 中使用 Avro 来序列化我们需要发送的消息了:

package example.avro;

import java.util.Properties;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;

public class AvroKafkaProducter {
    public static final String USER_SCHEMA = 
             "{" 
            + "\"type\":\"record\"," 
            + "\"name\":\"Iteblog\"," 
            + "\"fields\":[" 
            + "  { \"name\":\"str1\", \"type\":\"string\" }," 
            + "  { \"name\":\"str2\", \"type\":\"string\" },"
            + "  { \"name\":\"int1\", \"type\":\"int\" }" 
            + "]}";
    public static final String TOPIC = "t-testavro";

    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.0.121:9092,192.168.0.122:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(USER_SCHEMA);
        Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

        KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props);
        
        for (int i = 0; i < 1000; i++) {
            GenericData.Record avroRecord = new GenericData.Record(schema);
            avroRecord.put("str1", "Str 1-" + i);
            avroRecord.put("str2", "Str 2-" + i);
            avroRecord.put("int1", i);

            byte[] bytes = recordInjection.apply(avroRecord);

            ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(TOPIC, "" + i, bytes);
            producer.send(record);
            System.out.println(">>>>>>>>>>>>>>>>>>" + i);
        }

        producer.close();
        System.out.println("complete...");
    }
}

因为我们使用到 Avro 和 Bijection 类库,所有我们需要在 pom.xml 文件里面引入以下依赖:

<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.8.0</version>
</dependency>
 
<dependency>
  <groupId>com.twitter</groupId>
  <artifactId>bijection-avro_2.10</artifactId>
  <version>0.9.2</version>
</dependency>

从 Kafka 中读取 Avro 格式的消息

从 Kafka 中读取 Avro 格式的消息和读取其他类型的类型一样,都是创建相关的流,然后迭代:

ConsumerConnector consumer = ...;
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
for (final KafkaStream stream : streams) {
    ....
}

关键在于如何将读出来的 Avro 类型字节数组转换成我们要的数据。这里还是使用到我们之前介绍的模式解释器:

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

上面的 USER_SCHEMA 就是上边介绍的消息模式,我们创建了一个 recordInjection 对象,这个对象就可以利用刚刚解析好的模式将读出来的字节数组反序列化成我们写入的数据:

GenericRecord record = recordInjection.invert(message).get();

然后我们就可以通过下面方法获取写入的数据:

record.get("str1")
record.get("str2")
record.get("int1")

Kafka 0.9.x 版本Consumer实现

package example.avro;

import java.util.Collections;
import java.util.Properties;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;

public class AvroKafkaConsumer {
    public static void main(String[] args) {
        Logger logger = LoggerFactory.getLogger("AvroKafkaConsumer");
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.0.121:9092,192.168.0.122:9092");
        props.put("group.id", "testgroup");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", ByteArrayDeserializer.class.getName());

        KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(props);

        consumer.subscribe(Collections.singletonList(AvroKafkaProducter.TOPIC));
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(AvroKafkaProducter.USER_SCHEMA);
        Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

        try {
            while (true) {
                ConsumerRecords<String, byte[]> records = consumer.poll(1000);
                for (ConsumerRecord<String, byte[]> record : records) {
                    GenericRecord genericRecord = recordInjection.invert(record.value()).get();
                    String info = String.format(String.format("topic = %s, partition = %s, offset = %d, customer = %s,country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), genericRecord.get("str1")));
                    logger.info(info);
                }
            }
        } finally {
            consumer.close();
        }
    }

}

测试:

producer:

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.0.1
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : a7a17cdec9eaa6c5
>>>>>>>>>>>>>>>>>>0
>>>>>>>>>>>>>>>>>>1
>>>>>>>>>>>>>>>>>>2
>>>>>>>>>>>>>>>>>>3
>>>>>>>>>>>>>>>>>>4
>>>>>>>>>>>>>>>>>>5
>>>>>>>>>>>>>>>>>>6
>>>>>>>>>>>>>>>>>>7
>>>>>>>>>>>>>>>>>>8
>>>>>>>>>>>>>>>>>>9
>>>>>>>>>>>>>>>>>>10
...
>>>>>>>>>>>>>>>>>>997
>>>>>>>>>>>>>>>>>>998
>>>>>>>>>>>>>>>>>>999
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
complete...

consumer:

[main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4321, customer = 165,country = Str 1-165
[main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4322, customer = 166,country = Str 1-166
[main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4323, customer = 167,country = Str 1-167
[main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4324, customer = 168,country = Str 1-168
[main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4325, customer = 169,country = Str 1-169
[main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4326, customer = 170,country = Str 1-170
[main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4327, customer = 171,country = Str 1-171

 

以上是关于Spark + Kafka大数据环境的搭建和示例的简单运行的主要内容,如果未能解决你的问题,请参考以下文章

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十三)定义一个avro schema使用comsumer发送avro字符流,producer接受avro字符流并解析(示例代码

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十二)Spark Streaming接收流数据及使用窗口函数

Kafka:ZK+Kafka+Spark Streaming集群环境搭建定制一个arvo格式文件发送到kafka的topic,通过sparkstreaming读取kafka的数据

Kafka:ZK+Kafka+Spark Streaming集群环境搭建安装spark2.2.1

「大数据」(七十四)Spark之应用案例编程

Kafka:ZK+Kafka+Spark Streaming集群环境搭建安装zookeeper-3.4.12