Dcm4che CFind SCU源代码分析

Posted 恒哥的爸爸

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dcm4che CFind SCU源代码分析相关的知识,希望对你有一定的参考价值。

1 介绍几个测试程序

DVTk · Downloads

http://www.dcm4che.org/maven2/ 

https://github.com/dvtk-org/DVTk

2 Dcm4che基本类概念

   CEcho  CFind  CMove的客户角色代码,可以从资源内进行下载。

Device 、Application Entity、Connect类静态关系 

2.1 Device设备,Device The description of the device,自己理解即为一个当前虚拟设备实体的描述,内部可以包含多个应用实体概念和多个连接信息。而这些实体和连接之间可以随意组合。

2.2ApplicationEntity:  Network AE  The description of the network application entity   

2.3 Connnect: Network Connection The description of the network interface,描述网络连接类,有绑定ip,端口port,超时时间等

2.4 PresentationContext  (Transfer Capability)  The description of the SOP classes and syntaxes supported by a Network AE.  内部还有多个表示上下文,每个表示上下文是由一个抽象语法(本地通讯的目的,例如Patient Root Find,Modality Worklist)和与这个抽象语法相对应的多个传输语法组成。

public class PresentationContext {
    private final int pcid;       //表示当前的抽象语法
    private final String[] tss;   // 表示支持的传输语法
}

一般在创建客户端请求连接时,ApplicationEntity和PresentationContext是一一对应的。如下代码,是一个CFind SCU的设置表示上下文的代码;

    public final void setInformationModel(InformationModel model, String[] tss) {
       this.model = model;
       rq.addPresentationContext(new PresentationContext(1, model.cuid, tss));
       if (model.level != null)
           addLevel(model.level);

    }

在创建服务端ApplicationEntity的时候,可以不限制上下文

ae.addTransferCapability(new TransferCapability(null, "*", TransferCapability.Role.SCP, "*"));

3  Association动态交互过程

       C-Find SCU的代码,可以直接在CSDN的资源内进行下载。

        根据我方SCU角色的ApplicationEntity和对方SCP角色的ApplicationEntity的信息,创建生成Association,代表当前此次的动态连接(内部包含一个socket),此类对象保持当前通讯的整个生命周期。在此生命周期内,整个通讯的时序,是通过当前Association对象中的state来标识时序的状态,这些状态决定下一步将代码如何运行。现在本地AEntity、本地Connect、远程AEntity、远程Connect、以及本地表示上下文,都已初始化完毕,接下来,将进行连接,创建连接对象Association。以下代码块1,描述的是连接过程。

public Association ApplicationEntity.connect(Connection local, Connection remote, AAssociateRQ rq)
            throws IOException, InterruptedException, IncompatibleConnectionException, GeneralSecurityException {
        checkDevice();
        checkInstalled();
        if (rq.getCallingAET() == null)
            rq.setCallingAET(getCallingAETitle(rq.getCalledAET()));
        rq.setMaxOpsInvoked(local.getMaxOpsInvoked());
        rq.setMaxOpsPerformed(local.getMaxOpsPerformed());
        rq.setMaxPDULength(local.getReceivePDULength());
        Socket sock = local.connect(remote);   //本地的Connect去连接远程Connect,如果成功,将返回Socket
        AssociationMonitor monitor = device.getAssociationMonitor();
        Association as = null;
        try {
           // 利用socket生成Association,并且启动线程,在线程内处理,对服务端返回的信息,
            as = new Association(this, local, sock); 
           // 将AAssociateRQ的信息发送到对方SCP角色,进行协议协商,并且等待状态发生变化
            as.write(rq);             
           // 当前的state状态是State5:Awaiting A-ASSOCIATE-AC or A-ASSOCIATE-RJ PDU,在此状态下,Association启动的线程,对SCP返回的信息进行解析,并且对状态进行改变,这里利用的是synchronized wait的方法,实现两个线程的同步。 当状态从State5离开的时候,当前线程继续向下执行。          
            as.waitForLeaving(State.Sta5);        
            if (monitor != null)
                monitor.onAssociationEstablished(as);
            return as;
        } catch (InterruptedException | IOException e) {
            SafeClose.close(sock);
            if (as != null && monitor != null)
                monitor.onAssociationFailed(as, e);
            throw e;
        }
    }

       在Association类的构造函数中,最后调用activate函数,启动线程,此线程是从线程池中获取的,这个线程很重要,起监听从SCP发送来的信息结构,并且按照信息类型,分发到不同的处理函数中去处理。下边代码块2,可重点注意线程启动的过程;

