Erlang:连接到服务器,并在同一个套接字上接收输入

Posted

技术标签:

【中文标题】Erlang:连接到服务器,并在同一个套接字上接收输入【英文标题】:Erlang: Connecting to a server, and receiving input back on the same socket 【发布时间】:2019-05-16 10:17:23 【问题描述】:

我正在与本地主机建立套接字连接。我想在此连接(或任何连接,但使用相同的连接似乎是最简单的解决方案)上从服务器接收输入。因为几个客户端将能够连接到我的服务器,所以我创建了一个登录函数,除其他外,它将产生一个线程来侦听从服务器发送的消息。

当我输入命令时

c(erlSoc).    
erlSoc:logon("Jacob").

我收到错误提示

[C] error error,einval

我知道这条线

E ->
                io:format("[C] error ~p~n", [E])

正在打印它,但我需要更改什么才能避免此错误发生并且我能够从服务器接收消息。


erlSoc.erl 的主要兴趣点

logon(Uname) ->
    ok, Sock = gen_tcp:connect("localhost", 5300, [binary, packet, 0]),
    spawn(erlSoc, client_receive, [Sock]),
    io:format("Create a Room: create~nList Rooms: list~nJoin Rooms: join~n Leave Rooms: leave~nSend a message: message ~n"),
    client(Sock, Uname).

client_receive(Sock) ->
    case gen_tcp:recv(Sock, 0) of
       ok, Data ->
            io:format("[C]  ~p~n", [Data]);
        error, closed ->
            io:format("[C] closed~n", []);
        E ->
            io:format("[C] error ~p~n", [E])
    end.

erlSoc.erl

-module(erlSoc).
-export([start_server/0, logon/1, remove/2, server/1, client_receive/1]).
-define(TCP_OPTIONS, [binary, packet, 0, active, false, reuseaddr, true]).

listen() ->
    ok, LSocket = gen_tcp:listen(5300, ?TCP_OPTIONS),
    io:format("Accepted the socket connection ~n"),
    accept(LSocket).

accept(LSocket) ->
    ok, CSocket = gen_tcp:accept(LSocket),
    Ref = make_ref(),
    To = spawn(fun() -> init(Ref, CSocket) end),
    gen_tcp:controlling_process(CSocket, To),
    To ! handoff, Ref, CSocket,
    io:format("Second Test ~n"),
    accept(LSocket).

init(Ref, Socket) ->
    receive
        handoff, Ref, Socket ->
            ok, Peername = inet:peername(Socket),
        io:format("[S] peername ~p~n", [Peername]),
            loop(Socket)
    end.

loop(Socket) ->
    case gen_tcp:recv(Socket, 0) of
        ok, Data ->
        List = binary_to_list(Data),
        List2 = string:tokens(List,","),
            io:format("[S] got ~p~n", [List2]),
            parse_data(Socket, List2),
            loop(Socket);
        error, closed ->
            io:format("[S] closed~n", []);
        E ->
            io:format("[S] error ~p~n", [E])
    end.

server_node() ->
    erlSoc@localhost.

parse_data(Soc, [Task, Rname | Message]) -> 
    if 
    Task =:= "message" ->
        erlSoc, self() ! message, Rname, Message;
    Task =:= "create" ->
        erlSoc, server_node() ! create, Rname, Soc;
    Task =:= "list" ->
        erlSoc, self() ! list, Soc;
    Task =:= "join" ->
        erlSoc, self() ! join, Rname, Soc;
    Task =:= "leave" ->
        erlSoc, self() ! leave, Rname, Soc
    end.

start_server() ->
    spawn(erlSoc, server, [[[]]]),
    listen().

remove(X, L) ->
    [Y || Y <- L, Y =/= X].

server(RoomList) ->
    receive
        message, Rname, Message ->
            [_|[Users|Messages]] = lists:keyfind(Rname, 1, RoomList),
            RoomUpdate = lists:keyreplace(Rname, 1, RoomList, [Rname|[Users|[Messages|Message]]]),
            send_message(Message, Rname, Users),
            server(RoomUpdate);
        create, Rname, Soc ->
            server([[Rname|[Soc|[]]]|RoomList]);
        list, Soc ->
            Rooms, _ = RoomList,
            gen_tcp:send(Soc,Rooms),
            server(RoomList);
        join, Rname, User ->
            [Room|[Users|Messages]] = lists:keyfind(Rname, 1, RoomList),
            RoomUpdate = lists:keyreplace(Rname, 1, RoomList, [Rname|[[Users|User]|[Messages]]]),
            gen_tcp:send(User,[Rname,Messages]),
            [Room|[Users|Messages]] = lists:keyfind(Rname, 1, RoomUpdate),
            io:format("Users in ~p : ~p~n", [Rname, Users]),
            server(RoomList);
        leave, Rname, User ->
            [Room|[Users|Messages]] = lists:keyfind(Rname, 1, RoomList),
            NewU = remove(User, Users),
            RoomUpdate = lists:keyreplace(Rname, 1, RoomList, [Rname|[[NewU]|[Messages]]]),
            [Room|[Users|Messages]] = lists:keyfind(Rname, 1, RoomUpdate),
            io:format("Users in ~p : ~p~n", [Rname, Users]),
            server(RoomUpdate)
    end.


