用java向hdfs上传文件时,如何实现断点续传

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了用java向hdfs上传文件时,如何实现断点续传相关的知识,希望对你有一定的参考价值。

如题,在向hdfs上传文件时,如果hdfs上有这个文件,判断该文件的大小和字节数,然后只上传该文件没有的部分。请问如何实现,最好附上完整代码,分不多,谢谢。

@Component("javaLargeFileUploaderServlet")
@WebServlet(name = "javaLargeFileUploaderServlet", urlPatterns = "/javaLargeFileUploaderServlet" )
public class UploadServlet extends HttpRequestHandlerServlet
implements HttpRequestHandler

private static final Logger log = LoggerFactory.getLogger(UploadServlet.class);

@Autowired
UploadProcessor uploadProcessor;

@Autowired
FileUploaderHelper fileUploaderHelper;

@Autowired
ExceptionCodeMappingHelper exceptionCodeMappingHelper;

@Autowired
Authorizer authorizer;

@Autowired
StaticStateIdentifierManager staticStateIdentifierManager;

@Override
public void handleRequest(HttpServletRequest request, HttpServletResponse response)
throws IOException
log.trace("Handling request");

Serializable jsonObject = null;
try
// extract the action from the request
UploadServletAction actionByParameterName =
UploadServletAction.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.action));

// check authorization
checkAuthorization(request, actionByParameterName);

// then process the asked action
jsonObject = processAction(actionByParameterName, request);

// if something has to be written to the response
if (jsonObject != null)
fileUploaderHelper.writeToResponse(jsonObject, response);



// If exception, write it
catch (Exception e)
exceptionCodeMappingHelper.processException(e, response);




private void checkAuthorization(HttpServletRequest request, UploadServletAction actionByParameterName)
throws MissingParameterException, AuthorizationException

// check authorization
// if its not get progress (because we do not really care about authorization for get
// progress and it uses an array of file ids)
if (!actionByParameterName.equals(UploadServletAction.getProgress))

// extract uuid
final String fileIdFieldValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId, false);

// if this is init, the identifier is the one in parameter
UUID clientOrJobId;
String parameter = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false);
if (actionByParameterName.equals(UploadServletAction.getConfig) && parameter != null)
clientOrJobId = UUID.fromString(parameter);

// if not, get it from manager
else
clientOrJobId = staticStateIdentifierManager.getIdentifier();


// call authorizer
authorizer.getAuthorization(
request,
actionByParameterName,
clientOrJobId,
fileIdFieldValue != null ? getFileIdsFromString(fileIdFieldValue).toArray(new UUID[] ) : null);




private Serializable processAction(UploadServletAction actionByParameterName, HttpServletRequest request)
throws Exception
log.debug("Processing action " + actionByParameterName.name());

Serializable returnObject = null;
switch (actionByParameterName)
case getConfig:
String parameterValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false);
returnObject =
uploadProcessor.getConfig(
parameterValue != null ? UUID.fromString(parameterValue) : null);
break;
case verifyCrcOfUncheckedPart:
returnObject = verifyCrcOfUncheckedPart(request);
break;
case prepareUpload:
returnObject = prepareUpload(request);
break;
case clearFile:
uploadProcessor.clearFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)));
break;
case clearAll:
uploadProcessor.clearAll();
break;
case pauseFile:
List<UUID> uuids = getFileIdsFromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId));
uploadProcessor.pauseFile(uuids);
break;
case resumeFile:
returnObject =
uploadProcessor.resumeFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)));
break;
case setRate:
uploadProcessor.setUploadRate(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)),
Long.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.rate)));
break;
case getProgress:
returnObject = getProgress(request);
break;

return returnObject;


List<UUID> getFileIdsFromString(String fileIds)
String[] splittedFileIds = fileIds.split(",");
List<UUID> uuids = Lists.newArrayList();
for (int i = 0; i < splittedFileIds.length; i++)
uuids.add(UUID.fromString(splittedFileIds[i]));

return uuids;


private Serializable getProgress(HttpServletRequest request)
throws MissingParameterException
Serializable returnObject;
String[] ids =
new Gson()
.fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId), String[].class);
Collection<UUID> uuids = Collections2.transform(Arrays.asList(ids), new Function<String, UUID>()

@Override
public UUID apply(String input)
return UUID.fromString(input);


);
returnObject = Maps.newHashMap();
for (UUID fileId : uuids)
try
ProgressJson progress = uploadProcessor.getProgress(fileId);
((HashMap<String, ProgressJson>) returnObject).put(fileId.toString(), progress);

catch (FileNotFoundException e)
log.debug("No progress will be retrieved for " + fileId + " because " + e.getMessage());


return returnObject;


private Serializable prepareUpload(HttpServletRequest request)
throws MissingParameterException, IOException

