数据处理可视化

Posted 不想写bug第n天

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了数据处理可视化相关的知识,希望对你有一定的参考价值。

目录

1.采集数据: 【yarn api 采集yarn信息 : 】

1.创建idea

2.添加resources

3.启动spark-sql

4.代码

5.数据发送到kafka

2.实时数据处理

代码流程

1.添加依赖

2.对应添加各种包   ​编辑​编辑    ContextUtils.scala

3.App初步代码

4.保证代码健壮性

clickhouse

1.安装部署

         2.使用

         3.dbeaver 进行远程连接

         4.idea中使用

3.数据可视化

1.原生方式

1.superset启动

 2.superset连接clickhouse

2.doctor方式部署


  • 项目思路

    • 项目流程:

      • 1.数据采集:采集yarn指标数据 =》 yarn api => 用户自己开发代码 jar

      • 2.数据处理:实时处理  =》sparkstreaming 

      •  3.数据输出:mysql、olap =》 clickhouse

      • 4.数据可视化:superset、dataease

        olap  vs oltp  毫秒级别
        olap:clickhouse、doris、tidb、phoenix
        oltp:事务 mysql、

    • 链路: 
          yarn: 
              app.jar [采集yarn数据] =》 kafka =》sparkstreaming =》 clickhouse =》superset 

    • 数据格式: 

      • 1.一行一行发送数据: 文本数据【分割符即可】
                        1 2 3 4 5  空格 
                        1,2,3,4,5 ,

      • 2.json数据 
                        "applicationid":1,"meme":4G,“name”:"xxx  asd  x"
              
                性能问题:
                    json 占用io 、好处:解析方便 

1.采集数据: 【yarn api 采集yarn信息 : 】

  • 1.采集yarn 作业运行状态: 

    • 1.ACCEPTED

    • 2.RUNNING 

  • 2.yarn api如何使用? 

    • 1.YarnClient 

    • 2.YarnApplicationState

    • 3.ApplicationReport: 
         包含yarn上每个作业的详细信息: 
         程序id、执行引擎、开始时间、结束时间、mem、cpu、队列、名字