send_message(Message, ChatRoom, []) ->
    void;
send_message(Message, Chatroom, [To|Users]) ->
    gen_tcp:send(To,"Message From Chatroom "++Chatroom++": "++Message),
    send_message(Message, Chatroom, Users).


logon(Uname) ->
    ok, Sock = gen_tcp:connect("localhost", 5300, [binary, packet, 0]),
    spawn(erlSoc, client_receive, [Sock]),
    io:format("Create a Room: create~nList Rooms: list~nJoin Rooms: join~n Leave Rooms: leave~nSend a message: message ~n"),
    client(Sock, Uname).

client_receive(Sock) ->
    case gen_tcp:recv(Sock, 0) of
       ok, Data ->
            io:format("[C]  ~p~n", [Data]);
        error, closed ->
            io:format("[C] closed~n", []);
        E ->
            io:format("[C] error ~p~n", [E])
    end.

client(Sock, Uname) ->
    ok,[Task] = io:fread("Task? : ", "~s"),
    if
    Task =:= "message" ->
        ok, [Rname] = io:fread("Send the message to which room? : ", "~s"),
        Message = io:get_line("Type your message: "),
        ok = gen_tcp:send(Sock,"message,"++Rname++","++Uname++": "++Message);
    Task =:= "create" ->
        ok, [Rname] = io:fread("Enter a room name : ", "~s"),
        ok = gen_tcp:send(Sock, "create,"++Rname);
    Task =:= "list" ->
        ok = gen_tcp:send(Sock, "list");
    Task =:= "join" ->
        ok, [Rname] = io:fread("Leave Which Room? : ", "~s"),
        ok = gen_tcp:send(Sock, "leave,"++Rname);
    Task =:= "leave" ->
        ok, [Rname] = io:fread("Join Which Room? : ", "~s"),
        ok = gen_tcp:send(Sock, "join,"++Rname);
    Task =:= "help" ->
        io:format("Create a Room: create~nList Rooms: list~nJoin Rooms: join~n Leave Rooms: leave~nSend a message: message ~n");
    Task =:= "exit" ->
        gen_tcp:close(Sock)
    end.

更新

-module(erlSoc).
-export([start_server/0, logon/1, remove/2, server/1, client_receive/1, client/2]).
-define(TCP_OPTIONS, [binary, packet, 0, active, false, reuseaddr, true]).

listen() ->
    ok, LSocket = gen_tcp:listen(5300, ?TCP_OPTIONS),
    io:format("Accepted the socket connection ~n"),
    accept(LSocket).

accept(LSocket) ->
    ok, CSocket = gen_tcp:accept(LSocket),
    Ref = make_ref(),
    To = spawn(fun() -> init(Ref, CSocket) end),
    gen_tcp:controlling_process(CSocket, To),
    To ! handoff, Ref, CSocket,
    io:format("Second Test ~n"),
    accept(LSocket).

init(Ref, Socket) ->
    receive
        handoff, Ref, Socket ->
            ok, Peername = inet:peername(Socket),
        io:format("[S] peername ~p~n", [Peername]),
            loop(Socket)
    end.

loop(Socket) ->
    case gen_tcp:recv(Socket, 0) of
        ok, Data ->
        List = binary_to_list(Data),
        List2 = string:tokens(List,","),
            io:format("[S] got ~p~n", [List2]),
            parse_data(Socket, List2),
            loop(Socket);
        error, closed ->
            io:format("[S] closed~n", []);
        E ->
            io:format("[S] error ~p~n", [E])
    end.

server_node() ->
    erlSoc@localhost.

parse_data(Soc, [Task, Rname | Message]) -> 
    case Task of 
    "message" ->
        erlSoc, server_node() ! message, Rname, Message;
    "create" ->
        io:format("[C]  ~p~n", [Rname]),
        erlSoc, server_node() ! create, Rname, Soc;
    "list" ->
        erlSoc, server_node() ! list, Soc;
    "join" ->
        erlSoc, server_node() ! join, Rname, Soc;
    "leave" ->
        erlSoc, server_node() ! leave, Rname, Soc
    end.

