按部门 ID 计数员工并确定员工 ID 最多的两个部门

Posted

技术标签:

【中文标题】按部门 ID 计数员工并确定员工 ID 最多的两个部门【英文标题】:Count employee by department ID & identify top two departments with the most employee IDs 【发布时间】:2019-10-04 14:48:36 【问题描述】:

第一次使用 Spark 用户。我为两个 csv 文件(员工和部门)创建了 RDD。我想提供一个输出,按部门 ID 计算员工数量,并确定员工 ID 最多的前两个部门名称。 “deptno”是我的主键,但我不知道如何将两个文件连接在一起。

员工档案包含以下列:[empno、ename、job、mgr、hiredate、sal、comm、deptno]

dept 文件包含以下列:[deptno, dname, location]

这是我到目前为止所做的:

`employees_rdd = sc.textFile("/FileStore/tables/Employee.csv")
employees_rdd.take(3)
header_e = employees_rdd.first()
employees1 = employees_rdd.filter(lambda row : row != header_e)
employees1.take(1)`

`dept_rdd = sc.textFile("/FileStore/tables/Dept.csv")
dept_rdd.take(3)
header_d = dept_rdd.first()
dept1 = dept_rdd.filter(lambda row : row != header_d)
dept1.take(1)`

`employees2 = employees1.map(lambda row : row.split(","))
employees_kv = employees2.map(lambda row : (row[7],1))
employees_kv.take(3)`

在下面收到语法错误:

employees_kv.reduceByKey(lambda x,y : x+y).takeOrdered(2,lambda (x,y): -1*y)

非常感谢任何帮助。

【问题讨论】:

如果你使用数据框,那么它会容易得多。见***.com/questions/28782940/load-csv-file-with-spark 【参考方案1】:

这是我执行此操作的 pyspark 代码。我假设读取语句带有分隔符“|”。

from pyspark.sql.functions import *
from pyspark.sql.types import *

emp = spark.read.option("header","true") \
                .option("inferSchema","true") \
                .option("sep","|") \
                .csv("/FileStore/tables/employee.txt")

dept = spark.read.option("header","true") \
                 .option("inferSchema","true") \
                 .option("sep","|") \
                 .option("removeQuotes","true") \
                 .csv("/FileStore/tables/department.txt")

# Employee count by department
empCountByDept = emp.groupBy("deptno") \
                       .agg(count("empno").alias("no_of_employees"))

empCountByDept.show(20,False)

# Top two department names with the most employees 
topTwoDept = empCountByDept.join(dept, empCountByDept.deptno == dept.deptno, "inner") \
                           .orderBy(empCountByDept.no_of_employees.desc()).drop(dept.deptno) \
                           .select("dname","no_of_employees") \
                           .limit(2)
topTwoDept.show(20,False)

结果 ::

+------+---------------+
|deptno|no_of_employees|
+------+---------------+
|20    |5              |
|10    |3              |
|30    |6              |
+------+---------------+
+----------+---------------+
|dname     |no_of_employees|
+----------+---------------+
|'Sales'   |6              |
|'Research'|5              |
+----------+---------------+

【讨论】:

【参考方案2】:

我强烈建议使用 Dataframes/DataSets。不仅因为它们更易于使用和操作,而且它们提供了严重的性能改进。甚至 spark 文档也推荐相同。

这是您的数据框代码:-

val spark = SparkSession.builder.master("local[*]").getOrCreate

这会创建一个SparkSession,它是应用程序的入口点。

现在,让我们阅读您的员工和部门文件。

val employeeDF = spark.read.format("csv").option("header","true").load("/path/to/employee/file")
val deptDF = spark.read.format("csv").option("header","true").load("/path/to/dept/file")

现在,加入很容易。下面的语句将创建一个数据框,该数据框将是 employeeDFdeptDF 在列 deptno 上的内部连接的结果

val joinedDF = employeeDF.join(deptDF,Seq("deptno"))

现在,您可以使用joinedDF 获取结果。

val countByDept = joinedDF.groupBy($"deptno").count
//countByDept.show to display the results

val top2Dept = joinedDF.groupBy($"dname").count.orderBy($"count".desc).limit(2)
//top2Dept.show to display the results

希望这能让您开始使用 Spark DataFrames 和 DataSets。

【讨论】:

感谢您的帮助。我正在使用 Python 语言,所以我已经转换了你推荐的内容。我遇到了加入问题。我已将val joinedDF = employeeDF.join(deptDF,Seq("deptno")) 转换为join_df = dept_df.join(employee_df.select("deptno"), "deptno") 我收到以下消息:AnalysisException: "cannot resolve 'deptno' given input columns: [empno,ename,job,mgr,hiredate,sal,comm,deptno]; ;\n'项目 ['deptno]\n+- 关系[empno,ename,job,mgr,hiredate,sal,comm,deptno#120] csv\n"。这是因为架构类型是字符串还是整数? 我也尝试了这种略有不同的方法:joined_df = dept_df.join(employee_df, dept_df.deptno == employee_df.deptno) 但收到以下消息:AttributeError: 'DataFrame' object has no attribute 'deptno'

以上是关于按部门 ID 计数员工并确定员工 ID 最多的两个部门的主要内容,如果未能解决你的问题,请参考以下文章

当我使用特定部门 ID 时,我想要所有员工姓名

sql 按单位,评估状态,评估年份,评估级别和单位部门的所有人员计算员工ID数

查询各个部门中各职位的人数与平均工资? 查询工资,奖金与10号部门某员工工资,奖金都相同的员工? SQL

计数结果不正确

在部门的所有项目中工作的员工

返回组中最高总数的数据