PySpark:从 Oracle 表中选择一个值,然后添加到它
Posted
技术标签:
【中文标题】PySpark:从 Oracle 表中选择一个值,然后添加到它【英文标题】:PySpark: Select a value from an Oracle table then add to it 【发布时间】:2020-10-13 15:32:21 【问题描述】:我使用 PySpark 将行从 Oracle 加载到 AWS。 我一次抓取 10000 行,然后存储加载的最大 seq_id 并将其用于下一个范围。
我正在尝试在 PySpark 中执行此操作,但我无法弄清楚。任何人都可以帮助或指出一个有用的培训资源吗?我尝试将输出转换为 Int。我尝试了 select.collect[0][0] 但也遇到了错误。我是 PySpark 的新手,非常感谢任何帮助。
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
def oracle_read(user,pwd,hostname,port,service_name,table_name):
url = 'jdbc:oracle:thin:'+user+'/'+pwd+'@//'+hostname+':'+port+'/'+service_name
result = spark.read \
.format("jdbc") \
.option("url", url) \
.option("dbtable",table_name) \
.option("user", user) \
.option("password", pwd) \
.option("driver", "oracle.jdbc.driver.OracleDriver") \
.load()
result = result.toDF(* [c.lower() for c in result.columns])
return result
max_seq_qry = """(SELECT max_val FROM data_owner.tbl_max_seq_load WHERE table_name = 'TBL_A')"""
max_seq = oracle_read(oracle_user,oracle_pass,oracle_host,oracle_port,oracle_service,max_seq_qry)
min_seq = max_seq + 1
max_seq = max_seq + 10000
我收到以下错误:
TypeError: unsupported operand type(s) for +: 'DataFrame' and 'int'
NameError: name 'IntegerType' is not defined
TypeError: 'instancemethod' object has no attribute '__getitem__'
【问题讨论】:
print(max_seq.head())
的输出是什么?
【参考方案1】:
您的函数oracle_read
返回一个数据帧(结果),而您正试图增加它(向其添加一个),这是不可能的,因此会出现错误。
在您的情况下,您只从数据库中获取一列“max_val”,您可能是第一个匹配项,因此您可以选择此列并将第一个值作为max_seq['max_val'].values[0]
所以你可以重写你的代码
max_seq = oracle_read(oracle_user,oracle_pass,oracle_host,oracle_port,oracle_service,max_seq_qry)
max_seq = int(max_seq['max_val'].values[0]) + 1
【讨论】:
当我尝试 TypeError: int() argument must be a string or a number, not 'Column' 时得到了这个错误 你也是用 .values[0] 做的吗? 是的max_seq = int(max_seq['max_val'].values[0]) + 1
我试图将其拆分并首先获取值和转换,但无论哪种方法都得到了 TypeError: int() argument must be a string or a number, not 'Column'
。我可以将函数的结果收集到数据框以外的任何内容中吗?特定的 sql 只会带回一个值,我可以创建一个新函数,将结果发送到 int 或 string
您能否粘贴产生错误的确切行? oracle_read
行之后 print(max_seq) 的输出是什么?
这是打印显示的内容 ``` >>> print(max_txn_seq) DataFrame[max_val: string] ``` 这是我得到的行和错误 ``` >>> min_seq = int (max_txn_seq['max_val'].values[0]) + 1 Traceback(最近一次调用最后一次):文件“您能否检查是否有 table_name = 'TBL_A' 的行,如果没有行,请尝试添加 NVL(max_val, 0 ) 并查看是否有效。
与其将序列存储在表中,不如使用 Oracle 序列,因为它更具可扩展性并有助于多用户环境。
谢谢
【讨论】:
以上是关于PySpark:从 Oracle 表中选择一个值,然后添加到它的主要内容,如果未能解决你的问题,请参考以下文章
从 AWS Glue/PySpark 中的 100 个表中选择数据
使用 Oracle PLSQL 从动态选择的表中合并唯一值的更有效方法