ryu实例---基于跳数的最短路径转发

Posted 楊木木8023

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ryu实例---基于跳数的最短路径转发相关的知识,希望对你有一定的参考价值。

参考网上的一些知识,本篇内容主要介绍如何利用networkx实现最短路径转发,同时介绍ryu如何获取链路拓扑。

一、获取拓扑

对于ryu控制器而言,获取链路拓扑的主要模块在ryu/topology目录下面,下面主要介绍接下来用到的api.py和switches.py。

(1) api:

api是ryu对开发者提供的获取链路信息的接口,api提供了get_switch(), get_link()方法,通过这两个接口,可以获取链路的交换机的信息和各个节点的链路信息,具体的代码如下。

def get_switch(app, dpid=None):
    rep = app.send_request(event.EventSwitchRequest(dpid))
    return rep.switches


def get_link(app, dpid=None):
    rep = app.send_request(event.EventLinkRequest(dpid))
    return rep.links

通过api获取到switch和link的信息之后,就需要对这些信息进行解析,可以进入switches中查看关于switch和link的解析信息。

(2) switches

在这里主要用到了switch类和link类:

switch类中存储的交换机的相关信息,初始化的数据成员如下:

    def __init__(self, dp):
        super(Switch, self).__init__()

        self.dp = dp
        self.ports = []

其中dp是Datapath类的实例,该类定义在在ryu/controller/controller.py,主要属性有:

    def __init__(self, socket, address):
        super(Datapath, self).__init__()

        self.socket = socket
        self.socket.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)
        self.socket.settimeout(CONF.socket_timeout)
        self.address = address
        self.is_active = True

        # The limit is arbitrary. We need to limit queue size to
        # prevent it from eating memory up.
        self.send_q = hub.Queue(16)
        self._send_q_sem = hub.BoundedSemaphore(self.send_q.maxsize)

        self.echo_request_interval = CONF.echo_request_interval
        self.max_unreplied_echo_requests = CONF.maximum_unreplied_echo_requests
        self.unreplied_echo_requests = []

        self.xid = random.randint(0, self.ofproto.MAX_XID)
        self.id = None  # datapath_id is unknown yet
        self._ports = None
        self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10
        self.ofp_brick = ryu.base.app_manager.lookup_service_brick('ofp_event')
        self.state = None  # for pylint
        self.set_state(HANDSHAKE_DISPATCHER)

所以当获取了switch的信息之后,就可以解析出交换机的dpid作为拓扑的节点。

在switches的link类中,保存的是源和目的端口,初始化的数据成员如下:

    def __init__(self, src, dst):
        super(Link, self).__init__()
        self.src = src
        self.dst = dst

具体的拓扑发现原理可以参考:https://www.sdnlab.com/11576.html

接下来的拓扑获取主要用api的get_switch()和get_link()方法进行获取。

二、networkx简单介绍

借用networkx官方文档的一段介绍:

networkx是一个python包,用于创建、操作和研究复杂网络的结构、动态和功能。

NetworkX提供:

  • 研究社会、生物和基础设施网络结构和动态的工具;

  • 一种适用于多种应用的标准编程接口和图形实现;

  • 为协作性、多学科项目提供快速发展环境;

  • 与现有的数值算法和C、C++和FORTRAN代码的接口;

  • 能够轻松处理大型非标准数据集。

使用NetworkX,您可以以标准和非标准数据格式加载和存储网络,生成多种类型的随机和经典网络,分析网络结构,构建网络模型,设计新的网络算法,绘制网络,等等。

关于networkx的详细使用教程:https://www.osgeo.cn/networkx/

这里我主要根据本文用到的networkx进行介绍,给出一个例子。

import networkx as nx
import matplotlib.pyplot as plt