Association(ApplicationEntity ae, Connection local, Socket sock)
            throws IOException {
        this.connectTime = System.currentTimeMillis();
        this.serialNo = prevSerialNo.incrementAndGet();
        this.ae = ae;    //如果是客户端,ae是有值的,如果是SCP,只有被动连接后,才会动态的生成ae(ApplicationEntity)
        this.requestor = ae != null;
        this.name = "" + sock.getLocalSocketAddress()
             + delim() + sock.getRemoteSocketAddress()
             + '(' + serialNo + ')';
        this.conn = local;
        this.device = local.getDevice();
        this.monitor = device.getAssociationMonitor();
        this.sock = sock;
        this.in = sock.getInputStream();
        this.out = sock.getOutputStream();
        this.encoder = new PDUEncoder(this, out);
        if (requestor) {
            enterState(State.Sta4);
        } else {
            enterState(State.Sta2);
            startRequestTimeout();
        }
        activate(); //启动线程,进行发送ASSOCIATE_RQ信息
    }

    private void activate() {
        device.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    decoder = new PDUDecoder(Association.this, in);
                    device.addAssociation(Association.this);
                    while (!(state == State.Sta1 || state == State.Sta13))
                        decoder.nextPDU();
                } catch (AAbort aa) {
                    abort(aa);
                } catch (IOException e) {
                    onIOException(e);
                } catch (Exception e) {
                    onIOException(new IOException("Unexpected Error", e));
                } finally {
                    device.removeAssociation(Association.this);
                    onClose();
                }
            }
        });
    }

        继续查看代码,以下代码块3,是根据ACSE的不同的类型,将接受的信息体,分发到不同的函数中去执行的分发逻辑。

public void PDUDecoder.nextPDU() throws IOException {
        checkThread();
        Association.LOG.trace("{}: waiting for PDU", as);
        readFully(0, 10);
        pos = 0;
        pdutype = get();
        get();
        pdulen = getInt();
        Association.LOG.trace("{} >> PDU[type={}, len={}]",
                new Object[] { as, pdutype, pdulen & 0xFFFFFFFFL });
        switch (pdutype) {
        case PDUType.A_ASSOCIATE_RQ:
            readPDU();
            as.onAAssociateRQ((AAssociateRQ) decode(new AAssociateRQ()));
            return;
        case PDUType.A_ASSOCIATE_AC:
            readPDU();
            as.onAAssociateAC((AAssociateAC) decode(new AAssociateAC()));
            return;
        case PDUType.P_DATA_TF:
            readPDU();
            as.onPDataTF();
            return;
        case PDUType.A_ASSOCIATE_RJ:
            checkPDULength(4);
            get();
            as.onAAssociateRJ(new AAssociateRJ(get(), get(), get()));
            break;
        case PDUType.A_RELEASE_RQ:
            checkPDULength(4);
            as.onAReleaseRQ();
            break;
        case PDUType.A_RELEASE_RP:
            checkPDULength(4);
            as.onAReleaseRP();
            break;
        case PDUType.A_ABORT:
            checkPDULength(4);
            get();
            get();
            as.onAAbort(new AAbort(get(), get()));
            break;
        default:
            abort(AAbort.UNRECOGNIZED_PDU, UNRECOGNIZED_PDU);
        }
    }

