如何在 erlang gen_server 中有效地使用接收子句来解决超时错误?

Posted

技术标签:

【中文标题】如何在 erlang gen_server 中有效地使用接收子句来解决超时错误?【英文标题】:How to use efficiently receive clause in erlang gen_server to resolve timeout error? 【发布时间】:2021-04-14 03:18:54 【问题描述】:

有时我的循环会因为超时而返回正常,如何以正确的方式编写此代码。当超时时,它只会返回 ok 但不是我假设的实际值。在句柄调用中,我在 loop() 函数中调用函数 loop(),我正在接收带有接收子句的消息。现在,我使用 loop2 函数将这些数据发送到我的数据库,无论数据是否已成功保存,都会从数据库返回响应,并将响应返回给 loop()。但如果超时,我的循环函数会返回 ok 但不是实际值。

% @Author: ZEESHAN AHMAD
% @Date:   2020-12-22 05:06:12
% @Last Modified by:   ZEESHAN AHMAD
% @Last Modified time: 2021-01-10 04:42:59


-module(getAccDataCons).

-behaviour(gen_server).

-include_lib("deps/amqp_client/include/amqp_client.hrl").

-export([start_link/0, stop/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3,
         terminate/2]).
-export([get_account/0]).

start_link() ->
    gen_server:start_link(local, ?MODULE, ?MODULE, [], []).

stop() ->
    gen_server:cast(?MODULE, stop).

get_account() ->
    gen_server:call(?MODULE, get_account).