start_server() ->
    spawn(erlSoc, server, [[[]]]),
    listen().

remove(X, L) ->
    [Y || Y <- L, Y =/= X].

server(RoomList) ->
    receive
        message, Rname, Message ->
            [_|[Users|Messages]] = lists:keyfind(Rname, 1, RoomList),
            RoomUpdate = lists:keyreplace(Rname, 1, RoomList, [Rname|[Users|[Messages|Message]]]),
            send_message(Message, Rname, Users),
            server(RoomUpdate);
        create, Rname, Soc ->
            server([[Rname|[Soc|[]]]|RoomList]),
            io:format("[C]  ~p~n", [Rname]);
        list, Soc ->
            Rooms, _ = RoomList,
            gen_tcp:send(Soc,Rooms),
            server(RoomList);
        join, Rname, User ->
            [Room|[Users|Messages]] = lists:keyfind(Rname, 1, RoomList),
            RoomUpdate = lists:keyreplace(Rname, 1, RoomList, [Rname|[[Users|User]|[Messages]]]),
            gen_tcp:send(User,[Rname,Messages]),
            [Room|[Users|Messages]] = lists:keyfind(Rname, 1, RoomUpdate),
            io:format("Users in ~p : ~p~n", [Rname, Users]),
            server(RoomList);
        leave, Rname, User ->
            [Room|[Users|Messages]] = lists:keyfind(Rname, 1, RoomList),
            NewU = remove(User, Users),
            RoomUpdate = lists:keyreplace(Rname, 1, RoomList, [Rname|[[NewU]|[Messages]]]),
            [Room|[Users|Messages]] = lists:keyfind(Rname, 1, RoomUpdate),
            io:format("Users in ~p : ~p~n", [Rname, Users]),
            server(RoomUpdate)
    end.


send_message(Message, ChatRoom, []) ->
    void;
send_message(Message, Chatroom, [To|Users]) ->
    gen_tcp:send(To,"Message From Chatroom "++Chatroom++": "++Message),
    send_message(Message, Chatroom, Users).


logon(Uname) ->
    ok, Sock = gen_tcp:connect("localhost", 5300, [binary, packet, 0, active, false]),
    Merp = spawn(erlSoc, client_receive, [Sock]),
    gen_tcp:controlling_process(Sock, Merp),
    io:format("Create a Room: create~nList Rooms: list~nJoin Rooms: join~n Leave Rooms: leave~nSend a message: message ~n"),
    Raph = spawn(erlSoc, client, [Sock, Uname]),
    gen_tcp:controlling_process(Sock, Raph).

client_receive(Sock) ->
    case gen_tcp:recv(Sock, 0) of
       ok, Data ->
            io:format("[C]  ~p~n", [Data]),
            client_receive(Sock);
        error, closed ->
            io:format("[C] closed~n", []);
        E ->
            io:format("[C] error ~p~n", [E])
    end.

client(Sock, Uname) ->
    ok,[Task] = io:fread("Task? : ", "~s"),
    if
    Task =:= "message" ->
        ok, [Rname] = io:fread("Send the message to which room? : ", "~s"),
        Message = io:get_line("Type your message: "),
        ok = gen_tcp:send(Sock,"message,"++Rname++","++Uname++": "++Message);
    Task =:= "create" ->
        ok, [Rname] = io:fread("Enter a room name : ", "~s"),
        ok = gen_tcp:send(Sock, "create,"++Rname);
    Task =:= "list" ->
        ok = gen_tcp:send(Sock, "list");
    Task =:= "join" ->
        ok, [Rname] = io:fread("Leave Which Room? : ", "~s"),
        ok = gen_tcp:send(Sock, "leave,"++Rname);
    Task =:= "leave" ->
        ok, [Rname] = io:fread("Join Which Room? : ", "~s"),
        ok = gen_tcp:send(Sock, "join,"++Rname);
    Task =:= "help" ->
        io:format("Create a Room: create~nList Rooms: list~nJoin Rooms: join~n Leave Rooms: leave~nSend a message: message ~n");
    Task =:= "exit" ->
        gen_tcp:close(Sock)
    end,
    client(Sock,Uname).

【问题讨论】:

【参考方案1】:

既然你标记了functional-programming,我会告诉你的不仅仅是我在代码中发现的错误。

我在您的代码中放置了一些 cmets:


erlSoc.erl

-module(erlSoc).
-export([start_server/0, logon/1, remove/2, server/1, client_receive/1]).
-define(TCP_OPTIONS, [binary, packet, 0, active, false, reuseaddr, true]).

% ...

