移动端轨迹采集和上传->高德猎鹰轨迹服务
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了移动端轨迹采集和上传->高德猎鹰轨迹服务相关的知识,希望对你有一定的参考价值。
参考技术A需求:货车司机配送蔬菜到学校、公司单位、酒店等,需要在地图上显示货车司机的送货轨迹,并计算出轨迹里程。
适用平台: Android iOS Web服务
猎鹰是一套轨迹管理服务,提供移动端 SDK 和后端 API 供开发者接入。猎鹰提供轨迹纠偏、里程计算、实时监控等丰富的接口功能和云端服务,可以让开发者基于猎鹰迅速构建一套完全属于自己的精准、高效的轨迹管理系统,应用于车队管理、人员管理等领域。
1.1 轨迹上传
猎鹰提供可嵌入智能设备的Android、iOS SDK以及Web服务API进行轨迹上传,支持单点/多点多种上传形式。
1.2 轨迹存储
猎鹰可为开发者免费存储最近 1年 的设备轨迹数据。
1.3 轨迹查询
猎鹰为开发者提供高性能轨迹查询服务,设备轨迹一旦成功上传,开发者可以实时查询任意时段的设备轨迹。
1.4 里程计算
猎鹰提供精准的轨迹里程计算服务,计算的里程可用于行驶里程核算,网约车计费等场景。
1.5 轨迹纠偏
猎鹰提供专业的轨迹纠偏绑路功能。开发者可通过猎鹰对发生定位偏移的设备轨迹进行纠正,展示平滑连贯的轨迹。
1.6 实时监控
猎鹰提供实时监控终端位置的服务。开发者可通过接口实时查询终端位置,在监控端对设备实时监控。
1.7 终端搜索
猎鹰提供查询区域内终端位置的功能。开发者可通过关键字搜索、周边搜索、多边形内搜索、行政区域搜索等方式查询终端。
简单概括,就是服务端根据高德猎鹰WebApi接口创建ServiceId、TerminalId、TrackId,移动端需要配置这几个参数,调用SDK接口开始服务并采集数据就ok了。
服务管理:使用猎鹰对轨迹进行管理时,首先需要创建一个轨迹服务(Service)。每个 Key 下最多注册15个 Service,一个Service默认最大支持创建100000个。
终端管理:任何的移动设备、车辆都可以被视为一个终端。
终端管理类接口主要实现:terminal 的创建、更新、删除、查询。例如:添加一辆车、删除一辆车、更新车辆属性等。
终端实时监控:通过指定服务与终端,返回该终端指定轨迹的最后位置,支持对终端最后位置的实时查询与历史查询。
轨迹管理及上传:创建一条轨迹,一个终端下最多可创建500000条轨迹。
轨迹纠偏及里程查询:通过轨迹查询接口实现查询轨迹信息、轨迹纠偏和里程核算的功能。
这里比较重要的是,创建服务、创建和管理终端、创建轨迹、最后查询轨迹终端轨迹和里程;
2.1 轨迹上报
轨迹上报分为两种:
第一种:开启轨迹上报时仅提供了服务(service id)和终端(terminal id),而没有指定要将轨迹点上报到哪个轨迹上,这种上报方式叫做散点上报,所有轨迹点直接关联到终端,不属于任何一个轨迹(track id)。
第二种:指定要将轨迹上报到哪个轨迹上(track id),这种方式叫做指定轨迹上报,笔者使用的是第二种。
查询终端实时位置:使用服务id和终端id查询某个终端最后一次上报的位置信息
查询终端行驶里程:查询某个终端在最近n小时内的行驶里程
查询终端历史轨迹: 查询终端历史轨迹有两种方式,一种会查出终端的所有轨迹的轨迹点,包括不属于任何一个轨迹的散点,并将这些轨迹点按照上传时间排序;
另一种会按照轨迹查出每个轨迹下的轨迹点或特定轨迹下的轨迹点,散点不包含在内。
2.2 捋一捋移动端needDone
笔者修改后 Demo ,时间才是真理,试试才知道是不是自己想要的结果。
测试效果图:
三、总结
flume kafka和sparkstreaming整合
本文介绍Flume、Kafka和Sparkstreaming的整合。代码流程是,我们通过shell脚本重播测试轨迹数据到指定轨迹文件中,使用Flume监听该轨迹数据文件,实时将轨迹数据发送到Kafka,然后使用SparkStreaming简单统计轨迹数据量。
简单介绍下Flume
flume核心角色是agent,每个Agent相当于数据传递员,agent内部有3大组件
source:源端数据采集,Flume内置多种Source,同时还提供了自定义的Source
Channel:数据传输通道,主要用的是memory channel和File channel
Sink:移动数据到目标端,如HDFS、KAFKA、DB以及自定义的sink
kafka和spark在这里就不做介绍了,之后会有大量文章分析Kafka和Spark。
代码整合流程:
1.将flume包下载到数据节点,解压,进入flume的conf目录,编辑 gps.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
#flume监听轨迹文件内容的变化 tuch gps
a1.sources.r1.command = tail -F /data/gps/gps
a1.sources.r1.fileHeader = true
#a1.sources.r1.ignorePattern=(^gps_)
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#flume监听的文件数据发送到此kafka的主题当中
a1.sinks.k1.topic = gps_topic
a1.sinks.k1.brokerList= cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092
a1.sinks.k1.batchSize = 20
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.producer.linger.ms = 1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.创建回放脚本 play_gps.sh
srcfile=/data/gps/2020-9-13/testGps.log
outputpath=/data/gps/gps
echo "srcfile:"$srcfile
echo "outputpath:"$outputpath
while read line
do
# sleep 0.01
sleep 0.5
echo $line
`echo $line >>/data/gps/gps`
done <$srcfile
echo "compled!"
3.创建轨迹存放文件,也就是flume agent要监听的文件
创建数据存放路径,然后把准备好的轨迹数据放到此目录下
4.启动flume
[root bin]# nohup flume-ng agent --name a1 --conf-file ../conf/gps.conf &
[1] 976
[root bin]# nohup: ignoring input and appending output to ‘nohup.out’
[root bin]# tail -f nohup.out
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
20/09/13 01:36:54 INFO utils.AppInfoParser: Kafka version : 2.0.1
20/09/13 01:36:54 INFO utils.AppInfoParser: Kafka commitId : fa14705e51bd2ce5
20/09/13 01:36:54 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
20/09/13 01:36:54 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
20/09/13 01:36:57 WARN clients.NetworkClient: [Producer clientId=producer-1] Connection to node -2 could not be established. Broker may not be available.
20/09/13 01:36:57 WARN clients.NetworkClient: [Producer clientId=producer-1] Error while fetching metadata with correlation id 2 : {gps_topic=LEADER_NOT_AVAILABLE}
20/09/13 01:36:57 INFO clients.Metadata: Cluster ID: 7Sq6gNsRQtW4c9eosNA6Nw
5.回放轨迹数据
[root@cdh3 ~]# sh play_gps.sh
srcfile:/data/gps/2020-9-13/testGps.log
outputpath:/data/gps/gps
8f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969147,104.07513,30.72724
8f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969150,104.07513,30.72702
8f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969154,104.07504,30.72672
8f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969156,104.07497,30.72630
8f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969159,104.07497,30.72582
8f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969162,104.07496,30.72544
8f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969168,104.07489,30.72487
6.数据回放后我们查看下kafka是否已经产生对应的主题
[ ]
20/09/13 01:39:34 INFO utils.Log4jControllerRegistration$: Registered kafka:type=kafka.Log4jController MBean
20/09/13 01:39:34 INFO zookeeper.ZooKeeperClient: [ZooKeeperClient] Initializing a new session to cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka.
20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.5.5-100-51e233a47ddbcf45f5aa690243bc31b25eded2a2, built on 08/05/2020 09:41 GMT
20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:host.name=cdh3.macro.com
20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:java.version=1.8.0_171
20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:java.vendor=Oracle Corporation
20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:java.io.tmpdir=/tmp
20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:java.compiler=<NA>
20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:os.name=Linux
20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:os.arch=amd64
20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:os.version=3.10.0-327.el7.x86_64
20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:user.name=root
20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:user.home=/root
20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:user.dir=/root
20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:os.memory.free=231MB
20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:os.memory.max=256MB
20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Client environment:os.memory.total=250MB
20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka sessionTimeout=30000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@4efbca5a
20/09/13 01:39:34 INFO common.X509Util: Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation
20/09/13 01:39:34 INFO zookeeper.ClientCnxnSocket: jute.maxbuffer value is 4194304 Bytes
20/09/13 01:39:34 INFO zookeeper.ClientCnxn: zookeeper.request.timeout value is 0. feature enabled=
20/09/13 01:39:34 INFO zookeeper.ZooKeeperClient: [ZooKeeperClient] Waiting until connected.
20/09/13 01:39:34 INFO zookeeper.ClientCnxn: Opening socket connection to server cdh2.macro.com/192.168.0.207:2181. Will not attempt to authenticate using SASL (unknown error)
20/09/13 01:39:34 INFO zookeeper.ClientCnxn: Socket error occurred: cdh2.macro.com/192.168.0.207:2181: Connection refused
20/09/13 01:39:34 INFO zookeeper.ClientCnxn: Opening socket connection to server cdh3.macro.com/192.168.0.208:2181. Will not attempt to authenticate using SASL (unknown error)
20/09/13 01:39:34 INFO zookeeper.ClientCnxn: Socket connection established, initiating session, client: /192.168.0.208:50761, server: cdh3.macro.com/192.168.0.208:2181
20/09/13 01:39:34 INFO zookeeper.ClientCnxn: Session establishment complete on server cdh3.macro.com/192.168.0.208:2181, sessionid = 0x10014dbded001e2, negotiated timeout = 30000
20/09/13 01:39:34 INFO zookeeper.ZooKeeperClient: [ZooKeeperClient] Connected.
ATLAS_HOOK
ATLAS_SPARK_HOOK
__consumer_offsets
gps_topic
pos
20/09/13 01:39:34 INFO zookeeper.ZooKeeperClient: [ZooKeeperClient] Closing.
20/09/13 01:39:34 INFO zookeeper.ZooKeeper: Session: 0x10014dbded001e2 closed
20/09/13 01:39:34 INFO zookeeper.ClientCnxn: EventThread shut down for session: 0x10014dbded001e2
20/09/13 01:39:34 INFO zookeeper.ZooKeeperClient: [ZooKeeperClient] Closed.
7.可以看到kafka已经生成对应的topic,我们消费一下数据
[rootconsole-consumer --bootstrap-server cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092 --topic gps_topic ~]# kafka-
20/09/13 01:40:00 INFO utils.Log4jControllerRegistration$: Registered kafka:type=kafka.Log4jController MBean
20/09/13 01:40:00 INFO consumer.ConsumerConfig: ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [cdh1.macro.com:9092, cdh2.macro.com:9092, cdh3.macro.com:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = console-consumer-31005
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
20/09/13 01:40:01 INFO utils.AppInfoParser: Kafka version: 2.4.1.7.1.3.0-100
20/09/13 01:40:01 INFO utils.AppInfoParser: Kafka commitId: 0cfbf9b7ef3ca50d
20/09/13 01:40:01 INFO utils.AppInfoParser: Kafka startTimeMs: 1599932401067
20/09/13 01:40:01 INFO consumer.KafkaConsumer: [Consumer clientId=consumer-console-consumer-31005-1, groupId=console-consumer-31005] Subscribed to topic(s): gps_topic
20/09/13 01:40:01 INFO clients.Metadata: [Consumer clientId=consumer-console-consumer-31005-1, groupId=console-consumer-31005] Cluster ID: 7Sq6gNsRQtW4c9eosNA6Nw
20/09/13 01:40:01 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-console-consumer-31005-1, groupId=console-consumer-31005] Discovered group coordinator cdh3.macro.com:9092 (id: 2147483539 rack: null)
20/09/13 01:40:01 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-console-consumer-31005-1, groupId=console-consumer-31005] (Re-)joining group
20/09/13 01:40:01 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-console-consumer-31005-1, groupId=console-consumer-31005] (Re-)joining group
20/09/13 01:40:04 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-console-consumer-31005-1, groupId=console-consumer-31005] Finished assignment for group at generation 1: {consumer-console-consumer-31005-1-d8761451-a2c6-4b40-a18e-ba4b27ee4315=Assignment(partitions=[gps_topic-0])}
20/09/13 01:40:04 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-console-consumer-31005-1, groupId=console-consumer-31005] Successfully joined group with generation 1
20/09/13 01:40:04 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-console-consumer-31005-1, groupId=console-consumer-31005] Adding newly assigned partitions:gps_topic-0
20/09/13 01:40:04 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-console-consumer-31005-1, groupId=console-consumer-31005] Found no committed offset for partition gps_topic-0
20/09/13 01:40:04 INFO internals.SubscriptionState: [Consumer clientId=consumer-console-consumer-31005-1, groupId=console-consumer-31005] Resetting offset for partition gps_topic-0 to offset 185.
8f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969700,104.10055,30.70650
8f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969702,104.10064,30.70634
8f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969706,104.10083,30.70603
8f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969711,104.10114,30.70549
8f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969714,104.10130,30.70521
8f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969717,104.10148,30.70491
8f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969724,104.10179,30.70438
8f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969727,104.10195,30.70411
8f20c9188561b796ef8e26196de30be4,39a096b71376b82f35732eff6d95779b,1477969730,104.10205,30.70393
^C20/09/13 01:40:21 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-console-consumer-31005-1, groupId=console-consumer-31005] Revoke previously assigned partitions gps_topic-0
20/09/13 01:40:21 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-console-consumer-31005-1, groupId=console-consumer-31005] Member consumer-console-consumer-31005-1-d8761451-a2c6-4b40-a18e-ba4b27ee4315 sending LeaveGroup request to coordinator cdh3.macro.com:9092 (id: 2147483539 rack: null) due to the consumer is being closed
Processed a total of 36 messages
用SparkStreaming代码消费Kafka数据
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
object SparkStreamingAndKafka {
def main(args: Array[String]): Unit = {
import org.apache.spark._
import org.apache.spark.streaming._
Logger.getLogger("org").setLevel(Level.WARN)
//表示使用两个线程来模拟spark集群
val conf = new SparkConf().setAppName("SparkStreamingAndKafka").setMaster("local[2]")
//初始化Spark Streaming环境
val streamingContext = new StreamingContext(conf, Seconds(1))
//设置检查点
streamingContext.checkpoint("/sparkapp/tmp")
//"auto.offset.reset" -> "earliest"
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "192.168.0.171:9092,192.168.0.207:9092,192.168.0.208:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "test0002",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("cheng_du_gps_topic")
topics.foreach(println(_))
println("topics:" + topics)
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.count().print();
//启动sparkstreaming程序
streamingContext.start();
streamingContext.awaitTermination();
streamingContext.stop()
}
}
8.输出结果
gps_topic
topics:[Ljava.lang.String;@54a3ab8f
20/09/13 03:25:39 WARN kafka010.KafkaUtils: overriding enable.auto.commit to false for executor
20/09/13 03:25:39 WARN kafka010.KafkaUtils: overriding auto.offset.reset to none for executor
20/09/13 03:25:39 WARN kafka010.KafkaUtils: overriding executor group.id to spark-executor-test0002
20/09/13 03:25:39 WARN kafka010.KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135
-------------------------------------------
Time: 1599938740000 ms
-------------------------------------------
466
-------------------------------------------
Time: 1599938741000 ms
-------------------------------------------
0
至此flume、Kafka和sparkstreaming的整合完毕。
以上是关于移动端轨迹采集和上传->高德猎鹰轨迹服务的主要内容,如果未能解决你的问题,请参考以下文章