从存储中读取 JSON 数组并发送到 GCP PubSub

Posted

技术标签:

【中文标题】从存储中读取 JSON 数组并发送到 GCP PubSub【英文标题】:Reading JSON Array from Storage and send to GCP PubSub 【发布时间】:2020-05-15 17:21:56 【问题描述】:

我在 Google Cloud Storage 中有多个文件,每个文件都包含如下 JSON 数组 -


  "Data": [
    
      "Country": "IN",
      "Order": "1033616591",
      "Method": "LCD zone E same day",
      "WorkOrderNo": "1033616591",
      "Zipcode": "6020",
      "OriginalTimeSlot": "2019-05-29 14:00-18:00",
      "CurrentTimeSlot": "2019-05-29 14:00-18:00",
      "Shipment": "98:2",
      "WOCreationDate": "2019-05-27T18:21:15Z",
      "ModifactionDate": "2020-01-17T16:50:58Z",
      "Dispatch": 
        "Status": "00",
        "DispatchUnit": []
      ,
      "Parcels": 
        "Parcel": [
          
            "Active": true,
            "Weight": 29.087833333333332,
            "Volume": 0.27791899999999997,
            "Trackingstatus": "",
            "Number": "704648048"
          ,
          
            "Active": true,
            "Weight": 29.087833333333332,
            "Volume": 0.27791899999999997,
            "Trackingstatus": "",
            "Number": "704648049"
          ,
          
            "Active": true,
            "Weight": 29.087833333333332,
            "Volume": 0.27791899999999997,
            "Trackingstatus": "",
            "Number": "704648050"
          ,
          
            "Active": true,
            "Weight": 29.087833333333332,
            "Volume": 0.27791899999999997,
            "Trackingstatus": "",
            "Number": "704648051"
          ,
          
            "Active": true,
            "Weight": 29.087833333333332,
            "Volume": 0.27791899999999997,
            "Trackingstatus": "",
            "Number": "704648052"
          ,
          
            "Active": true,
            "Weight": 29.087833333333332,
            "Volume": 0.27791899999999997,
            "Trackingstatus": "",
            "Number": "704648053"
          
        ]
      ,
      "TimeSlotId": "d2916acd-1f36-4604-98dc-0d11014a045c"
    ,
    
      "Country": "IN",
      "Order": "1049968941",
      "Method": "LCD zone A",
      "WorkOrderNo": "1049968941",
      "Zipcode": "6020",
      "OriginalTimeSlot": "2019-09-26 06:00-10:00",
      "CurrentTimeSlot": "2019-09-26 06:00-10:00",
      "Shipment": "98:2",
      "WOCreationDate": "2019-09-02T16:17:13Z",
      "ModifactionDate": "2020-01-17T16:40:18Z",
      "Dispatch": 
        "Status": "00",
        "DispatchUnit": []
      ,
      "Parcels": 
        "Parcel": [
          
            "Active": true,
            "Weight": 44.5,
            "Volume": 1.147163,
            "Trackingstatus": "",
            "Number": "704987779"
          
        ]
      ,
      "TimeSlotId": "3c3da1d2-000d-402a-856d-0d89013a6961"
    
  ]

现在我正在尝试从存储中读取每个文件,根据“国家/地区”字段分离每个 JSON,并将其发布到 Google PubSub。在我尝试过的下面 -

const express = require('express')
const app = express()
const port = 8080
const  PubSub  = require('@google-cloud/pubsub');

const projectId = 'my_project_id';
const keyFilename = 'myjson.json';
const pubsub = new PubSub( projectId, keyFilename );
const topicName = 'pubsub_topic_name';
const subscriptionName = 'pubsub_subscription_name';
const  Storage  = require('@google-cloud/storage');
const storage = new Storage();
const timeout = 60;

const subscription = pubsub.subscription(subscriptionName);
let messageCount = 0;
const bucketName = 'temp-shipment';

