KafkaUtils.createDirectStream()参数详解
Posted zbw1112
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了KafkaUtils.createDirectStream()参数详解相关的知识,希望对你有一定的参考价值。
通过KafkaUtils.createDirectStream该方法创建kafka的DStream数据源,传入有三个参数:ssc,LocationStrategies,ConsumerStrategies。
LocationStrategies有三种策略:PreferBrokers,PreferConsistent,PreferFixed详情查看上边源码解析
/** * :: Experimental :: object to obtain instances of [[LocationStrategy]] * */ @Experimental object LocationStrategies { /** * :: Experimental :: * Use this only if your executors are on the same nodes as your Kafka brokers. 只有当executors数量等于brokers数量时使用 */ @Experimental def PreferBrokers: LocationStrategy = org.apache.spark.streaming.kafka010.PreferBrokers /** * :: Experimental :: * Use this in most cases, it will consistently distribute partitions across all executors.大多数使用,在所有的executors分配分区 */ @Experimental def PreferConsistent: LocationStrategy = org.apache.spark.streaming.kafka010.PreferConsistent /** * :: Experimental :: * Use this to place particular TopicPartitions on particular hosts if your load is uneven. * Any TopicPartition not specified in the map will use a consistent location.如果负载不平衡,把特定的TopicPartitions放在特定的hosts,不在这个map中的TopicPartition采用PreferConsistent策略 */ @Experimental def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy = new PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava)) /** * :: Experimental :: * Use this to place particular TopicPartitions on particular hosts if your load is uneven. * Any TopicPartition not specified in the map will use a consistent location. */ @Experimental def PreferFixed(hostMap: ju.Map[TopicPartition, String]): LocationStrategy = new PreferFixed(hostMap)
ConsumerStrategies消费者策略:Subscribe,SubscribePattern,Assign,订阅和分配
Subscribe为consumer自动分配partition,有内部算法保证topic-partitions以最优的方式均匀分配给同group下的不同consumer
Assign为consumer手动、显示的指定需要消费的topic-partitions,不受group.id限制,相当于指定的group无效
/** * :: Experimental :: * Subscribe to a collection of topics. * @param topics collection of topics to subscribe * @param kafkaParams Kafka * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs"> * configuration parameters</a> to be used on driver. The same params will be used on executors, * with minor automatic modifications applied. * Requires "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. * @param offsets: offsets to begin at on initial startup. If no offset is given for a * TopicPartition, the committed offset (if applicable) or kafka param * auto.offset.reset will be used. */ @Experimental def Subscribe[K, V]( topics: Iterable[jl.String], kafkaParams: collection.Map[String, Object], offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = { new Subscribe[K, V]( new ju.ArrayList(topics.asJavaCollection), new ju.HashMap[String, Object](kafkaParams.asJava), new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava)) } /** :: Experimental :: * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. * The pattern matching will be done periodically against topics existing at the time of check. * @param pattern pattern to subscribe to * @param kafkaParams Kafka * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs"> * configuration parameters</a> to be used on driver. The same params will be used on executors, * with minor automatic modifications applied. * Requires "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. * @param offsets: offsets to begin at on initial startup. If no offset is given for a * TopicPartition, the committed offset (if applicable) or kafka param * auto.offset.reset will be used. */ @Experimental def SubscribePattern[K, V]( pattern: ju.regex.Pattern, kafkaParams: collection.Map[String, Object], offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = { new SubscribePattern[K, V]( pattern, new ju.HashMap[String, Object](kafkaParams.asJava), new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava)) } /** * :: Experimental :: * Assign a fixed collection of TopicPartitions * @param topicPartitions collection of TopicPartitions to assign * @param kafkaParams Kafka * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs"> * configuration parameters</a> to be used on driver. The same params will be used on executors, * with minor automatic modifications applied. * Requires "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. * @param offsets: offsets to begin at on initial startup. If no offset is given for a * TopicPartition, the committed offset (if applicable) or kafka param * auto.offset.reset will be used. */ @Experimental def Assign[K, V]( topicPartitions: Iterable[TopicPartition], kafkaParams: collection.Map[String, Object], offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = { new Assign[K, V]( new ju.ArrayList(topicPartitions.asJavaCollection), new ju.HashMap[String, Object](kafkaParams.asJava), new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava)) }
Cannot resolve overloaded method:
原因:方法中传入的参数不符合要求。检查参数类型
以上是关于KafkaUtils.createDirectStream()参数详解的主要内容,如果未能解决你的问题,请参考以下文章