NVIDIA NCCL 源码学习- 数据通信链路transport的建立
Posted KIDGINBROOK
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了NVIDIA NCCL 源码学习- 数据通信链路transport的建立相关的知识,希望对你有一定的参考价值。
上节以ringGraph为例介绍了机器间channel的连接过程,现在环里每个rank都知道了从哪个rank接收数据以及将数据发送给哪个rank,本节具体介绍下P2P和rdma NET场景下数据通信链路的建立过程。
上节说到nccl通过ncclTransportP2pSetup完成了数据通信链路的建立,还是以上节两机十六卡的环为例:
第一台机器的环:
graph->intra: GPU/0 GPU/7 GPU/6 GPU/3 GPU/2 GPU/5 GPU/4 GPU/1
graph->inter: NET/0 NET/0
第二台机器的环:
graph->intra: GPU/10 GPU/9 GPU/8 GPU/13 GPU/12 GPU/15 GPU/14 GPU/11
graph->inter: NET/0 NET/0
首先介绍一下ncclPeer,ncclPeer保存了两个connector,对于rank 10,send负责和rank 9通信,recv负责和rank 1通信。后续为了方便表述,假设rank 10叫接收端,rank 1叫发送端。
struct ncclPeer
struct ncclConnector send;
struct ncclConnector recv;
;
ncclConnector中connected表示是否完成连接的建立,transportResources为通信过程中用到的buffer,proxyAppend后续介绍数据通信过程再说。
struct ncclConnector
int connected;
struct ncclProxyArgs *proxyAppend;
struct ncclTransportComm* transportComm;
void* transportResources; // Host-side resources
struct ncclConnInfo conn;
struct ncclComm *comm;
;
ncclConnInfo记录了通信过程上下文信息,本节只需要关注buffs,即通信过程中的buffer,实际位于transportResources,这里只是指针指过去。
struct ncclConnInfo
// Regular comm mechanism
char *buffs[NCCL_NUM_PROTOCOLS]; // Local for recv, remote for send
uint64_t *tail; // Local for recv, remote for send
uint64_t *head; // Local for send, remote for recv
int direct; // Direct communication
void **ptrExchange; // Pointer exchange for direct communication
int *fifo; // Size fifo for proxy
uint64_t step; // Keep where we are
uint64_t llLastCleaning;
;
ncclConnector的 ncclTransportComm 定义了一系列的通信相关的函数指针,用户可以自己实现这些接口,ncclTransport 定义了send和recv两个ncclTransportComm,本节会介绍下P2P和NET两个ncclTransport。
struct ncclTransportComm
ncclResult_t (*setup)(struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo*, struct ncclPeerInfo*, struct n
cclConnect*, struct ncclConnector*, int channelId);
ncclResult_t (*connect)(struct ncclConnect*, int nranks, int rank, struct ncclConnector*);
ncclResult_t (*free)(void*);
ncclResult_t (*proxy)(struct ncclProxyArgs*);
;
struct ncclTransport
const char name[4];
ncclResult_t (*canConnect)(int*, struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo*, struct ncclPeerInfo
*);
struct ncclTransportComm send;
struct ncclTransportComm recv;
;
struct ncclTransport netTransport =
"NET",
netCanConnect,
netSendSetup, netSendConnect, netSendFree, netSendProxy ,
netRecvSetup, netRecvConnect, netRecvFree, netRecvProxy
;
接下来接上节介绍下ncclTransportP2pSetup,由于当前rank为10,那么nrecv为1,peerRecv为1,nsend为1,peerSend为9;然后开始创建到1的通信,即通过selectTransport初始化peers[1].recv这个ncclConnector。
ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclChannel* channel, int nrecv, int* peerRecv, int nsend, int* peerSend)
TRACE(NCCL_INIT, "nsend %d nrecv %d", nsend, nrecv);
uint32_t nSkippedSend = 0, nSkippedRecv = 0; /* for tracing */
struct ncclConnect connect;
struct ncclConnector* conn;
for (int i=0; i<nrecv; i++)
int peer = peerRecv[i];
if (peer == -1 || peer >= comm->nRanks) continue;
conn = &channel->peers[peer].recv;
if (conn->connected) ++nSkippedRecv; continue;
memset(&connect, 0, sizeof(connect));
NCCLCHECK(selectTransport<0>(comm->topo, graph, comm->peerInfo+comm->rank, comm->peerInfo+peer, &connect, conn, channel->id));
NCCLCHECK(bootstrapSend(comm->bootstrap, peer, &connect, sizeof(struct ncclConnect)));
...
nccl现共有三个transport,P2P通过卡间p2p通信,SHM通过机器内共享的host内存通信,NET通过网络通信,nccl会依次通过这三个transport的canConnect判断是否可用,然后选择第一个可用的,由于rank 1不在当前机器,因此只有NET的recv可用,设置connector的transportComm为netTransport的recv。
template <int type>
static ncclResult_t selectTransport(struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connect, struct ncclConnector* connector, int channelId)
for (int t=0; t<NTRANSPORTS; t++)
struct ncclTransport *transport = ncclTransports+t;
struct ncclTransportComm* transportComm = type == 1 ? &transport->send : &transport->recv;
int ret = 0;
NCCLCHECK(transport->canConnect(&ret, topo, graph, myInfo, peerInfo));
if (ret)
connector->transportComm = transportComm;
NCCLCHECK(transportComm->setup(topo, graph, myInfo, peerInfo, connect, connector, channelId));
return ncclSuccess;
WARN("No transport found !");
return ncclInternalError;
然后看下netTransport的setup函数
ncclResult_t netRecvSetup(struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* recv, int channelId)
struct netRecvResources* resources;
NCCLCHECK(ncclCalloc(&resources, 1));
recv->transportResources = resources;
NCCLCHECK(ncclTopoGetNetDev(topo, myInfo->rank, graph, channelId, &resources->netDev));
NCCLCHECK(ncclTopoCheckGdr(topo, myInfo->busId, resources->netDev, 0, &resources->useGdr));
...
首先分配 netRecvResources 赋给ncclConnector,主要字段含义见注释,其中LOC_CONUT为2,表示有两个buffer,如果支持gdr,那么会使用第LOC_DEVMEM(1)个buffer,即显存,如果不支持gdr,那么会使用第LOC_HOSTMEM(0)个buffer,即锁页内存;sendMem,recvMem记录了fifo的head和tail,用来协调生产者消费者,这个下节具体介绍,本节可忽略;用户执行的通信操作比如ncclSend一块数据,nccl会将这块数据分成多个小块流水线发送,step表示第几个小块,这个也在下节具体介绍。
struct netRecvResources
void* netListenComm; // 建链使用的监听comm,如果是ib的话即ncclIbListenComm,保存了监听fd和使用了哪张网卡
void* netRecvComm; // 通信连接上下文信息,如果是ib的话即ncclIbRecvComm,保存了pd,cq等rdma连接信息
struct ncclSendMem* sendMem;
struct ncclRecvMem* recvMem;
int netDev; // 用的哪个网卡
int useGdr; // 是否支持gdr
char* buffers[LOC_COUNT]; // buffer地址,三个协议连续存储
int buffSizes[LOC_COUNT]; // buffer长度,三个协议的长度和
void* mhandles[LOC_COUNT]; // rdma注册的mr
void** mhandlesProto[NCCL_NUM_PROTOCOLS]; // 指向mhandles
uint64_t step;
uint64_t llLastCleaning;
;
ncclTopoGetNetDev为当前rank的gpu选择网卡,我们在搜索channel的时候将环对应的网卡记录在了graph->inter里,所以这里通过inter就可以找到对应网卡 。
ncclResult_t ncclTopoGetNetDev(struct ncclTopoSystem* system, int rank, struct ncclTopoGraph* graph, int channelId, int* dev)
if (graph)
// Honor the net device in the graph
int channel = channelId%graph->nChannels;
int ngpus = system->nodes[GPU].count;
int index = graph->intra[channel*ngpus] == rank ? 0 : 1;
*dev = graph->inter[channel*2+index];
else
int64_t id;
NCCLCHECK(ncclTopoGetLocalNet(system, rank, &id, channelId));
*dev = id;
return ncclSuccess;
ncclTopoCheckGdr检查选择的网卡和当前rank的gpu是否支持gdr,具体逻辑在第五节中介绍过,这里不再赘述。然后为sendMem和recvMem分配锁页内存,设置head和tail;测试机器支持gdr,所以protoLoc均为LOC_DEVMEM,即显存,然后分配三个协议所需的buffer,三个协议的buffer连续存储,通过offset记录各自的起始地址,offset保存到conn。mhandles即rdma用的mr,mhandlesProtoc指向mhandles。
ncclResult_t netRecvSetup(struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* recv, int channelId)
...
NCCLCHECK(ncclCudaHostCalloc(&resources->sendMem, 1));
NCCLCHECK(ncclCudaHostCalloc(&resources->recvMem, 1));
recv->conn.direct |= resources->useGdr ? NCCL_DIRECT_NIC : 0;
recv->conn.tail = &resources->recvMem->tail;
recv->conn.head = &resources->sendMem->head;
int protoLoc[NCCL_NUM_PROTOCOLS];
for (int p=0; p<NCCL_NUM_PROTOCOLS; p++)
protoLoc[p] = resources->useGdr ? LOC_DEVMEM : LOC_HOSTMEM;
int buffSizes[NCCL_NUM_PROTOCOLS];
for (int p=0; p<NCCL_NUM_PROTOCOLS; p++)
// Only allocate buffers for simple for p2p connections
buffSizes[p] = graph == NULL && p != NCCL_PROTO_SIMPLE ? 0 : recv->comm->buffSizes[p];
resources->buffSizes[protoLoc[p]] += buffSizes[p];
if (resources->buffSizes[LOC_DEVMEM])
NCCLCHECK(ncclCudaCalloc(resources->buffers+LOC_DEVMEM, resources->buffSizes[LOC_DEVMEM]));
if (resources->buffSizes[LOC_HOSTMEM])
NCCLCHECK(ncclCudaHostCalloc(resources->buffers+LOC_HOSTMEM, resources->buffSizes[LOC_HOSTMEM]));
int offsets[LOC_COUNT];
offsets[LOC_HOSTMEM] = offsets[LOC_DEVMEM] = 0;
for (int p=0; p<NCCL_NUM_PROTOCOLS; p++)
resources->mhandlesProto[p] = resources->mhandles+protoLoc[p];
recv->conn.buffs[p] = resources->buffers[protoLoc[p]] + offsets[protoLoc[p]];
offsets[protoLoc[p]] += buffSizes[p];
INFO(NCCL_INIT|NCCL_NET,"Channel %02d : %d[%lx] -> %d[%lx] [receive] via NET/%s/%d%s", channelId, peerInfo->rank, peerInfo->busId, myInfo->rank, myInfo->busId, ncclNetName(), resources->netDev,
resources->useGdr ? "/GDRDMA" : "");
struct netConnectInfo* info = (struct netConnectInfo*) connectInfo;
NCCLCHECK(ncclNetListen(resources->netDev, &info->netHandle, &resources->netListenComm));
return ncclSuccess;
由于基于socket的建链方式需要通过socket交换发送端和接收端的信息,比如qp number,port,mtu,gid或者lid等,所以这里通过ncclIbListen创建了监听socket,过程类似bootstrap,fd写到listenComm,ip port写到handle,即connectInfo。
ncclResult_t ncclIbListen(int dev, void* opaqueHandle, void** listenComm)
struct ncclIbListenComm* comm;
NCCLCHECK(ncclCalloc(&comm, 1));
struct ncclIbHandle* handle = (struct ncclIbHandle*) opaqueHandle;
static_assert(sizeof(struct ncclIbHandle) < NCCL_NET_HANDLE_MAXSIZE, "ncclIbHandle size too large");
comm->dev = dev;
NCCLCHECK(GetSocketAddr(&(handle->connectAddr)));
NCCLCHECK(createListenSocket(&comm->fd, &handle->connectAddr));
*listenComm = comm;
return ncclSuccess;
struct ncclIbListenComm
int dev;
int fd;
;
到这里就recv就初始化完成了,然后回到ncclTransportP2pSetup,通过bootstrapSend将connectInfo发送到了peer,即rank 1,connectInfo就是上述的ip port。当rank 1执行这个函数的时候,会遍历nsend,此时rank 1的peer就是rank 10,然后执行selectTransport,就会执行netTransport的send的setup,即netSendSetup,这个逻辑和netRecvSetup基本一致,主要还是分配各种buffer,不再赘述。接着看下边逻辑。
ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclChannel* channel, int nrecv, int* peerRecv, int nsend, int* peerSend)
...
for (int i=0; i<nrecv; i++)
...
NCCLCHECK(selectTransport<0>(comm->topo, graph, comm->peerInfo+comm->rank, comm->peerInfo+peer, &connect, conn, channel->id));
NCCLCHECK(bootstrapSend(comm->bootstrap, peer, &connect, sizeof(struct ncclConnect)));
for (int i=0; i<nsend; i++)
int peer = peerSend[i];
if (peer == -1 || peer >= comm->nRanks) continue;
conn = &channel->peers[peer].send;
if (conn->connected) ++nSkippedSend; continue;
memset(&connect, 0, sizeof(connect));
NCCLCHECK(selectTransport<1>(comm->topo, graph, comm->peerInfo+comm->rank, comm->peerInfo+peer, &connect, conn, channel->id));
NCCLCHECK(bootstrapSend(comm->bootstrap, peer, &connect, sizeof(struct ncclConnect)));
for (int i=0; i<nsend; i++)
int peer = peerSend[i];
if (peer == -1 || peer >= comm->nRanks) continue;
conn = &channel->peers[peer].send;
if (conn->connected) ++nSkippedSend; continue;
memset(&connect, 0, sizeof(connect));
NCCLCHECK(bootstrapRecv(comm->bootstrap, peer, &connect, sizeof(struct ncclConnect)));
NCCLCHECK(conn->transportComm->connect(&connect, 1, comm->rank, conn));
conn->connected = 1;
CUDACHECK(cudaMemcpy(&channel->devPeers[peer].send, conn, sizeof(struct ncclConnector), cudaMemcpyHostToDevice));
...
然后rank 1通过bootstrapRecv收到了rank 10发送来的ip和port,然后执行connect,即netSendConnect
ncclResult_t netSendConnect(struct ncclConnect* connectInfo, int nranks, int rank, struct ncclConnector* send)
// Setup device pointers
struct netSendResources* resources = (struct netSendResources*)send->transportResources;
struct netConnectInfo* info = (struct netConnectInfo*)connectInfo;
// Connect to remote peer
NCCLCHECK(ncclNetConnect(resources->netDev, info->netHandle, &resources->netSendComm));
...
这里的info就是rank 10的ip,port,然后执行ncclNetConnect,即ncclIbConnect,这里主要就是创建qp并将相关信息通过socket发送到接收端。
看下ncclIbConnect创建qp的过程,先看下下边两个会用到的api
ncclResult_t ncclIbInitVerbs(ibv_context* ctx, struct ncclIbVerbs* verbs)
NCCLCHECK(wrap_ibv_alloc_pd(&verbs->pd, ctx));
NCCLCHECK(wrap_ibv_create_cq(&verbs->cq, ctx, MAX_REQUESTS, NULL, NULL, 0));
return ncclSuccess;
ncclIbInitVerbs创建pd和cq,ncclIbVerbs保存了pd和cq
ncclResult_t ncclIbCreateQp(uint8_t ib_port, struct ncclIbVerbs* verbs, int access_flags, struct ibv_qp** qp)
struct ibv_qp_init_attr qpInitAttr;
memset(&qpInitAttr, 0, sizeof(struct ibv_qp_init_attr));
qpInitAttr.send_cq = verbs->cq;
qpInitAttr.recv_cq = verbs->cq;
qpInitAttr.qp_type = IBV_QPT_RC;
// We might send 2 requests per send (RDMA_WRITE+RDMA_WRITE_WITH_IMM)
qpInitAttr.cap.max_send_wr = 2*MAX_REQUESTS;
qpInitAttr.cap.max_recv_wr = MAX_REQUESTS;
qpInitAttr.cap.max_send_sge = 1;
qpInitAttr.cap.max_recv_sge = 1;
qpInitAttr.cap.max_inline_data = 0;
NCCLCHECK(wrap_ibv_create_qp(qp, verbs->pd, &qpInitAttr));
struct ibv_qp_attr qpAttr;
memset(&qpAttr, 0, sizeof(struct ibv_qp_attr));
qpAttr.qp_state = IBV_QPS_INIT;
qpAttr.pkey_index = 0;
qpAttr.port_num = ib_port;
qpAttr.qp_access_flags = access_flags;
NCCLCHECK(wrap_ibv_modify_qp(*qp, &qpAttr, IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS));
return ncclSuccess;
ncclIbCreateQp用于创建和初始化qp,设置send和recv使用的完成队列,设置qp_type为rc,设置send和recv的最大wr个数,以及每个wr里最多有多少个sge,然后创建qp,此时这个qp处于RST状态,还无法做任何事情;然后设置qp_state为init,然后设置port和access_flag为IBV_ACCESS_REMOTE_WRITE,表示qp可以接受远端的写,然后修改qp状态,此时qp就处于INIT状态了,此时qp可以下发recv wr,但是接收到的消息不会被处理。
然后再来看ncclIbConnect,ncclIbMalloc分配的是页对齐的内存,包括后边可以看到nccl在注册内存的时候都进行了页对齐,但ibv_reg_mr并不要求内存为页对齐的。
The registered memory buffer doesn't have to be page-aligned.
ncclResult_t ncclIbConnect(int dev, void* opaqueHandle, void** sendComm)
struct ncclIbSendComm* comm;
NCCLCHECK(ncclIbMalloc((void**)&comm, sizeof(struct ncclIbSendComm)));
struct ncclIbHandle* handle = (struct ncclIbHandle*) opaqueHandle;
NCCLCHECK(connectAddress(&comm->fd, &handle->connectAddr));
*sendComm = comm;
// IB Setup
ibv_context* ctx = ncclIbDevs[dev].context;
NCCLCHECK(ncclIbInitVerbs(ctx, &comm->verbs));
uint8_t ib_port = ncclIbDevs[dev].port;
NCCLCHECK(ncclIbCreateQp(ib_port, &comm->verbs, IBV_ACCESS_REMOTE_WRITE, &comm->qp));
// Send my QP Info to receiver through the socket. Hope this won't block.
struct ibv_port_attr portAttr;
NCCLCHECK(wrap_ibv_query_port(ctx, ib_port, &portAttr));
struct ncclIbQpInfo qpInfo;
qpInfo.ib_port = ib_port;
qpInfo.qpn = comm->qp->qp_num;
qpInfo.mtu = portAttr.active_mtu;
// Prepare my fifo
NCCLCHECK(wrap_ibv_reg_mr(&comm->fifoMr, comm->verbs.pd, comm->fifo, sizeof(struct ncclIbSendFifo)*MAX_REQUESTS, IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE|IBV_ACCESS_REMOTE_READ));
qpInfo.fifoRkey = comm->fifoMr->rkey;
qpInfo.fifoAddr = (uint64_t)comm->fifo;
// RoCE support
qpInfo.lid = portAttr.lid;
if (qpInfo.lid) // IB
INFO(NCCL_NET,"NET/IB: Dev %d Port %d qpn %d mtu %d LID %d", dev, ib_port, qpInfo.qpn, qpInfo.mtu, qpInfo.lid);
else // RoCE
union ibv_gid gid;
NCCLCHECK(wrap_ibv_query_gid(ctx, ib_port, ncclParamIbGidIndex(), &gid));
qpInfo.spn = gid.global.subnet_prefix;
qpInfo.iid = gid.global.interface_id;
INFO(NCCL_NET,"NET/IB: Dev %d Port %d qpn %d mtu %d GID %ld (%lX/%lX)", dev, ib_port, qpInfo.qpn, qpInfo.mtu, ncclParamIbGidIndex(), qpInfo.spn, qpInfo.iid);
NCCLCHECK(socketSend(comm->fd, &qpInfo, sizeof(qpInfo)));
return ncclSuccess;
QP初始化好之后就准备通过socket交换发送端和接收端的信息,获取port相关信息,将port,mtu,qpn赋值给qpInfo,然后判断使用的是ib还是roce,roce里lid为0,只能用gid进行通信,而ib可以使用lid进行通信,最后通过socket将qpInfo发送到接收端,即rank 10。
再回到netSendConnect,需要将setup过程中分配的数据buffer进行注册,即ncclIbRegMr,这里进行了页对齐,mr写到了resource的mhandle里。
ncclResult_t netSendConnect(struct ncclConnect* connectInfo, int nranks, int rank, struct ncclConnector* send)
...
if (resources->buffSizes[LOC_DEVMEM])
NCCLCHECK(ncclNetRegMr(resources->netSendComm, resources->buffers[LOC_DEVMEM], resources->buffSizes[LOC_DEVMEM], NCCL_PTR_CUDA, &resources->mhandles[LOC_DEVMEM]));
if (resources->buffSizes[LOC_HOSTMEM])
NCCLCHECK(ncclNetRegMr(resources->netSendComm, resources->buffers[LOC_HOSTMEM], resources->buffSizes[LOC_HOSTMEM], NCCL_PTR_HOST, &resources->mhandles[LOC_HOSTMEM]));
return ncclSuccess;
ncclResult_t ncclIbRegMr(void* comm, void* data, int size, int type, void** mhandle)
struct ncclIbVerbs* verbs = (struct ncclIbVerbs*)comm;
uint64_t addr = (uint64_t)data;
assert(size > 0);
// Deregister / register
uint64_t regAddr = addr & (~(REG_ALIGN-1));
uint64_t regSize = addr+size - regAddr;
regSize = ((regSize + REG_ALIGN-1) / REG_ALIGN ) * REG_ALIGN;
struct ibv_mr* mr;
NCCLCHECK(wrap_ibv_reg_mr(&mr, verbs->pd, (void*)regAddr, regSize, IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE|IBV_ACCESS_REMOTE_
READ));
*mhandle = (void*)mr;
TRACE(NCCL_INIT,"regAddr %lx size %ld rkey %x", regAddr, regSize, mr->rkey);
return ncclSuccess;
然后再回到ncclTransportP2pSetup,rank 1执行了connect,将qp相关信息通过socket发送给了rank 10,这时候rank 10接着执行下边的connect,即netRecvConnect。另外在rdma场景下这里通过bootstrap收到的ncclConnect没有用到。
ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclChannel* channel, int nrecv, int* peerRecv, int nsend, int* peerSend)
...
for (int i=0; i<nrecv; i++)
int peer = peerRecv[i];
if (peer == -1 || peer >= comm->nRanks) continue;
conn = &channel->peers[peer].recv;
if (conn->connected) ++nSkippedRecv; continue;
memset(&connect, 0, sizeof(connect));
NCCLCHECK(bootstrapRecv(comm->bootstrap, peer, &connect, sizeof(struct ncclConnect)));
NCCLCHECK(conn->transportComm->connect(&connect, 1, comm->rank, conn));
conn->connected = 1;
CUDACHECK(cudaMemcpy(&channel->devPeers[peer].recv, conn, sizeof(struct ncclConnector), cudaMemcpyHostToDevice));
TRACE(NCCL_INIT, "nsend %d nrecv %d nSkippedSend %u nSkippedRecv %u - DONE", nsend, nrecv, nSkippedSend, nSkippedRecv);
return ncclSuccess;
ncclResult_t netRecvConnect(struct ncclConnect* connectInfo, int nranks, int rank, struct ncclConnector* recv)
// Setup device pointers
struct netRecvResources* resources = (struct netRecvResources*)recv->transportResources;
// Finish connection establishment from remote peer
NCCLCHECK(ncclNetAccept(resources->netListenComm, &resources->netRecvComm));
NCCLCHECK(ncclNetCloseListen(resources->netListenComm));
if (resources->buffSizes[LOC_DEVMEM])
NCCLCHECK(ncclNetRegMr(resources->netRecvComm, resources->buffers[LOC_DEVMEM], resources->buffSizes[LOC_DEVMEM], NCCL_PTR_CUDA, &resources->mhandles[LOC_DEVMEM]));
if (resources->buffSizes[LOC_HOSTMEM])
NCCLCHECK(ncclNetRegMr(resources->netRecvComm, resources->buffers[LOC_HOSTMEM], resources->buffSizes[LOC_HOSTMEM], NCCL_PTR_HOST, &resources->mhandles[LOC_HOSTMEM]));
return ncclSuccess;
rank 10会执行ncclIbAccept,通过socket收到了rank 1的qp信息,然后通过net dev获取对应网卡的context和port,和上述过程一样通过ncclIbInitVerbs创建pd和cq,通过ncclIbCreateQp创建qp,然后根据rank 1调整mtu
ncclResult_t ncclIbAccept(void* listenComm, void** recvComm)
struct ncclIbListenComm* lComm = (struct ncclIbListenComm*)listenComm;
struct ncclIbRecvComm* rComm;
NCCLCHECK(ncclIbMalloc((void**)&rComm, sizeof(struct ncclIbRecvComm)));
struct sockaddr_in sockaddr;
socklen_t socklen = sizeof(struct sockaddr_in);
SYSCHECKVAL(accept(lComm->fd, (struct sockaddr*)&sockaddr, &socklen), "accept", rComm->fd);
struct ncclIbQpInfo remQpInfo;
NCCLCHECK(socketReceive(rComm->fd, &remQpInfo, sizeof(remQpInfo)));
// IB setup
ibv_context* ctx = ncclIbDevs[lComm->dev].context;
uint8_t ib_port = ncclIbDevs[lComm->dev].port;
struct ibv_port_attr portAttr;
NCCLCHECK(wrap_ibv_query_port(ctx, ib_port, &portAttr));
union ibv_gid gid;
NCCLCHECK(wrap_ibv_query_gid(ctx, ib_port, ncclParamIbGidIndex(), &gid));
// QP Creation
NCCLCHECK(ncclIbInitVerbs(ctx, &rComm->verbs));
NCCLCHECK(ncclIbCreateQp(ib_port, &rComm->verbs, IBV_ACCESS_REMOTE_WRITE, &rComm->qp));
// Adjust the MTU
remQpInfo.mtu = (enum ibv_mtu)std::min(remQpInfo.mtu, portAttr.active_mtu);
// Setup QP
struct ibv_qp* qp = rComm->qp;
NCCLCHECK(ncclIbRtrQp(qp, &remQpInfo));
NCCLCHECK(ncclIbRtsQp(qp));
...
然后执行ncclIbRtrQp,将qp从INIT状态转到RTR状态,设置mtu,对端的qpn,gid和port等信息,这个时候qp可以下发recv消息并正常接收了
ncclResult_t ncclIbRtrQp(ibv_qp* qp, struct ncclIbQpInfo* info)
struct ibv_qp_attr qpAttr;
memset(&qpAttr, 0, sizeof(struct ibv_qp_attr));
qpAttr.qp_state = IBV_QPS_RTR;
qpAttr.path_mtu = info->mtu;
qpAttr.dest_qp_num = info->qpn;
qpAttr.rq_psn = 0;
qpAttr.max_dest_rd_atomic = 1;
qpAttr.min_rnr_timer = 12;
if (info->lid == 0)
qpAttr.ah_attr.is_global = 1;
qpAttr.ah_attr.grh.dgid.global.subnet_prefix = info->spn;
qpAttr.ah_attr.grh.dgid.global.interface_id = info->iid;
qpAttr.ah_attr.grh.flow_label = 0;
qpAttr.ah_attr.grh.sgid_index = ncclParamIbGidIndex();
qpAttr.ah_attr.grh.hop_limit = 255;
qpAttr.ah_attr.grh.traffic_class = ncclParamIbTc();
else
qpAttr.ah_attr.is_global = 0;
qpAttr.ah_attr.dlid = info->lid;
qpAttr.ah_attr.sl = ncclParamIbSl();
qpAttr.ah_attr.src_path_bits = 0;
qpAttr.ah_attr.port_num = info->ib_port;
NCCLCHECK(wrap_ibv_modify_qp(qp, &qpAttr, IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER));
return ncclSuccess;
然后执行,此时qp从状态RTR转为状态RTS,此时qp可以下发send消息正常发送了。
ncclResult_t ncclIbRtsQp(ibv_qp* qp)
struct ibv_qp_attr qpAttr;
memset(&qpAttr, 0, sizeof(struct ibv_qp_attr));
qpAttr.qp_state = IBV_QPS_RTS;
qpAttr.timeout = ncclParamIbTimeout();
qpAttr.retry_cnt = ncclParamIbRetryCnt();
qpAttr.rnr_retry = 7;
qpAttr.sq_psn = 0;
qpAttr.max_rd_atomic = 1;
NCCLCHECK(wrap_ibv_modify_qp(qp, &qpAttr, IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC));
return ncclSuccess;
然后继续看ncclIbAccept,这里fifo也是用来控制发送过程的,后边介绍数据通信再写。
ncclResult_t ncclIbAccept(void* listenComm, void** recvComm)
...
// Retain remote fifo info and prepare my RDMA ops
rComm->remFifo.rkey = remQpInfo.fifoRkey;
rComm->remFifo.addr = remQpInfo.fifoAddr;
NCCLCHECK(wrap_ibv_reg_mr(&rComm->remFifo.mr, rComm->verbs.pd, &rComm->remFifo.elems, sizeof(struct ncclIbSendFifo)*MAX_REQUESTS, IBV_ACCESS_REMOTE_WRITE|IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_READ));
rComm->remFifo.sge.length = sizeof(struct ncclIbSendFifo);
rComm->remFifo.sge.lkey = rComm->remFifo.mr->lkey;
#if USE_RDMA_SEND_INLINE
// Determine whether the remFifo element data can be sent INLINE
struct ibv_qp_attr attr;
struct ibv_qp_init_attr init_attr;
NCCLCHECK(wrap_ibv_query_qp(qp, &attr, IBV_QP_CAP, &init_attr));
if (init_attr.cap.max_inline_data >= rComm->remFifo.sge.length) rComm->remFifo.flags = IBV_SEND_INLINE;
#endif
// Allocate Flush dummy buffer for GPU Direct RDMA
rComm->gpuFlush.enabled = (ncclIbGdrSupport(lComm->dev) == 0) && (ncclParamIbGdrFlushDisable() == 0) ? 1 : 0;
if (rComm->gpuFlush.enabled)
NCCLCHECK(wrap_ibv_reg_mr(&rComm->gpuFlush.hostMr, rComm->verbs.pd, &rComm->gpuFlush.hostMem, sizeof(int), IBV_ACCESS_LOCAL_WRITE));
rComm->gpuFlush.sge.addr = (uint64_t)&rComm->gpuFlush.hostMem;
rComm->gpuFlush.sge.length = 1;
rComm->gpuFlush.sge.lkey = rComm->gpuFlush.hostMr->lkey;
NCCLCHECK(ncclIbCreateQp(ib_port, &rComm->verbs, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ, &rComm->gpuFlush.qp));
struct ncclIbQpInfo localQpInfo =
.lid=portAttr.lid,
.ib_port=ib_port,
.qpn=rComm->gpuFlush.qp->qp_num,
.spn=gid.global.subnet_prefix,
.iid=gid.global.interface_id,
.mtu=portAttr.active_mtu
;
NCCLCHECK(ncclIbRtrQp(rComm->gpuFlush.qp, &localQpInfo));
NCCLCHECK(ncclIbRtsQp(rComm->gpuFlush.qp));
// Fill Handle
struct ncclIbQpInfo qpInfo =
.lid=portAttr.lid,
.ib_port=ib_port,
.qpn=qp->qp_num,
.spn=gid.global.subnet_prefix,
.iid=gid.global.interface_id,
.mtu=remQpInfo.mtu
;
NCCLCHECK(socketSend(rComm->fd, &qpInfo, sizeof(qpInfo)));
*recvComm = rComm;
return ncclSuccess;
gpuFlush也对应一个qp,不过这个qp是local的,即他的对端qp就是自己,当开启gdr之后,每次接收数据后都需要执行一下flush,其实是一个rdma read操作,使用网卡读一下接收到的数据的第一个int到hostMem。官方issue里解释说当通过gdr接收数据完成,产生wc到cpu的时候,接收的数据并不一定在gpu端可以读到,这个时候需要在cpu端执行以下读取。
struct ncclIbGpuFlush
int enabled;
int hostMem;
struct ibv_mr* hostMr;
struct ibv_sge sge;
struct ibv_qp* qp;
;
最后将rank 10的port,qpn,gid等通过socket发送回rank 1,到这里ncclTransportP2pSetup就执行完成了,但是此时rdma还没有完成建立连接,因为rank 1还没有拿到rank 10的信息,qp还处于INIT状态。rank 1直到开始发送数据的时候才会去检查是否完成最后一步建链,如果还没有建链那么执行ncclSendCheck,过程和上述一致,不再赘述。
ncclResult_t ncclSendCheck(struct ncclIbSendComm* comm)
struct ncclIbQpInfo remQpInfo;
struct ibv_qp* qp = comm->qp;
// Do not block on this receive, return if not ready.
int bytes = 0;
NCCLCHECK(socketProgress(NCCL_SOCKET_RECV, comm->fd, &remQpInfo, sizeof(remQpInfo), &bytes));
if (bytes == 0) return ncclSuccess; // Try again later
NCCLCHECK(socketWait(NCCL_SOCKET_RECV, comm->fd, &remQpInfo, sizeof(remQpInfo), &bytes));
NCCLCHECK(ncclIbRtrQp(qp, &remQpInfo));
NCCLCHECK(ncclIbRtsQp(qp));
comm->ready = 1;
// Block until this is done. It *should* not block indefinitely.
NCCLCHECK(socketSend(comm->fd, &comm->ready, sizeof(int)));
return ncclSuccess;
到这里rank 1和rank 10的rdma链接就建立完成了,然后我们再看下rank 10和rank 9的p2p链接。
p2p场景rank之间交换的connectInfo如下所示
struct p2pConnectInfo
int direct; // 是否为同进程
int read; // 是否支持p2p read
union
void* directPtr; // 同进程使用这个字段记录当前rank的数据buffer
cudaIpcMemHandle_t devIpc; // 不同进程的话使用共享显存通信,devIpc记录当前rank的ipc handle
;
;
仍然按照刚刚的顺序,rank 9先执行recv的setup, 首先分配resource,数据通信buffer会保存在ncclRecvMem的buff字段。
struct p2pRecvResources
struct ncclRecvMem* devMem;
void* ipcPtr;
;
struct ncclRecvMem
union
struct
uint64_t tail;
char pad1[CACHE_LINE_SIZE-sizeof(uint64_t)];
char pad2[CACHE_LINE_SIZE-sizeof(uint64_t)];
int sizesFifo[NCCL_STEPS];
;
char pad4[MEM_ALIGN];
;
char buff[1]; // Actually larger than that
;
然后判断useRead,如果两个rank之间的路径类型小于p2pLevel(默认是PATH_SYS),那么useP2P为1,如果路径类型为PATH_NVL并且为安培架构,那么useRead为1,ncclRecvMem使用柔性数组存储buffer,还是只关注NCCL_PROTO_SIMPLE,如果read为1那么不需要分配buffer,由于当前场景为单进程,所以记录direct为1,devMem记录到direcPtr,然后通过cudaDeviceEnablePeerAccess开启卡间p2p访问。
ncclResult_t p2pRecvSetup(struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo,
struct ncclConnect* connectInfo, struct ncclConnector * recv, int channelId)
struct p2pRecvResources* resources;
NCCLCHECK(ncclCalloc(&resources, 1));
recv->transportResources = resources;
int useRead = p2pUseRead(topo, myInfo, peerInfo);
int recvSize = offsetof(struct ncclRecvMem, buff);
// For P2P Read the SIMPLE buffer is tagged on the end of the ncclSendMem structure
for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) if (!(useRead && p == NCCL_PROTO_SIMPLE)) recvSize += recv->comm->buffSizes[p];
ALIGN_SIZE(recvSize, CUDA_IPC_MIN);
NCCLCHECK(ncclCudaCalloc((char**)&resources->devMem, recvSize));
struct p2pConnectInfo info;
info.read = useRead;
if (myInfo->pidHash == peerInfo->pidHash)
info.direct = 1;
info.directPtr = resources->devMem;
if (myInfo->cudaDev == peerInfo->cudaDev)
TRACE(NCCL_INIT|NCCL_P2P,"%d <- %d via P2P/common device", myInfo->rank, peerInfo->rank);
else
// Enable P2P access
cudaError_t err = cudaDeviceEnablePeerAccess(peerInfo->cudaDev, 0);
if (err == cudaErrorPeerAccessAlreadyEnabled)
cudaGetLastError();
else if (err != cudaSuccess)
WARN("failed to peer with device %d(=%lx): %d %s",
peerInfo->cudaDev, peerInfo->busId, err, cudaGetErrorString(err));
return ncclInternalError;
TRACE(NCCL_INIT|NCCL_P2P,"Channel %02d : %d[%lx] <- %d[%lx] via P2P/direct pointer", channelId, myInfo->rank, myInfo->busId, peerInfo->rank, peerInfo->busId);
else
...
static_assert(sizeof(struct p2pConnectInfo) <= sizeof(struct ncclConnect), "p2p Connect Info is too big");
memcpy(connectInfo, &info, sizeof(struct p2pConnectInfo));
return ncclSuccess;
接下来rank 10会执行send的setup,大体逻辑一致,从这里我们可以看出useRead的作用,如果useRead为1,那么buffer放在send rank,如果为0,则放在recv rank。
ncclResult_t p2pSendSetup(struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo,
struct ncclConnect* connectInfo, struct ncclConnector* send, int channelId)
struct p2pSendResources* resources;
NCCLCHECK(ncclCalloc(&resources, 1));
send->transportResources = resources;
int useRead = p2pUseRead(topo, myInfo, peerInfo);
int sendSize = sizeof(struct ncclSendMem);
// For P2P Read the SIMPLE buffer is tagged on the end of the ncclSendMem structure
if (useRead) sendSize += send->comm->buffSizes[NCCL_PROTO_SIMPLE];
ALIGN_SIZE(sendSize, CUDA_IPC_MIN);
NCCLCHECK(ncclCudaCalloc((char**)&resources->devMem, sendSize));
struct p2pConnectInfo info;
info.read = useRead;
const char* useReadStr = info.read ? "/read" : "";
if (myInfo->pidHash == peerInfo->pidHash)
info.direct = 1;
info.directPtr = resources->devMem;
if (myInfo->cudaDev == peerInfo->cudaDev)
INFO(NCCL_INIT|NCCL_P2P,"Channel %02d : %d[%d] -> %d[%d] via P2P/common device%s",
channelId, myInfo->rank, myInfo->cudaDev, peerInfo->rank, peerInfo->cudaDev, useReadStr);
return ncclInternalError;
else
// Enable P2P access
cudaError_t err = cudaDeviceEnablePeerAccess(peerInfo->cudaDev, 0);
if (err == cudaErrorPeerAccessAlreadyEnabled)
cudaGetLastError();
else if (err != cudaSuccess)
WARN("failed to peer with device %d(=%lx): %d %s",
peerInfo->cudaDev, peerInfo->busId, err, cudaGetErrorString(err));
return ncclInternalError;
INFO(NCCL_INIT|NCCL_P2P,"Channel %02d : %d[%lx] -> %d[%lx] via P2P/direct pointer%s",
channelId, myInfo->rank, myInfo->busId, peerInfo->rank, peerInfo->busId, useReadStr);
else
...
static_assert(sizeof(struct p2pConnectInfo) <= sizeof(struct ncclConnect), "p2p Connect Info is too big");
memcpy(connectInfo, &info, sizeof(struct p2pConnectInfo));
return ncclSuccess;
然后rank 10执行send connect过程,Info为rank 9的信息,remDevMem就是刚刚rank 9分配的显存,如果read为0,则需要设置conn的direct,接下来设置conn的buff,如果read为1,buff为当前卡,否则设置为rank 9的显存,接下来设置的head,tail用来协调发送端和接收端,下节详细介绍。
static ncclResult_t p2pSendConnect(struct ncclConnect* connectInfo, int nranks, int rank, struct ncclConnector* send)
struct p2pSendResources* resources = (struct p2pSendResources*)send->transportResources;
struct ncclRecvMem* remDevMem;
struct p2pConnectInfo* info = (struct p2pConnectInfo*)connectInfo;
if (info->direct)
remDevMem = (struct ncclRecvMem*)(info->directPtr);
if (info->read == 0) send->conn.direct |= NCCL_DIRECT_GPU;
else
...
int offset = 0;
for (int p=0; p<NCCL_NUM_PROTOCOLS; p++)
if (info->read && p == NCCL_PROTO_SIMPLE)
/* For P2P Read the SIMPLE buffer is local (ncclSendMem) */
send->conn.buffs[p] = resources->devMem->buff;
else
send->conn.buffs[p] = remDevMem->buff + offset;
offset += send->comm->buffSizes[p];
send->conn.tail = &remDevMem->tail;
send->conn.head = &resources->devMem->head;
send->conn.ptrExchange = &resources->devMem->ptrExchange;
return ncclSuccess;
对于recv connect逻辑基本一致
ncclResult_t p2pRecvConnect(struct ncclConnect* connectInfo, int nranks, int rank, struct ncclConnector* recv)
struct p2pRecvResources* resources = (struct p2pRecvResources*)recv->transportResources;
struct ncclSendMem* remDevMem;
struct p2pConnectInfo* info = (struct p2pConnectInfo*)connectInfo;
if (info->direct)
remDevMem = (struct ncclSendMem*)(info->directPtr);
if (info->read == 0)
recv->conn.direct |= NCCL_DIRECT_GPU;
recv->conn.ptrExchange = &remDevMem->ptrExchange;
else
//TRACE_DUMP_IPC(&info->devIpc);
cudaError_t err = cudaIpcOpenMemHandle(&resources->ipcPtr, info->devIpc, cudaIpcMemLazyEnablePeerAccess);
remDevMem = (struct ncclSendMem*)resources->ipcPtr;
if (err != cudaSuccess)
WARN("failed to open CUDA IPC handle : %d %s",
err, cudaGetErrorString(err));
return ncclUnhandledCudaError;
int offset = 0;
for (int p=0; p<NCCL_NUM_PROTOCOLS; p++)
if (info->read && p == NCCL_PROTO_SIMPLE)
/* For P2P Read the SIMPLE buffer is remote (ncclSendMem) */
recv->conn.buffs[p] = remDevMem->buff;
else
recv->conn.buffs[p] = resources->devMem->buff + offset;
offset += recv->comm->buffSizes[p];
recv->conn.tail = &resources->devMem->tail;
recv->conn.head = &remDevMem->head;
return ncclSuccess;
最后简单总结下,建链的过程都是以下过程:
- 接收端 执行recv setup,创建buffer等,将相关信息记录到connectIndo,启动一个监听socket,ip port同样记录到connectInfo,通过bootstrap发送connectInfo到 发送端。
- 发送端 执行send setup,创建buffer等,将相关信息记录到connectInfo,然后发送给 接收端。这一步rdma场景没有用到connectInfo。
- 发送端 接受到步骤1中 接收端 的信息,然后建立 发送端 到 接收端 的链接,p2p场景的话只是简单记录对端buffer,rdma场景的话需要初始化qp到INIT状态。
- 接收端 接受到步骤2中send发送的信息,然后建立 接收端 到 发送端 的链接,p2p场景还是记录对端buffer,rdma场景需要初始化qp到RTS状态,将本端的qp信息发送回对端。
- 如果rdma场景的话,发送端 还需接收对端的qp状态初始化本端的qp到RTS状态。
以上是关于NVIDIA NCCL 源码学习- 数据通信链路transport的建立的主要内容,如果未能解决你的问题,请参考以下文章
NVIDIA NCCL 源码学习- 单机内ncclSend和ncclRecv的过程
NVIDIA NCCL 源码学习- 单机内ncclSend和ncclRecv的过程