流程

  • 1.创建idea

    • 0.选择

    • 1.添加依赖

       <!--添加yarn依赖-->
          <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.3.4</version>
          </dependency>
          <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-yarn-common</artifactId>
            <version>3.3.4</version>
          </dependency>

       版本:2.12.14

  • 2.添加resources

    • 1.添加yarn-site.xml

      • 下载路径:/home/hadoop/app/hadoop/etc/hadoop

  • 3.启动spark-sql

    • spark-shell --master yarn --jars /home/hadoop/software mysql-connector-java-5.1.28.jar --driver-class-path /home/hadoop/software mysql-connector-java-5.1.28.jar --name spark-sql

    • //启动第二个(暂时不用)
      spark-shell --master yarn --jars /home/hadoop/software mysql-connector-java-5.1.28.jar --driver-class-path /home/hadoop/software mysql-connector-java-5.1.28.jar --name spark-sql01

    • 查看bigdata:8088

  • 4.代码

    •  YarnInfo 【接口】

      package com.bigdata.task
      import java.util
      import java.util.Date
      
      import org.apache.commons.lang3.time.FastDateFormat
      import org.apache.hadoop.yarn.api.records.YarnApplicationState
      import org.apache.hadoop.yarn.client.api.YarnClient
      import org.apache.hadoop.yarn.conf.YarnConfiguration
      trait YarnInfo 
      
        //获取yarn作业的信息
        def getYarnInfo =
      
          /**
           * 1.YarnClient
           * 2.YarnApplicationState
           * 3.ApplicationReport:
           */
      
          val yarnClient = YarnClient.createYarnClient()
      
          /**
           * 启动客户端
           */
          val YarnConfiguration = new YarnConfiguration()
          yarnClient.init(YarnConfiguration)  //初始化
          yarnClient.start()
      
          //获取作业信息
          val appStates = util.EnumSet.noneOf(classOf[YarnApplicationState])
          appStates.add(YarnApplicationState.ACCEPTED)
          appStates.add(YarnApplicationState.RUNNING)
      
          val applicationReports = yarnClient.getApplications(appStates).iterator()
      
          val data = StringBuilder.newBuilder  //拼接
      
          //遍历
          while (applicationReports.hasNext)
      
            val applicationReport = applicationReports.next()
      
            val applicationId = applicationReport.getApplicationId       //程序id
            val applicationType = applicationReport.getApplicationType   //执行引擎
            val user = applicationReport.getUser
            val startTime = applicationReport.getStartTime
            val finishTime = applicationReport.getFinishTime
            val memorySize = applicationReport.getApplicationResourceUsageReport.getUsedResources.getMemorySize   //内存信息
            val virtualCores = applicationReport.getApplicationResourceUsageReport.getUsedResources.getVirtualCores   //cpu信息
            val containers = applicationReport.getApplicationResourceUsageReport.getNumUsedContainers   //最终申请资源
            val state = applicationReport.getYarnApplicationState
            val url = applicationReport.getTrackingUrl
            val name = applicationReport.getName
            val queue = applicationReport.getQueue
            val todayTime = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss").format(new Date().getTime)
      
      
            data.append(applicationId).append("&&")
              .append(name).append("&&")
              .append(applicationType).append("&&")
              .append(user).append("&&")
              .append(startTime).append("&&")
              .append(finishTime).append("&&")
              .append(memorySize).append("&&")
              .append(virtualCores).append("&&")
              .append(containers).append("&&")
              .append(state).append("&&")
              .append(queue).append("&&")
              .append(todayTime).append("&&")
              .append(url).append("\\r\\n")
            
          
          data.toString()
      
          //println(data.toString())
        
      
      
    • YarnLogTask

      package com.bigdata.task
      
      object YarnLogTask extends YarnInfo 
      
        def main(args: Array[String]): Unit = 
      
          getYarnInfo
      
        
      
      
      

       运行结果

        当启动两个spark-sql时
        
      //后添加的时间截图
        

  • 5.数据发送到kafka

    • 1.启动zookeeper

      • 路径:/home/hadoop/shell

      • [hadoop@bigdata13 shell]$ zkServer.sh start

    • 2.启动kafka

      • 路径:/home/hadoop/app/kafka/bin

      • [hadoop@bigdata13 bin]$
        kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

      • 1.查看当前topic

        kafka-topics.sh \\
        --list \\
        --zookeeper bigdata13:2181,bigdata14:2181,bigdata15:2181/kafka
      • 2.创建一个topic

          kafka-topics.sh \\
        --create \\
        --zookeeper bigdata13:2181,bigdata14:2181,bigdata15:2181/kafka \\
        --topic yarn-info \\
        --partitions 3 \\
        --replication-factor 1
      • 3.控制台监控

        kafka-console-consumer.sh \\
        --bootstrap-server bigdata13:9092,bigdata14:9092,bigdata15:9092 \\
        --topic yarn-info
    • 3.代码 YarnLogTask

      package com.bigdata.task
      import java.util.Properties
      import org.apache.kafka.clients.producer.KafkaProducer, ProducerRecord
      import scala.util.Random
      
      object YarnLogTask extends YarnInfo 
      
        def main(args: Array[String]): Unit = 
      
          /**
           * 采集yarn log 数据 sink kafka :
           *   1.producer
           *   2.发送数据 :
           *       1.采集
           *       2.发送
           */
          val props = new Properties()
          props.put("bootstrap.servers", "bigdata13:9092,bigdata14:9092,bigdata15:9092")
          props.put("acks", "all")
          props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
          props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
          val producer = new KafkaProducer[String,String](props)
      
          val topic="kafka-console-consumer.sh \\\\\\n--bootstrap-server bigdata13:9092,bigdata14:9092,bigdata15:9092 \\\\\\n--topic yarn-info"
      
          while (true)
            val data = getYarnInfo
      
           Thread.sleep(1000)
      
            val applicationinfos = data.split("\\r\\n")
      
            for(applicationinfo <- applicationinfos)
              val partitionId = new Random().nextInt(10)%3
              println(applicationinfo)
              producer.send(new ProducerRecord[String,String](topic,partitionId,"",applicationinfo))
            
          
        
      
      

      开始打印

      控制台监控