class NetTopology:
    def __init__(self, nodes, links):
        # 创建一个空图
        self.G = nx.Graph()
        # 图中包含节点和边
        self.nodes = nodes
        self.links = links

    # 将节点列表和边列表添加到图中
    def create_topo(self):
        self.G.add_nodes_from(self.nodes)
        self.G.add_edges_from(self.links)

    # 绘图方法,可以将构建的图打印出来
    def polt_topo(self):
        nx.draw(self.G, with_labels=True, font_weight='bold')
        plt.show()

    # 按照最短路径找出源到目的的路径,并给出下一条的输出端口
    def shortest(self, datapath, src, tar):
        # path是源到目的的路径,比如1-5,会输出[1, 4, 5]
        path = nx.shortest_path(self.G, src, tar)
        # 下一跳,也就是1-5,从1开始的下一跳是4,datapath代表当前的节点的id,比如1代码1节点的id,1节点的下一跳就是4
        next_hop = path[path.index(datapath) + 1]
        # out_port是指到下一跳需要经过哪个端口转发出去,比如下一条是4,经过5端口转发出去
        out_port = self.G[datapath][next_hop]['att_dict']['port']

        print(path)
        print(out_port)


if __name__ == '__main__':
    nodes = [1, 2, 3, 4, 5]
    links = [(1, 2, {'att_dict': {'port': 1}}), (2, 3, {'att_dict': {'port': 3}}),
             (3, 5, {'att_dict': {'port': 4}}), (1, 4, {'att_dict': {'port': 5}}),
             (4, 5, {'att_dict': {'port': 6}})]
    net_topo = NetTopology(nodes, links)
    net_topo.create_topo()
    net_topo.polt_topo()
    net_topo.shortest(1, 1, 5)

三、最短路径转发代码

大致的思想如下:

  • 首先,需要在交换机和控制器的握手阶段下发默认流表项;
  • 然后,根据topology中api提供的get方法,构建节点、边的迪杰斯特拉图结构存储信息;
  • 接下来,在packet_in_handler中处理源到目的的转发指令,下发流表等操作;(通过对源到目的的最短路径计算出每一跳的转发端口,根据端口下发转发指令)

创建类PathForward,并进行初始化,代码如下。

class PathForward(app_manager.RyuApp):
    OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]

    def __init__(self, *args, **kwargs):
        super(PathForward, self).__init__(*args, **kwargs)
        self.G = nx.DiGraph()
        # 作为get_switch()和get_link()方法的参数传入
        self.topology_api_app = self

然后,就是处理握手阶段的代码和添加流表项的方法,代码如下,这里不再详述,可以参考以前的博文。

 # 添加流表项的方法
    def add_flow(self, datapath, priority, match, actions):
        ofp = datapath.ofproto
        ofp_parser = datapath.ofproto_parser
        command = ofp.OFPFC_ADD
        inst = [ofp_parser.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, actions)]
        req = ofp_parser.OFPFlowMod(datapath=datapath, command=command,
                                    priority=priority, match=match, instructions=inst)
        datapath.send_msg(req)

    # 当控制器和交换机开始的握手动作完成后,进行table-miss(默认流表)的添加
    # 关于这一段代码的详细解析,参见:https://blog.csdn.net/weixin_40042248/article/details/115749340
    @set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
    def switch_features_handler(self, ev):
        msg = ev.msg
        datapath = msg.datapath
        ofp = datapath.ofproto
        ofp_parser = datapath.ofproto_parser

        # add table-miss
        match = ofp_parser.OFPMatch()
        actions = [ofp_parser.OFPActionOutput(ofp.OFPP_CONTROLLER, ofp.OFPCML_NO_BUFFER)]
        self.add_flow(datapath=datapath, priority=0, match=match, actions=actions)

然后,关键部分是,获取链路拓扑并构建图,然后根据源和目的得到下一跳的输出端口。

首先,获取拓扑的代码如下,构建的是有向图,而且路径有来回之分,所以,需要添加两个方向的链路,代码如下:

@set_ev_cls(event.EventSwitchEnter)
    def get_topo(self, ev):
        switch_list = get_switch(self.topology_api_app)
        switches = []
        # 得到每个设备的id,并写入图中作为图的节点
        for switch in switch_list:
            switches.append(switch.dp.id)
        self.G.add_nodes_from(switches)
        
        link_list = get_link(self.topology_api_app)
        links = []
        # 将得到的链路的信息作为边写入图中
        for link in link_list:
            links.append((link.src.dpid, link.dst.dpid, {'attr_dict': {'port': link.src.port_no}}))
        self.G.add_edges_from(links)

        for link in link_list:
            links.append((link.dst.dpid, link.src.dpid, {'attr_dict': {'port': link.dst.port_no}}))
        self.G.add_edges_from(links)