// extract file information
PrepareUploadJson[] fromJson =
new Gson()
.fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.newFiles), PrepareUploadJson[].class);

// prepare them
final HashMap<String, UUID> prepareUpload = uploadProcessor.prepareUpload(fromJson);

// return them
return Maps.newHashMap(Maps.transformValues(prepareUpload, new Function<UUID, String>()

public String apply(UUID input)
return input.toString();
;
));


private Boolean verifyCrcOfUncheckedPart(HttpServletRequest request)
throws IOException, MissingParameterException, FileCorruptedException, FileStillProcessingException
UUID fileId = UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId));
try
uploadProcessor.verifyCrcOfUncheckedPart(fileId,
fileUploaderHelper.getParameterValue(request, UploadServletParameter.crc));

catch (InvalidCrcException e)
// no need to log this exception, a fallback behaviour is defined in the
// throwing method.
// but we need to return something!
return Boolean.FALSE;

return Boolean.TRUE;

追问

我觉得你在逗我,你觉得呢

参考技术A 我是做任务的不用管我

java socket数据流断点续传

设备正在给我发送数据,突然网络断开,服务端检测超时时间,在超时时间内如果没有检测到数据过来,就给设备端发送已个心跳测试,捕捉到异常说明设备已经离线,然后我生成日志文件,记录设备的id和文件的上传路径,等下一次该设备向我发送请求的时候,我判断是上传请求还是续传请求,如果是续传请求,则根据设备id到日志文件里面去匹配,如果能匹配到信息,则获取信息中的上传路径,接着将设备发送过来的数据追加到该路径下面的文件中,如果续传成功,则删除日志文件中改设备的所有信息

现在续传可以实现,但是接收的数据不对,设备记录断线时已经发送的字节,上线后接着发送剩余字节

我没有什么办法可以检测设备断线前给我发送的数据已经全部保存下来了

或者提供更好的断点续传思路,先谢谢大家了

设备往你这边传输数据时,你得知他的设备ID、数据的MD5校验值、操作授权代码。
传输数据开始,设备向你这边传输数据,你这边将受到的数据保存到文件或者数据库。传输过程中,你可能要告诉设备已经保存的数据字节数。
假如网络连接异常,按照你说的思路,发送心跳包检测连接情况。你这边程序将本次操作的数据保存、并将设备ID、数据MD5校验值、操作授权码、已保存的字节数保存到日志。
下次客户端请求续传,你就校验它的设备ID、操作授权代码,然后再告诉它从哪里开始续传,跳过那些字节。
续传完成,通知客户端,续传成功。追问

现在的问题就是,传送到哪个字节设备自己做了记录,并不是我告诉他的,这样就不知道我这断线前保存的数据是不是和设备记录的数据一致

追答

如果设备那边出错,TCP发送数据默认都有缓冲区,交换机之类的网络设备都可能会缓冲设备发来的数据,如果设备自己做记录,有可能将发送失败的字节也累加了,这可能是造成数据不一致的原因。
我想还是服务端做个记录比较妥。客户端也可以和你做个比对。看看丢了多少数据。

追问

这样的话,我的思路是这样的,断网时我这边记录下已经上传的字节数,设备给我发送续传请求,我匹配到有记录,先返回一个带有已上传字节数的报文给设备,然后设备根据那个字节数,继续给我发送数据

我如何记录字节数,保存的数据可能有一二百兆,而且是没有经过协议解析的原始数据,也就是说都是乱码,用字节来计算会有影响么,应该怎么计算呢

追答

按你说应该是字符数据流,但是你如果是按字节来接收发送,两端都是按字节计算,肯定不会出问题。

注意字符的编码一致性。我也不清楚你是按字节来保存还是其他方式的。中间可能都是按字节来接收。

追问

方便加Q么1102488623

追答

已加你。

参考技术A 你的思路很完整,挑不出什么不好的地方,现在的问题就是你要确定你所记录的断线时的已经传了多少的信息是否和已经保存起来的信息一致,先找个简单的txt文件写几行字试试追问

设备端是别人写的,我这里也不好做记录,如果可以的话,+1102488623,我把代码给你看看,如果我这里没有问题的话,那就是设备端的问题了

参考技术B 思路是对的哈,你如果不确定是不是设备那边的问题,可以自己写一个简易的client来试一下

以上是关于用java向hdfs上传文件时,如何实现断点续传的主要内容,如果未能解决你的问题,请参考以下文章

java web断点续传,我用的是fileupload来做的上传。

Java如何实现大文件分片上传,断点续传和秒传

java socket数据流断点续传

如何实现HTML5文件断点续传

4GB以上,超大文件上传,HTTP断点续传,如何实现?

Java Socket如何实现文件的断点续传,有代码更好