Dcm4che CFind SCU源代码分析
Posted 恒哥的爸爸
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dcm4che CFind SCU源代码分析相关的知识,希望对你有一定的参考价值。
1 介绍几个测试程序
http://www.dcm4che.org/maven2/
https://github.com/dvtk-org/DVTk
2 Dcm4che基本类概念
CEcho CFind CMove的客户角色代码,可以从资源内进行下载。
2.1 Device设备,Device The description of the device,自己理解即为一个当前虚拟设备实体的描述,内部可以包含多个应用实体概念和多个连接信息。而这些实体和连接之间可以随意组合。
2.2ApplicationEntity: Network AE The description of the network application entity
2.3 Connnect: Network Connection The description of the network interface,描述网络连接类,有绑定ip,端口port,超时时间等
2.4 PresentationContext (Transfer Capability) The description of the SOP classes and syntaxes supported by a Network AE. 内部还有多个表示上下文,每个表示上下文是由一个抽象语法(本地通讯的目的,例如Patient Root Find,Modality Worklist)和与这个抽象语法相对应的多个传输语法组成。
public class PresentationContext {
private final int pcid; //表示当前的抽象语法
private final String[] tss; // 表示支持的传输语法
}
一般在创建客户端请求连接时,ApplicationEntity和PresentationContext是一一对应的。如下代码,是一个CFind SCU的设置表示上下文的代码;
public final void setInformationModel(InformationModel model, String[] tss) {
this.model = model;
rq.addPresentationContext(new PresentationContext(1, model.cuid, tss));
if (model.level != null)
addLevel(model.level);
}
在创建服务端ApplicationEntity的时候,可以不限制上下文
ae.addTransferCapability(new TransferCapability(null, "*", TransferCapability.Role.SCP, "*"));
3 Association动态交互过程
C-Find SCU的代码,可以直接在CSDN的资源内进行下载。
根据我方SCU角色的ApplicationEntity和对方SCP角色的ApplicationEntity的信息,创建生成Association,代表当前此次的动态连接(内部包含一个socket),此类对象保持当前通讯的整个生命周期。在此生命周期内,整个通讯的时序,是通过当前Association对象中的state来标识时序的状态,这些状态决定下一步将代码如何运行。现在本地AEntity、本地Connect、远程AEntity、远程Connect、以及本地表示上下文,都已初始化完毕,接下来,将进行连接,创建连接对象Association。以下代码块1,描述的是连接过程。
public Association ApplicationEntity.connect(Connection local, Connection remote, AAssociateRQ rq)
throws IOException, InterruptedException, IncompatibleConnectionException, GeneralSecurityException {
checkDevice();
checkInstalled();
if (rq.getCallingAET() == null)
rq.setCallingAET(getCallingAETitle(rq.getCalledAET()));
rq.setMaxOpsInvoked(local.getMaxOpsInvoked());
rq.setMaxOpsPerformed(local.getMaxOpsPerformed());
rq.setMaxPDULength(local.getReceivePDULength());
Socket sock = local.connect(remote); //本地的Connect去连接远程Connect,如果成功,将返回Socket
AssociationMonitor monitor = device.getAssociationMonitor();
Association as = null;
try {
// 利用socket生成Association,并且启动线程,在线程内处理,对服务端返回的信息,
as = new Association(this, local, sock);
// 将AAssociateRQ的信息发送到对方SCP角色,进行协议协商,并且等待状态发生变化
as.write(rq);
// 当前的state状态是State5:Awaiting A-ASSOCIATE-AC or A-ASSOCIATE-RJ PDU,在此状态下,Association启动的线程,对SCP返回的信息进行解析,并且对状态进行改变,这里利用的是synchronized wait的方法,实现两个线程的同步。 当状态从State5离开的时候,当前线程继续向下执行。
as.waitForLeaving(State.Sta5);
if (monitor != null)
monitor.onAssociationEstablished(as);
return as;
} catch (InterruptedException | IOException e) {
SafeClose.close(sock);
if (as != null && monitor != null)
monitor.onAssociationFailed(as, e);
throw e;
}
}
在Association类的构造函数中,最后调用activate函数,启动线程,此线程是从线程池中获取的,这个线程很重要,起监听从SCP发送来的信息结构,并且按照信息类型,分发到不同的处理函数中去处理。下边代码块2,可重点注意线程启动的过程;
Association(ApplicationEntity ae, Connection local, Socket sock)
throws IOException {
this.connectTime = System.currentTimeMillis();
this.serialNo = prevSerialNo.incrementAndGet();
this.ae = ae; //如果是客户端,ae是有值的,如果是SCP,只有被动连接后,才会动态的生成ae(ApplicationEntity)
this.requestor = ae != null;
this.name = "" + sock.getLocalSocketAddress()
+ delim() + sock.getRemoteSocketAddress()
+ '(' + serialNo + ')';
this.conn = local;
this.device = local.getDevice();
this.monitor = device.getAssociationMonitor();
this.sock = sock;
this.in = sock.getInputStream();
this.out = sock.getOutputStream();
this.encoder = new PDUEncoder(this, out);
if (requestor) {
enterState(State.Sta4);
} else {
enterState(State.Sta2);
startRequestTimeout();
}
activate(); //启动线程,进行发送ASSOCIATE_RQ信息
}
private void activate() {
device.execute(new Runnable() {
@Override
public void run() {
try {
decoder = new PDUDecoder(Association.this, in);
device.addAssociation(Association.this);
while (!(state == State.Sta1 || state == State.Sta13))
decoder.nextPDU();
} catch (AAbort aa) {
abort(aa);
} catch (IOException e) {
onIOException(e);
} catch (Exception e) {
onIOException(new IOException("Unexpected Error", e));
} finally {
device.removeAssociation(Association.this);
onClose();
}
}
});
}
继续查看代码,以下代码块3,是根据ACSE的不同的类型,将接受的信息体,分发到不同的函数中去执行的分发逻辑。
public void PDUDecoder.nextPDU() throws IOException {
checkThread();
Association.LOG.trace("{}: waiting for PDU", as);
readFully(0, 10);
pos = 0;
pdutype = get();
get();
pdulen = getInt();
Association.LOG.trace("{} >> PDU[type={}, len={}]",
new Object[] { as, pdutype, pdulen & 0xFFFFFFFFL });
switch (pdutype) {
case PDUType.A_ASSOCIATE_RQ:
readPDU();
as.onAAssociateRQ((AAssociateRQ) decode(new AAssociateRQ()));
return;
case PDUType.A_ASSOCIATE_AC:
readPDU();
as.onAAssociateAC((AAssociateAC) decode(new AAssociateAC()));
return;
case PDUType.P_DATA_TF:
readPDU();
as.onPDataTF();
return;
case PDUType.A_ASSOCIATE_RJ:
checkPDULength(4);
get();
as.onAAssociateRJ(new AAssociateRJ(get(), get(), get()));
break;
case PDUType.A_RELEASE_RQ:
checkPDULength(4);
as.onAReleaseRQ();
break;
case PDUType.A_RELEASE_RP:
checkPDULength(4);
as.onAReleaseRP();
break;
case PDUType.A_ABORT:
checkPDULength(4);
get();
get();
as.onAAbort(new AAbort(get(), get()));
break;
default:
abort(AAbort.UNRECOGNIZED_PDU, UNRECOGNIZED_PDU);
}
}
3.1 A-ASSOCIATE过程
调用as.write(rq); 发送连接请求,等待SCP的返回协商后结果数据。在当前示例中,整个通讯都是成功的过程,所以,首先返回的通讯类型是A_ASSOCIATE_AC的PDU类型,在处理中,如下代码所示。
void onAAssociateAC(AAssociateAC ac) throws IOException {
LOG.info("{} >> A-ASSOCIATE-AC", name);
LOG.debug("{}", ac);
stopTimeout();
state.onAAssociateAC(this, ac); //此时的State是状态5状态:Sta5 - Awaiting A-ASSOCIATE-AC or A-ASSOCIATE-RJ PDU
}
//下段代码段是截取State枚举类型中的代码
Sta3("Sta3 - Awaiting local A-ASSOCIATE response primitive"),
Sta4("Sta4 - Awaiting transport connection opening to complete"),
Sta5("Sta5 - Awaiting A-ASSOCIATE-AC or A-ASSOCIATE-RJ PDU") {
@Override
void onAAssociateAC(Association as, AAssociateAC ac)
throws IOException {
as.handle(ac);
}
@Override
void onAAssociateRJ(Association as, AAssociateRJ rj)
throws IOException {
as.handle(rj);
}
},
首先,将服务端可以提供的表示上下文保存到Association.pcMap中, 注意这里的pcMap是一个HashMap<String,HashMap<String,PresentationContext>>结构,第一个Map中的Key是抽象语法对应的UID,第二个HashMap的Key是当前抽象语法对应的传输语法的UID;其次,Association将离开状态State5,进入状态State6,state6表示Association established and ready for data transfer;dicom通讯中,很重要的概念。这里枚举出来的是其中一部分状态,这里用了一个很好的语法,通过这些枚举值来代表各种不同的状态机状态,并且实现了针对每种不同的状态的多态方式实现。
void handle(AAssociateAC ac) throws IOException {
this.ac = ac;
initPCMap();
maxOpsInvoked = ac.getMaxOpsInvoked();
maxPDULength = Association.minZeroAsMax(
ac.getMaxPDULength(), conn.getSendPDULength());
enterState(State.Sta6);状态5状态:Sta5 - Awaiting A-ASSOCIATE-AC or A-ASSOCIATE-RJ PDU; 进入状态6 Association established and ready for data transfer
startIdleTimeout();
}
private void initPCMap() {
for (PresentationContext pc : ac.getPresentationContexts())
if (pc.isAccepted()) {
PresentationContext rqpc = rq.getPresentationContext(pc.getPCID());
if (rqpc != null)
initTSMap(rqpc.getAbstractSyntax()).put(pc.getTransferSyntax(), pc);
else
LOG.info("{}: Ignore unexpected {} in A-ASSOCIATE-AC", name, pc);
}
}
private HashMap<String, PresentationContext> initTSMap(String as) {
HashMap<String, PresentationContext> tsMap = pcMap.get(as);
if (tsMap == null)
pcMap.put(as, tsMap = new HashMap<String, PresentationContext>());
return tsMap;
}
3.2 CFind查询交互阶段(DIMSE)
接下来,通讯将进入下一个阶段,将进入表示层DIMSE的交互过程;接下来客户端代码,将直接调用query查询代码。具体的详细过程,如下代码块4 所示。
private void query(Attributes keys) throws IOException, InterruptedException {
DimseRSPHandler rspHandler = new DimseRSPHandler(as.nextMessageID()) {
int cancelAfter = WlyFindSCU.this.cancelAfter;
int numMatches;
@Override
public void onDimseRSP(Association as, Attributes cmd,Attributes data) {
super.onDimseRSP(as, cmd, data);
int status = cmd.getInt(Tag.Status, -1);
if (Status.isPending(status)) {
WlyFindSCU.this.onResult(data);
++numMatches;
if (cancelAfter != 0 && numMatches >= cancelAfter)
try {
cancel(as);
cancelAfter = 0;
} catch (IOException e) {
e.printStackTrace();
}
}
}
};
this.association.cfind(model.cuid, priority, keys, null, rspHandler);
}
首先,创建一个回调类,在回调类中,要注意,as.nextMessageID()这句代码,这里是定义当前回调类将要响应的Message ID;也就是说,从服务端返回的命令类型PDV内,Message ID(0X0000,0120)的信息,标识着是针对哪个请求Message ID而做出的响应。返回信息处理线程,还是Association中启动的线程,在代码块2中所示的线程来处理。此时,如果服务端有返回信息,那么,数据还是由PDUDecoder.nextPDU(),来进行解析。解析完毕后,由于当前的状态为State6状态,根据代码块3中所示,当前PDU类型是P_DATA_TF类型,那么,接下来会调用Association.handlePDataTF,一直调试代码,可以看到,这里的处理方式要比处理A-ASSOCIATE-AQ要复杂些,这是因为,PDV又能分为数据类型和命令类型,并且,会有当前信息体的状态Status,来标识当前回复是否结束,还是Pending状态。
这里,我们只看回复的信息的Status状态是完成状态,程序的处理方式,详细可以查看下边代码块5,这里,对于最后的完成状态最重要的处理就是,移除Message ID对应的RspHandler处理函数,并且,通知其他rspHandlerForMsgId 同步区代码,进入就绪状态。
void handlePDataTF() throws IOException {
decoder.decodeDIMSE();
}
public void PDUDecoder.decodeDIMSE() throws IOException {
checkThread();
if (pcid != - 1)
return; // already inside decodeDIMSE
nextPDV(PDVType.COMMAND, -1);
PresentationContext pc = as.getPresentationContext(pcid);//获取传输语法进行解码
if (pc == null) {
throw new AAbort();
}
if (!pc.isAccepted()) {
throw new AAbort();
}
Attributes cmd = readCommand();
Dimse dimse = dimseOf(cmd);
String tsuid = pc.getTransferSyntax();
if (dimse == Dimse.C_CANCEL_RQ) {
as.onCancelRQ(cmd);
} else if (Commands.hasDataset(cmd)) { //是否有数据类型
nextPDV(PDVType.DATA, pcid);
if (dimse.isRSP()) {
Attributes data = readDataset(tsuid);
if (Dimse.LOG.isDebugEnabled()) {
Dimse.LOG.debug("{} >> {} Dataset:\\n{}", as, dimse.toString(cmd), data);
}
as.onDimseRSP(dimse, cmd, data);
} else {
if (Dimse.LOG.isDebugEnabled()) {
Dimse.LOG.debug("{} >> {} Dataset receiving...", as, dimse.toString(cmd));
}
as.onDimseRQ(pc, dimse, cmd, this);
long skipped = skipAll();
if (skipped > 0)
Association.LOG.debug(
"{}: Service User did not consume {} bytes of DIMSE data.",
as, skipped);
}
} else {
if (dimse.isRSP()) {
as.onDimseRSP(dimse, cmd, null); //如果是命令类型
} else {
as.onDimseRQ(pc, dimse, cmd, null);
}
}
pcid = -1;
}
void Association.onDimseRSP(Dimse dimse, Attributes cmd, Attributes data) throws AAbort {
int msgId = cmd.getInt(Tag.MessageIDBeingRespondedTo, -1);
int status = cmd.getInt(Tag.Status, 0);
boolean pending = Status.isPending(status);
DimseRSPHandler rspHandler = getDimseRSPHandler(msgId);
if (rspHandler == null) {
Dimse.LOG.info("{}: unexpected message ID in DIMSE RSP:", name);
Dimse.LOG.info("\\n{}", cmd);
throw new AAbort();
}
rspHandler.onDimseRSP(this, cmd, data);
if (pending) {
if (rspHandler.isStopOnPending())
startTimeout(msgId, conn.getRetrieveTimeout(),true);
} else {
incReceivedCount(dimse);
removeDimseRSPHandler(msgId);
if (rspHandlerForMsgId.isEmpty() && performing == 0)
startIdleOrReleaseTimeout();
}
}
private DimseRSPHandler Association.removeDimseRSPHandler(int msgId) {
synchronized (rspHandlerForMsgId ) {
DimseRSPHandler tmp = rspHandlerForMsgId.remove(msgId); //将message id对应的RSPHandler进行移除
tmp.stopTimeout(this);
rspHandlerForMsgId.notifyAll(); //通知其他线程进入就绪状态;
return tmp;
}
}
这里,需要重新回到主线程中,因为在调用query后,也就是代码块4,在finally块中,调用了close函数;在close函数中,继续看waitForOutstandingRSP内部代码。代码如下所示,具体的线程同步解释,查看代码上的注释
public void close() throws IOException, InterruptedException {
if (as != null && as.isReadyForDataTransfer()) {
as.waitForOutstandingRSP();
as.release();
}
}
public void Association.waitForOutstandingRSP() throws InterruptedException { //outstanding 表示还未完成
synchronized (rspHandlerForMsgId) {
while (!rspHandlerForMsgId.isEmpty())
rspHandlerForMsgId.wait(); // 调用此函数后,主线程将进入等待唤醒状态。
// 一旦其他线程发出notify或者notifyAll的信号,
// 主线程将再次进入就绪状态,
// 并且判断rspHandlerForMsgId中,是否有继续处理的任务,直接退出循环;
}
}
3.3 A-RELEASE阶段
一旦主线程再次进入就绪状态后,真个通讯流程就进入了第三个A-RELEASE过程。主线程就直接调用release函数
public void release() throws IOException {
state.writeAReleaseRQ(this); //当前状态是State6
}
Sta6("Sta6 - Association established and ready for data transfer") {
@Override
void writeAReleaseRQ(Association as) throws IOException {
as.writeAReleaseRQ();
}
void Association.writeAReleaseRQ() throws IOException {
enterState(State.Sta7); Sta7 - Awaiting A-RELEASE-RP PDU
stopTimeout();
encoder.writeAReleaseRQ();
startReleaseTimeout();
}
当服务角色SCP使用A-Release-rq的信息体,回复了客户角色A-Release-rq请求后,可以参考代码块3,客户端关闭socket,并且state进入1状态(Idle)
void onAReleaseRP() throws IOException {
LOG.info("{} >> A-RELEASE-RP", name);
stopTimeout();
state.onAReleaseRP(this); //当前state 为7 ,Awaiting A-RELEASE-RP PDU
}
void doCloseSocket() {
LOG.info("{}: close {}", name, sock);
SafeClose.close(sock);
enterState(State.Sta1); //state 变为1 Idle
}
以上是关于Dcm4che CFind SCU源代码分析的主要内容,如果未能解决你的问题,请参考以下文章
Linux内核分析:完成一个简单的时间片轮转多道程序内核代码