图构建完成之后,就可以获取packet_in_handler()中需要的out_port,关于out_port的获取原理,可以参考第二节介绍的示例,代码如下。

    def get_out_port(self, datapath, src, dst, in_port):
        dpid = datapath.id

        # 开始时,各个主机可能在图中不存在,因为开始ryu只获取了交换机的dpid,并不知道各主机的信息,
        # 所以需要将主机存入图中
        if src not in self.G:
            self.G.add_node(src)
            self.G.add_edge(dpid, src, attr_dict={'port': in_port})
            self.G.add_edge(src, dpid)

        if dst in self.G:
            path = nx.shortest_path(self.G, src, dst)
            next_hop = path[path.index(dpid) + 1]
            out_port = self.G[dpid][next_hop]['attr_dict']['port']
            print(path)
        else:
            out_port = datapath.ofproto.OFPP_FLOOD
        return out_port

最后就是处理packet_in消息,这里的处理原理和自学习交换机的案例是一样的,无非是这里将out_port的获取从自学习交换机里面的转发表改为了从有向图中获取,代码如下。

@set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
    def packet_in_handler(self, ev):
        msg = ev.msg
        datapath = msg.datapath
        ofp = datapath.ofproto
        ofp_parser = datapath.ofproto_parser

        dpid = datapath.id
        in_port = msg.match['in_port']

        pkt = packet.Packet(msg.data)
        eth = pkt.get_protocols(ethernet.ethernet)[0]

        dst = eth.dst
        src = eth.src

        out_port = self.get_out_port(datapath, src, dst, in_port)
        actions = [ofp_parser.OFPActionOutput(out_port)]

        # 如果执行的动作不是flood,那么此时应该依据流表项进行转发操作,所以需要添加流表到交换机
        if out_port != ofp.OFPP_FLOOD:
            match = ofp_parser.OFPMatch(in_port=in_port, eth_dst=dst, eth_src=src)
            self.add_flow(datapath=datapath, priority=1, match=match, actions=actions)

        data = None
        if msg.buffer_id == ofp.OFP_NO_BUFFER:
            data = msg.data
        # 控制器指导执行的命令
        out = ofp_parser.OFPPacketOut(datapath=datapath, buffer_id=msg.buffer_id,
                                      in_port=in_port, actions=actions, data=data)
        datapath.send_msg(out)

四、实验

实验拓扑如下:

拓扑的各项设置如下所示

端口对应关系

拓扑设置完成之后,运行拓扑。

然后输入命令ryu-manager shortest_path_forward_yjl.py --observe-links,运行刚才完成的最短路径转发的程序,注意这里一定要有--observe-links,用于指明拓扑发现。

接下来,在mininet终端中,h1 ping h3,查看ryu的输出日志,如下。

根据打印的日志可以发现,转发数据是按照最短路径进行的。

五、完整代码

from ryu.base import app_manager
from ryu.ofproto import ofproto_v1_3
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER, CONFIG_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.topology.api import get_switch, get_link
from ryu.topology import event
from ryu.lib.packet import packet
from ryu.lib.packet import ethernet

import networkx as nx


