大数据信号分析:存储和查询信号数据的更好方法

Posted

技术标签:

【中文标题】大数据信号分析:存储和查询信号数据的更好方法【英文标题】:Big data signal analysis: better way to store and query signal data 【发布时间】:2016-04-24 10:24:49 【问题描述】:

我正在使用 Hadoop/Spark 进行一些信号分析,我需要有关如何构建整个过程的帮助。

信号现在存储在数据库中,我们将使用 Sqoop 读取它,并将其转换为 HDFS 上的文件,其架构类似于:

<Measure ID> <Source ID> <Measure timestamp> <Signal values>

其中信号值只是由浮点逗号分隔的数字组成的字符串。

000123  S001  2015/04/22T10:00:00.000Z  0.0,1.0,200.0,30.0 ... 100.0
000124  S001  2015/04/22T10:05:23.245Z  0.0,4.0,250.0,35.0 ... 10.0
...
000126  S003  2015/04/22T16:00:00.034Z  0.0,0.0,200.0,00.0 ... 600.0

我们想编写交互式/批量查询到:

对信号值应用聚合函数

SELECT *
FROM SIGNALS
WHERE MAX(VALUES) > 1000.0

选择峰值超过 1000.0 的信号。

在聚合上应用聚合

SELECT SOURCEID, MAX(VALUES) 
FROM SIGNALS
GROUP BY SOURCEID
HAVING MAX(MAX(VALUES)) > 1500.0

选择至少有一个信号超过 1500.0 的信号源。

对样本应用用户定义的函数

SELECT *
FROM SIGNALS
WHERE MAX(LOW_BAND_FILTER("5.0 KHz", VALUES)) > 100.0)

选择经过 5.0 KHz 滤波后的值至少超过 100.0 的信号。

我们需要一些帮助才能:

    找到正确的文件格式以将信号数据写入 HDFS。我想到了 Apache Parquet。您将如何构建数据? 了解正确的数据分析方法:最好是创建不同的数据集(例如使用 Spark 处理数据并将结果保存在 HDFS 上)还是尝试在查询时从原始数据集执行所有操作? Hive 是一个很好的工具来进行我写的查询吗?我们在 Cloudera Enterprise Hadoop 上运行,因此我们也可以使用 Impala。 如果我们生成与原始数据不同的派生数据集,我们如何跟踪数据的沿袭,即知道数据是如何从原始版本生成的?

非常感谢!

【问题讨论】:

【参考方案1】:

1) Parquet 作为列格式适用于 OLAP。 Parquet 的 Spark 支持对于生产使用来说已经足够成熟了。我建议将表示信号值的字符串解析为以下数据结构(简化):

 case class Data(id: Long, signals: Array[Double])
 val df = sqlContext.createDataFrame(Seq(Data(1L, Array(1.0, 1.0, 2.0)), Data(2L, Array(3.0, 5.0)), Data(2L, Array(1.5, 7.0, 8.0))))

保留双精度数组允许像这样定义和使用 UDF:

def maxV(arr: mutable.WrappedArray[Double]) = arr.max
sqlContext.udf.register("maxVal", maxV _)
df.registerTempTable("table")

sqlContext.sql("select * from table where maxVal(signals) > 2.1").show()
+---+---------------+
| id|        signals|
+---+---------------+
|  2|     [3.0, 5.0]|
|  2|[1.5, 7.0, 8.0]|
+---+---------------+

sqlContext.sql("select id, max(maxVal(signals)) as maxSignal from table group by id having maxSignal > 1.5").show()
+---+---------+
| id|maxSignal|
+---+---------+
|  1|      2.0|
|  2|      8.0|
+---+---------+

或者,如果您想要一些类型安全,请使用 Scala DSL:

import org.apache.spark.sql.functions._
val maxVal = udf(maxV _)
df.select("*").where(maxVal($"signals") > 2.1).show()
df.select($"id", maxVal($"signals") as "maxSignal").groupBy($"id").agg(max($"maxSignal")).where(max($"maxSignal") > 2.1).show()
+---+--------------+
| id|max(maxSignal)|
+---+--------------+
|  2|           8.0|
+---+--------------+

2) 这取决于:您的数据大小是否允许在查询时间内以合理的延迟进行所有处理 - 去吧。您可以从这种方法开始,然后为慢速/流行查询构建优化的结构

3) Hive 很慢,它被 Impala 和 Spark SQL 过时了。有时选择并不容易,我们使用经验法则:如果所有数据都存储在 HDFS/Hive 中,Impala 适合无连接查询,Spark 延迟较大但连接可靠,它支持更多数据源并具有丰富的非 SQL 处理能力(如 MLlib 和 GraphX)

4) 保持简单:将原始数据(主数据集)存储为去重和分区(我们使用基于时间的分区)。如果新数据到达分区并且您已经生成了下游数据集 - 重新启动此分区的管道。

希望对你有帮助

【讨论】:

谢谢,我将尝试您建议的方法。 今天从头开始,您是否建议跳过 Hive 转而使用 Spark SQL? 当然,我会选择 Spark SQL、Impala 和刚刚发布的 Hive-on-Spark。您可以混合技术:使用 Spark 进行批处理准备数据,然后使用 Impala 或 Presto 查询准备好的数据【参考方案2】:

首先,我相信 Vitaliy 的方法在各个方面都非常出色。 (我完全支持 Spark)

不过,我想提出另一种方法。原因是:

    我们想做交互式查询(+ 我们有 CDH) 数据已经结构化 需要对数据进行“分析”而不是“处理”。如果 (a) 数据是结构化的,我们可以更快地形成 sql 查询,并且 (b) 我们不想在每次运行查询时都编写程序,则 Spark 可能是一种过度杀伤

以下是我想要执行的步骤:

    使用 sqoop 将数据摄取到 HDFS:[可选] 使用 --as-parquetfile 根据需要创建外部 Impala 表或内部表。如果您尚未将文件作为 parquet 文件传输,则可以在此步骤中执行此操作。最好按 Source ID 进行分区,因为我们的分组将发生在该列上。

因此,基本上,一旦我们传输了数据,我们需要做的就是创建一个 Impala 表,最好是 parquet 格式,并按我们将用于分组的列进行分区。请记住在加载后进行计算统计以帮助 Impala 更快地运行它。

移动数据: - 如果我们需要从结果中生成提要,请创建一个单独的文件 - 如果另一个系统要更新现有数据,则在创建时将数据移动到不同位置->加载表 - 如果只是关于查询和分析以及获取报告(即外部表就足够了),我们不需要不必要地移动数据 - 我们可以在相同数据之上创建一个外部配置单元表。如果我们需要运行长时间运行的批处理查询,我们可以使用 Hive。但是,对于交互式查询来说,这是一个禁忌。如果我们从查询中创建任何派生表并希望通过 Impala 使用,请记住在对 hive 生成​​的表运行 impala 查询之前运行“无效元数据”

血统-我没有深入了解,这是link on Impala lineage using Cloudera Navigator

【讨论】:

以上是关于大数据信号分析:存储和查询信号数据的更好方法的主要内容,如果未能解决你的问题,请参考以下文章

图信号处理进行大数据分析

6.4 访存指令的控制信号

群体选择信号分析

数字电路

信号与系统--信号以及系统的介绍

信号与线性系统综合实验 一、实验目的 1、掌握连续时间信号与系统的时域、频域综合分析方法;