3.1 A-ASSOCIATE过程         

        调用as.write(rq); 发送连接请求,等待SCP的返回协商后结果数据。在当前示例中,整个通讯都是成功的过程,所以,首先返回的通讯类型是A_ASSOCIATE_AC的PDU类型,在处理中,如下代码所示。

void onAAssociateAC(AAssociateAC ac) throws IOException {
        LOG.info("{} >> A-ASSOCIATE-AC", name);
        LOG.debug("{}", ac);
        stopTimeout();
        state.onAAssociateAC(this, ac); //此时的State是状态5状态:Sta5 - Awaiting A-ASSOCIATE-AC or A-ASSOCIATE-RJ PDU
    }
//下段代码段是截取State枚举类型中的代码
    Sta3("Sta3 - Awaiting local A-ASSOCIATE response primitive"),
    Sta4("Sta4 - Awaiting transport connection opening to complete"),
    Sta5("Sta5 - Awaiting A-ASSOCIATE-AC or A-ASSOCIATE-RJ PDU") {
        @Override
        void onAAssociateAC(Association as, AAssociateAC ac)
                throws IOException {
            as.handle(ac);
        }
        @Override
        void onAAssociateRJ(Association as, AAssociateRJ rj)
                throws IOException {
            as.handle(rj);
        }
    },

          首先,将服务端可以提供的表示上下文保存到Association.pcMap中, 注意这里的pcMap是一个HashMap<String,HashMap<String,PresentationContext>>结构,第一个Map中的Key是抽象语法对应的UID,第二个HashMap的Key是当前抽象语法对应的传输语法的UID;其次,Association将离开状态State5,进入状态State6,state6表示Association established and ready for data transfer;dicom通讯中,很重要的概念。这里枚举出来的是其中一部分状态,这里用了一个很好的语法,通过这些枚举值来代表各种不同的状态机状态,并且实现了针对每种不同的状态的多态方式实现。

   void handle(AAssociateAC ac) throws IOException {
        this.ac = ac;
        initPCMap();
        maxOpsInvoked = ac.getMaxOpsInvoked();
        maxPDULength = Association.minZeroAsMax(
                ac.getMaxPDULength(), conn.getSendPDULength());
        enterState(State.Sta6);状态5状态:Sta5 - Awaiting A-ASSOCIATE-AC or A-ASSOCIATE-RJ PDU; 进入状态6 Association established and ready for data transfer
        startIdleTimeout();
    }

    private void initPCMap() {
        for (PresentationContext pc : ac.getPresentationContexts())
            if (pc.isAccepted()) {
                PresentationContext rqpc = rq.getPresentationContext(pc.getPCID());
                if (rqpc != null)
                    initTSMap(rqpc.getAbstractSyntax()).put(pc.getTransferSyntax(), pc);
                else
                    LOG.info("{}: Ignore unexpected {} in A-ASSOCIATE-AC", name, pc);
            }
    }
    private HashMap<String, PresentationContext> initTSMap(String as) {
        HashMap<String, PresentationContext> tsMap = pcMap.get(as);
        if (tsMap == null)
            pcMap.put(as, tsMap = new HashMap<String, PresentationContext>());
        return tsMap;
    }

3.2 CFind查询交互阶段(DIMSE) 

       接下来,通讯将进入下一个阶段,将进入表示层DIMSE的交互过程;接下来客户端代码,将直接调用query查询代码。具体的详细过程,如下代码块4 所示。

