gen_server:使用新状态调用

Posted

技术标签:

【中文标题】gen_server:使用新状态调用【英文标题】:gen_server:call with new State 【发布时间】:2019-07-10 00:16:53 【问题描述】:

模块调用gen_server 来处理流,该流使用记录作为状态。

handle_call 使用来自State 的函数处理流,该函数将完成的数据和尾部分开,

现在下一次,在模块发送更多数据之前,应先馈送尾部但更新State

handle_call(stream, Data, _From, State = #mystatemyfun=Fun) ->
    case Fun(Data) of
       completed piece,tail -> 
           dosomethingwithpieace,
           NewState = State##mystatemyfun=resetfun();
           % How do i call this again to feed Tail first with new state?
       stillstreaming, Currentstate ->
           NewState = State##mystatemyfun=CurrentState;

我无法致电gen_server:call(self(),stream, Tail),因为需要先更新State。 而且我无法回复新的State,因为模块会发送更多数据并且尾部会消失。

有没有办法用更新的State 再次调用它而不用尾部回复并从模块反馈尾部??

更新,代码:

% caller module
case gen_tcp:recv(Socket, 0) of % cannot set Length as it will block untill it is done reading Length number of bytes 
    ok, Data ->
        Response = gen_server:call(Pid, handle_init,Socket,Data),
        case Response of
            ok, continue ->
                pre_loop(Socket, Pid);
            ok, logged_in ->
                UserId, UserName = get_user_data(), % ignore it for now.
                receiver_loop(UserId, UserName, Socket, Pid);
            stop, Reason ->
                io:format("Error at pre_loop: ~p~n", [Reason]);
            _ ->
                io:format("Unknown response from call in pre-loop: ~p~n", [Response])
        end;
    error, closed -> % done here as no data was stored in mnesia yet.
        gen_server:stop(Pid),
        io:format("Client died in pre_loop~n")
end.

和 gen_server 模块:

% gen_server module
handle_call(handle_init, _Socket, Data, _From, State = #server_statedata_fun =  incomplete, Fun) ->
    case catch Fun(Data) of
        incomplete, F ->
            NewState = State#server_statedata_fun = incomplete, F,
            reply, ok, continue, NewState;
        with_tail, Term, Tail ->
            % handle Term login/register only
            case handle_input_init(Term, Tail) of
                incomplete, Fn, logged_in ->
                    NewState = State#server_statedata_fun = incomplete, Fn,
                    reply, ok, logged_in, NewState;
                incomplete, Fn ->
                    NewState = State#server_statedata_fun = incomplete, Fn,
                    reply, ok, continue, NewState;
                stop, malformed_data ->
                    reply, stop, malformed_data, State
            end;
        _ ->
            reply, stop, malformed_data, State
    end;

handle_call(_Message, _From, State = #server_state) ->
reply, stop , unknown_call, State.

handle_input_init(Term, Tail) ->
case handle_term_init(Term) of
    ok, login_passed ->
        io:format("send user a login pass msg"),
        handle_tail_init(Tail, logged_in);
    error, login_failed ->
        io:format("send user a login failed error~n"),
        handle_tail_init(Tail);
    ok, registration_passed ->
        io:format("send user a registeration passed msg"),
        handle_tail_init(Tail);
    error, registration_failed ->
        io:format("send user a registeration failed error"),
        handle_tail_init(Tail);
    error, invalidreq ->
        io:format("send user an invalid requst error~n"),
        handle_tail_init(Tail)
end.

handle_tail_init(Tail) ->
case catch jsx:decode(Tail, [stream, return_tail, return_maps]) of
    incomplete, F ->
        incomplete, F;
    with_tail, Term, Tail2 ->
        handle_input_init(Term, Tail2);
    _ ->
        stop, malformed_data
end.

handle_tail_init(Tail, logged_in) -> % because it was logged in already, any further requests should be ignored
case catch jsx:decode(Tail, [stream, return_tail, return_maps]) of
    incomplete, F ->
        incomplete, F, logged_in;
    with_tail, _Term, Tail2 ->
        io:format("send user an invalid requst error~n"),
        handle_tail_init(Tail2, logged_in);
    _ ->
        stop, malformed_data
end.

handle_term_init(Term) ->
case Term of
    #<<"Login">> := [UserName,Password] ->
        login_user(UserName,Password);
    #<<"Register">> := [UserName,Password] ->
        register_user(UserName,Password);
    _ ->
        error, invalidreq
end.

它按预期工作,但这是我的第一个 Erlang 代码,我很肯定它可以简化为单个 recursive handle_call,保持 OTP 风格,这是我选择 Erlang 的原因.

【问题讨论】:

在写任何东西时都不要使用代词——谁知道itthis 指的是什么。好吧,你知道——但你是唯一的一个。你可以在 State 中保存你想要的任何东西:如果需要,将它扩展为一个 1,000 个元素的元组,其中一个元素是你的记录,然后你可以使用其他 999 个插槽来存储你想要的任何东西——包括你消失的尾巴。另外,我认为您的问题与流没有任何关系,因此请创建一个简单的示例来演示您的问题而不使用流,例如handle_call(go, Data, _From, Func) -&gt; case Func(Data) of X, Y 它不是我的尾巴 xD,是的,我知道我可以,但这并不能解决问题,我可以使用数千个元组或编写多个函数来解决它,但它会产生更多问题,即如果尾部包含完整的数据还是完整的数据本身?除了编写复杂的代码之外,我还必须在任何地方处理已完成的部分。 【参考方案1】:

我不能调用 gen_server:call(self(),stream, Tail) 因为 State 需要先更新。

我不太明白你想说什么,但如果你的意思是:

我不能递归地调用gen_server:call(self(),stream, Tail),即我不能在handle:call()中编写递归调用handle_call()的代码。

那么您当然可以将handle:call() 中的所有数据发送到另一个递归调用自身的函数:

handle_call(...) ->

    ...
    NewState = ....
    Result = ...
    FinalResult, FinalState = helper_func(Result, NewState, Tail)
    reply, FinalResult, FinalState

helper_func(Result, State, []) ->
    Result, State;
helper_func(Result, State, [Piece|Tail]) ->
    ...
    NewState = ...
    NewResult = ...
    helper_func(NewResult, NewState, Tail).  %% Recursive call here

【讨论】:

调用者需要知道每次调用的结果来决定它是否应该继续,我尝试使用handle_continue,它可以帮助我使用更新的状态递归调用handle_call,但它也会更新状态作为回复,有回复(发件人,回复),但只有在我没有错的情况下,呼叫者才会处理一个回复。 这里的问题不是如何让它工作,而是如何让它以它应该的方式工作(otp 风格)。如果没有任何效果,我将限制读取缓冲区以确保 tail 不会包含已完成的部分,并且只需使用 handle_continue(tail, Tail) 调用者需要知道每次调用的结果来决定它是否应该继续,--这是你唯一能做的写清楚地说明了您要做什么。 这里的问题不是如何使它工作,问题是如何使它按应有的方式工作--不幸的是,您没有写作技巧来沟通如何它应该可以工作。我收集到您希望在调用 gen_server:call() 之间保留一些数据,但这正是 gen_server 的 LoopData 所做的。 调用者发送数据,handle_call 将块分开,剩下的任何额外数据是 return = tail,对那个块做任何需要做的事情并回复结果,现在 tail 可能还包含有用的块,所以在回复之前,tail 也需要处理,这是无法完成的,因为无法递归调用 handle_call 并且调用者只会处理一个回复。 Tail 可能包含超过 1 个有用的块,如果没有它们,输入数据将因为丢失数据(即尾部)而被破坏,因此需要严格处理 我目前只向调用者回复继续或停止信号并处理 gen_server 中的所有内容,当我继续我的项目时,这也将成为一个问题。

以上是关于gen_server:使用新状态调用的主要内容,如果未能解决你的问题,请参考以下文章

Erlang:如何在主管中正确调度带有 start_child 的 gen_server 并调用 API

通过 Jinterface 调用 gen_server?

gen_server 中的错误也会终止调用进程?

从 Mod:handle_cast 调用 gen_server:cast

如何在 erlang gen_server 中有效地使用接收子句来解决超时错误?

Erlang 源码分析之 Gen_Server