spark定制之六:sql版start.scala
Posted mqxnongmin
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark定制之六:sql版start.scala相关的知识,希望对你有一定的参考价值。
上个版本号的start.scala用的是HiveContext。这个是SQLContext的,不需编译。
# cat testperson.txt #字段用table键分隔
zs 10
30.0
li 12 32.0
# spark-shell -i:start.scala
scala> help
依据提示逐步执行
import org.apache.spark.sql.SchemaRDD
var FIELD_SEPERATOR = "\t"
var RECORD_SEPERATOR = "\n"
var lastrdd : SchemaRDD = null
object MyFileUtil extends java.io.Serializable {
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.FileStatus
import scala.collection.mutable.ListBuffer
def regularFile(filepath:String):String = {
if(filepath == "") {
filepath;
} else if(filepath.startsWith("hdfs:")) {
filepath
} else if(filepath.startsWith("file:")) {
filepath
} else if(filepath.startsWith("/")) {
"file://" + filepath
} else {
val workdir = System.getProperty("user.dir")
"file://" + workdir + "/" + filepath
}
}
var SAFEMINPATH_LENGTH : Int = 24
def getFileSystem(filepath:String) = {
if(filepath.startsWith("hdfs:")) {
FileSystem.get(new org.apache.hadoop.conf.Configuration());
} else if(filepath.startsWith("file:")) {
FileSystem.getLocal(new org.apache.hadoop.conf.Configuration());
} else {
throw new Exception("file path invalid")
}
}
def deletePath(filepath:String) = {
if(filepath.length < SAFEMINPATH_LENGTH)
throw new Exception("file path is to short")
var fs : FileSystem = getFileSystem(filepath)
if (fs.exists(new Path(filepath))) {
fs.delete(new Path(filepath), true);
}
}
def listFile(fs:FileSystem, path:Path, pathlist:ListBuffer[Path], statuslist:ListBuffer[FileStatus]=null) {
if ( fs.exists(path) ) {
val substatuslist = fs.listStatus(path);
for(substatus <- substatuslist){
if(statuslist != null)
statuslist.append(substatus)
if(substatus.isDir()){
listFile(fs,substatus.getPath(),pathlist);
}else{
pathlist.append(substatus.getPath());
}
}
}
}
def hasContext(filepath:String) = {
val realpath = regularFile(filepath)
val fs = getFileSystem(realpath)
val pathlist = ListBuffer[Path]()
val statuslist = ListBuffer[FileStatus]()
listFile(fs,new Path(filepath),pathlist,statuslist)
var length:Long = 0
for( status <- statuslist )
length += status.getLen()
length > 0
}
}
org.apache.spark.repl.Main.interp.command("""
class MySchemaRDD(rdd:org.apache.spark.sql.SchemaRDD) extends java.io.Serializable {
def go() = {
var startstr = ""
var endstr = RECORD_SEPERATOR
val result = rdd.collect
result.foreach( x =>
print(x.mkString(startstr,FIELD_SEPERATOR,endstr))
)
}
def result() = {
rdd.collect
}
def saveto(output: String) = {
import org.apache.hadoop.io.{NullWritable,Text}
var startstr = ""
var endstr = RECORD_SEPERATOR
if(output.startsWith("hdfs:")) {
val outputpath = MyFileUtil.regularFile(output)
MyFileUtil.deletePath(outputpath)
rdd.map(x =>
(NullWritable.get(), new Text(x.mkString(FIELD_SEPERATOR)))
).saveAsHadoopFile[
org.apache.hadoop.mapred.TextOutputFormat[NullWritable, Text]
](outputpath)
} else {
val outputpath = MyFileUtil.regularFile(output)
MyFileUtil.deletePath(outputpath)
val result = rdd.collect()
val writer = new java.io.FileWriter(output)
result.foreach(x =>
writer.write(x.mkString(startstr,FIELD_SEPERATOR,endstr))
)
writer.close()
}
}
}
object MySchemaRDD {
implicit def toMySchemaRDD(rdd:org.apache.spark.sql.SchemaRDD) = new MySchemaRDD(rdd)
}
""")
val ssc = new org.apache.spark.sql.SQLContext(sc)
import ssc._
import MySchemaRDD._
def getRegisterString(rddname:String,classname:String,tablename:String,tabledef:String) : String = {
val members = tabledef.trim.split(",").map(_.trim.split(" ").filter(""!=)).map(x => (x(0).trim,x(1).trim.head.toString.toUpperCase+x(1).trim.tail))
val classmemberdef = members.map(x => (x._1+":"+x._2)).mkString(",")
val convertstr = members.map(x => x._2).zipWithIndex.map(x => "t("+x._2+").to"+x._1).mkString(",")
return s"""
case class ${classname}(${classmemberdef})
val schemardd = ${rddname}.map(_.split("${FIELD_SEPERATOR}")).map(t=>${classname}(${convertstr}))
ssc.registerRDDAsTable(schemardd,"${tablename}")
"""
}
org.apache.spark.repl.Main.interp.command("""
class MyCommandTranslator(cmd:String) extends java.io.Serializable {
def go()(implicit f: SchemaRDD => MySchemaRDD) = {
lastrdd = sql(cmd)
lastrdd.go()
}
def saveto(output: String)(implicit f: SchemaRDD => MySchemaRDD) = {
lastrdd = sql(cmd)
lastrdd.saveto(output)
}
def result()(implicit f: SchemaRDD => MySchemaRDD) = {
lastrdd = sql(cmd)
lastrdd.result()
}
// def hqlgo()(implicit f: SchemaRDD => MySchemaRDD) = {
// lastrdd = hql(cmd)
// lastrdd.go()
// }
//
// def hqlsaveto(output: String)(implicit f: SchemaRDD => MySchemaRDD) = {
// lastrdd = hql(cmd)
// lastrdd.saveto(output)
// }
//
// def hqlresult()(implicit f: SchemaRDD => MySchemaRDD) = {
// lastrdd = hql(cmd)
// lastrdd.result()
// }
def defineas(tabledef:String) = {
if( tabledef != "" ) {
org.apache.spark.repl.Main.interp.command(
getRegisterString(cmd,cmd.toUpperCase,cmd,tabledef)
)
} else {
org.apache.spark.repl.Main.interp.command(
"ssc.registerRDDAsTable(${cmd},\"${cmd}\")"
)
}
}
def from(filepath:String) {
if( cmd.trim.startsWith("create table ") ) {
val tablename = cmd.trim.substring(13).trim().split(" ")(0)
val leftstr = cmd.substring(13).trim().substring(tablename.length).trim()
val tabledef = leftstr.substring(1,leftstr.length-1).trim()
val realfile = MyFileUtil.regularFile(filepath)
org.apache.spark.repl.Main.interp.command(
"val "+tablename+" = sc.textFile(\""+realfile+"\")"
)
new MyCommandTranslator(tablename).defineas(tabledef)
} else {
println("usage:")
println("\"create table sometablename (field1 string,field2 int...)\" from \"somefile or hdfs:somepath\"")
}
}
def isok() = {
if(cmd.contains(".") || cmd.contains("/")) {
MyFileUtil.hasContext(cmd)
} else {
val res = sql(s"select count(*) from ${cmd}").result()
val count = res(0).getLong(0)
count > 0
}
}
}
object MyCommandTranslator {
implicit def stringToTranslator(cmd:String) = new MyCommandTranslator(cmd)
def show(tabledata:Array[org.apache.spark.sql.Row]) = {
tabledata.foreach( x => println(x.mkString("\t")))
}
}
""")
def to = MyCommandTranslator
import MyCommandTranslator._
val onetable = sql("select 1 as id")
ssc.registerRDDAsTable(onetable,"onetable")
def help = {
println("""example:
"create table testperson (name string,age int,weight double)" from "testperson.txt"
"select * from testperson" go
"select * from testperson" saveto "somelocalfile.txt"
"select * from testperson" saveto "hdfs:/basedir/parentdir/testperson"
"testperson" isok
"somelocalfile.txt" isok
"hdfs:/basedir/parentdir/testperson" isok
val data = "select * from testperson" result
to show data
val somerdddata = sc.textFile("hdfs:/basedir/parentdir/testperson")<span style="font-family: Arial, Helvetica, sans-serif;"> </span>
"somerdddata" defineas "name string,age int,weight double"
"select * from somerdddata" go
if you want to see the help of enveronment, please type :help
""")
}