当前位置: 代码迷 >> 综合 >> 调用JAVA API 对 HDFS 进行文件的读取、写入、上传、下载、删除等操作
  详细解决方案

调用JAVA API 对 HDFS 进行文件的读取、写入、上传、下载、删除等操作

热度:99   发布时间:2023-11-03 04:55:08.0
Hadoop文件系统 
基本的文件系统命令操作, 通过hadoop fs -help可以获取所有的命令的详细帮助文件。 

Java抽象类org.apache.hadoop.fs.FileSystem定义了hadoop的一个文件系统接口。该类是一个抽象类,通过以下两种静态工厂方法可以过去FileSystem实例: 
public static FileSystem.get(Configuration conf) throws IOException 
public static FileSystem.get(URI uri, Configuration conf) throws IOException
 

具体方法实现: 
1、public boolean mkdirs(Path f) throws IOException  
一次性新建所有目录(包括父目录), f是完整的目录路径。 

2、public FSOutputStream create(Path f) throws IOException  
创建指定path对象的一个文件,返回一个用于写入数据的输出流 
create()有多个重载版本,允许我们指定是否强制覆盖已有的文件、文件备份数量、写入文件缓冲区大小、文件块大小以及文件权限。 

3、public boolean copyFromLocal(Path src, Path dst) throws IOException  
将本地文件拷贝到文件系统 

4、public boolean exists(Path f) throws IOException  
检查文件或目录是否存在 

5、public boolean delete(Path f, Boolean recursive)  
永久性删除指定的文件或目录,如果f是一个空目录或者文件,那么recursive的值就会被忽略。只有recursive=true时,一个非空目录及其内容才会被删除。 

6、FileStatus类封装了文件系统中文件和目录的元数据,包括文件长度、块大小、备份、修改时间、所有者以及权限信息。


