从存储中读取 JSON 数组并发送到 GCP PubSub
Posted
技术标签:
【中文标题】从存储中读取 JSON 数组并发送到 GCP PubSub【英文标题】:Reading JSON Array from Storage and send to GCP PubSub 【发布时间】:2020-05-15 17:21:56 【问题描述】:我在 Google Cloud Storage 中有多个文件,每个文件都包含如下 JSON 数组 -
"Data": [
"Country": "IN",
"Order": "1033616591",
"Method": "LCD zone E same day",
"WorkOrderNo": "1033616591",
"Zipcode": "6020",
"OriginalTimeSlot": "2019-05-29 14:00-18:00",
"CurrentTimeSlot": "2019-05-29 14:00-18:00",
"Shipment": "98:2",
"WOCreationDate": "2019-05-27T18:21:15Z",
"ModifactionDate": "2020-01-17T16:50:58Z",
"Dispatch":
"Status": "00",
"DispatchUnit": []
,
"Parcels":
"Parcel": [
"Active": true,
"Weight": 29.087833333333332,
"Volume": 0.27791899999999997,
"Trackingstatus": "",
"Number": "704648048"
,
"Active": true,
"Weight": 29.087833333333332,
"Volume": 0.27791899999999997,
"Trackingstatus": "",
"Number": "704648049"
,
"Active": true,
"Weight": 29.087833333333332,
"Volume": 0.27791899999999997,
"Trackingstatus": "",
"Number": "704648050"
,
"Active": true,
"Weight": 29.087833333333332,
"Volume": 0.27791899999999997,
"Trackingstatus": "",
"Number": "704648051"
,
"Active": true,
"Weight": 29.087833333333332,
"Volume": 0.27791899999999997,
"Trackingstatus": "",
"Number": "704648052"
,
"Active": true,
"Weight": 29.087833333333332,
"Volume": 0.27791899999999997,
"Trackingstatus": "",
"Number": "704648053"
]
,
"TimeSlotId": "d2916acd-1f36-4604-98dc-0d11014a045c"
,
"Country": "IN",
"Order": "1049968941",
"Method": "LCD zone A",
"WorkOrderNo": "1049968941",
"Zipcode": "6020",
"OriginalTimeSlot": "2019-09-26 06:00-10:00",
"CurrentTimeSlot": "2019-09-26 06:00-10:00",
"Shipment": "98:2",
"WOCreationDate": "2019-09-02T16:17:13Z",
"ModifactionDate": "2020-01-17T16:40:18Z",
"Dispatch":
"Status": "00",
"DispatchUnit": []
,
"Parcels":
"Parcel": [
"Active": true,
"Weight": 44.5,
"Volume": 1.147163,
"Trackingstatus": "",
"Number": "704987779"
]
,
"TimeSlotId": "3c3da1d2-000d-402a-856d-0d89013a6961"
]
现在我正在尝试从存储中读取每个文件,根据“国家/地区”字段分离每个 JSON,并将其发布到 Google PubSub。在我尝试过的下面 -
const express = require('express')
const app = express()
const port = 8080
const PubSub = require('@google-cloud/pubsub');
const projectId = 'my_project_id';
const keyFilename = 'myjson.json';
const pubsub = new PubSub( projectId, keyFilename );
const topicName = 'pubsub_topic_name';
const subscriptionName = 'pubsub_subscription_name';
const Storage = require('@google-cloud/storage');
const storage = new Storage();
const timeout = 60;
const subscription = pubsub.subscription(subscriptionName);
let messageCount = 0;
const bucketName = 'temp-shipment';
app.get('/', async function (req, res)
var messageIds = "";
console.log('Line 1');
const [files] = await storage.bucket('bucketname').getFiles();
console.log('Line 2');
var bkt = '';
var i = 0;
files.forEach(file =>
console.log('name of file' + file.name);
var archivo = file.createReadStream();
bkt = '';
console.log('---- bkt value:' + bkt);
archivo.on('data', async function (d)
console.log('---- bkt value 2:' + bkt);
bkt += d;
i = i + 1;
console.log('---- bkt value 3:' + bkt);
).on('end', async function ()
console.log('---- bkt value 4:' + bkt);
console.log(">>>>END CALLED i" + i)
console.log("bky:" + bkt.replace(/(?:\\[rn])+/g, '').trim().toString());
try
var kktrim = bkt.replace(/(?:\\[rn])+/g, '').trim();
var kk = JSON.parse(kktrim);
for (var v of kk.Data)
var myJsonObject = message: JSON.stringify(v)
const data = JSON.stringify(myJsonObject);
console.log("Data: " + data);
const dataBuffer = Buffer.from(data);
// console.log("buffer" + dataBuffer);
const messageId = await pubsub.topic(topicName).publish(dataBuffer);
console.log("MessageId>>>>>>>>>>>>>>>>>>>" + messageId);
//messageIds += ":" + messageId;
//console.log(messageIds);
bkt = '';
catch (ex)
console.log('error' + ex + 'in ' + file.name);
)
);
res.send('Message successfully sent!\nTopic: messages' + messageIds);
);
app.listen(port, () => console.log(`Example app listening on port $port!`))
此代码读取文件,但代码实际转换一个文件并为其他文件提供错误。对于其他文件,它会抛出此错误 -
textPayload: "errorSyntaxError: Unexpected token in JSON at position 1696in IN.json"
需要一些帮助来解决问题。我无法理解我在这里缺少什么。
【问题讨论】:
您能否尝试使用 json validator 验证失败的 json 文件。我怀疑你的 json 文件格式不正确。 @marian.vladoi 我试过了。两个文件内容都表示有效的 JSON。 错误消息说在 IN.json 文件中发现了一个意外的令牌“”。此文件是您在 Google Cloud Storage 中的文件之一吗? @marian.vladoi.. 是的,我有 2 个文件。 IN.json 和 AS.json。根据这一行 -- console.log('name of file' + file.name); ,它读取两个文件。我到现在发现,它不是一个一个地读取文件。它读取两个文件。可能是这个问题,不确定。 【参考方案1】:问题是因为正在附加 JSON 文件。解析的第一个 JSON 是:
"Data":
...
但是,第二个尝试解析的 JSON 是:
"Data":
...
"Data":
...
因此“在...中找到了意外的令牌''”错误。
这是由变量 bkt
引起的,该变量用于解析 JSON:
var kktrim = bkt.replace(/(?:\\[rn])+/g, '').trim();
var kk = JSON.parse(kktrim);
但变量是在遍历文件之前声明的:
var bkt = '';
var i = 0;
files.forEach(file =>
// More code
);
您可以移动变量声明以保持 JSON 文件分离并解决问题:
var i = 0;
files.forEach(file =>
var bkt = '';
// More code
);
【讨论】:
以上是关于从存储中读取 JSON 数组并发送到 GCP PubSub的主要内容,如果未能解决你的问题,请参考以下文章
如何从 GCP 存储桶中读取 Apache Beam 中的多个文件
GCP Dataflow- 从存储中读取 CSV 文件并写入 BigQuery
Apache Beam GCP 在动态创建的目录中上传 Avro