基于PySpark的航天日志分析(SQL分析)

Posted SongpingWang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于PySpark的航天日志分析(SQL分析)相关的知识,希望对你有一定的参考价值。


文章目录


基于PySpark的航天日志 ——运行环境`jupyter notebook`

1、导入PySpark包

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
-----------------------------------------------------------
#设置环境变量
os.environ[JAVA_HOME] = C:/Java/jdk1.8.0_91
#hadoop安装目录
os.environ[HADOOP_HOME] = C:/Java/hadoop-2.6.0-cdh5.7.6
#设置spark安装目录
os.environ[SPARK_HOME] = C:/Java/spark-2.2.0-bin-2.6.0-cdh5.7.6

2、创建SparkSession实例对象

#创建spark实例:local[2]本地模式两个线程。
spark = SparkSession.builder\\
.appName(Python_Spark_test)\\
.master(local[2])\\
.getOrCreate()
------------------------------------------------------------------
# 读取数据
print type(spark)
print spark
输出:
<class pyspark.sql.session.SparkSession>
<pyspark.sql.session.SparkSession object at 0x000000000670BD68>

3、读取数据(Schema()信息)

读取数据方法1
#读取csv格式文件
df = spark.read.csv("flights.csv",header = True)
df.head(5)
输出:
[Row(year=u2014, month=u1, day=u1, dep_time=u1, dep_delay=u96, arr_time=u235, arr_delay=u70, carrier=uAS, tailnum=uN508AS, flight=u145, origin=uPDX, dest=uANC, air_time=u194, distance=u1542, hour=u0, minute=u1),
Row(year=u2014, month=u1, day=u1, dep_time=u4, dep_delay=u-6, arr_time=u738, arr_delay=u-23, carrier=uUS, tailnum=uN195UW, flight=u1830, origin=uSEA, dest=uCLT, air_time=u252, distance=u2279, hour=u0, minute=u4),
Row(year=u2014, month=u1, day=u1, dep_time=u8, dep_delay=u13, arr_time=u548, arr_delay=u-4, carrier=uUA, tailnum=uN37422, flight=u1609, origin=uPDX, dest=uIAH, air_time=u201, distance=u1825, hour=u0, minute=u8),
Row(year=u2014, month=u1, day=u1, dep_time=u28, dep_delay=u-2, arr_time=u800, arr_delay=u-23, carrier=uUS, tailnum=uN547UW, flight=u466, origin=uPDX, dest=uCLT, air_time=u251, distance=u2282, hour=u0, minute=u28),
Row(year=u2014, month=u1, day=u1, dep_time=u34, dep_delay=u44, arr_time=u325, arr_delay=u43, carrier=uAS, tailnum=uN762AS, flight=u121, origin=uSEA, dest=uANC, air_time=u201, distance=u1448, hour=u0, minute=u34)]
# 查看DataFrame的Schema信息
df.printSchema()

输出:
root
|-- year: string (nullable = true)
|-- month: string (nullable = true)
|-- day: string (nullable = true)
|-- dep_time: string (nullable = true)
|-- dep_delay: string (nullable = true)
|-- arr_time: string (nullable = true)
|-- arr_delay: string (nullable = true)
|-- carrier: string (nullable = true)
|-- tailnum: string (nullable = true)
|-- flight: string (nullable = true)
|-- origin: string (nullable = true)
|-- dest: string (nullable = true)
|-- air_time: string (nullable = true)
|-- distance: string (nullable = true)
|-- hour: string (nullable = true)
|-- minute: string (nullable = true)
读取数据方法2
# 指定程序自定推断Schema数据
df2 = spark.read.csv(flights.csv, header=True, inferSchema=True)
df2.printSchema()

输出:
root
|-- year: integer (nullable = true)
|-- month: integer (nullable = true)
|-- day: integer (nullable = true)
|-- dep_time: string (nullable = true)
|-- dep_delay: string (nullable = true)
|-- arr_time: string (nullable = true)
|-- arr_delay: string (nullable = true)
|-- carrier: string (nullable = true)
|-- tailnum: string (nullable = true)
|-- flight: integer (nullable = true)
|-- origin: string (nullable = true)
|-- dest: string (nullable = true)
|-- air_time: string (nullable = true)
|-- distance: integer (nullable = true)
|-- hour: string (nullable = true)
|-- minute: string (nullable = true)

