spark定制之六:sql版start.scala

Posted mqxnongmin

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark定制之六:sql版start.scala相关的知识,希望对你有一定的参考价值。

上个版本号的start.scala用的是HiveContext。这个是SQLContext的,不需编译。

# cat testperson.txt #字段用table键分隔

zs 10 30.0
li 12 32.0

# spark-shell -i:start.scala

scala> help

依据提示逐步执行

import org.apache.spark.sql.SchemaRDD  
  
var FIELD_SEPERATOR = "\t"  
var RECORD_SEPERATOR = "\n"  
var lastrdd : SchemaRDD = null  
  
object MyFileUtil extends java.io.Serializable {  
    import org.apache.hadoop.fs.Path  
    import org.apache.hadoop.fs.FileSystem  
    import org.apache.hadoop.fs.FileStatus  
    import scala.collection.mutable.ListBuffer  
  
    def regularFile(filepath:String):String = {  
        if(filepath == "") {  
            filepath;  
        } else if(filepath.startsWith("hdfs:")) {  
            filepath  
        } else if(filepath.startsWith("file:")) {  
            filepath  
        } else if(filepath.startsWith("/")) {  
            "file://" + filepath  
        } else {  
            val workdir = System.getProperty("user.dir")  
            "file://" + workdir + "/" + filepath  
        }  
    }  
  
    var SAFEMINPATH_LENGTH : Int = 24  
  
    def getFileSystem(filepath:String) = {  
        if(filepath.startsWith("hdfs:")) {  
            FileSystem.get(new org.apache.hadoop.conf.Configuration());  
        } else if(filepath.startsWith("file:")) {  
            FileSystem.getLocal(new org.apache.hadoop.conf.Configuration());  
        } else {  
            throw new Exception("file path invalid")  
        }  
    }  
  
    def deletePath(filepath:String) = {  
        if(filepath.length < SAFEMINPATH_LENGTH)  
            throw new Exception("file path is to short")  
        var fs : FileSystem = getFileSystem(filepath)  
        if (fs.exists(new Path(filepath))) {  
            fs.delete(new Path(filepath), true);  
        }  
    }  
  
    def listFile(fs:FileSystem, path:Path, pathlist:ListBuffer[Path], statuslist:ListBuffer[FileStatus]=null) {  
        if ( fs.exists(path) ) {  
            val substatuslist =  fs.listStatus(path);  
            for(substatus <- substatuslist){  
                if(statuslist != null)  
                    statuslist.append(substatus)  
                if(substatus.isDir()){  
                    listFile(fs,substatus.getPath(),pathlist);  
                }else{  
                    pathlist.append(substatus.getPath());  
                }  
            }  
        }  
    }  
  
    def hasContext(filepath:String) = {  
        val realpath = regularFile(filepath)  
        val fs = getFileSystem(realpath)   
        val pathlist = ListBuffer[Path]()  
        val statuslist = ListBuffer[FileStatus]()  
        listFile(fs,new Path(filepath),pathlist,statuslist)  
        var length:Long = 0  
        for( status <- statuslist )  
            length += status.getLen()  
        length > 0  
    }  
}  
  
org.apache.spark.repl.Main.interp.command("""  
class MySchemaRDD(rdd:org.apache.spark.sql.SchemaRDD) extends java.io.Serializable {  
  
    def go() = {  
        var startstr = ""  
        var endstr = RECORD_SEPERATOR  
        val result = rdd.collect  
        result.foreach( x =>  
            print(x.mkString(startstr,FIELD_SEPERATOR,endstr))  
          )  
    }  
  
    def result() = {  
        rdd.collect  
    }  
  
    def saveto(output: String) = {  
        import org.apache.hadoop.io.{NullWritable,Text}  
        var startstr = ""  
        var endstr = RECORD_SEPERATOR  
        if(output.startsWith("hdfs:")) {  
            val outputpath = MyFileUtil.regularFile(output)  
            MyFileUtil.deletePath(outputpath)  
            rdd.map(x =>   
                  (NullWritable.get(), new Text(x.mkString(FIELD_SEPERATOR)))  
                ).saveAsHadoopFile[  
                  org.apache.hadoop.mapred.TextOutputFormat[NullWritable, Text]  
                ](outputpath)  
        } else {  
            val outputpath = MyFileUtil.regularFile(output)  
            MyFileUtil.deletePath(outputpath)  
            val result = rdd.collect()  
            val writer = new java.io.FileWriter(output)  
            result.foreach(x =>   
                writer.write(x.mkString(startstr,FIELD_SEPERATOR,endstr))  
              )  
            writer.close()  
        }  
    }  
}  
object MySchemaRDD {  
    implicit def toMySchemaRDD(rdd:org.apache.spark.sql.SchemaRDD) = new MySchemaRDD(rdd)  
}  
""")  
  
val ssc = new org.apache.spark.sql.SQLContext(sc)  
import ssc._  
import MySchemaRDD._  