2.实时数据处理

重新开一个项目(用原来的也可以)

  • 代码流程

  • 1.添加依赖

        <!--hadoop依赖-->
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
          <version>3.3.4</version>
        </dependency>
        <!--添加spark-core依赖-->
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.12</artifactId>
          <version>3.2.1</version>
        </dependency>
        <!--sparksql依赖-->
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.12</artifactId>
          <version>3.2.1</version>
        </dependency>
    
        <!--spark-hive依赖-->
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-hive_2.12</artifactId>
          <version>3.2.1</version>
        </dependency>
    
    
        <!--添加mysql驱动-->
        <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>5.1.49</version>
        </dependency>
        <!--sparkstreaming 依赖-->
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.12</artifactId>
          <version>3.2.1</version>
        </dependency>
    
        <!--spark整合为kafka依赖 -->
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
          <version>3.2.1</version>
        </dependency>
    
  • 2.对应添加各种包
       
        ContextUtils.scala

    package com.bigdata.util
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.streaming.Seconds, StreamingContext
    import org.apache.spark.SparkConf, SparkContext
    
    object ContextUtils 
    
      /**
       * 获取sc
       * @param appName
       * @param master
       * @return
       */
      def getSparkContext(appName:String,master:String="local[2]")=
        val conf: SparkConf = new SparkConf()
        //      .setAppName(appName).setMaster(master)
        //    conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
        //    conf.registerKryoClasses(Array(classOf[Info]))
    
        new SparkContext(conf)
      
    
      /**
       * 获取 sparksession
       * @param appName
       * @param master
       */
      def getSparkSession(appName:String,master:String="local[2]")=
        SparkSession.builder()
          .appName(appName).master(master)
          .config("hive.exec.dynamic.partition.mode","nonstrict")
          .enableHiveSupport().getOrCreate()
      
    
      def getStreamingContext(appName:String,batch:Int,master:String="local[2]") = 
        val conf = new SparkConf().setMaster(master).setAppName(appName)
        new                         StreamingContext(conf,Seconds(batch))
      
    
    
    
  • 3.App初步代码

    package com.bigdata
    
    
    import com.bigdata.util.ContextUtils
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.rdd
    import org.apache.spark.sql.SaveMode, SparkSession
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.streaming.kafka010.CanCommitOffsets, HasOffsetRanges, KafkaUtils
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    
    object App 
      def main(args: Array[String]): Unit = 
    
        val ssc = ContextUtils.getStreamingContext(this.getClass.getSimpleName, 10)
    
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "bigdata13:9092,bigdata14:9092,bigdata15:9092", // kafka地址
          "key.deserializer" -> classOf[StringDeserializer], // 反序列化
          "value.deserializer" -> classOf[StringDeserializer],//反序列化
          "group.id" -> "yarn01", // 指定一个 消费者组
          "auto.offset.reset" -> "latest", // 从那个地方 开始消费数据  偏移量
          "enable.auto.commit" -> (false: java.lang.Boolean) // offset 提交  选择 不自动提交 手动管理的
        )
    
        val topics = Array("yarn-info")
    
        val stream = KafkaUtils.createDirectStream[String, String](
          ssc,
          PreferConsistent,
          Subscribe[String, String](topics, kafkaParams)
        )
    
        stream.map(_.value()).print()
        stream.foreachRDD(rdd => 
          /**
           * 获取offset
           * 业务逻辑 no
           * sink: clickhouse
           *     sparksql jdbc api
           *     jdbc代码 =》了解
           * 提交offset
           */
          println(s"rdd的分区数:$rdd.partitions.size")
          val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    
          /**
           * 业务逻辑处理:
           *   sparksql
           */
          val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
          import spark.implicits._
    
          val df = rdd.map(_.value()).toDF("data")
          df.show()
    
          // 提交offset
          stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
            )
    
        ssc.start()
        ssc.awaitTermination()
      
    
    

     单独运行结果
     
     和yarn-log中YarnLogTask共同运行结果
     

  • 4.保证代码健壮性

    package com.bigdata
    import java.util.Properties
    
    import com.bigdata.util.ContextUtils
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.rdd
    import org.apache.spark.sql.SaveMode, SparkSession
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.streaming.kafka010.CanCommitOffsets, HasOffsetRanges, KafkaUtils
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    
    object App 
    
      def main(args: Array[String]): Unit = 
    
        val ssc = ContextUtils.getStreamingContext(this.getClass.getSimpleName, 10)
    
        val url = "jdbc:clickhouse://bigdata15:8123/bigdata"
        val table = "yarn_info02"
    
    
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "bigdata13:9092,bigdata14:9092,bigdata15:9092", // kafka地址
          "key.deserializer" -> classOf[StringDeserializer], // 反序列化
          "value.deserializer" -> classOf[StringDeserializer],//反序列化
          "group.id" -> "yarn02", // 指定一个 消费者组
          "auto.offset.reset" -> "latest", // 从那个地方 开始消费数据  偏移量
          "enable.auto.commit" -> (false: java.lang.Boolean) // offset 提交  选择 不自动提交 手动管理的
        )
    
        val topics = Array("yarn-info02")
    
        val stream = KafkaUtils.createDirectStream[String, String](
          ssc,
          PreferConsistent,
          Subscribe[String, String](topics, kafkaParams)
        )
    
        stream.map(_.value()).print()
    
        stream.foreachRDD(rdd => 
          /**
           * 获取offset
           * 业务逻辑 no
           * sink: clickhouse
           *     sparksql jdbc api
           *     jdbc代码 =》了解
           * 提交offset
           */
          println(s"rdd的分区数:$rdd.partitions.size")
          val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    
          /**
           * 业务逻辑处理:
           *   sparksql
           */
          val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
          import spark.implicits._
    
          //保证代码健壮性
          val df = rdd.map(_.value()).filter(_.nonEmpty)
            .map(line => 
              val columns = line.split("&&")
    
              //反射方式来做 java scala
              // 反射:array =》case class
              (columns(0),
                columns(1),
                columns(2),
                columns(3),
                columns(4),
                columns(5),
                columns(6),
                columns(7),
                columns(8),
                columns(9),
                columns(10),
                columns(11),
                columns(12))
            ).toDF("applicationid"
          ,"name"
          ,"applicationtype"
          ,"user"
          ,"starttime"
          ,"finishtime"
          ,"memorysize"
          ,"virtualcores"
          ,"containers"
          ,"state"
          ,"queue"
          ,"todaytime"
          ,"url")
    
          df.show()
          df.printSchema()
    
          val properties = new Properties()
          properties.setProperty("user", "default")
          df.write.mode(SaveMode.Append)
            .option("driver","ru.yandex.clickhouse.ClickHouseDriver")
            .jdbc(url,table,properties)
    
          // 提交offset
          stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
            )
    
        ssc.start()
        ssc.awaitTermination()
    
      
    
    

     

  • clickhouse

    • 1.安装部署

    • 1.官网:https://clickhouse.com/docs/en/install/#from-rpm-packages
    • 2.选择了rpm方式下载
      • 下载到bigdata15的home目录下
         ​​
        sudo yum install -y yum-utils
        sudo yum-config-manager --add-repo https://packages.clickhouse.com/rpm/clickhouse.repo
        sudo yum install -y clickhouse-server clickhouse-client
        
        sudo /etc/init.d/clickhouse-server start
        clickhouse-client # or "clickhouse-client --password" if you set up a password.
          ​​
           
          登录命令:clickhouse-client
          重启命令:
                          clickhouse stop
                          clickhouse start
    • 2.使用

      • 1.创建数据库: create database bigdata;
      • 2.切换数据库: use bigdata
      • 3.创建表
        CREATE TABLE bigdata.user_info (
            id String ,
        name String,
         age String,
            dt String
        ) ENGINE = MergeTree()
        PARTITION BY dt
        PRIMARY KEY (dt,id,name,age)
        ORDER BY (dt,id,name,age) SETTINGS index_granularity = 8192
      • 4.插入表
        insert into bigdata.user_info values('1','zhangsan','30','20230206'),('2','lisi','30','20230206');
        

          

    • 3.dbeaver 进行远程连接

      • 方法一【不使用】

      • 创建新连接(未开启权限)
           ① ssh
           
            ②常规
            

        • 查看8123端口号:[root@bigdata15 ~]# netstat -nlp | grep 8123

          未被其他占用
        • 查看
      • 方法二【使用】

      •  修改配置文件

        • 进入配置温江:vim /etc/clickhouse-server/config.xml
        • 尾行模式查找:/listen
        • 插入
           <listen_host>::</listen_host>
        • 重启clickhouse
        • 直接创建连接
    • 4.idea中使用

      • 1.添加clickhouse依赖
          spark运行过程中需要 xml依赖
            <!--添加clickhouse驱动-->
            <dependency>
              <groupId>ru.yandex.clickhouse</groupId>
              <artifactId>clickhouse-jdbc</artifactId>
              <version>0.2.4</version>
            </dependency>
        
            <!--添加 xml问题的依赖-->
            <dependency>
              <groupId>com.fasterxml.jackson.module</groupId>
              <artifactId>jackson-module-scala_2.12</artifactId>
              <version>2.12.4</version>
            </dependency>
            <dependency>
              <groupId>com.fasterxml.jackson.module</groupId>
              <artifactId>jackson-module-jaxb-annotations</artifactId>
              <version>2.12.4</version>
            </dependency>
            <dependency>
              <groupId>com.fasterxml.jackson.core</groupId>
              <artifactId>jackson-databind</artifactId>
              <version>2.12.4</version>
            </dependency>
            <dependency>
              <groupId>com.fasterxml.jackson.core</groupId>
              <artifactId>jackson-annotations</artifactId>
              <version>2.12.4</version>
            </dependency>
            <dependency>
              <groupId>com.fasterxml.jackson.core</groupId>
              <artifactId>jackson-core</artifactId>
              <version>2.12.4</version>
            </dependency>
      • 2.添加代码
              val url = "jdbc:clickhouse://bigdata15:8123/bigdata"
              val table = "yarn_info"
        
              val properties = new Properties()
              properties.setProperty("user", "default")
              df.write.mode(SaveMode.Append)
                .option("driver","ru.yandex.clickhouse.ClickHouseDriver")
                .jdbc(url,table,properties)
      • 3.创建表
        CREATE TABLE bigdata.yarn_info (
        applicationid String,
        name String,
        applicationtype String,
        user String,
        starttime String,
        finishtime String,
        memorysize String,
        virtualcores String,
        containers String,
        state String,
        queue String,
        todaytime String,
        url String
        ) ENGINE = MergeTree()
        PARTITION BY queue
        PRIMARY KEY (applicationid,name,applicationtype,user,starttime,finishtime,memorysize,virtualcores,containers,state,queue,todaytime,url)
        ORDER BY (applicationid,name,applicationtype,user,starttime,finishtime,memorysize,virtualcores,containers,state,queue,todaytime,url) SETTINGS index_granularity = 8192
        
        

         
        运行结果

          

