当前位置: 代码迷 >> 综合 >> hadoop hdfs 断点续传--下载
  详细解决方案

hadoop hdfs 断点续传--下载

热度:88   发布时间:2023-12-09 19:03:30.0

我们做了一个类似webhdfs的服务,通过rest api存储HDSF上的文件,这两天实现了对hdfs的断点续传的下载,并经应用返回给客户端:

hdfs-->java代理服务-->php应用-->浏览器等客户端


要实现断点续传,读取文件时应该支持offset和length,支持seek方法,而实际上HDFS本身就支持指定偏移量读取文件:

long offset = 1024;
FSDataInputStream in = fs.open(new Path(path));
in.seek(offset);

然后根据length读取相应的长度即可,仅供参考:

long readLength = 0; // 记录已读字节数
int buf_size = 4096;
long length = 0; 
int n = 0;out = new BufferedOutputStream(response.getOutputStream());//HttpServletResponse
while (readLength <= length - buf_size) {// 大部分字节在这里读取n = in.read(buf, 0, buf_size);readLength += buf_size;out.write(buf, 0, n);
}
if (readLength <= length) { // 余下的不足n = in.read(buf, 0, (int) (length - readLength));out.write(buf, 0, n);
}




下面是附件上JAVA写的类似webhdfs的服务片断:

