Spark:单个应用程序中的两个 SparkContext 最佳实践
Posted
技术标签:
【中文标题】Spark:单个应用程序中的两个 SparkContext 最佳实践【英文标题】:Spark: Two SparkContexts in a single Application Best Practice 【发布时间】:2016-01-31 07:06:52 【问题描述】:我想我今天有一个有趣的问题要问大家。在下面的代码中,您会注意到我有两个 SparkContext,一个用于 SparkStreaming,另一个用于普通 SparkContext。根据最佳实践,您应该在 Spark 应用程序中只有一个 SparkContext,即使可以通过配置中的 allowMultipleContexts 来规避这一点。
问题是,我需要从 hive 和 Kafka 主题中检索数据以执行一些逻辑,并且每当我提交我的应用程序时,它显然会返回“不能在 JVM 上运行 2 个 Spark 上下文”。
我的问题是,有没有比我现在做的更正确的方法?
public class MainApp
private final String logFile= Properties.getString("SparkLogFileDir");
private static final String KAFKA_GROUPID = Properties.getString("KafkaGroupId");
private static final String ZOOKEEPER_URL = Properties.getString("ZookeeperURL");
private static final String KAFKA_BROKER = Properties.getString("KafkaBroker");
private static final String KAFKA_TOPIC = Properties.getString("KafkaTopic");
private static final String Database = Properties.getString("HiveDatabase");
private static final Integer KAFKA_PARA = Properties.getInt("KafkaParrallel");
public static void main(String[] args)
//set settings
String sql="";
//START APP
System.out.println("Starting NPI_TWITTERAPP...." + new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
System.out.println("Configuring Settings...."+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
SparkConf conf = new SparkConf()
.setAppName(Properties.getString("SparkAppName"))
.setMaster(Properties.getString("SparkMasterUrl"));
//Set Spark/hive/sql Context
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(5000));
JavaHiveContext HiveSqlContext = new JavaHiveContext(sc);
//Check if Twitter Hive Table Exists
try
HiveSqlContext.sql("DROP TABLE IF EXISTS "+Database+"TWITTERSTORE");
HiveSqlContext.sql("CREATE TABLE IF NOT EXISTS "+Database+".TWITTERSTORE "
+" (created_at String, id String, id_str String, text String, source String, truncated String, in_reply_to_user_id String, processed_at String, lon String, lat String)"
+" STORED AS TEXTFILE");
catch(Exception e)
System.out.println(e);
//Check if Ivapp Table Exists
sql ="CREATE TABLE IF NOT EXISTS "+Database+".IVAPPGEO AS SELECT DISTINCT a.LATITUDE, a.LONGITUDE, b.ODNCIRCUIT_OLT_CLLI, b.ODNCIRCUIT_OLT_TID, a.CITY, a.STATE, a.ZIP FROM "
+Database+".T_PONNMS_SERVICE B, "
+Database+".CLLI_LATLON_MSTR A WHERE a.BID_CLLI = substr(b.ODNCIRCUIT_OLT_CLLI,0,8)";
try
System.out.println(sql + new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
HiveSqlContext.sql(sql);
sql = "SELECT LATITUDE, LONGITUDE, ODNCIRCUIT_OLT_CLLI, ODNCIRCUIT_OLT_TID, CITY, STATE, ZIP FROM "+Database+".IVAPPGEO";
JavaSchemaRDD RDD_IVAPPGEO = HiveSqlContext.sql(sql).cache();
catch(Exception e)
System.out.println(sql + new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
//JavaHiveContext hc = new JavaHiveContext();
System.out.println("Retrieve Data from Kafka Topic: "+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(KAFKA_TOPIC,KAFKA_PARA);
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(
jssc, KAFKA_GROUPID, ZOOKEEPER_URL, topicMap);
JavaDStream<String> json = messages.map(
new Function<Tuple2<String, String>, String>()
private static final long serialVersionUID = 42l;
@Override
public String call(Tuple2<String, String> message)
return message._2();
);
System.out.println("Completed Kafka Messages... "+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
System.out.println("Filtering Resultset... "+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
JavaPairDStream<Long, String> tweets = json.mapToPair(
new TwitterFilterFunction());
JavaPairDStream<Long, String> filtered = tweets.filter(
new Function<Tuple2<Long, String>, Boolean>()
private static final long serialVersionUID = 42l;
@Override
public Boolean call(Tuple2<Long, String> tweet)
return tweet != null;
);
JavaDStream<Tuple2<Long, String>> tweetsFiltered = filtered.map(
new TextFilterFunction());
tweetsFiltered = tweetsFiltered.map(
new StemmingFunction());
System.out.println("Finished Filtering Resultset... "+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
System.out.println("Processing Sentiment Data... "+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
//calculate postive tweets
JavaPairDStream<Tuple2<Long, String>, Float> positiveTweets =
tweetsFiltered.mapToPair(new PositiveScoreFunction());
//calculate negative tweets
JavaPairDStream<Tuple2<Long, String>, Float> negativeTweets =
tweetsFiltered.mapToPair(new NegativeScoreFunction());
JavaPairDStream<Tuple2<Long, String>, Tuple2<Float, Float>> joined =
positiveTweets.join(negativeTweets);
//Score tweets
JavaDStream<Tuple4<Long, String, Float, Float>> scoredTweets =
joined.map(new Function<Tuple2<Tuple2<Long, String>,
Tuple2<Float, Float>>,
Tuple4<Long, String, Float, Float>>()
private static final long serialVersionUID = 42l;
@Override
public Tuple4<Long, String, Float, Float> call(
Tuple2<Tuple2<Long, String>, Tuple2<Float, Float>> tweet)
return new Tuple4<Long, String, Float, Float>(
tweet._1()._1(),
tweet._1()._2(),
tweet._2()._1(),
tweet._2()._2());
);
System.out.println("Finished Processing Sentiment Data... "+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
System.out.println("Outputting Tweets Data to flat file "+Properties.getString("HdfsOutput")+" ... "+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
JavaDStream<Tuple5<Long, String, Float, Float, String>> result =
scoredTweets.map(new ScoreTweetsFunction());
result.foreachRDD(new FileWriter());
System.out.println("Outputting Sentiment Data to Hive... "+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
jssc.start();
jssc.awaitTermination();
【问题讨论】:
从您的代码中不清楚为什么需要 2 个 spark 上下文,您能否更好地解释一下?还有一个很好的做法是在将代码作为问题的一部分提交之前清理代码,因此它只包含理解您遇到的问题所需的必要部分。 【参考方案1】:创建 SparkContext
您可以先创建一个 SparkContext 实例,也可以先创建一个 SparkConf 对象。
获取现有的或创建新的 SparkContext(getOrCreate 方法)
getOrCreate(): SparkContext
getOrCreate(conf: SparkConf): SparkContext
SparkContext.getOrCreate 方法允许您获取现有的 SparkContext 或创建一个新的。
import org.apache.spark.SparkContext
val sc = SparkContext.getOrCreate()
// Using an explicit SparkConf object
import org.apache.spark.SparkConf
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("SparkMe App")
val sc = SparkContext.getOrCreate(conf)
参考这里 - https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sparkcontext.html
【讨论】:
使用正确的格式并提供更多信息来改进您的回答【参考方案2】:显然,如果我在执行 JavaStreaming Context 之前使用 sc.close() 关闭原始 SparkContext,它可以完美运行,没有错误或问题。
【讨论】:
【参考方案3】:您可以使用单例对象ContextManager
来处理要提供的上下文。
public class ContextManager
private static JavaSparkContext context;
private static String currentType;
private ContextManager()
public static JavaSparkContext getContext(String type)
if(type == currentType && context != null)
return context;
else if (type == "streaming")
.. clean up the current context ..
.. initialize the context to streaming context ..
currentType = type;
else
..clean up the current context..
... initialize the context to normal context ..
currentType = type;
return context;
有些问题,例如在您快速切换上下文的项目中,开销会非常大。
【讨论】:
【参考方案4】:您可以从 JavaStreamingSparkContext 访问 SparkContext,并在创建其他上下文时使用 that 引用。
SparkConf sparkConfig = new SparkConf().setAppName("foo");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, Duration.seconds(30));
SqlContext sqlContext = new SqlContext(jssc.sparkContext());
【讨论】:
以上是关于Spark:单个应用程序中的两个 SparkContext 最佳实践的主要内容,如果未能解决你的问题,请参考以下文章