def getRegisterString(rddname:String,classname:String,tablename:String,tabledef:String) : String = {  
    val members = tabledef.trim.split(",").map(_.trim.split(" ").filter(""!=)).map(x => (x(0).trim,x(1).trim.head.toString.toUpperCase+x(1).trim.tail))  
    val classmemberdef = members.map(x => (x._1+":"+x._2)).mkString(",")  
    val convertstr = members.map(x => x._2).zipWithIndex.map(x => "t("+x._2+").to"+x._1).mkString(",")  
    return s"""  
        case class ${classname}(${classmemberdef})  
        val schemardd = ${rddname}.map(_.split("${FIELD_SEPERATOR}")).map(t=>${classname}(${convertstr}))  
        ssc.registerRDDAsTable(schemardd,"${tablename}")  
    """  
}  

org.apache.spark.repl.Main.interp.command("""  
class MyCommandTranslator(cmd:String) extends java.io.Serializable {  
  
    def go()(implicit f: SchemaRDD => MySchemaRDD) = {  
        lastrdd = sql(cmd)  
        lastrdd.go()  
    }  
  
    def saveto(output: String)(implicit f: SchemaRDD => MySchemaRDD) = {  
        lastrdd = sql(cmd)  
        lastrdd.saveto(output)  
    }  
  
    def result()(implicit f: SchemaRDD => MySchemaRDD) = {  
        lastrdd = sql(cmd)  
        lastrdd.result()  
    }  
  
//    def hqlgo()(implicit f: SchemaRDD => MySchemaRDD) = {  
//        lastrdd = hql(cmd)  
//        lastrdd.go()  
//    }  
//  
//    def hqlsaveto(output: String)(implicit f: SchemaRDD => MySchemaRDD) = {  
//        lastrdd = hql(cmd)  
//        lastrdd.saveto(output)  
//    }  
//  
//    def hqlresult()(implicit f: SchemaRDD => MySchemaRDD) = {  
//        lastrdd = hql(cmd)  
//        lastrdd.result()  
//    }  
  
    def defineas(tabledef:String) = {  
        if( tabledef != "" ) {  
            org.apache.spark.repl.Main.interp.command(   
                getRegisterString(cmd,cmd.toUpperCase,cmd,tabledef)  
            )  
        } else {  
            org.apache.spark.repl.Main.interp.command(  
                "ssc.registerRDDAsTable(${cmd},\"${cmd}\")"  
            )  
        }  
    }  
  
    def from(filepath:String) {  
        if( cmd.trim.startsWith("create table ") ) {  
            val tablename = cmd.trim.substring(13).trim().split(" ")(0)  
            val leftstr = cmd.substring(13).trim().substring(tablename.length).trim()  
            val tabledef = leftstr.substring(1,leftstr.length-1).trim()  
            val realfile = MyFileUtil.regularFile(filepath)  
            org.apache.spark.repl.Main.interp.command(  
                "val "+tablename+" = sc.textFile(\""+realfile+"\")"  
            )  
            new MyCommandTranslator(tablename).defineas(tabledef)  
        } else {  
            println("usage:")  
            println("\"create table sometablename (field1 string,field2 int...)\" from \"somefile or hdfs:somepath\"")  
        }  
    }  
  
    def isok() = {  
        if(cmd.contains(".") || cmd.contains("/")) {  
            MyFileUtil.hasContext(cmd)  
        } else {  
            val res = sql(s"select count(*) from ${cmd}").result()  
            val count = res(0).getLong(0)  
            count > 0  
        }  
    }  
}  
object MyCommandTranslator {  
    implicit def stringToTranslator(cmd:String) = new MyCommandTranslator(cmd)  
  
    def show(tabledata:Array[org.apache.spark.sql.Row]) = {  
        tabledata.foreach( x => println(x.mkString("\t")))  
    }  
}  
""")  
  
def to = MyCommandTranslator  
import MyCommandTranslator._  
  
val onetable = sql("select 1 as id")  
ssc.registerRDDAsTable(onetable,"onetable")  
  
def help = {  
    println("""example:  
        "create table testperson (name string,age int,weight double)" from "testperson.txt"  
        "select * from testperson" go  
        "select * from testperson" saveto "somelocalfile.txt"  
        "select * from testperson" saveto "hdfs:/basedir/parentdir/testperson"  
        "testperson" isok  
        "somelocalfile.txt" isok  
        "hdfs:/basedir/parentdir/testperson" isok  
        val data = "select * from testperson" result  
        to show data  
        val somerdddata = sc.textFile("hdfs:/basedir/parentdir/testperson")<span style="font-family: Arial, Helvetica, sans-serif;"> </span>
        "somerdddata" defineas "name string,age int,weight double"  
        "select * from somerdddata" go  
        if you want to see the help of enveronment, please type :help  
        """)  
}


以上是关于spark定制之六:sql版start.scala的主要内容,如果未能解决你的问题,请参考以下文章

Spark 定制版:016~Spark Streaming源码解读之数据清理内幕彻底解密

Spark 定制版:008~Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考

Spark 定制版:009~Spark Streaming源码解读之Receiver在Driver的精妙实现全生命周期彻底研究和思考

Spark源码分析之六:Task调度

定制Spark SQL: 一种轻量级实现方案

Spark源代码分析之六:Task调度