3.数据可视化

  • 1.原生方式

  • 1.superset启动

    • 1.[root@bigdata15 hadoop]# cd /usr/local/src
    • 2.[root@bigdata15 src]# source superset-py3/bin/activate #激活superset的venv
    • 3.(superset-py3) [root@bigdata15 src]#  superset run -h bigdata15 -p 8889
         
    • 4.进入http://bigdata15:8889/
                用户:admin
                密码:admin
          
  •  2.superset连接clickhouse

    • 1.先关闭superset,再安装clickhouse驱动
      pip3 install sqlalchemy-clickhouse -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host  mirrors.aliyun.com
    • 2.启动superset:
         (superset-py3) [root@bigdata15 src]# superset run -h bigdata15 -p 8889
    •  3.在clickhouse中创建用户
      • 修改clickhouse中users权限
        • [root@bigdata15 clickhouse-client]# vim /etc/clickhouse-server/users.xml
        • 取消注释:<access_management>1</access_management>
      • 进入clickhouse创建用户
        bigdata15 :) create user root identified by '123456';
      • 赋予用户权限:
        bigdata15 :) GRANT ALTER on *.* TO root;
    • 4.在superset网页中添加新的Database
        
          
        clickhouse://root:123456@bigdata15/bigdata
    • 5.测试连接
         
  • 2.doctor方式部署

    • 视频:E:\\视频\\yarnmonitor\\day02\\14

