ryu实例---ECMP的rr(轮询)算法实现

Posted 楊木木8023

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ryu实例---ECMP的rr(轮询)算法实现相关的知识,希望对你有一定的参考价值。

最近,做到一个实验,需要每隔一段时间,就改变数据的传输路径,如下图所示,主机之间的数据传输需要经过s1和s2两个交换机,交换机之间的链路有两条,实验是在这两条链路之间每隔10秒切换一次,也就是相当于ECMP的轮询算法(当然,若想以流为单位切换链路,比如说:上一个流经链路1下一条流经链路2,这种方式的轮询利用集线器的程序改变一下就可以实现)。

  • 轮询,即各个流在多条路径之间轮询传输。

拓扑的端口对应关系:

接下来,重点说明程序如何写,不再赘述一些关于拓扑构建和轮询的一些概念。

一、程序

SDN架构下的网络数据传输均是通过流表进行的,也就是说,要想在两条链路之间进行切换,就需要对流表进行改变,这里的思想:

  1. 获取SDN交换机对象和SDN交换机的dpid,也就是s1和s2;
  2. 向s1,s2添加流表项,规则为走上面一条链路;
  3. 设定的时间间隔后,删除s1,s2的流表项,重新添加流表项,规则为走下面一条链路;
  4. 循环执行2,3过程。

首先,创建RRSwitchChange类,进行初始化,并初始化交换机字典,用来存储SDN交换机对象和id。

class RRSwitchChange(app_manager.RyuApp):

    def __init__(self, *args, **kwargs):
        super(RRSwitchChange, self).__init__(*args, **kwargs)
        # 用于存储交换机对象
        self.datapaths = {}

接下来,向类中添加增加和删除流表项的方法。

    # 删除流表项的方法,方法直接清除交换机的所有流表项
    def del_flow(self, datapath, match):
        ofp = datapath.ofproto
        ofp_parser = datapath.ofproto_parser

        req = ofp_parser.OFPFlowMod(datapath=datapath,
                                    command=ofp.OFPFC_DELETE,
                                    out_port=ofp.OFPP_ANY,
                                    out_group=ofp.OFPG_ANY,
                                    match=match)
        datapath.send_msg(req)

    # 添加流表项的方法
    def add_flow(self, datapath, priority, match, actions):
        ofp = datapath.ofproto
        ofp_parser = datapath.ofproto_parser
        command = ofp.OFPFC_ADD
        inst = [ofp_parser.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, actions)]
        req = ofp_parser.OFPFlowMod(datapath=datapath, command=command,
                                    priority=priority, match=match, instructions=inst)
        datapath.send_msg(req)

同时,由于删除和添加流表项的多项操作均相同,也就是如果单纯的调用添加和删除流表项的方法会显得程序冗余杂乱,所以,这里本实验添加了一个方法match_flow(),用于简化添加和删除流表项的重复调用的冗余操作。

    def match_flow(self, dpid, in_port, out_port, priority, add_del):
        ofp_parser = self.datapaths[dpid].ofproto_parser
        ofp = self.datapaths[dpid].ofproto
        actions = [ofp_parser.OFPActionOutput(out_port)]
        # add_del变量用来判断是该执行删除还是添加操作,如果是1则执行添加,如果为0执行删除
        if add_del == 1:
            match = ofp_parser.OFPMatch(in_port=in_port)
            self.add_flow(datapath=self.datapaths[dpid], priority=priority, match=match, actions=actions)
        if add_del == 0:
            match = ofp_parser.OFPMatch()
            self.del_flow(datapath=self.datapaths[dpid], match=match)

接下来,是整个程序的关键部分,即处理流表的删除与添加的函数。

    def rr_link_change(self):
        # flag用于标记每一段时间间隔的流表项下发规则
        # 例如flag=1表示走上面一条链路,flag=2表示走下面一条链路
        flag = 1
        
        # 一致循环的执行流表项的清除添加工作
        while True:
            print('datapath:', self.datapaths)
            try:
                if flag == 1:
                    print('flag:', flag)
                    flag = 2
                    # 添加流表项之前先删除交换机s1,s2中存在的流表项
                    self.match_flow(dpid=1, in_port=1, out_port=1, priority=1, add_del=0)
                    self.match_flow(dpid=2, in_port=1, out_port=1, priority=1, add_del=0)
                    
                    # 向交换机s1中添加流表项
                    self.match_flow(dpid=1, in_port=1, out_port=2, priority=1, add_del=1)
                    self.match_flow(dpid=1, in_port=2, out_port=1, priority=1, add_del=1)

                    # 向交换机s2中添加流表项
                    self.match_flow(dpid=2, in_port=1, out_port=2, priority=1, add_del=1)
                    self.match_flow(dpid=2, in_port=2, out_port=1, priority=1, add_del=1)
                elif flag == 2:
                    print('flag:', flag)
                    flag = 1
                    self.match_flow(dpid=1, in_port=1, out_port=1, priority=1, add_del=0)
                    self.match_flow(dpid=2, in_port=1, out_port=1, priority=1, add_del=0)

                    self.match_flow(dpid=1, in_port=1, out_port=3, priority=1, add_del=1)
                    self.match_flow(dpid=1, in_port=3, out_port=1, priority=1, add_del=1)

                    self.match_flow(dpid=2, in_port=1, out_port=3, priority=1, add_del=1)
                    self.match_flow(dpid=2, in_port=3, out_port=1, priority=1, add_del=1)

            except Exception as info:
                print('info:', info)

            hub.sleep(10) # 间隔10秒切换一次