[java] view plain  copy
  1. import org.apache.hadoop.conf.Configuration;  
  2. import org.apache.hadoop.fs.*;  
  3. import org.apache.hadoop.fs.FileSystem;  
  4. import org.apache.hadoop.io.IOUtils;  
  5.   
  6. import java.io.*;  
  7. import java.net.URI;  
  8. import java.net.URISyntaxException;  
  9. import java.util.ArrayList;  
  10. import java.util.List;  
  11.   
  12. public class HdfsOperate {  
  13.     ArrayList<HdfsFile> hdfsfiles;  
  14.   
  15.     public HdfsOperate() {  
  16.         this.hdfsfiles = new ArrayList<HdfsFile>();  
  17.     }  
  18.   
  19.     /** 
  20.      * 获取hdfs路径下的文件列表 
  21.      * 
  22.      * @param srcpath 
  23.      * @return 
  24.      */  
  25.     public String[] getFileList(String srcpath) {  
  26.         try {  
  27.             Configuration conf = new Configuration();  
  28.             Path path = new Path(srcpath);  
  29.             FileSystem fs = path.getFileSystem(conf);  
  30.             List<String> files = new ArrayList<String>();  
  31.             if (fs.exists(path) && fs.isDirectory(path)) {  
  32.                 for (FileStatus status : fs.listStatus(path)) {  
  33.                     files.add(status.getPath().toString());  
  34.                 }  
  35.             }  
  36.             //fs.close();  
  37.             return files.toArray(new String[]{});  
  38.         } catch (IOException e) {  
  39.         } catch (Exception e) {  
  40.         }  
  41.         return null;  
  42.     }  
  43.   
  44.     /** 
  45.      * 给定文件名和文件内容,创建hdfs文件 
  46.      * 
  47.      * @param dst 
  48.      * @param contents 
  49.      * @throws IOException 
  50.      */  
  51.     public void createFile(String dst, byte[] contents)  
  52.             throws IOException {  
  53.         Configuration conf = new Configuration();  
  54.         Path dstPath = new Path(dst);  
  55.         FileSystem fs = dstPath.getFileSystem(conf);  
  56.   
  57.         FSDataOutputStream outputStream = fs.create(dstPath);  
  58.         outputStream.write(contents);  
  59.         outputStream.close();  
  60.         System.out.println("create file " + dst + " success!");  
  61.         //fs.close();  
  62.     }  
  63.   
  64.     /** 
  65.      * 删除hdfs文件 
  66.      * 
  67.      * @param filePath 
  68.      * @throws IOException 
  69.      */  
  70.     public void delete(String filePath) throws IOException {  
  71.         Configuration conf = new Configuration();  
  72.         Path path = new Path(filePath);  
  73.         FileSystem fs = path.getFileSystem(conf);  
  74.   
  75.         boolean isok = fs.deleteOnExit(path);  
  76.         if (isok) {  
  77.             System.out.println("delete file " + filePath + " success!");  
  78.         } else {  
  79.             System.out.println("delete file " + filePath + " failure");  
  80.         }  
  81.         //fs.close();  
  82.     }  
  83.   
  84.     /** 
  85.      * 创建hdfs目录 
  86.      * 
  87.      * @param path 
  88.      * @throws IOException 
  89.      */  
  90.     public void mkdir(String path) throws IOException {  
  91.         Configuration conf = new Configuration();  
  92.         Path srcPath = new Path(path);  
  93.         FileSystem fs = srcPath.getFileSystem(conf);  
  94.   
  95.         boolean isok = fs.mkdirs(srcPath);  
  96.         if (isok) {  
  97.             System.out.println("create dir ok!");  
  98.         } else {  
  99.             System.out.println("create dir failure");  
  100.         }  
  101.         //fs.close();  
  102.     }  
  103.   
  104.     /** 
  105.      * 读取hdfs文件内容,并在控制台打印出来 
  106.      * 
  107.      * @param filePath 
  108.      * @throws IOException 
  109.      */  
  110.     public void readFile(String filePath) throws IOException {  
  111.         Configuration conf = new Configuration();  
  112.         Path srcPath = new Path(filePath);  
  113.         FileSystem fs = null;  
  114.         URI uri;  
  115.         try {  
  116.             uri = new URI(filePath);  
  117.             fs = FileSystem.get(uri, conf);  
  118.         } catch (URISyntaxException e) {  
  119.             e.printStackTrace();  
  120.         }  
  121.         InputStream in = null;  
  122.         try {  
  123.             in = fs.open(srcPath);  
  124.             IOUtils.copyBytes(in, System.out, 4096false);  
  125.         } finally {  
  126.             IOUtils.closeStream(in);  
  127.         }  
  128.     }  
  129.   
  130.     /** 
  131.      * 下载hdfs文件到本地目录 
  132.      * 
  133.      * @param dstPath 
  134.      * @param srcPath 
  135.      * @throws Exception 
  136.      */  
  137.     public void downloadFile(String dstPath, String srcPath) throws Exception {  
  138.         Path path = new Path(srcPath);  
  139.         Configuration conf = new Configuration();  
  140.         FileSystem hdfs = path.getFileSystem(conf);  
  141.   
  142.         File rootfile = new File(dstPath);  
  143.         if (!rootfile.exists()) {  
  144.             rootfile.mkdirs();  
  145.         }  
  146.   
  147.         if (hdfs.isFile(path)) {  
  148.             //只下载非txt文件  
  149.             String fileName = path.getName();  
  150.             if (!fileName.toLowerCase().endsWith("txt")) {  
  151.                 FSDataInputStream in = null;  
  152.                 FileOutputStream out = null;  
  153.                 try {  
  154.                     in = hdfs.open(path);  
  155.                     File srcfile = new File(rootfile, path.getName());  
  156.                     if (!srcfile.exists()) srcfile.createNewFile();  
  157.                     out = new FileOutputStream(srcfile);  
  158.                     IOUtils.copyBytes(in, out, 4096false);  
  159.                 } finally {  
  160.                     IOUtils.closeStream(in);  
  161.                     IOUtils.closeStream(out);  
  162.                 }  
  163.                 //下载完后,在hdfs上将原文件删除  
  164.                 this.delete(path.toString());  
  165.             }  
  166.         } else if (hdfs.isDirectory(path)) {  
  167.             File dstDir = new File(dstPath);  
  168.             if (!dstDir.exists()) {  
  169.                 dstDir.mkdirs();  
  170.             }  
  171.             //在本地目录上加一层子目录  
  172.             String filePath = path.toString();//目录  
  173.             String subPath[] = filePath.split("/");  
  174.             String newdstPath = dstPath + subPath[subPath.length - 1] + "/";  
  175.             System.out.println("newdstPath=======" + newdstPath);  
  176.             if (hdfs.exists(path) && hdfs.isDirectory(path)) {  
  177.                 FileStatus[] srcFileStatus = hdfs.listStatus(path);  
  178.                 if (srcFileStatus != null) {  
  179.                     for (FileStatus status : hdfs.listStatus(path)) {  
  180.                         //下载子目录下文件  
  181.                         downloadFile(newdstPath, status.getPath().toString());  
  182.                     }  
  183.                 }  
  184.             }  
  185.         }  
  186.     }  
  187.   
  188.     /** 
  189.      * 下载hdfs文件内容,保存到内存对象中 
  190.      * 
  191.      * @param srcPath 
  192.      * @throws Exception 
  193.      */  
  194.     public void downloadFileByte(String srcPath) throws Exception {  
  195.         Path path = new Path(srcPath);  
  196.         FileSystem hdfs = null;  
  197.         Configuration conf = new Configuration();  
  198.         hdfs = FileSystem.get(URI.create(srcPath), conf);  
  199.         if (hdfs.exists(path)) {  
  200.             if (hdfs.isFile(path)) {  
  201.                 //如果是文件  
  202.                 FSDataInputStream in = null;  
  203.                 FileOutputStream out = null;  
  204.                 try {  
  205.                     in = hdfs.open(new Path(srcPath));  
  206.                     byte[] t = new byte[in.available()];  
  207.                     in.read(t);  
  208.                     hdfsfiles.add(new HdfsFile(path.getName(), srcPath, t));  
  209.                 } finally {  
  210.                     IOUtils.closeStream(in);  
  211.                     IOUtils.closeStream(out);  
  212.                 }  
  213.             } else {  
  214.                 //如果是目录  
  215.                 FileStatus[] srcFileStatus = hdfs.listStatus(new Path(srcPath));  
  216.                 for (int i = 0; i < srcFileStatus.length; i++) {  
  217.                     String srcFile = srcFileStatus[i].getPath().toString();  
  218.                     downloadFileByte(srcFile);  
  219.                 }  
  220.             }  
  221.         }  
  222.     }  
  223.   
  224.     public ArrayList<HdfsFile> getHdfsfiles() {  
  225.         return hdfsfiles;  
  226.     }  
  227.   
  228.     /** 
  229.      * 将本地目录或文件上传的hdfs 
  230.      * 
  231.      * @param localSrc 
  232.      * @param dst 
  233.      * @throws Exception 
  234.      */  
  235.     public void uploadFile(String localSrc, String dst) throws Exception {  
  236.   
  237.         Configuration conf = new Configuration();  
  238.         File srcFile = new File(localSrc);  
  239.         if (srcFile.isDirectory()) {  
  240.             copyDirectory(localSrc, dst, conf);  
  241.         } else {  
  242.             copyFile(localSrc, dst, conf);  
  243.         }  
  244.     }  
  245.   
  246.     /** 
  247.      * 拷贝本地文件hdfs目录下 
  248.      * 
  249.      * @param localSrc 
  250.      * @param dst 
  251.      * @param conf 
  252.      * @return 
  253.      * @throws Exception 
  254.      */  
  255.     private boolean copyFile(String localSrc, String dst, Configuration conf) throws Exception {  
  256.         File file = new File(localSrc);  
  257.         dst = dst + file.getName();  
  258.         Path path = new Path(dst);  
  259.         FileSystem fs = path.getFileSystem(conf);//FileSystem.get(conf);  
  260.         fs.exists(path);  
  261.         InputStream in = new BufferedInputStream(new FileInputStream(file));  
  262.         OutputStream out = fs.create(new Path(dst));  
  263.         IOUtils.copyBytes(in, out, 4096true);  
  264.         in.close();  
  265.         return true;  
  266.     }  
  267.   
  268.     /** 
  269.      * 拷贝本地目录到hdfs 
  270.      * @param src 
  271.      * @param dst 
  272.      * @param conf 
  273.      * @return 
  274.      * @throws Exception 
  275.      */  
  276.     private boolean copyDirectory(String src, String dst, Configuration conf) throws Exception {  
  277.         Path path = new Path(dst);  
  278.         FileSystem fs = path.getFileSystem(conf);  
  279.         if (!fs.exists(path)) {  
  280.             fs.mkdirs(path);  
  281.         }  
  282.         File file = new File(src);  
  283.   
  284.         File[] files = file.listFiles();  
  285.         for (int i = 0; i < files.length; i++) {  
  286.             File f = files[i];  
  287.             if (f.isDirectory()) {  
  288.                 String fname = f.getName();  
  289.                 if (dst.endsWith("/")) {  
  290.                     copyDirectory(f.getPath(), dst + fname + "/", conf);  
  291.                 } else {  
  292.                     copyDirectory(f.getPath(), dst + "/" + fname + "/", conf);  
  293.                 }  
  294.             } else {  
  295.                 copyFile(f.getPath(), dst, conf);  
  296.             }  
  297.         }  
  298.         return true;  
  299.     }  
  300. }  
  相关解决方案