Python Google Cloud Firestore 错误 504 Deadline Exceeded

Posted

技术标签:

【中文标题】Python Google Cloud Firestore 错误 504 Deadline Exceeded【英文标题】:Python Google cloud firestore error 504 Deadline Exceeded 【发布时间】:2020-02-07 07:46:51 【问题描述】:

我在 python 中有一个 fore firestore 函数,我在其中为一个集合的所有用户执行 for 循环,然后我进入另一个集合以获取一些指标,并在第一个集合中更新这些指标。

我运行了这个函数,但是在执行的某个时刻,函数中断给我这个错误:

_Rendezvous                               Traceback (most recent call last)
~\Anaconda3\envs\work\lib\site-packages\google\api_core\grpc_helpers.py in next(self)
     78         try:
---> 79             return six.next(self._wrapped)
     80         except grpc.RpcError as exc:

~\Anaconda3\envs\work\lib\site-packages\grpc\_channel.py in __next__(self)
    363     def __next__(self):
--> 364         return self._next()
    365 

~\Anaconda3\envs\work\lib\site-packages\grpc\_channel.py in _next(self)
    346             else:
--> 347                 raise self
    348             while True:

_Rendezvous: <_Rendezvous of RPC that terminated with:
    status = StatusCode.DEADLINE_EXCEEDED
    details = "Deadline Exceeded"
    debug_error_string = ""created":"@1570660422.708000000","description":"Error received from peer ipv4:216.58.202.234:443","file":"src/core/lib/surface/call.cc","file_line":1052,"grpc_message":"Deadline Exceeded","grpc_status":4"
>

The above exception was the direct cause of the following exception:

DeadlineExceeded                          Traceback (most recent call last)
<ipython-input-20-05c9cefdafb4> in <module>
----> 1 update_collection__persons()

<ipython-input-19-6e2bdd597a6e> in update_collection__persons()
     10     counter_secs = 0
     11 
---> 12     for person_doc in person_docs:
     13         person_dict = person_doc.to_dict()
     14         last_updated = person_dict['last_updated']

~\Anaconda3\envs\work\lib\site-packages\google\cloud\firestore_v1\query.py in stream(self, transaction)
    766         )
    767 
--> 768         for response in response_iterator:
    769             if self._all_descendants:
    770                 snapshot = _collection_group_query_response_to_snapshot(

~\Anaconda3\envs\work\lib\site-packages\google\api_core\grpc_helpers.py in next(self)
     79             return six.next(self._wrapped)
     80         except grpc.RpcError as exc:
---> 81             six.raise_from(exceptions.from_grpc_error(exc), exc)
     82 
     83     # Alias needed for Python 2/3 support.

~\Anaconda3\envs\work\lib\site-packages\six.py in raise_from(value, from_value)

DeadlineExceeded: 504 Deadline Exceeded

一直在找解决办法,资料不多,在这里发现了一个类似的问题:https://github.com/googleapis/google-cloud-python/issues/8933

所以我尝试使用此代码但无法正常工作。这是我的功能:

def update_collection__persons():   
    persons = db.collection(u'collections__persons')
    person_docs = persons.stream()


    counter_secs = 0

    for person_doc in person_docs:
        person_dict = person_doc.to_dict()
        last_updated = person_dict['last_updated']
        last_processed = person_dict['last_processed']
        dt_last_updated = datetime(1, 1, 1) + timedelta(microseconds=last_updated/10)
        dt_last_processed = datetime(1, 1, 1) + timedelta(microseconds=last_processed/10)

        if dt_last_processed < dt_last_updated:
            orders = db.collection(u'collection__orders').where(u'email', u'==', person_dict['email'])
            orders_docs = orders.stream()

            sum_price = 0
            count = 0
            date_add_list = []

            for order_doc in orders_docs:
                order_dict = order_doc.to_dict() 
                sum_price += order_dict['total_price']
                count +=1
                date_add_list.append(order_dict['dateAdded'])
            if count > 0:
                data = 'metrics': 'LTV': sum_price,
                                    'AOV': sum_price/count,
                                    'Quantity_orders': count,
                                    'first_order_date': min(date_add_list),
                                    'last_order_date': max(date_add_list),
                         'last_processed': int((datetime.utcnow() - datetime(1, 1, 1)).total_seconds() * 10000000)

                 db.collection(u'collection__persons').document(person_dict['email']).set(data, merge = True)

我创建了一个 counter_secs 只是为了查看函数是否总是在同一个查询中中断,但事实并非如此。

此外,如果我看到其中一些随机用户在运行该函数后,我已经更新了他们的数据,所以它正在工作,但在某些时候会中断

【问题讨论】:

【参考方案1】:

persons.stream() 有 60 秒的超时时间。不要在流式传输时处理每个文档,而是尝试预先获取所有文档:

person_docs = [snapshot for snapshot in persons.stream()]

如果您的文档多于 60 秒内无法获取的文件,请尝试递归函数 like in this answer。

订单也一样:

orders_docs = [snapshot for snapshot in orders.stream()]

【讨论】:

你能解释一下这个 start_after(cursor) 是什么吗?他将光标初始化为无,那么它什么时候将具有与无不同的值? start_after 和 no use this start_after 有什么区别? 当然,这是query cursor。该函数一次将一个大型查询拆分为 1000 个文档的批次。获取一批 1000 个文档后,该函数将第 1000 个文档作为查询游标传递,以获取接下来的 1000 个文档,count_collection(coll_ref, count, docs[999].get())【参考方案2】:

我在获取所有文档以将其转换为 JSON 时遇到了确切的问题。

我是按照下面的方法做的。

 def load_documents(self, path):
    collection = self.db
    nodes = path.split("/")

    for i, node in enumerate(nodes):
        if i % 2 == 0:
            collection = collection.collection(node) 
        else:
            collection = collection.document(node)
    stream = collection.stream()

    for doc in stream:
        print("* Fetching document: ".format(doc.get("filename")))
        self.memes.append(self._fetch_doc(doc))
def _fetch_doc(self, doc):
    try:
        return 
            "caption": doc.get("caption"),
            "filename": doc.get("filename"),
            "url": doc.get("url")
        
    except:
        self._fetch_doc(doc)

如果遇到异常,我会递归获取。

【讨论】:

实际的异常是DeadlineExceeded,它发生在for doc in stream行而不是doc.get(...),所以它仍然会崩溃。 我在这里回答了这个问题。 ***.com/a/61663938/3778645。希望它可以帮助某人。【参考方案3】:

在遵循@juan-lara 的解决方案后,我仍然面临这个问题,将文档转换为 dict 终于为我工作了。

person_docs = [snapshot.to_dict() for snapshot in persons.stream()]

【讨论】:

以上是关于Python Google Cloud Firestore 错误 504 Deadline Exceeded的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Google Python Client for Cloud Functions 获取 Google Cloud Functions 列表?

python Google Cloud Python - 示例

Google App Engine - 大查询 - Python 找不到库 google.cloud

无法在 python 脚本中导入 google.cloud 模块

从 Cloud Function (python) 写入 Google Cloud Storage

使用 google-cloud-python API 访问 Dataproc 时出现无效区域错误