calico源码分析-IPAM

Posted 苏大强的鱼塘

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了calico源码分析-IPAM相关的知识,希望对你有一定的参考价值。

Big Picture

生产要把网络插件换成calico,对于IP的分配方法有点存疑,花了2天看下了calico-ipam的源码,学习下calico的IP分配逻辑,对比生产环境,看看有没有明显的坑。

calico比较突出的一点是可以指定单个pod的ip,也可以指定一个RS的网段,但实际用的比较少,首先不可能单个pod部署,一个rs指定一个ippool的意义并不是很大,所以主要看下自动分配的代码逻辑,代码其实不是很多,但几个概念挺绕的,根据目前的了解,简单画了一个流程图:


ipam的入口还是老三样,分IP 从cmdAdd开始看:

func cmdAdd(args *skel.CmdArgs) error {
conf := types.NetConf{}
if err := json.Unmarshal(args.StdinData, &conf); err != nil {
return fmt.Errorf("failed to load netconf: %v", err)
}

nodename := utils.DetermineNodename(conf)
utils.ConfigureLogging(conf)

calicoClient, err := utils.CreateClient(conf)
if err != nil {
return err
}

epIDs, err := utils.GetIdentifiers(args, nodename)
if err != nil {
return err
}

epIDs.WEPName, err = epIDs.CalculateWorkloadEndpointName(false)
if err != nil {
return fmt.Errorf("error constructing WorkloadEndpoint name: %s", err)
}

handleID := utils.GetHandleID(conf.Name, args.ContainerID, epIDs.WEPName)
  • 上来和其他IPAM 一样,解析conf文件,让conf配置变成一个对象

  • 基于配置获取当前node节点的名字(DetermineNodename()), 如果配置文件没有指定nodename,则会直接获取当前node的hostname,所以node的hostname 请不要修改,修改后可能会导致后面的一系列失败。

func GetHandleID(netName, containerID, workload string) string {
handleID := fmt.Sprintf("%s.%s", netName, containerID)
		// Default to assigning an IPv4 address
num4 := 1
if conf.IPAM.AssignIpv4 != nil && *conf.IPAM.AssignIpv4 == "false" {
num4 = 0
}

// Default to NOT assigning an IPv6 address
num6 := 0
if conf.IPAM.AssignIpv6 != nil && *conf.IPAM.AssignIpv6 == "true" {
num6 = 1
}

logger.Infof("Calico CNI IPAM request count IPv4=%d IPv6=%d", num4, num6)

v4pools, err := utils.ResolvePools(ctx, calicoClient, conf.IPAM.IPv4Pools, true)
if err != nil {
return err
}

v6pools, err := utils.ResolvePools(ctx, calicoClient, conf.IPAM.IPv6Pools, false)
if err != nil {
return err
}

如果rs的yaml里没有指定网段, 那么下面的返回其实就是个空数组

func ResolvePools(ctx context.Context, c client.Interface, pools []string, isv4 bool) ([]cnet.IPNet, error) {
// First, query all IP pools. We need these so we can resolve names to CIDRs.
pl, err := c.IPPools().List(ctx, options.ListOptions{})
if err != nil {
return nil, err
}

// Iterate through the provided pools. If it parses as a CIDR, just use that.
// If it does not parse as a CIDR, then attempt to lookup an IP pool with a matching name.
result := []cnet.IPNet{}
for _, p := range pools {
_, cidr, err := net.ParseCIDR(p)
if err != nil {
......
}
ip := cidr.IP
if isv4 && ip.To4() == nil {
return nil, fmt.Errorf("%q isn't a IPv4 address", ip)
}
if !isv4 && ip.To4() != nil {
return nil, fmt.Errorf("%q isn't a IPv6 address", ip)
}
result = append(result, cnet.IPNet{IPNet: *cidr})
}
return result, nil
}
		assignArgs := ipam.AutoAssignArgs{
Num4: num4, // 分配的IPV4数量
Num6: num6, // 分配的IPV6数量
HandleID: &handleID, // handleID
Hostname: nodename, // node名字
IPv4Pools: v4pools, // 初定选择的ipv4pool
IPv6Pools: v6pools, // 初定选择的ipv6pool
MaxBlocksPerHost: maxBlocks, // 每台机器最大的block数量,初始为0
Attrs: attrs, // 容器的基本信息,nscontainerID
}

开始正式执行IP分配:

func (c ipamClient) AutoAssign(ctx context.Context, args AutoAssignArgs) ([]net.IPNet, []net.IPNet, error) {
......
var v4list, v6list []net.IPNet

if args.Num4 != 0 {
// Assign IPv4 addresses.
log.Debugf("Assigning IPv4 addresses")
for _, pool := range args.IPv4Pools {
if pool.IP.To4() == nil {
return nil, nil, fmt.Errorf("provided IPv4 IPPools list contains one or more IPv6 IPPools")
}
}
v4list, err = c.autoAssign(ctx, args.Num4, args.HandleID, args.Attrs, args.IPv4Pools, 4, hostname, args.MaxBlocksPerHost, args.HostReservedAttrIPv4s)
if err != nil {
log.Errorf("Error assigning IPV4 addresses: %v", err)
return v4list, nil, err
}
}

......
return v4list, v6list, nil
}

实际分配IP的逻辑在这里autoAssign,所幸作者代码习惯好,注释很给力:

// First, get the existing host-affine blocks.
......
pools, affBlocks, err := c.prepareAffinityBlocksForHost(ctx, requestedPools, version, host, rsvdAttr)
if err != nil {
return nil, err
}

首先会进行block的亲和性检查, prepareAffinityBlocksForHost, 比较核心的代码如下:

func (c ipamClient) prepareAffinityBlocksForHost(
ctx context.Context,
requestedPools []net.IPNet,
version int,
host string,
rsvdAttr *HostReservedAttr)
([]v3.IPPool, []net.IPNet, error)
{
......
// 判断下掩码的长度
maxPrefixLen, err := getMaxPrefixLen(version, rsvdAttr)
if err != nil {
return nil, nil, err
}
......
// Determine the correct set of IP pools to use for this request.
pools, allPools, err := c.determinePools(ctx, requestedPools, version, *v3n, maxPrefixLen)
if err != nil {
return nil, nil, err
}
......
affBlocks, affBlocksToRelease, err := c.blockReaderWriter.getAffineBlocks(ctx, host, version, pools)

......
for _, block := range affBlocksToRelease {
// Determine the pool for each block.
pool, err := c.blockReaderWriter.getPoolForIP(net.IP{block.IP}, allPools)
......
// Determine if the pool selects the current node, refusing to release this particular block affinity if so.
blockSelectsNode, err := pool.SelectsNode(*v3n)
if err != nil {
logCtx.WithError(err).WithField("pool", pool).Error("Failed to determine if node matches pool, skipping")
continue
}
if blockSelectsNode {
logCtx.WithFields(log.Fields{"pool": pool, "block": block}).Debug("Block's pool still selects node, refusing to remove affinity")
continue
}

// Release the block affinity, requiring it to be empty.
for i := 0; i < datastoreRetries; i++ {
if err = c.blockReaderWriter.releaseBlockAffinity(ctx, host, block, true); err != nil {
......
}
}

return pools, affBlocks, nil
}

这里提到了block,block说白了就是将calico的ippool的网段进行了拆分,主要目的是为了减少路由条目,默认是/26位的,所以参数里的blocksize就是26, 但有了block之后,有个比较大的限制就是blockAffinity, 即多个block 默认会和一个node绑定(亲和性), 即这个node上的pod默认都是这几个block网段的,为啥要这样?试想一下,calico没有自动的路由汇总,所以只能通过/26的block网段减少路由条目,一旦一台node上跑了所有网段的服务器,会造成什么问题?最明显的就是主机上的明细路由会非常非常多,维护起来不方便。
再看下亲和性检查会执行大致以下几个步骤:

  • getMaxPrefixLen() 判断block是不是大于32(ipv4情况下),所以block分片的最小值,就是一条明细路由

  • c.determinePools() 决定了哪些pool 可以被分配,代码不贴了,大致逻辑是先拉取所有状态是enable的ippool, 然后遍历和上面为空的IPv4Pools()match,如果match 上了,返回matchpool,因为默认不指定网段,所以这块逻辑是不用的,然后会判断enable的ippool是不是在指定node上的(nodeselect),所以最终只会返回nodeSelect匹配,且状态都是enable的ippool

  • c.blockReaderWriter.getAffineBlocks(), 该方法遍历了blockAffinity对象,并开始进行以下判断:
    a. 如果c.determinePools() 获得的ippool数量为0,那么将所有遍历出来的blockAffinity对象放入blocksInPool内,等待后续分配
    b. 如果c.determinePools() 获得的ippool数量不为0,则遍历返回的上述方法返回的pool列表,如果有pool 包含了blockAffinity对象,则把该blockAffinity对象放入blocksInPool内,等待后续分配,如果不包含,则把该blockAffinity对象放入blocksNotInPool内,等待后续释放

  • 之后将blocksNotInPool内的blockAffinity对象进行释放,先进行遍历,先获取该对象内IP所在的网段(ippool),根据返回值进行判断: 
    a. 如果该则不释放blockAffinity没有匹配到IP池,则跳过。 
    b. 则如果该IP池选择了当前节点,则不释放blockAffinity。
    c. 都不满足则调用releaseBlockAffinity函数进行释放, 出现其他错误时,会进行100次以内的重试。

  • 最后返回所有可用的ippool,符合亲和性规则的blockAffinity对象数组

然后再回到autoAssign方法

func (c ipamClient) autoAssign(ctx context.Context, num int, handleID *string, attrs map[string]string, requestedPools []net.IPNet, version int, host string, maxNumBlocks int, rsvdAttr *HostReservedAttr) ([]net.IPNet, error) {

......

pools, affBlocks, err := c.prepareAffinityBlocksForHost(ctx, requestedPools, version, host, rsvdAttr)
if err != nil {
return nil, err
}

......

s := &blockAssignState{
client: c,
version: version,
host: host,
pools: pools,
remainingAffineBlocks: affBlocks,
hostReservedAttr: rsvdAttr,
allowNewClaim: true,
}

// Allocate the IPs.
for len(ips) < num {
var b *model.KVPair

rem := num - len(ips)
if maxNumBlocks > 0 && numBlocksOwned >= maxNumBlocks {
s.allowNewClaim = false
}

b, newlyClaimed, err := s.findOrClaimBlock(ctx, 1)
if err != nil {
if _, ok := err.(noFreeBlocksError); ok {
// Skip to check non-affine blocks
break
}
if errors.Is(err, ErrBlockLimit) {
log.Warnf("Unable to allocate a new IPAM block; host already has %v blocks but "+
"blocks per host limit is %v", numBlocksOwned, maxNumBlocks)
return ips, ErrBlockLimit
}
return nil, err
}

if newlyClaimed {
numBlocksOwned++
}

// We have got a block b.
for i := 0; i < datastoreRetries; i++ {
newIPs, err := c.assignFromExistingBlock(ctx, b, rem, handleID, attrs, host, config.StrictAffinity)
......
ips = append(ips, newIPs...)
rem = num - len(ips)
break
}
}

rem := num - len(ips)
if config.StrictAffinity != true && rem != 0 {
......
}
}
}

logCtx.Infof("Auto-assigned %d out of %d IPv%ds: %v", len(ips), num, version, ips)
return ips, nil
}

由于前面已经获取到了符合亲和性的blockAffinity对象,可以从中取出对应的block的CIDR, 然后调用assignFromExistingBlock, 从block中分配IP,如果IP用完了,则会检查配置,AutoAllocateBlocks是否开启,开启后则会新申请一个block绑定到该node上继续分IP,如果AutoAllocateBlocks关着,或者IP分完了,calico-ipam 会再次检查配置,如果StrictAffinity开启了,则他会从其他node的亲和性的block里分IP。

总结

calico分IP的流程基本都是围绕着block展开的,在规划的时候,根据上面的代码逻辑会需要注意以下几个问题:

  • handleId是存在etcd里,是pod 和 其对应IP的信息的主要Key,而handleID的组成是ns+container,所以namespaces一定要提前规划好,避免后续踩坑。

  • node的主机名配置不会去写,所以大多数都是获取node的当前主机名,所以尽量不要修改主机名字

  • 由于block的存在,需要规划好blockSize的大小,计算和pod数量的关系,避免浪费或者不够。

  • node下线后,请一定要把从集群中删除node,不然因为亲和性强匹配的规则,可能会导致下线node的IP永远无法再次被使用

  • 如果没有block了,且亲和性检查关着,会需要取其他node的block里借IP,借用IP的操作本质上就是每个block重新分IP的逻辑,block数量多的话也一定会带来性能损耗。


以上是关于calico源码分析-IPAM的主要内容,如果未能解决你的问题,请参考以下文章

CNI IPAM插件分析 --- 以hostlocal为示例

k8s cni bridge

Android 插件化VirtualApp 源码分析 ( 目前的 API 现状 | 安装应用源码分析 | 安装按钮执行的操作 | 返回到 HomeActivity 执行的操作 )(代码片段

Kubernetes ❀ Calico网络插件YAML源码(直接复制即可使用)

Kubernetes ❀ Calico网络插件YAML源码(直接复制即可使用)

Android 逆向整体加固脱壳 ( DEX 优化流程分析 | DexPrepare.cpp 中 dvmOptimizeDexFile() 方法分析 | /bin/dexopt 源码分析 )(代码片段