读取 Amazon Kinesis Firehose 流写入 s3 的数据
Posted
技术标签:
【中文标题】读取 Amazon Kinesis Firehose 流写入 s3 的数据【英文标题】:Reading the data written to s3 by Amazon Kinesis Firehose stream 【发布时间】:2016-03-31 19:33:55 【问题描述】:我正在将记录写入 Kinesis Firehose 流,该流最终由 Amazon Kinesis Firehose 写入 S3 文件。
我的记录对象看起来像
ItemPurchase
String personId,
String itemId
写入S3的数据如下:
"personId":"p-111","itemId":"i-111""personId":"p-222","itemId":"i-222""personId":"p-333","itemId":"i-333"
没有逗号分隔。
Json 数组中没有起始括号
[
Json 数组中没有结束括号
]
我想读取这个数据获取 ItemPurchase 对象的列表。
List<ItemPurchase> purchases = getPurchasesFromS3(IOUtils.toString(s3ObjectContent))
读取这些数据的正确方法是什么?
【问题讨论】:
【参考方案1】:令我惊讶的是,Amazon Firehose 以这种方式将 JSON 消息转储到 S3,并且不允许您设置分隔符或任何东西。
最终,我发现解决问题的技巧是使用 JSON raw_decode 方法处理文本文件
这将允许您读取一堆连接的 JSON 记录,它们之间没有任何分隔符。
Python 代码:
import json
decoder = json.JSONDecoder()
with open('giant_kinesis_s3_text_file_with_concatenated_json_blobs.txt', 'r') as content_file:
content = content_file.read()
content_length = len(content)
decode_index = 0
while decode_index < content_length:
try:
obj, decode_index = decoder.raw_decode(content, decode_index)
print("File index:", decode_index)
print(obj)
except JSONDecodeError as e:
print("JSONDecodeError:", e)
# Scan forward and keep trying to decode
decode_index += 1
【讨论】:
【参考方案2】:我也遇到了同样的问题,我是这样解决的。
-
将“”替换为“\n”
行被“\n”分割。
input_json_rdd.map(lambda x : re.sub("", "\n", x, flags=re.UNICODE))
.flatMap(lambda line: line.split("\n"))
一个嵌套的 json 对象有几个“”,所以用“”分割行并不能解决问题。
【讨论】:
我考虑过做这样的事情,但我认为如果 JSON 对象中的一个字符串碰巧包含一个 那么这种技术就会失败。也许如果你遍历每个字符,如果你点击一个“”(表示输入或离开一个字符串)切换一个布尔值,计算你所在的对象的级别(在看到 字符串之外增加,在看到 之外减少string),然后将对象的结尾视为您的级别计数器再次达到 0 时。 分隔符
是有问题的,因为内部字符串可以在其中包含像这样的json:\"
(带有转义引号),因此使用"
作为分隔符会更好一些,因为内部字符串不能有引号
为了以 Eran 的回答为基础,我使用否定的前瞻来解释
出现在字符串末尾的情况:re.sub('"(?![,])', '\n"', string)
【参考方案3】:
我也遇到了同样的问题。
如果AWS
允许我们设置分隔符会更好,但我们可以自己做。
在我的用例中,我一直在收听推文流,一旦收到一条新推文,我立即将其发送至Firehose
。
这当然会导致无法解析的 1 行文件。
所以,为了解决这个问题,我将推文的 JSON 与 \n
连接起来。
这反过来又让我使用一些可以在读取流内容时输出行的包,并轻松解析文件。
希望对你有所帮助。
【讨论】:
【参考方案4】:我认为解决这个问题的最佳方法是首先创建一个格式正确的 json 文件,其中包含良好分离的 json 对象。就我而言,我在被推入消防软管的事件中添加了“,”。然后在s3中保存一个文件后,所有文件都将包含由一些分隔符分隔的json对象(在我们的例子中是逗号)。必须添加的另一件事是文件开头和结尾的“[”和“]”。然后你有一个包含多个 json 对象的正确 json 文件。现在可以解析它们了。
【讨论】:
这适用于 JSON,但不适用于更复杂的标记,例如 XML。如果每条记录都是 XML 文档,则需要对其进行解析并将根元素包装到新的 XML 文档和某种封闭元素中(我使用过<array></array>
)。我目前正在尝试弄清楚如何以这种方式从 S3 中读取数据。【参考方案5】:
如果 firehose 的输入源是 Analytics 应用程序,则此不带分隔符的串联 JSON 是引用 here 的已知问题。你应该有一个 here 的 lambda 函数,它在多行中输出 JSON 对象。
【讨论】:
【参考方案6】:使用这个简单的 Python 代码。
input_str = '''"personId":"p-111","itemId":"i-111""personId":"p-222","itemId":"i-222""personId":"p-333","itemId":"i-333"'''
data_str = "[]".format(input_str.replace("",","))
data_json = json.loads(data_str)
然后(如果需要)转换为 Pandas。
import pandas as pd
df = pd.DataFrame().from_records(data_json)
print(df)
这就是结果
itemId personId
0 i-111 p-111
1 i-222 p-222
2 i-333 p-333
【讨论】:
【参考方案7】:我使用转换 Lambda 在每条记录的末尾添加换行符
def lambda_handler(event, context):
output = []
for record in event['records']:
# Decode from base64 (Firehose records are base64 encoded)
payload = base64.b64decode(record['data'])
# Read json as utf-8
json_string = payload.decode("utf-8")
# Add a line break
output_json_with_line_break = json_string + "\n"
# Encode the data
encoded_bytes = base64.b64encode(bytearray(output_json_with_line_break, 'utf-8'))
encoded_string = str(encoded_bytes, 'utf-8')
# Create a deep copy of the record and append to output with transformed data
output_record = copy.deepcopy(record)
output_record['data'] = encoded_string
output_record['result'] = 'Ok'
output.append(output_record)
print('Successfully processed records.'.format(len(event['records'])))
return 'records': output
【讨论】:
【参考方案8】:您可以通过计算括号找到每个有效的 JSON。假设文件以 开头,这个 python sn-p 应该可以工作:
import json
def read_block(stream):
open_brackets = 0
block = ''
while True:
c = stream.read(1)
if not c:
break
if c == '':
open_brackets += 1
elif c == '':
open_brackets -= 1
block += c
if open_brackets == 0:
yield block
block = ''
if __name__ == "__main__":
c = 0
with open('firehose_json_blob', 'r') as f:
for block in read_block(f):
record = json.loads(block)
print(record)
【讨论】:
警告:这只是一个盲流阅读器,因此如果任何 JSON blob 包含恰好在其中包含转义括号的字符串,它将中断。【参考方案9】:如果有办法改变数据的写入方式,请用一行分隔所有记录。这样您就可以简单地逐行读取数据。如果没有,那么只需构建一个以“”作为分隔符的扫描仪对象并使用扫描仪进行读取。这样就可以了。
【讨论】:
【参考方案10】:在 Spark 中,我们遇到了同样的问题。我们正在使用以下内容:
from pyspark.sql.functions import *
@udf
def concatenated_json_to_array(text):
final = "["
separator = ""
for part in text.split(""):
final += separator + part
separator = "" if re.search(r':\s*"([^"]|(\\"))*$', final) else ","
return final + "]"
def read_concatenated_json(path, schema):
return (spark.read
.option("lineSep", None)
.text(path)
.withColumn("value", concatenated_json_to_array("value"))
.withColumn("value", from_json("value", schema))
.withColumn("value", explode("value"))
.select("value.*"))
它的工作原理如下:
-
将数据读取为每个文件一个字符串(无分隔符!)
使用 UDF 引入 JSON 数组,并通过引入逗号分割 JSON 对象。 注意:注意不要破坏任何带有
的字符串!
将带有架构的 JSON 解析为 DataFrame 字段。
将数组分解为单独的行
将值对象展开到列中。
像这样使用它:
from pyspark.sql.types import *
schema = ArrayType(
StructType([
StructField("type", StringType(), True),
StructField("value", StructType([
StructField("id", IntegerType(), True),
StructField("joke", StringType(), True),
StructField("categories", ArrayType(StringType()), True)
]), True)
])
)
path = '/mnt/my_bucket_name/messages/*/*/*/*/'
df = read_concatenated_json(path, schema)
我在这里写了更多细节和注意事项:Parsing JSON data from S3 (Kinesis) with Spark。不要只用 分割,因为它会弄乱你的字符串数据!例如:
"line": "a\"rt"
。
【讨论】:
【参考方案11】:你可以使用下面的脚本。
如果流数据大小不超过您设置的缓冲区大小,则 s3 的每个文件都有一对括号([])和逗号。
import base64
print('Loading function')
def lambda_handler(event, context):
output = []
for record in event['records']:
print(record['recordId'])
payload = base64.b64decode(record['data']).decode('utf-8')+',\n'
# Do custom processing on the payload here
output_record =
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(payload.encode('utf-8'))
output.append(output_record)
last = len(event['records'])-1
print('Successfully processed records.'.format(len(event['records'])))
start = '['+base64.b64decode(output[0]['data']).decode('utf-8')
end = base64.b64decode(output[last]['data']).decode('utf-8')+']'
output[0]['data'] = base64.b64encode(start.encode('utf-8'))
output[last]['data'] = base64.b64encode(end.encode('utf-8'))
return 'records': output
【讨论】:
【参考方案12】:使用 javascript 正则表达式。
JSON.parse(`[$item.replace(/\s*/g, ',')]`);
【讨论】:
以上是关于读取 Amazon Kinesis Firehose 流写入 s3 的数据的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Amazon Kinesis 服务上部署和运行 Amazon Kinesis 应用程序
Amazon Kinesis:在同步 Kinesis 分片和租约时捕获异常
为 amazon-kinesis-video-streams-producer-sdk-cpp 构建依赖项时出错 [重复]