app.get('/', async function (req, res) 
    var messageIds = "";
    console.log('Line 1');
    const [files] = await storage.bucket('bucketname').getFiles();
    console.log('Line 2');
    var bkt = '';
    var i = 0;
    files.forEach(file => 
        console.log('name of file' + file.name);
        var archivo = file.createReadStream();
        bkt = '';
        console.log('---- bkt value:' + bkt);
        archivo.on('data', async function (d) 
            console.log('---- bkt value 2:' + bkt);
            bkt += d;
            i = i + 1;
            console.log('---- bkt value 3:' + bkt);
        ).on('end', async function () 

            console.log('---- bkt value 4:' + bkt);
            console.log(">>>>END CALLED i" + i)
            console.log("bky:" + bkt.replace(/(?:\\[rn])+/g, '').trim().toString());

            try 
                var kktrim = bkt.replace(/(?:\\[rn])+/g, '').trim();
                var kk = JSON.parse(kktrim);

                for (var v of kk.Data) 

                    var myJsonObject =  message: JSON.stringify(v) 
                    const data = JSON.stringify(myJsonObject);
                    console.log("Data: " + data);
                    const dataBuffer = Buffer.from(data);
                   // console.log("buffer" + dataBuffer);
                    const messageId = await pubsub.topic(topicName).publish(dataBuffer);
                    console.log("MessageId>>>>>>>>>>>>>>>>>>>" + messageId);
                    //messageIds += ":" + messageId;
                    //console.log(messageIds);

                
                bkt = '';
             catch (ex) 
                console.log('error' + ex + 'in ' + file.name);
            


        )


    );

    res.send('Message successfully sent!\nTopic: messages' + messageIds);
);
app.listen(port, () => console.log(`Example app listening on port $port!`))

此代码读取文件,但代码实际转换一个文件并为其他文件提供错误。对于其他文件,它会抛出此错误 -

textPayload: "errorSyntaxError: Unexpected token  in JSON at position 1696in IN.json" 

需要一些帮助来解决问题。我无法理解我在这里缺少什么。

【问题讨论】:

您能否尝试使用 json validator 验证失败的 json 文件。我怀疑你的 json 文件格式不正确。 @marian.vladoi 我试过了。两个文件内容都表示有效的 JSON。 错误消息说在 IN.json 文件中发现了一个意外的令牌“”。此文件是您在 Google Cloud Storage 中的文件之一吗? @marian.vladoi.. 是的,我有 2 个文件。 IN.json 和 AS.json。根据这一行 -- console.log('name of file' + file.name); ,它读取两个文件。我到现在发现,它不是一个一个地读取文件。它读取两个文件。可能是这个问题,不确定。 【参考方案1】:

问题是因为正在附加 JSON 文件。解析的第一个 JSON 是:


  "Data": 
    ...

但是,第二个尝试解析的 JSON 是:


  "Data": 
    ...


  "Data": 
    ...

因此“在...中找到了意外的令牌''”错误。

这是由变量 bkt 引起的,该变量用于解析 JSON:

var kktrim = bkt.replace(/(?:\\[rn])+/g, '').trim();
var kk = JSON.parse(kktrim);

但变量是在遍历文件之前声明的:

var bkt = '';
var i = 0;
files.forEach(file => 
    // More code
);

您可以移动变量声明以保持 JSON 文件分离并解决问题:

var i = 0;
files.forEach(file => 
    var bkt = '';
    // More code
);

【讨论】:

以上是关于从存储中读取 JSON 数组并发送到 GCP PubSub的主要内容,如果未能解决你的问题,请参考以下文章

如何从 GCP 存储桶中读取 Apache Beam 中的多个文件

GCP Dataflow- 从存储中读取 CSV 文件并写入 BigQuery

将一串数字拆分为数字并发送到数组

Apache Beam GCP 在动态创建的目录中上传 Avro

GCP - 谷歌云平台:有没有任何方法可以在没有任何版本控制系统的情况下将代码从 eclipse 推送到云端?

如何从GPS获取数据并发送到服务器以及如何保存到数据库