(参考网上的)

  /*** * @param appId* @param path* @param range* @param request* @param response*/@GET@Path("/download")@Produces(MediaType.APPLICATION_OCTET_STREAM)public void download(@QueryParam("app_id") String appId,@QueryParam("path") String path, @QueryParam("range") String range,@Context HttpServletRequest request,@Context HttpServletResponse response) {Map<String, Object> map = null;if (logger.isDebugEnabled()) {logger.debug("path: {}", path);}if (!checkAppId(appId)) {return;}if (null == path || "".equals(path)) {return;}// 查看文件信息map = kdisk.info(appId, path);if ((Integer) map.get("code") != 0) {return;}// 获取下载的范围if (null == range || "".equals(range)) {range = request.getHeader("Range");}OutputStream out = null;FSDataInputStream in = null;try {Object objMap = map.get("data");FileDesc file = (FileDesc) objMap;long fileLength = file.getSize(); // 记录文件大小long pastLength = 0; // 记录已下载文件大小int rangeSwitch = 0;long toLength = fileLength - 1;long contentLength = 0; // 客户端请求的字节总量String rangeBytes = ""; // 记录客户端传来的形如“27000-”或者“27000-39000”的内容int buf_size = 4096;byte buf[] = new byte[buf_size]; // 暂存容器if (range != null) { // 客户端请求的下载的文件块的开始字节rangeBytes = range.replaceAll("bytes=", "");if (rangeBytes.indexOf('-') == rangeBytes.length() - 1) {// bytes=969998336-rangeSwitch = 1;rangeBytes = rangeBytes.substring(0,rangeBytes.indexOf('-'));pastLength = Long.parseLong(rangeBytes.trim());contentLength = fileLength - pastLength; // 客户端请求的是969998336之后的字节} else { // bytes=1275856879-1275877358rangeSwitch = 2;int pos = rangeBytes.indexOf('-');String temp0 = rangeBytes.substring(0, pos);String temp2 = rangeBytes.substring(pos + 1,rangeBytes.length());pastLength = Long.parseLong(temp0.trim()); toLength = Long.parseLong(temp2.trim()); if (toLength  > fileLength - 1) {toLength = fileLength - 1;}contentLength = toLength + 1 - pastLength ; }if (pastLength > fileLength) {return;}		String contentRange = new StringBuffer("bytes ").append(pastLength).append("-").append(toLength).append("/").append(fileLength).toString();response.setHeader("Content-Range", contentRange);response.setStatus(javax.servlet.http.HttpServletResponse.SC_PARTIAL_CONTENT);} else { contentLength = fileLength; }response.reset(); // 重置response.setContentType("application/octet-stream");response.addHeader("Content-Disposition", "attachment;filename=\""+ URLEncoder.encode(file.getName(), "utf-8") + "\"");response.addHeader("Content-Length", String.valueOf(contentLength));// 从HDFS上获得文件流,直接写到输出文件流给用户下载。out = new BufferedOutputStream(response.getOutputStream());in = (FSDataInputStream) kdisk.getFileInputStream(appId, path,pastLength);if (in == null) {if (logger.isDebugEnabled()) {logger.debug("get donwload file fail.");}return;}int n = 0;switch (rangeSwitch) {case 0: case 1:IOUtils.copyBytes(in, out, buf_size);break;case 2: long readLength = 0;while (readLength <= contentLength - buf_size) {n = in.read(buf, 0, buf_size);readLength += buf_size;out.write(buf, 0, n);}if (readLength <= contentLength) { n = in.read(buf, 0, (int) (contentLength - readLength));out.write(buf, 0, n);}break;default:break;}out.flush();} catch (Exception e) {if (logger.isDebugEnabled()) {logger.debug(e.getMessage());}return;} finally {if (out != null) {try {out.close();} catch (IOException e) {if (logger.isDebugEnabled()) {logger.debug(e.getMessage());}}}if (in != null) {try {in.close();} catch (IOException e) {if (logger.isDebugEnabled()) {logger.debug(e.getMessage());}}}}}





下面附上用PHP调用我们自己开发的类似webhdfs的服务片断,并返回给客户端如浏览器:

        /*** 下载文件。* 可以通过浏览器下载。* 支持断点续传下载。通过设置头信息:$_SERVER ['HTTP_RANGE']* * @param string $remote_path* @param string $file_name*/static function download_client($remote_path, $file_name) {$file_info = self::info ( $remote_path );if (! $file_info) {return false;}// 输入文件标签$speed = 20000;$chunk = 16384;$sleep = $speed ? floor ( ($chunk / ($speed * 1024)) * 500000 ) : 0;$sent = 0;ob_end_clean ();$file_mime = 'application/octet-stream';header ( 'Cache-Control: max-age=86400' );header ( 'Expires: ' . gmdate ( 'D, d M Y H:i:s \G\M\T', time () + 86400 ) );header ( 'Content-Type: ' . $file_mime );$size = $file_info ['size'];$offset = 0;$length = $size;$end_pos = $size - 1;$range = '';$req = array ();if (isset ( $_SERVER ['HTTP_RANGE'] )) {$range = $_SERVER ['HTTP_RANGE'];$req ['range'] = $range;list ( $a, $range ) = explode ( "=", $_SERVER ['HTTP_RANGE'] );//0-1023或者1024-,以0为开始的下标,0-1023表示要下载前面1024个字节。$range = explode ( '-', $range );if (isset ( $range [0] ) && $range [0]) {$offset = $range [0];}if (isset ( $range [1] ) && $range [1]) {$end_pos = $range [1];}//str_replace ( $range, "-", $range );//$size2 = $size - 1;$length = ($end_pos + 1) - $offset;header ( "HTTP/1.1 206 Partial Content" );header ( "Content-Length: $length" );header ( "Content-Range: bytes $offset-$end_pos/$size" );} else { //第一次连接 header ( "Content-Length: " . $size ); //输出总长 }//header ( "Content-Length: " . $file_info ['size'] );header ( 'Content-Transfer-Encoding: binary' );header ( 'Content-Encoding: none' );$ua = isset ( $_SERVER ["HTTP_USER_AGENT"] ) ? $_SERVER ["HTTP_USER_AGENT"] : '';$encoded_filename = urlencode ( $file_name );$encoded_filename = str_replace ( "+", "%20", $encoded_filename );if (preg_match ( "/MSIE/", $ua )) {header ( 'Content-Disposition: attachment; filename="' . $encoded_filename . '"' );} else if (preg_match ( "/Firefox/", $ua )) {header ( 'Content-Disposition: attachment; filename*="utf8\'\'' . $file_name . '"' );} else {header ( 'Content-Disposition: attachment; filename="' . $file_name . '"' );}ob_start ();$remote_file = KDiskServer::open ( $remote_path, $req );if (! $remote_file) {return false;}//输出文件内容try {set_time_limit ( 0 );while ( ! feof ( $remote_file ) && connection_status () == 0 ) {//reset time limit for big filesecho (fread ( $remote_file, $chunk ));ob_flush ();flush ();}$info = stream_get_meta_data ( $remote_file );fclose ( $remote_file );if ($info ['timed_out']) {return false;} else {return true;}} catch ( Exception $e ) {fclose ( $remote_file );return false;}}



  相关解决方案