Apache Spark + Ignite 集群瘦客户端
Posted
技术标签:
【中文标题】Apache Spark + Ignite 集群瘦客户端【英文标题】:Apache Spark + Ignite Cluster Thin Client 【发布时间】:2020-02-14 16:26:21 【问题描述】:我正在尝试使用 apache-spark 读写 Ignite 集群
现在所有 spark + ignite 示例都会启动一个本地 ignite 集群,但我希望我的代码作为客户端连接到已经存在的集群。
问题:-
如何在example-default.xml中传递Ignite连接ip和端口(10800)10800?
错误:- 现在我得到以下错误
TcpDiscoverySpi:无法从 IP finder 连接到任何地址(将每 2000 毫秒重试一次加入拓扑;更改“reconnectDelay”以配置重试频率):[/3.88.248.113:10800]
工作(使用 JDBC 的 Spark + Ignite):-
val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
import java.util.Properties
val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
"Person",connectionProperties)
spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)
不工作需要一个 CONFIG 文件,即 example-default.xml:-
val igniteDF = spark.read
.format(FORMAT_IGNITE) //Data source type.
.option(OPTION_TABLE, "person") //Table to read.
.option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
.load()
.filter(col("id") >= 2) //Filter clause.
.filter(col("name") like "%J%") //Another filter clause.
完整代码:- (sparkDSLExample) 函数无法使用瘦连接点燃远程集群
package com.ignite.examples.spark
import com.ignite.examples.model.Address
import org.apache.ignite.Ignite, Ignition
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.client.ClientCache, IgniteClient
import org.apache.ignite.configuration.CacheConfiguration, ClientConfiguration
import java.lang.Long => JLong, String => JString
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.spark.IgniteDataFrameSettings.FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_TABLE
import org.apache.log4j.Level, Logger
import org.apache.spark.sql.SaveMode, SparkSession
import org.apache.spark.sql.functions.col
object SparkClientConnectionTest
private val CACHE_NAME = "SparkCache"
private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
def setupExampleData =
val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800")
val igniteClient:IgniteClient = Ignition.startClient(cfg2)
System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)
cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person"))
.setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName))
.setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll
System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll
println(data.toString)
def sparkDSLExample(implicit spark: SparkSession): Unit =
println("Querying using Spark DSL.")
println
val igniteDF = spark.read
.format(FORMAT_IGNITE) //Data source type.
.option(OPTION_TABLE, "person") //Table to read.
.option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
.load()
.filter(col("id") >= 2) //Filter clause.
.filter(col("name") like "%J%") //Another filter clause.
println("Data frame schema:")
igniteDF.printSchema() //Printing query schema to console.
println("Data frame content:")
igniteDF.show() //Printing query results to console.
def main(args: Array[String]): Unit =
setupExampleData
//Creating spark session.
implicit val spark = SparkSession.builder()
.appName("Spark Ignite data sources example")
.master("local")
.config("spark.executor.instances", "2")
.getOrCreate()
// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger.setLevel(Level.ERROR)
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
//sparkDSLExample
val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
import java.util.Properties
val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
"Person",connectionProperties)
spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)
example-default.xml:-
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--
Ignite configuration with all defaults and enabled p2p deployment and enabled events.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Set to true to enable distributed class loading for examples, default is false. -->
<property name="peerClassLoadingEnabled" value="true"/>
<!-- Enable task execution events for examples. -->
<property name="includeEventTypes">
<list>
<!--Task execution events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
<!--Cache events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
</list>
</property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113:10800</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
【问题讨论】:
【参考方案1】:就像在http://apache-ignite-users.70518.x6.nabble.com/Apache-Spark-Ignite-Connection-Issue-td29797.html 中回答的那样,您正在尝试将胖客户端连接到瘦客户端端口。
【讨论】:
对不起,我没有从论坛得到任何答案,我能得到一个直接的答案吗?我在 example-default.xml 中尝试了 ec2 公共地址和端口 47500..47509 但仍然没有运气,尝试了 ec2_public_ip:10800 没有使用 spark 瘦客户端创建的表我的意思是在 ignite .sqline shell 中不存在使用 spark 数据帧我只看到使用 spark jdbc 瘦客户端创建的表 Ignite 的所有文档都没有意义【参考方案2】:JDBC 瘦客户端是从外部 ignite 集群连接 AWS + Spark 上的 Ignite 的唯一方法,这在下面的网格增益博客中得到了回答。
https://forums.gridgain.com/community-home/digestviewer/viewthread?MessageKey=13f5e836-1569-486a-8475-84c70fc141e0&CommunityKey=3b551477-7e2d-462f-bc5f-d6d10ccbbe35&tab=digestviewer&SuccessMsg=Thank+you+for+submitting+your+message.
Spark + Ignite 示例:- https://github.com/kali786516/ApacheIgnitePoc/blob/master/src/main/scala/com/ignite/examples/spark/SparkIgniteCleanCode.scala
【讨论】:
以上是关于Apache Spark + Ignite 集群瘦客户端的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Apache Ignite.NET 瘦客户端连接到特定网格
Ignite 性能:如何调整 ignite 瘦客户端的缓存写入性能?
Apache Spark 与 Apache Ignite [关闭]