4、查看DataFrame数据信息(显示完整【列名】不省略)

df.printSchema()         # 查看Schame信息
输出:
root
|-- year: string (nullable = true)
|-- month: string (nullable = true)
|-- day: string (nullable = true)
|-- dep_time: string (nullable = true)
|-- dep_delay: string (nullable = true)
|-- arr_time: string (nullable = true)
|-- arr_delay: string (nullable = true)
|-- carrier: string (nullable = true)
|-- tailnum: string (nullable = true)
|-- flight: string (nullable = true)
|-- origin: string (nullable = true)
|-- dest: string (nullable = true)
|-- air_time: string (nullable = true)
|-- distance: string (nullable = true)
|-- hour: string (nullable = true)
|-- minute: string (nullable = true)
df.schema
输出:
StructType(List(StructField(year,StringType,true),StructField(month,StringType,true),
StructField(day,StringType,true),StructField(dep_time,StringType,true),
StructField(dep_delay,StringType,true),StructField(arr_time,StringType,true),
StructField(arr_delay,StringType,true),StructField(carrier,StringType,true),
StructField(tailnum,StringType,true),StructField(flight,StringType,true),
StructField(origin,StringType,true),StructField(dest,StringType,true),
StructField(air_time,StringType,true),StructField(distance,StringType,true),
StructField(hour,StringType,true),StructField(minute,StringType,true)))
df.show()   # 显示前N条数据, 默认显示前20条数据
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014| 1| 1| 1| 96| 235| 70| AS| N508AS| 145| PDX| ANC| 194| 1542| 0| 1|
|2014| 1| 1| 4| -6| 738| -23| US| N195UW| 1830| SEA| CLT| 252| 2279| 0| 4|
|2014| 1| 1| 8| 13| 548| -4| UA| N37422| 1609| PDX| IAH| 201| 1825| 0| 8|
|2014| 1| 1| 28| -2| 800| -23| US| N547UW| 466| PDX| CLT| 251| 2282| 0| 28|
|2014| 1| 1| 34| 44| 325| 43| AS| N762AS| 121| SEA| ANC| 201| 1448| 0| 34|
|2014| 1| 1| 37| 82| 747| 88| DL| N806DN| 1823| SEA| DTW| 224| 1927| 0| 37|
|2014| 1| 1| 346| 227| 936| 219| UA| N14219| 1481| SEA| ORD| 202| 1721| 3| 46|
|2014| 1| 1| 526| -4| 1148| 15| UA| N813UA| 229| PDX| IAH| 217| 1825| 5| 26|
|2014| 1| 1| 527| 7| 917| 24| UA| N75433| 1576| SEA| DEN| 136| 1024| 5| 27|
|2014| 1| 1| 536| 1| 1334| -6| UA| N574UA| 478| SEA| EWR| 268| 2402| 5| 36|
|2014| 1| 1| 541| 1| 911| 4| UA| N36476| 1569| PDX| DEN| 130| 991| 5| 41|
|2014| 1| 1| 549| 24| 907| 12| US| N548UW| 649| PDX| PHX| 122| 1009| 5| 49|
|2014| 1| 1| 550| 0| 837| -12| DL| N660DL| 1634| SEA| SLC| 82| 689| 5| 50|
|2014| 1| 1| 557| -3| 1134| -16| AA| N3JLAA| 1094| SEA| DFW| 184| 1660| 5| 57|
|2014| 1| 1| 557| -3| 825| -25| AS| N562AS| 81| SEA| ANC| 188| 1448| 5| 57|
|2014| 1| 1| 558| -2| 801| -2| AS| N402AS| 200| SEA| SJC| 100| 697| 5| 58|
|2014| 1| 1| 559| -1| 916| -9| F9| N210FR| 796|以上是关于基于PySpark的航天日志分析(SQL分析)的主要内容,如果未能解决你的问题,请参考以下文章

pyspark.sql.utils.AnalysisException:u'无法推断Parquet的模式。必须手动指定。

Spark 实践——基于 Spark Streaming 的实时日志分析系统

PySpark数据分析基础:pyspark.sql.SparkSession类方法详解及操作+代码展示

基于 Scheduled SQL 对 VPC FlowLog 实现细粒度时间窗口分析

基于pySpark得地震数据处理与分析

基于Python Spark的大数据分析_pyspark实战项目课程