loop(Socket) ->
    % Remember that in server, after accepting a connection, 
    % you are waiting for packet
    case gen_tcp:recv(Socket, 0) of
        ok, Data ->

% ...

parse_data(Soc, [Task, Rname | Message]) -> 
if 
Task =:= "message" ->
    % below code does not work. If you want to send a message to a process,
    % you can use its process id (pid) or its registered name or name of the
    % node which that process belongs to with its pid
    % Based server/1 function, I think that you want to have a process named
    % erlSoc and send message to it. If yes, then you have to register that 
    % process for that name using erlang:register/2 and just use:
    % erlSoc ! YourMessage
    erlSoc, self() ! message, Rname, Message;
% ...
Task =:= "list" ->
    % It seems that you are sending a socket to your erlSoc process and you
    % did not make erlSoc controller of socket !
    erlSoc, self() ! list, Soc;

logon(Uname) ->
    % In below, Your socket will be in 'active' mode and in client_receive/2 function
    % you have called gen_tcp:recv/2 on socket which is invalid (einval)
    ok, Sock = gen_tcp:connect("localhost", 5300, [binary, packet, 0]),

    % In below, You are passing Sock to a new process which is not owner of socket
    % You have to call gen_tcp:controlling_process/2 here
    spawn(erlSoc, client_receive, [Sock]),
    io:format("Create a Room: create~nList Rooms: list~nJoin Rooms: join~n Leave Rooms: leave~nSend a message: message ~n"),
    % Also here you need to call gen_tcp:controlling_process/2 in that process to
    % become owner of socket in this process
    client(Sock, Uname).

client_receive(Sock) ->
    % As I mentioned, In server you were waiting for a packet, Also here in
    % client you are waiting for a packet ! So nothing will happen
    % If you want to use gen_tcp:recv, I recommend to use it with third 
    % argument which is timeout
    case gen_tcp:recv(Sock, 0) of
       ok, Data ->
            io:format("[C]  ~p~n", [Data]);
        error, closed ->
            io:format("[C] closed~n", []);
        E ->
            io:format("[C] error ~p~n", [E])
    end.

% ...

我建议重写部分代码。最好将客户端和服务器代码分开。另外我建议正确命名事物,在 Erlang 中,模块和函数名称都应该是小写的。推荐。可以不使用if,而使用case,例如:

if 
Task =:= "message" ->
    erlSoc, self() ! message, Rname, Message;
Task =:= "create" ->
    erlSoc, server_node() ! create, Rname, Soc;
Task =:= "list" ->
    erlSoc, self() ! list, Soc;
Task =:= "join" ->
    erlSoc, self() ! join, Rname, Soc;
Task =:= "leave" ->
    erlSoc, self() ! leave, Rname, Soc
end.

可以是:

case Task of 
    "message" ->
        erlSoc, self() ! message, Rname, Message;
    "create" ->
        erlSoc, server_node() ! create, Rname, Soc;
    "list" ->
        erlSoc, self() ! list, Soc;
    "join" ->
        erlSoc, self() ! join, Rname, Soc;
    "leave" ->
        erlSoc, self() ! leave, Rname, Soc
end.

【讨论】:

如何使 client_receive 中的套接字能够接受来自服务器的传入消息。您说过它处于活动模式,但我该如何让它处于活动模式? @Jacob 喜欢你的监听套接字!在第三行中,您使用 active, false 作为用于服务器端的套接字选项。在客户端打开套接字你也应该指定这个选项。 gen_tcp:connect("localhost", 5300, [active, false, binary, packet, 0]) 好的,我进行了更新,现在我得到**异常错误:函数 erlSoc:logon/1 中右侧值 error,econnrefused 不匹配(erlSoc.erl,行105)。我并没有真正理解你所说的关于客户等待包裹的说法。我到底要为那部分改变什么? 我在产生客户端和客户端接收的线程上使用了 gen_tcp:controlling 进程,但我仍然收到error,not_owner。是因为我在将第一个线程分配给第二个线程时从第一个线程中删除了所有权?我怎么能解决这个问题? 不建议多次将socket传递给不同的进程。当您这样做时,在其他进程中,您必须等待作为套接字所有者的进程,然后才能从套接字读取任何数据。

以上是关于Erlang:连接到服务器,并在同一个套接字上接收输入的主要内容,如果未能解决你的问题,请参考以下文章

Xamarin/WinForms 客户端在双线程架构中发送/接收时无法连接到套接字

MongoDB erlang 连接结束

通过 Adob​​e Air 客户端连接到 Openshift Python 套接字服务器

如何连接到已注册的节点(Erlang)并从 Ejabberd 使用它

使用 XMPP 关闭 GCM 连接服务器的套接字

使用 python 套接字发送/接收数据