最后,需要将方法 rr_link_change()进行调用,同时由于上述步骤用到了交换机的对象,所以需要获取交换机,将其写入字典中,这里使用握手时的数据获取交换机字典,获取完成后调用 rr_link_change()进行流表项的下发,为了持续的执行方法,而且不对交换机的获取产生影响,这里利用协程进行方法的执行,代码如下。

@set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
    def switch_features_handler(self, ev):
        msg = ev.msg
        datapath = msg.datapath

        if datapath.id not in self.datapaths:
            self.datapaths[datapath.id] = datapath

        hub.spawn(self.rr_link_change)

至此,程序编写完成,接下来,在Ubuntu终端命令行执行程序。

二、实验

实验拓扑如下:

在程序执行前,查看s1和s2的流表项,流表项为空。接下来,输入命令执行程序,如下图所示。

然后,查看s1和s2的流表项,可以间隔时间查一次,就可以发现流表项每隔10秒更改一次,如下所示。

此时,h1 ping h2,就可以正常的进行通信了。

3、代码附录

from ryu.controller import ofp_event
from ryu.lib import hub
from ryu.base import app_manager
from ryu.controller.handler import CONFIG_DISPATCHER, MAIN_DISPATCHER
from ryu.controller.handler import set_ev_cls


class RRSwitchChange(app_manager.RyuApp):

    def __init__(self, *args, **kwargs):
        super(RRSwitchChange, self).__init__(*args, **kwargs)
        self.datapaths = {}

    @set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
    def switch_features_handler(self, ev):
        msg = ev.msg
        datapath = msg.datapath

        if datapath.id not in self.datapaths:
            self.datapaths[datapath.id] = datapath

        hub.spawn(self.rr_link_change)

    def rr_link_change(self):
        flag = 1

        while True:
            print('datapath:', self.datapaths)
            try:
                if flag == 1:
                    print('flag:', flag)
                    flag = 2
                    self.match_flow(dpid=1, in_port=1, out_port=1, priority=1, add_del=0)
                    self.match_flow(dpid=2, in_port=1, out_port=1, priority=1, add_del=0)

                    self.match_flow(dpid=1, in_port=1, out_port=2, priority=1, add_del=1)
                    self.match_flow(dpid=1, in_port=2, out_port=1, priority=1, add_del=1)

                    self.match_flow(dpid=2, in_port=1, out_port=2, priority=1, add_del=1)
                    self.match_flow(dpid=2, in_port=2, out_port=1, priority=1, add_del=1)
                elif flag == 2:
                    print('flag:', flag)
                    flag = 1
                    self.match_flow(dpid=1, in_port=1, out_port=1, priority=1, add_del=0)
                    self.match_flow(dpid=2, in_port=1, out_port=1, priority=1, add_del=0)

                    self.match_flow(dpid=1, in_port=1, out_port=3, priority=1, add_del=1)
                    self.match_flow(dpid=1, in_port=3, out_port=1, priority=1, add_del=1)

                    self.match_flow(dpid=2, in_port=1, out_port=3, priority=1, add_del=1)
                    self.match_flow(dpid=2, in_port=3, out_port=1, priority=1, add_del=1)

            except Exception as info:
                print('info:', info)

            hub.sleep(10)

    def match_flow(self, dpid, in_port, out_port, priority, add_del):
        ofp_parser = self.datapaths[dpid].ofproto_parser
        ofp = self.datapaths[dpid].ofproto
        actions = [ofp_parser.OFPActionOutput(out_port)]
        if add_del == 1:
            match = ofp_parser.OFPMatch(in_port=in_port)
            self.add_flow(datapath=self.datapaths[dpid], priority=priority, match=match, actions=actions)
        if add_del == 0:
            match = ofp_parser.OFPMatch()
            self.del_flow(datapath=self.datapaths[dpid], match=match)

    def del_flow(self, datapath, match):
        ofp = datapath.ofproto
        ofp_parser = datapath.ofproto_parser

        req = ofp_parser.OFPFlowMod(datapath=datapath,
                                    command=ofp.OFPFC_DELETE,
                                    out_port=ofp.OFPP_ANY,
                                    out_group=ofp.OFPG_ANY,
                                    match=match)
        datapath.send_msg(req)

    def add_flow(self, datapath, priority, match, actions):
        ofp = datapath.ofproto
        ofp_parser = datapath.ofproto_parser
        command = ofp.OFPFC_ADD
        inst = [ofp_parser.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, actions)]
        req = ofp_parser.OFPFlowMod(datapath=datapath, command=command,
                                    priority=priority, match=match, instructions=inst)
        datapath.send_msg(req)

github地址:https://github.com/Yang-Jianlin/ryu/blob/master/ryu/app/rr_switch_link_yjl.py

如有问题,请各位留言或者私信指出,谢谢啦。

 

以上是关于ryu实例---ECMP的rr(轮询)算法实现的主要内容,如果未能解决你的问题,请参考以下文章

Round Robin 轮询调度算法

ryu实例---网络时延探测

ryu实例---网络时延探测

LVS 模式

ryu实例---基于跳数的最短路径转发

负载均衡调度算法