处理主题路由的 routing_key
Posted
技术标签:
【中文标题】处理主题路由的 routing_key【英文标题】:Handling routing_key for topic routing 【发布时间】:2019-12-22 19:58:43 【问题描述】:我对 Erlang 环境有点陌生
我正在编写一个电子邮件测试应用程序,它使用主题交换中随机生成的 routing_keys 过滤传入的电子邮件,以使电子邮件进入我的系统
一旦它们在队列中被交付(和处理),我想用之前随机的 routing_key 再次标记它们,以将它们路由到另一个交换器,以使它们为最终消费做好准备。
第二个生产步骤给我带来了真正的麻烦
我正在从 tcp 套接字(由第三层程序:spamassassin 处理)通过 handle_info 模式匹配获取数据
我首先依靠 gen_server 通过常规 amqp_client/include/amqp_client.hrl 库来使用消息
我在我的 gen_server 行为中使用 handle_info,然后对参数进行模式匹配。
检测传递的AMQP消息是通过handle_info回调中的函数头(记录)完成的
TCP 套接字很适合与 spamassassin 交谈,它返回一个包含二进制字符串数据的 3 元组:
tcp,#Port<0.55>,<<"SPAMD/1.1 0 EX_OK\r\nContent-length: 564\r\nSpam: True ; 7.9 / 5.0\r\n\r\nReceived: from localhost by XXXX.ikexpress.com\n\twith SpamAssassin (version 3.4.2);\n\tThu, 15 Aug 2019 21:44:12 +0200\nX-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on\n\tXXXXX.ikexpress.com\nX-Spam-Flag: YES\nX-Spam-Level: *******\nX-Spam-Status: Yes, score=7.9 required=5.0 tests=EMPTY_MESSAGE,MISSING_DATE,\n\tMISSING_FROM,MISSING_HEADERS,MISSING_MID,MISSING_SUBJECT,\n\tNO_HEADERS_MESSAGE,NO_RECEIVED,NO_RELAYS autolearn=no\n\tautolearn_force=no version=3.4.2\nMIME-Version: 1.0\nContent-Type: multipart/mixed; boundary=\"----------=_5D55B60C.D2FC2670\"\n\n">>
第二个handle_info中的循环匹配来自监听gen_tcp服务器的答案,但是我必须做打包发送到一个主题交换(topic_scored_email交换)
***My gen_server****
handle_info(#'basic.deliver'routing_key=Key, consumer_tag=Tag, Content, State) ->
#amqp_msgprops = Properties, payload = Payload = Content,
#'P_basic'message_id = MessageId, headers = Headers = Properties,
send_to_spamassassin:calcule_score(Payload),
noreply, State;
handle_info(Msg, State) ->
case Msg of
_,_,Data ->
scored_email:main(Data);
_,_ ->
end,
noreply, State.
***send_to_spamassassin function ***
calcule_score(Message) ->
case gen_tcp:connect("localhost", 783, [mode, binary]) of
ok, Sock ->
…
gen_tcp:send(Sock, Message2);
error,_ ->
io:fwrite("Connection error! Quitting...~n")
end.
***scored_email***
main(Argv) ->
ok, Connection = amqp_connection:start(#amqp_params_networkvirtual_host = <<"/">>),
ok, Channel = amqp_connection:open_channel(Connection),
amqp_channel:call(Channel, #'exchange.declare'exchange = <<"topic_scored_email">>,type = <<"topic">>),
RoutingKey, Message = case Argv of
…
%DOING PATTERN MATCHING THAT WORKS HERE
…
end,
amqp_channel:cast(Channel,#'basic.publish'exchange = <<"topic_scored_email">>,routing_key = RoutingKey,#amqp_msgpayload = Message)
第一个问题是数据的类型(二进制字符串),但我想这可能是使用 BIF binary_to_tuple 或类似的东西的解决方法。
我很难理解的是如何传递正确的 RoutingKey,因为 Erlang 是功能性的,没有副作用或分配。
格式数据的变化(AMQP --> 原始 TCP --> 然后是 AMQP)似乎不可能(对我来说)通过 OTP 抽象来实现
但是,我想用正确的路由键重新组装每条已处理的消息,匹配上面的 5 行。
我怎样才能修改我的代码呢?我来自命令式语言,在这里达到了我的极限……
你的
【问题讨论】:
【参考方案1】:第一个问题是数据类型(二进制字符串),但我想它可以 是使用 BIF binary_to_tuple 或类似的东西的解决方法。
在所有语言中,您都必须弄清楚如何解析从套接字读取的数据。
我很难理解的是我如何通过正确的 RoutingKey,由于 Erlang 是函数式的,所以没有副作用或 分配。
那是党的路线,但实际上递归函数的参数变量可以用来存储值。在您的情况下,您可以将路由密钥存储在 State
变量中,然后在所有 gen_server 回调函数中都可以使用该变量。 State
可以是 30 个元素的元组,如果你愿意,你可以在 State
变量中存储多少信息是没有限制的。
另一种选择是使用 ets/dets 表(即 erlang 数据库)来存储带有路由键的消息,直到您准备好发送所有内容?到其他进程。
RoutingKey, Message = ...
但是,我想用 右路由键匹配上面 5 行。
如果您在同一个函数中,是什么阻止您使用变量RoutingKey
和Message
中的路由键和消息?如果所有代码都在一个函数中,我不清楚如何存在问题。我认为你可以这样做:
RoutingKey, Message = ...
ProcessedMsg = process_this(Message)
RoutingKey, ProcessedMsg
我建议您发布一个简单的问题示例 - 没有所有复杂的匹配和 amqp_channel 的东西来提炼问题的核心,例如
handle_info(Msg, State) ->
RoutingKey = 3,
ProcessedMsg = "hello",
%% Here, I want to write: ....
【讨论】:
以上是关于处理主题路由的 routing_key的主要内容,如果未能解决你的问题,请参考以下文章
Springboot整合RabbitMQ(三)——Topic主题交换机
SpringCloud-Zuul(二):自定义Filter及Zuul内部路由源码解析