private void query(Attributes keys) throws IOException, InterruptedException {
         DimseRSPHandler rspHandler = new DimseRSPHandler(as.nextMessageID()) { 
            int cancelAfter = WlyFindSCU.this.cancelAfter;
            int numMatches;
            @Override
            public void onDimseRSP(Association as, Attributes cmd,Attributes data) {
                super.onDimseRSP(as, cmd, data);
                int status = cmd.getInt(Tag.Status, -1);
                if (Status.isPending(status)) {
                    WlyFindSCU.this.onResult(data);
                    ++numMatches;
                    if (cancelAfter != 0 && numMatches >= cancelAfter)
                        try {
                            cancel(as);
                            cancelAfter = 0;
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                }
            }
        };
        this.association.cfind(model.cuid, priority, keys, null, rspHandler);
    }

      首先,创建一个回调类,在回调类中,要注意,as.nextMessageID()这句代码,这里是定义当前回调类将要响应的Message ID;也就是说,从服务端返回的命令类型PDV内,Message ID(0X0000,0120)的信息,标识着是针对哪个请求Message ID而做出的响应。返回信息处理线程,还是Association中启动的线程,在代码块2中所示的线程来处理。此时,如果服务端有返回信息,那么,数据还是由PDUDecoder.nextPDU(),来进行解析。解析完毕后,由于当前的状态为State6状态,根据代码块3中所示,当前PDU类型是P_DATA_TF类型,那么,接下来会调用Association.handlePDataTF,一直调试代码,可以看到,这里的处理方式要比处理A-ASSOCIATE-AQ要复杂些,这是因为,PDV又能分为数据类型和命令类型,并且,会有当前信息体的状态Status,来标识当前回复是否结束,还是Pending状态。

       这里,我们只看回复的信息的Status状态是完成状态,程序的处理方式,详细可以查看下边代码块5,这里,对于最后的完成状态最重要的处理就是,移除Message ID对应的RspHandler处理函数,并且,通知其他rspHandlerForMsgId 同步区代码,进入就绪状态。

void handlePDataTF() throws IOException {
     decoder.decodeDIMSE();
}
public void PDUDecoder.decodeDIMSE() throws IOException {
        checkThread();
        if (pcid != - 1)
            return; // already inside decodeDIMSE
        nextPDV(PDVType.COMMAND, -1);
        PresentationContext pc = as.getPresentationContext(pcid);//获取传输语法进行解码
        if (pc == null) {
            throw new AAbort();
        }
        if (!pc.isAccepted()) {
            throw new AAbort();
        }
        Attributes cmd = readCommand();
        Dimse dimse = dimseOf(cmd);
        String tsuid = pc.getTransferSyntax();
        if (dimse == Dimse.C_CANCEL_RQ) {
            as.onCancelRQ(cmd);
        } else if (Commands.hasDataset(cmd)) {    //是否有数据类型
            nextPDV(PDVType.DATA, pcid);
            if (dimse.isRSP()) {
                Attributes data = readDataset(tsuid);
                if (Dimse.LOG.isDebugEnabled()) {
                    Dimse.LOG.debug("{} >> {} Dataset:\\n{}", as, dimse.toString(cmd), data);
                }
                as.onDimseRSP(dimse, cmd, data);
            } else {
                if (Dimse.LOG.isDebugEnabled()) {
                    Dimse.LOG.debug("{} >> {} Dataset receiving...", as, dimse.toString(cmd));
                }
                as.onDimseRQ(pc, dimse, cmd, this);
                long skipped = skipAll();
                if (skipped > 0)
                    Association.LOG.debug(
                        "{}: Service User did not consume {} bytes of DIMSE data.",
                        as, skipped);
            }
        } else {
            if (dimse.isRSP()) {
                as.onDimseRSP(dimse, cmd, null);   //如果是命令类型
            } else {
                as.onDimseRQ(pc, dimse, cmd, null);
            }
        }
        pcid = -1;
    }    

    void Association.onDimseRSP(Dimse dimse, Attributes cmd, Attributes data) throws AAbort {
        int msgId = cmd.getInt(Tag.MessageIDBeingRespondedTo, -1);
        int status = cmd.getInt(Tag.Status, 0);
        boolean pending = Status.isPending(status);
        DimseRSPHandler rspHandler = getDimseRSPHandler(msgId);
        if (rspHandler == null) {
            Dimse.LOG.info("{}: unexpected message ID in DIMSE RSP:", name);
            Dimse.LOG.info("\\n{}", cmd);
            throw new AAbort();
        }
        rspHandler.onDimseRSP(this, cmd, data);
        if (pending) {
            if (rspHandler.isStopOnPending())
                startTimeout(msgId, conn.getRetrieveTimeout(),true);
        } else {
            incReceivedCount(dimse);
            removeDimseRSPHandler(msgId);
            if (rspHandlerForMsgId.isEmpty() && performing == 0)
                startIdleOrReleaseTimeout();
        }
    }

    private DimseRSPHandler Association.removeDimseRSPHandler(int msgId) {
        synchronized (rspHandlerForMsgId ) {
            DimseRSPHandler tmp = rspHandlerForMsgId.remove(msgId); //将message id对应的RSPHandler进行移除
            tmp.stopTimeout(this);
            rspHandlerForMsgId.notifyAll();                        //通知其他线程进入就绪状态;
            return tmp;
        }
    }

这里,需要重新回到主线程中,因为在调用query后,也就是代码块4,在finally块中,调用了close函数;在close函数中,继续看waitForOutstandingRSP内部代码。代码如下所示,具体的线程同步解释,查看代码上的注释

    public void close() throws IOException, InterruptedException {
        if (as != null && as.isReadyForDataTransfer()) {
            as.waitForOutstandingRSP();
            as.release();
        }
    }
    public void Association.waitForOutstandingRSP() throws InterruptedException { //outstanding 表示还未完成
        synchronized (rspHandlerForMsgId) {
            while (!rspHandlerForMsgId.isEmpty())
                rspHandlerForMsgId.wait(); //  调用此函数后,主线程将进入等待唤醒状态。
                                           //  一旦其他线程发出notify或者notifyAll的信号,
                                           // 主线程将再次进入就绪状态,
                                           // 并且判断rspHandlerForMsgId中,是否有继续处理的任务,直接退出循环;
        }
    }

3.3 A-RELEASE阶段

      一旦主线程再次进入就绪状态后,真个通讯流程就进入了第三个A-RELEASE过程。主线程就直接调用release函数

    public void release() throws IOException {
        state.writeAReleaseRQ(this); //当前状态是State6
    } 

     
    Sta6("Sta6 - Association established and ready for data transfer") {
        @Override
        void writeAReleaseRQ(Association as) throws IOException {
            as.writeAReleaseRQ();
        }


    void Association.writeAReleaseRQ() throws IOException {
        enterState(State.Sta7); Sta7 - Awaiting A-RELEASE-RP PDU
        stopTimeout();
        encoder.writeAReleaseRQ();
        startReleaseTimeout();
    }

    当服务角色SCP使用A-Release-rq的信息体,回复了客户角色A-Release-rq请求后,可以参考代码块3,客户端关闭socket,并且state进入1状态(Idle)

   void onAReleaseRP() throws IOException {
        LOG.info("{} >> A-RELEASE-RP", name);
        stopTimeout();
        state.onAReleaseRP(this); //当前state 为7 ,Awaiting A-RELEASE-RP PDU
    }

    void doCloseSocket() {
        LOG.info("{}: close {}", name, sock);
        SafeClose.close(sock);
        enterState(State.Sta1); //state 变为1 Idle
    }

以上是关于Dcm4che CFind SCU源代码分析的主要内容,如果未能解决你的问题,请参考以下文章

Dcm4che CFind SCU源代码分析

一个简单的时间片轮转内核代码的分析(课程作业)

Linux内核分析:完成一个简单的时间片轮转多道程序内核代码

Linux内核分析—完成一个简单的时间片轮转多道程序内核代码

第二周:一个简单的时间片轮转多道程序内核代码及分析

dcm4che,WADO相关