05 DataBricks遍历S3容器

Posted ζ漠小斌

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了05 DataBricks遍历S3容器相关的知识,希望对你有一定的参考价值。

背景:为介绍实战项目,本文先介绍一个简单实例。目标:将csv数据文件同步到Databricks表中

通过flag文件获取

连接s3

当前s3目录

  • flag

    • 主题01_时间戳1.csv
    • 主题02_时间戳2.csv
  • request

    • file_时间戳1

      • data_file.csv
    • file_时间戳2

      • data_file.csv

falg文件中存储的是每个主题文件的csv,每个主题csv中有所需表的列表、以及当前增量行数。request中根据主题文件时间戳可以找到对于时间戳的文件夹。文件夹中包含真正的数据文件。

import boto3
import pandas as pd

#s3存储桶访问信息
key_id=\'key_id\'
secret_key=\'密钥\'
my_bucket=\'容器名\'

#建立s3临时客户端
client = boto3.client(
                       \'s3\',
                       aws_access_key_id=key_id,
                       aws_secret_access_key=secret_key,
                       region_name=\'cn-north-1\'
                      )

    
#调用函数列出文件信息
paginator = client.get_paginator(\'list_objects\')
# print(paginator)
page_iterator = paginator.paginate(
  Bucket=my_bucket
  , Delimiter=\'/\',
  Prefix=\'EDW_SHARE/Flag/\')

遍历s3,并做限制

上传的文件并非都获取,我们只获取当前有用到的。所以需要一个flag表做限制,在flag中的表获取出来。我们先将限制的列表做好。


# #定义空dataframe用于存放flag文件内容
df=pd.DataFrame(data=None, index=None, columns=(\'flag_file\',\'content\'), dtype=None, copy=False)

# #定义dataframe用于存放已读入的flag文件名,用于过于已经读入过的flag,并将sql查询结果从sparksql支持的dataframe转换为pandas支持的dataframe
df1 = spark.sql(\'select distinct flag_file from cfg.flag_file_info\').toPandas()

# #将flag依次插入列表
file_list=[i for i in df1[\'flag_file\']]

# file_list就是限制获取到s3中的文件列表

开始遍历,并在遍历时过滤不在falg表中的记录。

# 从s3返回的json数组中拆分flag文件名
for page in page_iterator:
      for key in page[\'Contents\']:

        if key[\'Key\'] not in file_list:
            
#             从返回的json数组中读取flag内容
            response1 = client.select_object_content(
                                                      Bucket=my_bucket,
                                                      Key=key[\'Key\'],
                                                      Expression=\'SELECT SOURCE_NAME FROM S3Object\',
                                                      ExpressionType=\'SQL\',
                                                      InputSerialization={
                                                          \'CSV\': {
                                                                  \'FileHeaderInfo\': \'USE\',
                                                                  \'QuoteCharacter\': \'"\'
                                                                  }

                                                      },
                                                      OutputSerialization={
                                                                            \'JSON\': {}
                                                                           }
                                                    )
            
            for i in response1[\'Payload\']:
                 if \'Records\' in i:
                    # print(key[\'Key\'])
                    for content in i[\'Records\'][\'Payload\'].decode(\'utf-8\').replace(\'{"SOURCE_NAME":\',\'\').replace(\'}\',\'\').replace(\'"\',\'\').split(\'\\n\'):
                        if content  :
                            df.loc[len(df)]=(key[\'Key\'],content)

# 将pandas的dataframe转换为sparksql支持的dataframe
df=spark.createDataFrame(df)
#将dataframe转换为临时表
df.createOrReplaceTempView("file_flag_file_info")

file_flag_file_info获取到的就是需要的文件名。获取到两列,如下

flag_filecontent
EDW_SHARE/Flag/EDW2CUBE_ALLIANCE_FLAGFILE_20210220072705.csvDW_TEST01
EDW_SHARE/Flag/EDW2CUBE_ALLIANCE_FLAGFILE_20210220072705.csvDW_TEST02

获取到的记录保存在一个表中

spark.sql("insert into cfg.flag_file_info select date_id,flag_file,content,status,create_dt from (select date_format(from_utc_timestamp(current_timestamp(),\'UTC+8\'),\'yyyyMMdd\') as date_id,flag_file,content,1 as status,from_utc_timestamp(current_timestamp(),\'UTC+8\') as create_dt,rank() over(partition by content order by flag_file desc) as rk from file_flag_file_info where  content in (select file_name from cfg.file_config_list where is_active=1))t where t.rk=1")

遍历s3文件目的是为了正确取到数据文件在s3的路径,为之后DataFactory从S3中拿文件做基础。

通过request文件直接获取

由于request的文件夹命名规则一致,所以我们可以通过截取文件夹的名字直接获取时间日期,其他的过程与上述一致

## 重写,可重复跑数据,可连续跑多天数据
import boto3
import pandas as pd
import time
import datetime
from dateutil.parser import parse

#s3存储桶访问信息

#PROD
key_id=\'key_id\'
secret_key=\'secret_key\'
my_bucket=\'my_bucket\'

#建立临时客户端
client = boto3.client(
                       \'s3\',
                       aws_access_key_id=key_id,
                       aws_secret_access_key=secret_key,
                       region_name=\'cn-north-1\'
                      )

# #定义空dataframe用于存放flag文件内容
df=pd.DataFrame(data=None, index=None, columns=(\'file_url\',\'date_id\',\'status\'), dtype=None, copy=False)

# #需要的文件
df1 = spark.sql("select distinct file_name from cfg.file_config_list where is_active=1").toPandas()
# #将flag依次插入列表
file_list=[i for i in df1[\'file_name\']]


# #需要的文件
df2 = spark.sql("select max(date_id) as date from cfg.file_url where status=0 ").toPandas()
## cfg.file_url 表中最大日期, 库中的数据截止日期
data_date = str(df2[\'date\'][0])

# 表中最大时间+1
c=parse(data_date) + datetime.timedelta(days=1)

# 时间转字符串
current_date = datetime.datetime.strftime(c, "%Y%m%d")


#调用函数列出文件信息
page_iterator02 = paginator.paginate( Bucket=\'az-cn-cloudbi-edwfile\' , Delimiter=\'/\',Prefix=\'EDW_SHARE/Request/\')

# 数据日期


b = []

# s3有数
if b :
  # 记录表中最大时间>=今天   正常跑数据或重跑数据
    if data_date < time.strftime(\'%Y%m%d\'):
        for page in page_iterator02:
            for key in page[\'CommonPrefixes\']:
                if key[\'Prefix\'][27:35] == current_date:
                    b.append(key[\'Prefix\'])
          
        for i in range(len(b)):
            page_iterator03 = paginator.paginate(Bucket=\'az-cn-cloudbi-edwfile\', Delimiter=\'/\',Prefix=b[i])
            for page in page_iterator03:
                for key in page[\'Contents\']: 
                    if key[\'Key\'][42:-4] in file_list:
                        df.loc[len(df)]=(key[\'Key\'],current_date,1)

# s3无数
else:
     # 记录表中最大时间大于今天 重跑今天数据
    if data_date >= time.strftime(\'%Y%m%d\'):
            sql = \'delete from cfg.file_url where date_id =\'+ data_date
            spark.sql(sql)
            for page in page_iterator02:
                for key in page[\'CommonPrefixes\']:
                    if key[\'Prefix\'][27:35] == data_date:
                        b.append(key[\'Prefix\'])
            for i in range(len(b)):
                page_iterator03 = paginator.paginate(Bucket=\'az-cn-cloudbi-edwfile\', Delimiter=\'/\',Prefix=b[i])
                for page in page_iterator03:
                    for key in page[\'Contents\']: 
                        if key[\'Key\'][42:-4] in file_list:
                            df.loc[len(df)]=(key[\'Key\'],data_date,1)
    else: #   记录表中最大时间小于今天,但没有取到数据。说明s3在这天少传了数据
        df.loc[len(df)]=(\'no data\',current_date,1)

#将pandas的dataframe转换为sparksql支持的dataframe
df=spark.createDataFrame(df)
#将dataframe转换为临时表
df.createOrReplaceTempView("file_url")
# 插入要跑的批次数据
spark.sql("insert into cfg.file_url select date_id,file_url,status,from_utc_timestamp(current_timestamp(),\'UTC+8\'),from_utc_timestamp(current_timestamp(),\'UTC+8\') from file_url")

# 把job lookup1 的查询改为: select file_url from cfg.file_url where status=1

以上是关于05 DataBricks遍历S3容器的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Databricks 中管理 S3 挂载的权限

将 AWS S3 连接到 Databricks PySpark

Azure - 为存储容器中的每个新 blob 触发 Databricks 笔记本

通过 Simba JDBC 的 Databricks Spark 连接问题

Databricks 上的 Spark - 缓存 Hive 表

将 DataBricks 连接到 Azure Blob 存储