重新部署时 Google Cloud Pub Sub 内存泄漏(基于 Netty)

Posted

技术标签:

【中文标题】重新部署时 Google Cloud Pub Sub 内存泄漏(基于 Netty)【英文标题】:Google Cloud Pub Sub memory leak on re-deploy (Netty based) 【发布时间】:2020-03-11 14:00:36 【问题描述】:

我的 tomcat 网络服务使用 realtime developer notifications for android,这需要 Google Cloud Pub Sub。它工作完美,所有通知都会立即收到。唯一的问题是它使用了太多的 RAM,导致机器的响应速度比预期的要慢,并且在取消部署应用程序后没有释放它。它使用 HttpServlet(特别是 Jersey,它提供了 contextInitializedcontextDestroyed 方法来设置和清除引用)并且注释 pub-sub 代码实际上减少了很多内存使用。

这是订阅-取消订阅Android订阅通知的代码。

package com.example.webservice;

import com.example.webservice.Log;
import com.google.api.core.ApiService;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.collect.Lists;
import com.google.pubsub.v1.ProjectSubscriptionName;

import java.io.FileInputStream;

public class SubscriptionTest

    // for hiding purposes
    private static final String projectId1 = "api-000000000000000-000000";
    private static final String subscriptionId1 = "realtime_notifications_subscription";
    private static final String TAG = "SubscriptionTest";

    private ApiService subscriberService;
    private MessageReceiver receiver;

    // Called when "contextInitialized" is called.
    public void initializeSubscription()
    
        Log.w(TAG, "Initializing subscriptions...");
        try
        
            GoogleCredentials credentials1 = GoogleCredentials.fromStream(new FileInputStream("googlekeys/apikey.json"))
                    .createScoped(Lists.newArrayList("https://www.googleapis.com/auth/cloud-platform"));
            ProjectSubscriptionName subscriptionName1 = ProjectSubscriptionName.of(projectId1, subscriptionId1);

            // Instantiate an asynchronous message receiver
            receiver =
                    (message, consumer) ->
                    
                        consumer.ack();

                        // do processing
                    ;

            // Create a subscriber for "my-subscription-id" bound to the message receiver
            Subscriber subscriber1 = Subscriber.newBuilder(subscriptionName1, receiver)
                    .setCredentialsProvider(FixedCredentialsProvider.create(credentials1))
                    .build();

            subscriberService = subscriber1.startAsync();
        
        catch (Throwable e)
        
            Log.e(TAG, "Exception while initializing async message receiver.", e);
            return;
        
        Log.w(TAG, "Subscription initialized. Messages should come now.");
    

    // Called when "contextDestroyed" is called.
    public void removeSubscription()
    
        if (subscriberService != null)
        
            subscriberService.stopAsync();
            Log.i(TAG, "Awaiting subscriber termination...");
            subscriberService.awaitTerminated();
            Log.i(TAG, "Subscriber termination done.");
        

        subscriberService = null;
        receiver = null;
    

这是应用程序卸载后的语句。 (名称可能不匹配但并不重要)

org.apache.catalina.loader.WebappClassLoaderBase.checkThreadLocalMapForLeaks The web application 
[example] created a ThreadLocal with key of type [java.lang.ThreadLocal] 
(value [java.lang.ThreadLocal@2cb2fc20]) and a value of type 
[io.grpc.netty.shaded.io.netty.util.internal.InternalThreadLocalMap] 
(value [io.grpc.netty.shaded.io.netty.util.internal.InternalThreadLocalMap@4f4c4b1a]) 
but failed to remove it when the web application was stopped. 
Threads are going to be renewed over time to try and avoid a probable memory leak.

据我观察,Netty 正在创建一个静态 ThreadLocal,它强烈引用值 InternalThreadLocalMap,这似乎导致了此消息的出现。我试图通过使用类似这样的某种代码来删除它(可能这是矫枉过正,但到目前为止没有一个答案对我有用,而且这似乎也不起作用)

    InternalThreadLocalMap.destroy();
    FastThreadLocal.destroy();
    for (Thread thread : Thread.getAllStackTraces().keySet())
    
        if (thread instanceof FastThreadLocalThread)
        
            // Handle the memory leak that netty causes.
            InternalThreadLocalMap map = ((FastThreadLocalThread) thread).threadLocalMap();
            if (map == null)
                continue;

            for (int i = 0; i < map.size(); i++)
                map.setIndexedVariable(i, null);
            ((FastThreadLocalThread) thread).setThreadLocalMap(null);
        
    

如果我单击Find leaks(显然),在取消部署(或停止启动)tomcat 检测到内存泄漏之后。问题是,已经使用的 RAM 和 CPU 没有释放,因为显然订阅没有正确关闭。重新部署应用程序会导致使用的 RAM 在每个操作中进一步增加,例如,如果它首先使用 200 MB 内存,那么在第二次部署后它会增加到 400、600、800,直到机器减速到足以死机为止。

这是一个严重的问题,我不知道如何解决它,停止方法按定义调用,awaitTerminated 也被调用并立即执行(意味着接口实际上已停止侦听)但它没有释放它背后的 RAM。

到目前为止,我只看到有关 python 客户端的问题(ref 1、ref 2),但似乎没有人提到 Java 客户端,我对使用这种结构有点失去希望了。

我也针对这个问题打开了issue。

我应该怎么做才能解决这个问题?感谢您的帮助,非常感谢。

【问题讨论】:

【参考方案1】:

我不知道它是否能完全解决您的问题,但您似乎因为不关闭 FileInputStream 而泄漏了一些内存。

第一种选择是将 FileInputStream 提取到一个变量中,并在读取完内容后对其调用 close() 方法。

使用这类流的第二个(更好的)选择是使用 try-with-resources。由于FileInputStream实现了AutoCloseable接口,退出try-with-resources时会自动关闭。

例子:

try (FileInputStream stream = new FileInputStream("googlekeys/apikey.json")) 
    GoogleCredentials credentials1 = GoogleCredentials.fromStream(stream)
            .createScoped(Lists.newArrayList("https://www.googleapis.com/auth/cloud-platform"));
    // ...
 catch (Exception e) 
    Log.e(TAG, "Exception while initializing async message receiver.", e);
    return;

【讨论】:

这可能会有所帮助,但我认为 Google 的 SDK 会自行处理关闭流,我实际上并没有检查代码。尽管如此,它不应该在每次重新部署时造成近 100 MB 的内存泄漏。不过,感谢您的帮助。 评论更新:它确实帮助了 3 MB :) 但问题仍然存在。

以上是关于重新部署时 Google Cloud Pub Sub 内存泄漏(基于 Netty)的主要内容,如果未能解决你的问题,请参考以下文章

Google Cloud - Pub Sub Push Subscription 调用云函数重试

Google Cloud Pub/Sub 确认

Google Cloud Functions 仅在成功时确认 Pub/Sub(GCP 解决的问题)

每当在 Google Cloud SQL 中插入或更新数据时,是不是可以向 Google Pub/Sub 发布消息?

当 ParDo 函数出现错误时,NACK 不会从 Dataflow 发送回 Google Cloud Pub/Sub

Google Cloud Pub/Sub 中的积压工作