4.制作可视化

  • 要求
    • 1.yarn正在运行作业的明细表

    • 2.yarn正在运行作业内存使用情况

    • 3.yarn正在运行作业任务数

    • 4.yarn正在申请作业任务数

    • 5.yarn正在运行作业内存使用top10、运行时长top10

过程
由于前面的一些错误,在此重新建表yarn-info02 【yarn-info = yarn-info02】

  • 1.yarn正在运行作业的明细表

    • 1.sql查询语句 

      select
      applicationid,
      name,
      applicationtype,
      user,
      toDateTime(intDivOrZero(toInt64OrZero(starttime),1000)) as s_time,
      dateDiff('minute', toDateTime(intDivOrZero(toInt64OrZero(starttime),1000)),toDateTime(todaytime) ) as diff,
      intDivOrZero(toInt64OrZero(memorysize),1024) as user_mem,
      virtualcores,
      containers,
      state,
      queue,
      todaytime,
      url
      from bigdata.yarn_info02
      where
      state='RUNNING'
      and
      todaytime =(
      	select
      	max(todaytime) as t
      	from bigdata.yarn_info02
      	where
      	formatDateTime(toDateTime(todaytime),'%Y-%m-%d')=formatDateTime(now(),'%Y-%m-%d')
      )
      • 在dbeaver测试
         

    • 2.在superset制作
         

  • 2.yarn正在运行作业内存使用情况

    • 1.sql语句查询

      select
      cast(todaytime_alias,'String') as dt,
      sum(user_mem) as all_g
      from
      (
      	select
      	toDateTime(todaytime) as todaytime_alias,
      	intDivOrZero(toInt64OrZero(memorysize),1024) as user_mem,
      	name
      	from  bigdata.yarn_info02
      	where
      	state='RUNNING'
      	group by
      	todaytime_alias,
      	user_mem,
      	name
      ) as a
      group by
      dt;
      
    • 2.superset

      • 1.选择图标
           

      • 2.选择时间字段
            

      • 3.修改内容

        • x轴时间
             
             

        • 表格的数据

      • 运行YarnLogTask的结果

以上是关于数据处理可视化的主要内容,如果未能解决你的问题,请参考以下文章

什么是数据可视化?

如何将数据进行数据可视化展现

数据可视化软件哪家好?

数据可视化工具都有哪些?

什么是数据可视化?小白怎样快速上手?

大数据可视化是啥?