class PathForward(app_manager.RyuApp):
    OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]

    def __init__(self, *args, **kwargs):
        super(PathForward, self).__init__(*args, **kwargs)
        self.G = nx.DiGraph()
        # 作为get_switch()和get_link()方法的参数传入
        self.topology_api_app = self

    # 添加流表项的方法
    def add_flow(self, datapath, priority, match, actions):
        ofp = datapath.ofproto
        ofp_parser = datapath.ofproto_parser
        command = ofp.OFPFC_ADD
        inst = [ofp_parser.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, actions)]
        req = ofp_parser.OFPFlowMod(datapath=datapath, command=command,
                                    priority=priority, match=match, instructions=inst)
        datapath.send_msg(req)

    # 当控制器和交换机开始的握手动作完成后,进行table-miss(默认流表)的添加
    # 关于这一段代码的详细解析,参见:https://blog.csdn.net/weixin_40042248/article/details/115749340
    @set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
    def switch_features_handler(self, ev):
        msg = ev.msg
        datapath = msg.datapath
        ofp = datapath.ofproto
        ofp_parser = datapath.ofproto_parser

        # add table-miss
        match = ofp_parser.OFPMatch()
        actions = [ofp_parser.OFPActionOutput(ofp.OFPP_CONTROLLER, ofp.OFPCML_NO_BUFFER)]
        self.add_flow(datapath=datapath, priority=0, match=match, actions=actions)

    @set_ev_cls(event.EventSwitchEnter)
    def get_topo(self, ev):
        switch_list = get_switch(self.topology_api_app)
        switches = []
        # 得到每个设备的id,并写入图中作为图的节点
        for switch in switch_list:
            switches.append(switch.dp.id)
        self.G.add_nodes_from(switches)

        link_list = get_link(self.topology_api_app)
        links = []
        # 将得到的链路的信息作为边写入图中
        for link in link_list:
            links.append((link.src.dpid, link.dst.dpid, {'attr_dict': {'port': link.src.port_no}}))
        self.G.add_edges_from(links)

        for link in link_list:
            links.append((link.dst.dpid, link.src.dpid, {'attr_dict': {'port': link.dst.port_no}}))
        self.G.add_edges_from(links)

    def get_out_port(self, datapath, src, dst, in_port):
        dpid = datapath.id

        # 开始时,各个主机可能在图中不存在,因为开始ryu只获取了交换机的dpid,并不知道各主机的信息,
        # 所以需要将主机存入图中
        if src not in self.G:
            self.G.add_node(src)
            self.G.add_edge(dpid, src, attr_dict={'port': in_port})
            self.G.add_edge(src, dpid)

        if dst in self.G:
            path = nx.shortest_path(self.G, src, dst)
            next_hop = path[path.index(dpid) + 1]
            out_port = self.G[dpid][next_hop]['attr_dict']['port']
            print(path)
        else:
            out_port = datapath.ofproto.OFPP_FLOOD
        return out_port

    @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
    def packet_in_handler(self, ev):
        msg = ev.msg
        datapath = msg.datapath
        ofp = datapath.ofproto
        ofp_parser = datapath.ofproto_parser

        dpid = datapath.id
        in_port = msg.match['in_port']

        pkt = packet.Packet(msg.data)
        eth = pkt.get_protocols(ethernet.ethernet)[0]

        dst = eth.dst
        src = eth.src

        out_port = self.get_out_port(datapath, src, dst, in_port)
        actions = [ofp_parser.OFPActionOutput(out_port)]

        # 如果执行的动作不是flood,那么此时应该依据流表项进行转发操作,所以需要添加流表到交换机
        if out_port != ofp.OFPP_FLOOD:
            match = ofp_parser.OFPMatch(in_port=in_port, eth_dst=dst, eth_src=src)
            self.add_flow(datapath=datapath, priority=1, match=match, actions=actions)

        data = None
        if msg.buffer_id == ofp.OFP_NO_BUFFER:
            data = msg.data
        # 控制器指导执行的命令
        out = ofp_parser.OFPPacketOut(datapath=datapath, buffer_id=msg.buffer_id,
                                      in_port=in_port, actions=actions, data=data)
        datapath.send_msg(out)

github地址:https://github.com/Yang-Jianlin/ryu/blob/master/ryu/app/shortest_path_forward.py

以上是关于ryu实例---基于跳数的最短路径转发的主要内容,如果未能解决你的问题,请参考以下文章

ryu实例---网络时延探测

ryu实例---基于链路质量(时延)的最短路径转发

ryu实例---基于链路质量(时延)的最短路径转发

基于网络流量的SDN最短路径转发应用

邀请讨论“一种跳数约束的最短路径问题求解与分析”

SQL - postgres - 图中的最短路径 - 递归