大数据之获取数据

Posted 潇洒哥浩浩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据之获取数据相关的知识,希望对你有一定的参考价值。

package com.sjw.flink

import java.util.{Properties, Random}

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011


object SourceTest {

def main(args: Array[String]): Unit = {
//搭建环境
val environment = StreamExecutionEnvironment.getExecutionEnvironment

//1.集合中获取数据
val stream1 = environment
.fromCollection(List(
SensorReading("sensor_1", 1547718199, 35.80018327300259),
SensorReading("sensor_6", 1547718201, 15.402984393403084),
SensorReading("sensor_7", 1547718202, 6.720945201171228),
SensorReading("sensor_10", 1547718205, 38.101067604893444)
))

// stream1.print("stream1:").setParallelism(1)

//2.文件中读取
val stream2 = environment.readTextFile("src/main/resources/words.txt")
// stream2.print("stream2").setParallelism(1)

//3.从不同的元素中读取
// environment.fromElements(1,2.0,"string").print("stream3")

//4.从kafka中读取数据
val properties = new Properties()
//设置主机名和端口
properties.setProperty("bootstrap.servers","sunjunwei1.com:9092")
//设置消费者组
properties.setProperty("group.id","consumer-group")
//设置key的反序列化器
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
//设置value的反序列化器
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
//kafka偏移量自动重置
properties.setProperty("auto.offset.reset","latest")

val stream4 = environment.addSource(new FlinkKafkaConsumer011[String]("flink_kafka",new SimpleStringSchema(),properties))
stream4.print("stream4").setParallelism(1)

//5.自定义数据源
val stream5 = environment.addSource(new SensorSource())
// stream5.print("stream5").setParallelism(1)

//启动
environment.execute("source test")
}

}

case class SensorReading(id:String,timestamp:Long,temperature:Double)

class SensorSource() extends SourceFunction[SensorReading]{

//定义一个flag,表示数据是否正常运行
var flag:Boolean = true

//取消数据源生成
override def cancel() = {
flag = false
}

override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit ={
//初始化一个随机数生成器
val random = new Random()
//初始化一组传感器温度数据
var curTemp = 1.to(10).map(
i => ("sensor_" + i, 60 + random.nextGaussian() * 20)
)
//用循环不断产生数据
while (flag){
curTemp = curTemp.map(
t => (t._1, t._2 + random.nextGaussian())
)
//当前时间
val curTime = System.currentTimeMillis()
curTemp.foreach(
t=>sourceContext.collect(SensorReading(t._1,curTime,t._2))
)
//设置时间间隔
Thread.sleep(500)
}

}
}

以上是关于大数据之获取数据的主要内容,如果未能解决你的问题,请参考以下文章

大数据课程重大福利,终身学习权限参与就有机会获取

大数据之高级分析如何从天气中获取洞察力

大数据之Hbase:HBase之读写数据流程

Python教程之获取网络数据!

2022年最新Python大数据之Python基础面向对象与继承

2022年最新Python大数据之Python基础面向对象与继承