每日一题 为了工作 2020 0510 第六十八题

Posted walxt

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了每日一题 为了工作 2020 0510 第六十八题相关的知识,希望对你有一定的参考价值。

package com.swust.action;

import com.alibaba.fastjson.JSONObject;
import com.swust.constant.Constants;
import com.swust.skynet.SelfDefineAccumulator;
import com.swust.utils.StringUtils;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

/**
 *
 * @author 雪瞳
 * @Slogan 时钟尚且前行,人怎能就此止步!
 * @Function 检测路口状态
 *
 */
public class CheckMonitorState {
    public static JavaPairRDD<Integer,String> checkMonitorState(JavaSparkContext jsc,
                                                                SparkSession session,
                                                                JavaPairRDD<String,String> startMonitorInfos,
                                                                final long taskId,
                                                                JSONObject taskParamsJsonObject,
                                                                SelfDefineAccumulator  accumulator){
        //从monitor_camera_info标准表中查询出来每一个卡口对应的camera的数量
        String sqlText = "select * from monitor_camera_info";
        Dataset<Row> standDataFrame = session.sql(sqlText);
        JavaRDD<Row> standRdd = standDataFrame.toJavaRDD();

        //转换成k-v格式RDD
        JavaPairRDD<String, String> monitorRdd = standRdd.mapToPair(new PairFunction<Row, String, String>() {
            @Override
            public Tuple2<String, String> call(Row row) throws Exception {

                String key = row.getString(0);
                String value = row.getString(1);
                Tuple2<String, String> tp = new Tuple2<>(key, value);
                return tp;
            }
        });
        /**
         * 对每一个卡扣下面的信息进行统计,统计出来camera_count(这个卡扣下一共有多少个摄像头),camera_ids(这个卡扣下,所有的摄像头编号拼接成的字符串)
         * 返回:
         * 	("monitorId","cameraIds=xxx|cameraCount=xxx")
         * 例如:
         * 	("0008","cameraIds=02322,01213,03442|cameraCount=3")
         * 如何来统计?
         * 	1、按照monitor_id分组
         * 	2、使用mapToPair遍历,遍历的过程可以统计
         */
        JavaPairRDD<String, Iterable<String>> monitorGroupRdd = monitorRdd.groupByKey();
        JavaPairRDD<String, String> monitorGroupInfosRdd = monitorGroupRdd.mapToPair(new PairFunction<Tuple2<String, Iterable<String>>, String, String>() {
            @Override
            public Tuple2<String, String> call(Tuple2<String, Iterable<String>> tuple) throws Exception {
                String monitorId = tuple._1();
                Iterator<String> cameraIterator = tuple._2().iterator();
                int count = 0;
                StringBuilder cameraIds = new StringBuilder();

                while (cameraIterator.hasNext()) {
                    String next = cameraIterator.next();
                    count++;
                    cameraIds.append(next);
                }
                String cameraInfos = Constants.FIELD_CAMERA_IDS + "=" + cameraIds.toString().substring(1) + "|"
                        + Constants.FIELD_CAR_COUNT + "=" + count;

                return new Tuple2<>(monitorId, cameraInfos);
            }
        });
        //左外连接两个RDD
        JavaPairRDD<String, Tuple2<String, Optional<String>>> resultRdd = monitorGroupInfosRdd.leftOuterJoin(startMonitorInfos);
        JavaPairRDD<Integer, String> res = resultRdd.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Tuple2<String, Tuple2<String, Optional<String>>>>, Integer, String>() {
            @Override
            public Iterator<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Tuple2<String, Optional<String>>>> iterator) throws Exception {
                List<Tuple2<Integer, String>> list = new ArrayList<>();
                while (iterator.hasNext()) {
                    Tuple2<String, Tuple2<String, Optional<String>>> tuple2 = iterator.next();
                    String monitorId = tuple2._1();
                    String standCameraInfos = tuple2._2()._1();
                    Optional<String> factCameraInfosOptional = tuple2._2()._2();
                    String factCameraInfo = "";
                    if (factCameraInfosOptional.isPresent()) {
                        factCameraInfo = factCameraInfosOptional.get();
                    } else {
                        String standCameraIds = StringUtils.
                                getFieldFromConcatString(
                                        standCameraInfos, "\|", Constants.FIELD_CAMERA_IDS);
                        String abnoramlCameraCount = StringUtils.
                                getFieldFromConcatString(
                                        standCameraInfos,"\|", Constants.FIELD_CAMERA_COUNT);
                        accumulator.add(
                                Constants.FIELD_ABNORMAL_MONITOR_COUNT + "=1|"
                                        + Constants.FIELD_ABNORMAL_CAMERA_COUNT + "=" + abnoramlCameraCount + "|"
                                        + Constants.FIELD_ABNORMAL_MONITOR_CAMERA_INFOS + "=" + monitorId + ":" + standCameraIds
                        );
                        continue;
                    }
                    //实际摄像头个数
                    int factCameraCount = Integer.parseInt(StringUtils.
                            getFieldFromConcatString(
                                    factCameraInfo,"\|", Constants.FIELD_CAMERA_COUNT));
                    //标准摄像头个数
                    int standCameraCount = Integer.parseInt(StringUtils.
                            getFieldFromConcatString(
                                    standCameraInfos, "\|", Constants.FIELD_CAMERA_COUNT));
                    if (standCameraCount == factCameraCount) {
                        /**
                         * 1 正常路口的数量
                         * 2 异常路口的数量
                         * 3 正常通道个数
                         * 4 摄像头异常的个数
                         */
                        accumulator.add(Constants.FIELD_NORMAL_MONITOR_COUNT + "=1|" + Constants.FIELD_NORMAL_CAMERA_COUNT + "=" + factCameraCount);
                    } else {
                        //获取实际摄像头编号
                        String factCameraIds = StringUtils.getFieldFromConcatString(
                                factCameraInfo, "\|", Constants.FIELD_CAMERA_IDS);
                        //获取标准摄像头编号
                        String standCameraIds = StringUtils.getFieldFromConcatString(
                                standCameraInfos, "\|", Constants.FIELD_CAMERA_IDS);

                        List<String> factCameraIdList = Arrays.asList(factCameraIds.split(","));
                        List<String> standCameraIdList = Arrays.asList(standCameraIds.split(","));
                        StringBuilder abnormalCameraInfos = new StringBuilder();
                        int abnormalCameraCount = 0;
                        int normalCameraCount = 0;
                        for (String cameraId : standCameraIdList) {
                            if (!factCameraIdList.contains(cameraId)) {
                                abnormalCameraCount++;
                                abnormalCameraInfos.append("," + cameraId);
                            }
                        }
                        normalCameraCount = standCameraIdList.size() - abnormalCameraCount;
                        //往累加器中更新状态
                        accumulator.add(
                                Constants.FIELD_ABNORMAL_MONITOR_COUNT + "=1|"
                                        + Constants.FIELD_NORMAL_CAMERA_COUNT + "=" + normalCameraCount + "|"
                                        + Constants.FIELD_ABNORMAL_CAMERA_COUNT + "=" + abnormalCameraCount + "|"
                                        + Constants.FIELD_ABNORMAL_MONITOR_CAMERA_INFOS + "=" + monitorId + ":" + abnormalCameraInfos.toString().substring(1));
                    }
                    //从实际数据拼接到字符串中获取车流量
                    int carCount = Integer.parseInt(
                            StringUtils.getFieldFromConcatString(
                                    factCameraInfo, "\|", Constants.FIELD_CAMERA_COUNT));
                    list.add(new Tuple2<>(carCount, monitorId));
                }
                return list.iterator();
            }
        });
    return res;
    }
}

  

以上是关于每日一题 为了工作 2020 0510 第六十八题的主要内容,如果未能解决你的问题,请参考以下文章

每日一题 为了工作 2020 0502 第六十一题

每日一题 为了工作 2020 0508 第六十六题

每日一题 为了工作 2020 0508 第六十六题

每日一题 为了工作 2020 056 第六十四题

每日一题 为了工作 2020 056 第六十四题

每日一题 为了工作 2020 0501 第六十题