处理主题路由的 routing_key

Posted

技术标签:

【中文标题】处理主题路由的 routing_key【英文标题】:Handling routing_key for topic routing 【发布时间】:2019-12-22 19:58:43 【问题描述】:

我对 Erlang 环境有点陌生

我正在编写一个电子邮件测试应用程序,它使用主题交换中随机生成的 routing_keys 过滤传入的电子邮件,以使电子邮件进入我的系统

一旦它们在队列中被交付(和处理),我想用之前随机的 routing_key 再次标记它们,以将它们路由到另一个交换器,以使它们为最终消费做好准备。

第二个生产步骤给我带来了真正的麻烦

我正在从 tcp 套接字(由第三层程序:spamassassin 处理)通过 handle_info 模式匹配获取数据

我首先依靠 gen_server 通过常规 amqp_client/include/amqp_client.hrl 库来使用消息

我在我的 gen_server 行为中使用 handle_info,然后对参数进行模式匹配。

检测传递的AMQP消息是通过handle_info回调中的函数头(记录)完成的

TCP 套接字很适合与 spamassassin 交谈,它返回一个包含二进制字符串数据的 3 元组:

tcp,#Port<0.55>,<<"SPAMD/1.1 0 EX_OK\r\nContent-length: 564\r\nSpam: True ; 7.9 / 5.0\r\n\r\nReceived: from localhost by XXXX.ikexpress.com\n\twith SpamAssassin (version 3.4.2);\n\tThu, 15 Aug 2019 21:44:12 +0200\nX-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on\n\tXXXXX.ikexpress.com\nX-Spam-Flag: YES\nX-Spam-Level: *******\nX-Spam-Status: Yes, score=7.9 required=5.0 tests=EMPTY_MESSAGE,MISSING_DATE,\n\tMISSING_FROM,MISSING_HEADERS,MISSING_MID,MISSING_SUBJECT,\n\tNO_HEADERS_MESSAGE,NO_RECEIVED,NO_RELAYS autolearn=no\n\tautolearn_force=no version=3.4.2\nMIME-Version: 1.0\nContent-Type: multipart/mixed; boundary=\"----------=_5D55B60C.D2FC2670\"\n\n">>

第二个handle_info中的循环匹配来自监听gen_tcp服务器的答案,但是我必须做打包发送到一个主题交换(topic_scored_email交换)

***My gen_server****
handle_info(#'basic.deliver'routing_key=Key, consumer_tag=Tag, Content, State) ->
    #amqp_msgprops = Properties, payload = Payload = Content,
    #'P_basic'message_id = MessageId, headers = Headers = Properties,
    send_to_spamassassin:calcule_score(Payload),
    noreply, State;
handle_info(Msg, State) ->
    case Msg of
        _,_,Data ->
           scored_email:main(Data);
        _,_ ->
    end,
    noreply, State.

***send_to_spamassassin function ***
    calcule_score(Message) ->
    case gen_tcp:connect("localhost", 783, [mode, binary]) of
        ok, Sock ->
            …
            gen_tcp:send(Sock, Message2);
        error,_ ->
            io:fwrite("Connection error! Quitting...~n")
    end.

***scored_email***
main(Argv) ->
    ok, Connection = amqp_connection:start(#amqp_params_networkvirtual_host = <<"/">>),
    ok, Channel = amqp_connection:open_channel(Connection),
    amqp_channel:call(Channel, #'exchange.declare'exchange = <<"topic_scored_email">>,type = <<"topic">>),
    RoutingKey, Message = case Argv of
                                …
%DOING PATTERN MATCHING THAT WORKS HERE
                                …
                            end,
    amqp_channel:cast(Channel,#'basic.publish'exchange = <<"topic_scored_email">>,routing_key = RoutingKey,#amqp_msgpayload = Message)

第一个问题是数据的类型(二进制字符串),但我想这可能是使用 BIF binary_to_tuple 或类似的东西的解决方法。

我很难理解的是如何传递正确的 RoutingKey,因为 Erlang 是功能性的,没有副作用或分配。

格式数据的变化(AMQP --> 原始 TCP --> 然后是 AMQP)似乎不可能(对我来说)通过 OTP 抽象来实现

但是,我想用正确的路由键重新组装每条已处理的消息,匹配上面的 5 行。

我怎样才能修改我的代码呢?我来自命令式语言,在这里达到了我的极限……

你的

【问题讨论】:

【参考方案1】:

第一个问题是数据类型(二进制字符串),但我想它可以 是使用 BIF binary_to_tuple 或类似的东西的解决方法。

在所有语言中,您都必须弄清楚如何解析从套接字读取的数据。

我很难理解的是我如何通过正确的 RoutingKey,由于 Erlang 是函数式的,所以没有副作用或 分配。

那是党的路线,但实际上递归函数的参数变量可以用来存储值。在您的情况下,您可以将路由密钥存储在 State 变量中,然后在所有 gen_server 回调函数中都可以使用该变量。 State 可以是 30 个元素的元组,如果你愿意,你可以在 State 变量中存储多少信息是没有限制的。

另一种选择是使用 ets/dets 表(即 erlang 数据库)来存储带有路由键的消息,直到您准备好发送所有内容?到其他进程。

RoutingKey, Message = ...

但是,我想用 右路由键匹配上面 5 行。

如果您在同一个函数中,是什么阻止您使用变量RoutingKeyMessage 中的路由键和消息?如果所有代码都在一个函数中,我不清楚如何存在问题。我认为你可以这样做:

RoutingKey, Message = ...
ProcessedMsg = process_this(Message)
RoutingKey, ProcessedMsg

我建议您发布一个简单的问题示例 - 没有所有复杂的匹配和 amqp_channel 的东西来提炼问题的核心,例如

handle_info(Msg, State) -> 
    RoutingKey = 3,
    ProcessedMsg = "hello",

    %% Here, I want to write: ....

【讨论】:

以上是关于处理主题路由的 routing_key的主要内容,如果未能解决你的问题,请参考以下文章

WPF--路由事件

Kafka事件路由器:过滤和基于内容的路由,如何?

Springboot整合RabbitMQ(三)——Topic主题交换机

SpringCloud-Zuul(二):自定义Filter及Zuul内部路由源码解析

RabbitMQ入门:主题路由器(Topic Exchange)

React-Router 和 Material-UI:根据路由应用自定义主题