init(_Args) ->
    ok, Connection = amqp_connection:start(#amqp_params_networkhost = "localhost"),
    ok, Channel = amqp_connection:open_channel(Connection),
    ok, Channel.

handle_call(get_account, _From, State) ->
    amqp_channel:call(State, #'exchange.declare'exchange = <<"get">>, type = <<"topic">>),
    amqp_channel:call(State, #'queue.declare'queue = <<"get_account">>),
    Binding =
        #'queue.bind'exchange = <<"get">>,
                      routing_key = <<"get.account">>,
                      queue = <<"get_account">>,
    #'queue.bind_ok' = amqp_channel:call(State, Binding),
    io:format(" [*] Waiting for logs. To exit press CTRL+C~n"),
    amqp_channel:call(State,#'basic.consume'queue = <<"get_account">>, no_ack = true),
    Returned =loop(),
    io:format("~nReti=~p",[Returned]),
    reply, Returned, State;
    

handle_call(Message, _From, State) ->
    io:format("received other handle_call message: ~p~n", [Message]),
    reply, ok, State.

handle_cast(stop, State) ->
    stop, normal, State;
handle_cast(Message, State) ->
    io:format("received other handle_cast call : ~p~n", [Message]),
    noreply, State.

handle_info(Message, State) ->
    io:format("received handle_info message : ~p~n", [Message]),
    noreply, State.

code_change(_OldVer, State, _Extra) ->
    ok, State.

terminate(Reason, _State) ->
    io:format("server is terminating with reason :~p~n", [Reason]).


    loop()->
        receive
         #'basic.consume_ok' -> ok
        end,
       receive
            #'basic.deliver', Msg ->
                #amqp_msgpayload = Payload = Msg,
                Value=loop2(Payload),
        Value
    after 2000->
    io:format("Server timeout")
    end.


  loop2(Payload)->
            Result = jiffy:decode(Payload),
            [<<"account_id">>, AccountId] = Result,
            Doc = [<<"account_id">>, AccountId],
            getAccDataDb:create_AccountId_view(),
            Returned=case getAccDataDb:getAccountNameDetails(Doc) of
                success ->
                    Respo = getAccDataDb:getAccountNameDetails1(Doc),
                     Respo;
                details_not_matched ->
                    user_not_exist
            end,
            Returned.

【问题讨论】:

【参考方案1】:

没有looploop2 代码,很难给出答案,如果这两个函数之一检测到超时,您必须首先更改它们的行为以避免任何超时,或者将其增加到一个有效的值。如果需要超时,则确保返回值是显式的,例如 error,RequestRef,timeout 而不是 ok

尽管如此gen_server 不应该等待太久才能得到答案,您可以修改您的代码:

在客户端进程中不要使用gen_server:call(ServerRef,Request),您可以使用:

RequestId = send_request(ServerRef, Request),
Result = wait_response(RequestId, Timeout),

并删除loop 和/或loop2 中的超时。这样做可以控制客户端的超时,甚至可以将其设置为无穷大(这不是一个好主意!)。

或者你可以将你的功能分成两部分

gen_server:cast(ServerRef,Request,RequestRef), 
% this will not wait for any answer, RequestRef is a tag to identify later 
% if the request was fulfilled, you can use make_ref() to generate it

以后,或者在另一个客户端进程中(这需要至少将RequestRef传递给这个进程)检查请求的结果:

Answer = gen_server:call(ServerRef,get_answer,RequestRef),
case Answer of
    no_reply -> ... % no answer yet
    ok,Reply -> ... % handle the answer
end,

最后,您必须修改循环代码以处理RequestRef,将消息(再次使用gen_server:cast)连同结果和RequestRef 发送回服务器,并将此结果存储在服务器状态中。

我不认为第二种解决方案有价值,因为它与第一种解决方案或多或少相同,但都是手工制作的,它让您可以管理许多可能最终导致一种内存泄漏。

【讨论】:

我已经用 loop() 和 loop2() 更新了我的代码 @Coderops 看到我的新回复【参考方案2】:

这对编辑来说太长了,我把它放在一个新的答案中。

发生超时时收到ok 的原因在于loop() 代码。在第二个接收块中,2000 毫秒后,您返回 紧接在io:format/1 语句之后。

io:format 返回 ok,这就是您在 Returned 变量中得到的内容。您应该使用

更改此代码
loop()->
    ok = receive
        #'basic.consume_ok' -> ok
    end,
    receive
        #'basic.deliver', #amqp_msgpayload = Payload -> ok,loop2(Payload)
    after 2000 ->
        io:format("Server timeout"),
        error,timeout
    end.

使用此代码,您的客户将收到ok,Valueerror,timeout 并能够做出相应的反应。

但是这个版本还是有问题的: - 2 秒超时可能太短,您缺少有效答案 - 当您在接收块中使用模式匹配并且不检查每个amqp_channel:call 的结果时,可能会出现许多不同的问题并显示为超时

首先让我们看看超时。对amqp_channel 的 4 次调用可能总共需要超过 2 秒才能成功完成。简单的解决方案是增加超时时间,将after 2000 更改为after 3000 或更多。 但是你会遇到两个问题:

您的 gen_server 在此期间一直被阻止,如果它不是专用于单个客户端,它将无法用于 在等待答复时处理任何其他请求。 如果您需要将超时时间增加到 5 秒以上,您将遇到另一个超时,由 gen_server 内部管理:请求必须在 5 秒内得到答复。

gen_server 提供了一些接口函数来解决这类问题:'send_request'、'wait_response'和reply。这是一个基本的 gen_server 可以处理 3 种请求:

stop ... 停止服务器,对更新代码很有用。 blocking,Time,Value 服务器将在 Time ms 结束期间休眠,然后返回 Value。这模拟了你的情况,你可以调整如何 需要很长时间才能得到答案。 non_blocking,Time,Value 服务器会将作业委托给另一个进程并立即返回而无应答(因此 它可用于另一个请求)。新进程将在 Time ms 结束期间休眠,然后使用 gen_server:reply 返回值。

服务器模块实现了几个用户界面:

标准的 start()、stop() blocking(Time,Value) 使用 gen_server:call 通过请求 blocking,Time,Value 调用服务器 blocking_catch(Time,Value) 与上一个相同,但捕获 gen_server:call 的结果以显示隐藏的超时 non_blocking(Time,Value,Wait) 使用 gen_server:send_request 使用请求 non_blocking,Time,Value 调用服务器并等待等待 ms 最大值的答案

最后它包括2个测试功能

test([Type,Time,Value,OptionalWait]) 它产生一个进程,该进程将发送一个带有相应参数的类型的请求。答案被发送回调用进程。可以在 shell 中使用 flush() 来检索答案。 parallel_test ([Type,Time,NbRequests,OptionalWait]) 它使用相应的参数调用 NbRequests 次测试。它收集了所有 答案并使用本地函数 collect(NbRequests,Timeout) 打印出来。

代码如下

-module (server_test).

-behaviour(gen_server).

%% API
-export([start/0,stop/0,blocking/2,blocking_catch/2,non_blocking/3,test/1,parallel_test/1]).


%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

-define(SERVER, ?MODULE). 

%%%===================================================================
%%% API
%%%===================================================================
start() ->
    gen_server:start_link(local, ?SERVER, ?MODULE, [], []).

stop() ->
    gen_server:cast(?SERVER, stop).

blocking(Time,Value) ->
    gen_server:call(?SERVER, blocking,Time,Value).

blocking_catch(Time,Value) ->
    catch ok,gen_server:call(?SERVER, blocking,Time,Value).

non_blocking(Time,Value,Wait) ->
    ReqId = gen_server:send_request(?SERVER,non_blocking,Time,Value),
    gen_server:wait_response(ReqId,Wait).

test([Type,Time,Value]) -> test([Type,Time,Value,5000]);
test([Type,Time,Value,Wait]) ->
    Start = erlang:monotonic_time(),
    From = self(),
    F = fun() -> 
        R = case Type of 
            non_blocking -> ?MODULE:Type(Time,Value,Wait);
            _ -> ?MODULE:Type(Time,Value)
        end,
        From ! request,Type,Time,Value,got_answer,R,after_microsec,erlang:monotonic_time() - Start 
    end,
    spawn(F).

parallel_test([Type,Time,NbRequests]) -> parallel_test([Type,Time,NbRequests,5000]);
parallel_test([Type,Time,NbRequests,Wait]) ->
    case Type of
        non_blocking -> [server_test:test([Type,Time,X,Wait]) || X <- lists:seq(1,NbRequests)];
        _ -> [server_test:test([Type,Time,X]) || X <- lists:seq(1,NbRequests)]
    end,
    collect_answers(NbRequests,Time + 1000).


%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([]) ->
    ok, #.

handle_call(blocking,Time,Value, _From, State) ->
    timer:sleep(Time),
    Reply = ok,Value,
    reply, Reply, State;
handle_call(non_blocking,Time,Value, From, State) ->
    F = fun() ->
        do_answer(From,Time,Value)
    end,
    spawn(F),
    noreply, State;
handle_call(_Request, _From, State) ->
    Reply = ok,
    reply, Reply, State.

handle_cast(stop, State) ->
    stop,stopped, State;
handle_cast(_Msg, State) ->
    noreply, State.

handle_info(_Info, State) ->
    noreply, State.

terminate(_Reason, _State) ->
    ok.

code_change(OldVsn, State, _Extra) ->
    io:format("changing code replacing version ~p~n",[OldVsn]),
    ok, State.

%%%===================================================================
%%% Internal functions
%%%===================================================================

do_answer(From,Time,Value) ->
    timer:sleep(Time),
    gen_server:reply(From, Value).

collect_answers(0,_Timeout) ->
    got_all_answers;
collect_answers(NbRequests,Timeout) ->
    receive 
        A -> io:format("~p~n",[A]),
            collect_answers(NbRequests - 1, Timeout)
    after Timeout ->
        missing_answers
    end.

shell 中的会话:

44> c(server_test).                                    
ok,server_test
45> server_test:start().                               
ok,<0.338.0>
46> server_test:parallel_test([blocking,200,3]).
request,blocking,200,1,got_answer,ok,1,after_microsec,207872
request,blocking,200,2,got_answer,ok,2,after_microsec,415743
request,blocking,200,3,got_answer,ok,3,after_microsec,623615
got_all_answers
47> % 3 blocking requests in parallel, each lasting 200ms, they are executed in sequence but no timemout is reached
47> % All the clients get their answers
47> server_test:parallel_test([blocking,2000,3]).                                                                                                       
request,blocking,2000,1,got_answer,ok,1,after_microsec,2063358
request,blocking,2000,2,got_answer,ok,2,after_microsec,4127740
missing_answers
48> % 3 blocking requests in parallel, each lasting 2000ms, they are executed in sequence and the last answer exceeds the gen_server timeout.       
48> % The client for this request don't receive answer. The client should also manage its own timeout to handle this case
48> server_test:parallel_test([blocking_catch,2000,3]).                                                                                             
request,blocking_catch,2000,1,got_answer,ok,1,after_microsec,2063358
request,blocking_catch,2000,2,got_answer,ok,2,after_microsec,4127740
request,blocking_catch,2000,3,got_answer,
         'EXIT',timeout,gen_server,call,[server_test,blocking,2000,3],
         after_microsec,5135355
got_all_answers
49> % same thing but catching the exception. After 5 seconds the gen_server call throws a timeout exception.
49> % The information can be forwarded to the client
49> server_test:parallel_test([non_blocking,200,3]).                                                       
request,non_blocking,200,1,got_answer,reply,1,after_microsec,207872
request,non_blocking,200,2,got_answer,reply,2,after_microsec,207872
request,non_blocking,200,3,got_answer,reply,3,after_microsec,207872
got_all_answers
50> % using non blocking mechanism, we can see that all the requests were managed in parallel 
50> server_test:parallel_test([non_blocking,5100,3]).                                        
request,non_blocking,5100,1,got_answer,timeout,after_microsec,5136379
request,non_blocking,5100,2,got_answer,timeout,after_microsec,5136379
request,non_blocking,5100,3,got_answer,timeout,after_microsec,5136379
got_all_answers
51> % if we increase the answer delay above 5000ms, all requests fail in default timeout
51> server_test:parallel_test([non_blocking,5100,3,6000]).                              
request,non_blocking,5100,1,got_answer,reply,1,after_microsec,5231611
request,non_blocking,5100,2,got_answer,reply,2,after_microsec,5231611
request,non_blocking,5100,3,got_answer,reply,3,after_microsec,5231611
got_all_answers
52> % but thanks to the send_request/wait_response/reply interfaces, the client can adjust the timeout to an accurate value
52> % for each request

请求无法完成的下一个原因是 amqp_channel:call 之一失败。取决于你想做什么,有几个 什么都不做,让崩溃,捕获异常或管理所有情况的可能性。下一个提案使用全局捕获

handle_call(get_account,Timeout, From, State) ->
    F = fun() ->
        do_get_account(From,State,Timeout)
    end,
    spawn(F), % delegate the job to another process and free the server
    noreply, State; % I don't see any change of State in your code, this should be enough

...

do_get_account(From,State,Timeout) ->
    % this block of code asserts all positive return values from amqp_channel calls. it will catch any error
    % and return it as error,.... If everything goes well it return ok,Answer
    Reply = try
        ok = amqp_channel:call(State, #'exchange.declare'exchange = <<"get">>, type = <<"topic">>),
        ok = amqp_channel:call(State, #'queue.declare'queue = <<"get_account">>),
        Binding = #'queue.bind'exchange = <<"get">>,
                                routing_key = <<"get.account">>,
                                queue = <<"get_account">>,
        #'queue.bind_ok' = amqp_channel:call(State, Binding),
        ok = amqp_channel:call(State,#'basic.consume'queue = <<"get_account">>, no_ack = true),
        ok,wait_account_reply(Timeout)
    catch
        Class:Exception -> error,Class,Exception
    end,
    gen_server:reply(From, Reply).

wait_account_reply(Timeout) ->
    receive
    % #'basic.consume_ok' -> ok % you do not handle this message, ignore it since it will be garbaged when the process die
        #'basic.deliver', #amqp_msgpayload = Payload -> extract_account(Payload)
    after Timeout->
       server_timeout
    end.


extract_account(Payload)->
        [<<"account_id">>, AccountId] = jiffy:decode(Payload),
        Doc = [<<"account_id">>, AccountId],
        getAccDataDb:create_AccountId_view(), % What is the effect of this function, what is the return value?
        case getAccDataDb:getAccountNameDetails(Doc) of
            success ->
                getAccDataDb:getAccountNameDetails1(Doc);
            details_not_matched ->
                user_not_exist
        end.

客户端应该是这样的:

get_account() ->
    ReqId = gen_server:send_request(server_name,get_account,2000),
    gen_server:wait_response(ReqId,2200).

【讨论】:

以上是关于如何在 erlang gen_server 中有效地使用接收子句来解决超时错误?的主要内容,如果未能解决你的问题,请参考以下文章

如何停止在erlang中作为gen_server实现的tcp_listener

Erlang 二进制泄漏?

在 Erlang 的 gen_server 中实现代码交换

Erlang gen_server:如何捕获错误?

难以理解 Erlang Gen_Server 架构

何时在 Erlang/OTP 应用程序中使用 gen_server