`
ssydxa219
  • 浏览: 609529 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

HadoopFSOperations

 
阅读更多

文件的压缩有两大好处:1、可以减少存储文件所需要的磁盘空间;2、可以加速数据在网络和磁盘上的传输。尤其是在处理大数据时,这两大好处是相当重要的

 

package com.hdfs;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;

public class CodecTest {
    //压缩文件
    public static void compress(String codecClassName) throws Exception{
        Class<?> codecClass = Class.forName(codecClassName);
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf);
        //指定压缩文件路径
        FSDataOutputStream outputStream = fs.create(new Path("/user/hadoop/text.gz"));
        //指定要被压缩的文件路径
        FSDataInputStream in = fs.open(new Path("/user/hadoop/aa.txt"));
        //创建压缩输出流
        CompressionOutputStream out = codec.createOutputStream(outputStream); 
        IOUtils.copyBytes(in, out, conf);
        IOUtils.closeStream(in);
        IOUtils.closeStream(out);
    }
   
    //解压缩
    public static void uncompress(String fileName) throws Exception{
        Class<?> codecClass = Class.forName("org.apache.hadoop.io.compress.GzipCodec");
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf);
        FSDataInputStream inputStream = fs.open(new Path("/user/hadoop/text.gz"));
         //把text文件里到数据解压,然后输出到控制台 
        InputStream in = codec.createInputStream(inputStream); 
        IOUtils.copyBytes(in, System.out, conf);
        IOUtils.closeStream(in);
    }
   
    //使用文件扩展名来推断二来的codec来对文件进行解压缩
    public static void uncompress1(String uri) throws IOException{
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
       
        Path inputPath = new Path(uri);
        CompressionCodecFactory factory = new CompressionCodecFactory(conf);
        CompressionCodec codec = factory.getCodec(inputPath);
        if(codec == null){
            System.out.println("no codec found for " + uri);
            System.exit(1);
        }
        String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());
        InputStream in = null;
        OutputStream out = null;
        try {
            in = codec.createInputStream(fs.open(inputPath));
            out = fs.create(new Path(outputUri));
            IOUtils.copyBytes(in, out, conf);
        } finally{
            IOUtils.closeStream(out);
            IOUtils.closeStream(in);
        }
    }
   
    public static void main(String[] args) throws Exception {
        //compress("org.apache.hadoop.io.compress.GzipCodec");
        //uncompress("text");
        uncompress1("hdfs://master:9000/user/hadoop/text.gz");
    }

}

 

\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\

 在单个操作中处理一批文件,这是很常见的需求。比如说处理日志的MapReduce作业可能需要分析一个月内包含在大量目录中的日志文件。在一个表 达式中使用通配符在匹配多个文件时比较方便的,无需列举每个文件和目录来指定输入。hadoop为执行通配提供了两个FIleSystem方法:

1 public FileStatus[] globStatus(Path pathPattern) throw IOException
2 public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throw IOException

  globStatus()方法返回与路径想匹配的所有文件的FileStatus对象数组,并按路径排序。hadoop所支持的通配符与Unix bash相同。

  第二个方法传了一个PathFilter对象作为参数,PathFilter可以进一步对匹配进行限制。PathFilter是一个接口,里面只有一个方法accept(Path path)。

下面看一个例子演示PathFilter的作用:

  RegexExcludePathFilter.java:该类实现了PathFilter接口,重写了accept方法

class RegexExcludePathFilter implements PathFilter{
    private final String regex;
    public RegexExcludePathFilter(String regex) {
        this.regex = regex;
    }
    @Override
    public boolean accept(Path path) {
        return !path.toString().matches(regex);
    }
   
}

 

//通配符的使用
    public static void list() throws IOException{
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        //PathFilter是过滤布符合置顶表达式的路径,下列就是把以txt结尾的过滤掉
        FileStatus[] status = fs.globStatus(new Path("hdfs://master:9000/user/hadoop/test/*"),new RegexExcludePathFilter(".*txt"));
        //FileStatus[] status = fs.globStatus(new Path("hdfs://master:9000/user/hadoop/test/*"));
        Path[] listedPaths = FileUtil.stat2Paths(status);
        for (Path p : listedPaths) {
            System.out.println(p);
        }
    }

 

/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

任何文件系统的一个重要特性都是提供其目录结构浏览和检索它所存文件和目录相关信息的功能。FileStatus对象封装了文件系统中文件和目录的元数据,包括文件的长度、块大小、备份数、修改时间、所有者以及权限等信息。

  FileStatus对象由FileSystem的getFileStatus()方法获得,调用该方法的时候要把文件的Path传进去。

  例子:打印输出某个文件的所有信息

package com.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;

public class HdfsTest1 {
    //显示文件所有信息
    public static void fileInfo(String path) throws IOException{
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Path p = new Path(path);
        //FileStatus对象封装了文件的和目录的额元数据,包括文件长度、块大小、权限等信息
        FileStatus fileStatus = fs.getFileStatus(p);
        System.out.println("文件路径:"+fileStatus.getPath());
        System.out.println("块的大小:"+fileStatus.getBlockSize());
        System.out.println("文件所有者:"+fileStatus.getOwner()+":"+fileStatus.getGroup());
        System.out.println("文件权限:"+fileStatus.getPermission());
        System.out.println("文件长度:"+fileStatus.getLen());
        System.out.println("备份数:"+fileStatus.getReplication());
        System.out.println("修改时间:"+fileStatus.getModificationTime());
    }
    public static void main(String[] args) throws IOException {
        fileInfo("/user/hadoop/aa.mp4");
    }

}

\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\

 

HDFS是一个分布式文件系统,既然是文件系统,就可以对其文件进行操作,比如说新建文件、删除文件、读取文件内容等操作。下面记录一下使用JAVA API对HDFS中的文件进行操作的过程。

  对分HDFS中的文件操作主要涉及一下几个类:

  Configuration类:该类的对象封转了客户端或者服务器的配置。

  FileSystem类:该类的对象是一个文件系统对象,可以用该对象的一些方法来对文件进行操作。FileSystem fs = FileSystem.get(conf);通过FileSystem的静态方法get获得该对象。

  FSDataInputStream和FSDataOutputStream:这两个类是HDFS中的输入输出流。分别通过FileSystem的open方法和create方法获得。

  具体如何对文件操作清下下面例子:

package com.hdfs;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class HdfsTest {
   
    //创建新文件
    public static void createFile(String dst , byte[] contents) throws IOException{
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Path dstPath = new Path(dst); //目标路径
        //打开一个输出流
        FSDataOutputStream outputStream = fs.create(dstPath);
        outputStream.write(contents);
        outputStream.close();
        fs.close();
        System.out.println("文件创建成功!");
    }
   
    //上传本地文件
    public static void uploadFile(String src,String dst) throws IOException{
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Path srcPath = new Path(src); //原路径
        Path dstPath = new Path(dst); //目标路径
        //调用文件系统的文件复制函数,前面参数是指是否删除原文件,true为删除,默认为false
        fs.copyFromLocalFile(false,srcPath, dstPath);
       
        //打印文件路径
        System.out.println("Upload to "+conf.get("fs.default.name"));
        System.out.println("------------list files------------"+"\n");
        FileStatus [] fileStatus = fs.listStatus(dstPath);
        for (FileStatus file : fileStatus)
        {
            System.out.println(file.getPath());
        }
        fs.close();
    }
   
    //文件重命名
    public static void rename(String oldName,String newName) throws IOException{
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Path oldPath = new Path(oldName);
        Path newPath = new Path(newName);
        boolean isok = fs.rename(oldPath, newPath);
        if(isok){
            System.out.println("rename ok!");
        }else{
            System.out.println("rename failure");
        }
        fs.close();
    }
    //删除文件
    public static void delete(String filePath) throws IOException{
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path(filePath);
        boolean isok = fs.deleteOnExit(path);
        if(isok){
            System.out.println("delete ok!");
        }else{
            System.out.println("delete failure");
        }
        fs.close();
    }
   
    //创建目录
    public static void mkdir(String path) throws IOException{
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Path srcPath = new Path(path);
        boolean isok = fs.mkdirs(srcPath);
        if(isok){
            System.out.println("create dir ok!");
        }else{
            System.out.println("create dir failure");
        }
        fs.close();
    }
   
    //读取文件的内容
    public static void readFile(String filePath) throws IOException{
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Path srcPath = new Path(filePath);
        InputStream in = null;
        try {
            in = fs.open(srcPath);
            IOUtils.copyBytes(in, System.out, 4096, false); //复制到标准输出流
        } finally {
            IOUtils.closeStream(in);
        }
    }
   
   
    public static void main(String[] args) throws IOException {
        //测试上传文件
        //uploadFile("D:\\c.txt", "/user/hadoop/test/");
        //测试创建文件
        /*byte[] contents =  "hello world 世界你好\n".getBytes();
        createFile("/user/hadoop/test1/d.txt",contents);*/
        //测试重命名
        //rename("/user/hadoop/test/d.txt", "/user/hadoop/test/dd.txt");
        //测试删除文件
        //delete("test/dd.txt"); //使用相对路径
        //delete("test1");    //删除目录
        //测试新建目录
        //mkdir("test1");
        //测试读取文件
        readFile("test1/d.txt");
    }

}

 

 

,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,////////////////////

 

 

 

 

 

 

 

 

 

 

 

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>maven</groupId>
  <artifactId>maven</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>maven</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
     <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-minicluster</artifactId>
          <version>2.5.1</version>
    </dependency>
    <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
          <version>2.5.1</version>
    </dependency>
    <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-assemblies</artifactId>
          <version>2.5.1</version>
    </dependency>
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-maven-plugins</artifactId>
          <version>2.5.1</version>
    </dependency>
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
          <version>2.5.1</version>
    </dependency>
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-hdfs</artifactId>
          <version>2.5.1</version>
    </dependency>
  </dependencies>
</project>

 

 

package maven.maven;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.DFSClient.*;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;

public class HadoopFSOperations {
   
    private static Configuration conf = new Configuration();
    private static final String HADOOP_URL="hdfs://192.168.190.129:9000";
   
    private static FileSystem fs;
   
    private static DistributedFileSystem hdfs;
   
    static {
        try {
            FileSystem.setDefaultUri(conf, HADOOP_URL);
            fs = FileSystem.get(conf);
            hdfs = (DistributedFileSystem)fs;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 列出所有DataNode的名字信息
     */
    public void listDataNodeInfo() {       
        try {
            DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();
            String[] names = new String[dataNodeStats.length];
            System.out.println("List of all the datanode in the HDFS cluster:");
           
            for (int i=0;i<names.length;i++) {
                names[i] = dataNodeStats[i].getHostName();
                System.out.println(names[i]);
            }
            System.out.println(hdfs.getUri().toString());
         } catch (Exception e) {
             e.printStackTrace();
         }
    }
   
    /**
     * 查看文件是否存在
     */
    public void checkFileExist() {
        try {
            Path a= hdfs.getHomeDirectory();
            System.out.println("main path:"+a.toString());
           
            Path f = new Path("/user/xxx/input01/");
            boolean exist = fs.exists(f);
            System.out.println("Whether exist of this file:"+exist);
           
            //删除文件
//            if (exist) {
//                boolean isDeleted = hdfs.delete(f, false);
//                if(isDeleted) {
//                    System.out.println("Delete success");
//                }               
//            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
   
    /**
     *创建文件到HDFS系统上
     */
    public void createFile() {
        try {
            Path f = new Path("/user/xxx/input02/file01");
            System.out.println("Create and Write :"+f.getName()+" to hdfs");
           
            FSDataOutputStream os = fs.create(f, true);
            Writer out = new OutputStreamWriter(os, "utf-8");//以UTF-8格式写入文件,不乱码
            out.write("你好 good job");
            out.close();
            os.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
   
   
    /**
     * 读取本地文件到HDFS系统<br>
     * 请保证文件格式一直是UTF-8,从本地->HDFS
     */
    public void copyFileToHDFS() {
        try {
            Path f = new Path("/user/xxx/input02/file01");
            File file = new File("E:\\hadoopTest\\temporary.txt");
           
            FileInputStream is = new FileInputStream(file);
            InputStreamReader isr = new InputStreamReader(is, "utf-8");
            BufferedReader br = new BufferedReader(isr);
           
            FSDataOutputStream os = fs.create(f, true);
            Writer out = new OutputStreamWriter(os, "utf-8");
           
            String str = "";
            while((str=br.readLine()) != null) {
                out.write(str+"\n");
            }
            br.close();
            isr.close();
            is.close();
            out.close();
            os.close();
            System.out.println("Write content of file "+file.getName()+" to hdfs file "+f.getName()+" success");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
   
    /**
     * 取得文件块所在的位置..
     */
    public void getLocation() {
        try {
            Path f = new Path("/user/xxx/input02/file01");
            FileStatus fileStatus = fs.getFileStatus(f);
           
            BlockLocation[] blkLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
            for (BlockLocation currentLocation : blkLocations) {
                String[] hosts = currentLocation.getHosts();
                for (String host : hosts) {
                    System.out.println(host);
                }
            }
           
            //取得最后修改时间
            long modifyTime = fileStatus.getModificationTime();
            Date d = new Date(modifyTime);
            System.out.println(d);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
   
    /**
     * 读取hdfs中的文件内容
     */
    public void readFileFromHdfs() {
        try {
            Path f = new Path("/user/xxx/input02/file01");
           
            FSDataInputStream dis = fs.open(f);
            InputStreamReader isr = new InputStreamReader(dis, "utf-8");
            BufferedReader br = new BufferedReader(isr);
            String str = "";
            while ((str = br.readLine()) !=null) {
                System.out.println(str);
            }
            br.close();
            isr.close();
            dis.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
   
    /**
     * list all file/directory
     * @param args
     * @throws IOException
     * @throws IllegalArgumentException
     * @throws FileNotFoundException
     */
    public void listFileStatus(String path) throws FileNotFoundException, IllegalArgumentException, IOException {
        FileStatus fileStatus[]=fs.listStatus(new Path(path));
        int listlength=fileStatus.length;
        for (int i=0 ;i<listlength ;i++){
            if (fileStatus[i].isDirectory() == false) {
                System.out.println("filename:"
                        + fileStatus[i].getPath().getName() + "\tsize:"
                        + fileStatus[i].getLen());
            } else {
                String newpath = fileStatus[i].getPath().toString();
                listFileStatus(newpath);
            }
        }
    }
   
    public static void main(String[] args) {
        HadoopFSOperations a = new HadoopFSOperations();
        a.listDataNodeInfo();
//        a.checkFileExist();
//        a.createFile();
//        a.copyFileToHDFS();
//        a.getLocation();
//        a.readFileFromHdfs();
        try {
            a.listFileStatus(HADOOP_URL+"/user");
        } catch (FileNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IllegalArgumentException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

 

package test.hadoop.util;

import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import com.emar.adwiser.common.util.LogUtil;
import com.emar.adwiser.common.util.Logs;
/***
 * HDFS 工具类
 * @author zhaidw
 *
 */
public class HDFSUtil {
    private static Logs log = LogUtil.getLog(HDFSUtil.class);
    public synchronized static FileSystem getFileSystem(String ip, int port) {
        FileSystem fs = null;
        String url = "hdfs://" + ip + ":" + String.valueOf(port);
        Configuration config = new Configuration();
        config.set("fs.default.name", url);
        try {
            fs = FileSystem.get(config);
        } catch (Exception e) {
            log.error("getFileSystem failed :"
                    + ExceptionUtils.getFullStackTrace(e));
        }
        return fs;
    }
    public synchronized static void listNode(FileSystem fs) {
        DistributedFileSystem dfs = (DistributedFileSystem) fs;
        try {
            DatanodeInfo[] infos = dfs.getDataNodeStats();
            for (DatanodeInfo node : infos) {
                System.out.println("HostName: " + node.getHostName() + "/n"
                        + node.getDatanodeReport());
                System.out.println("--------------------------------");
            }
        } catch (Exception e) {
            log.error("list node list failed :"
                    + ExceptionUtils.getFullStackTrace(e));
        }
    }
    /**
     * 打印系统配置
     *
     * @param fs
     */
    public synchronized static void listConfig(FileSystem fs) {
        Iterator<Entry<String, String>> entrys = fs.getConf().iterator();
        while (entrys.hasNext()) {
            Entry<String, String> item = entrys.next();
            log.info(item.getKey() + ": " + item.getValue());
        }
    }
    /**
     * 创建目录和父目录
     *
     * @param fs
     * @param dirName
     */
    public synchronized static void mkdirs(FileSystem fs, String dirName) {
        // Path home = fs.getHomeDirectory();
        Path workDir = fs.getWorkingDirectory();
        String dir = workDir + "/" + dirName;
        Path src = new Path(dir);
        // FsPermission p = FsPermission.getDefault();
        boolean succ;
        try {
            succ = fs.mkdirs(src);
            if (succ) {
                log.info("create directory " + dir + " successed. ");
            } else {
                log.info("create directory " + dir + " failed. ");
            }
        } catch (Exception e) {
            log.error("create directory " + dir + " failed :"
                    + ExceptionUtils.getFullStackTrace(e));
        }
    }
    /**
     * 删除目录和子目录
     *
     * @param fs
     * @param dirName
     */
    public synchronized static void rmdirs(FileSystem fs, String dirName) {
        // Path home = fs.getHomeDirectory();
        Path workDir = fs.getWorkingDirectory();
        String dir = workDir + "/" + dirName;
        Path src = new Path(dir);
        boolean succ;
        try {
            succ = fs.delete(src, true);
            if (succ) {
                log.info("remove directory " + dir + " successed. ");
            } else {
                log.info("remove directory " + dir + " failed. ");
            }
        } catch (Exception e) {
            log.error("remove directory " + dir + " failed :"
                    + ExceptionUtils.getFullStackTrace(e));
        }
    }
    /**
     * 上传目录或文件
     *
     * @param fs
     * @param local
     * @param remote
     */
    public synchronized static void upload(FileSystem fs, String local,
            String remote) {
        // Path home = fs.getHomeDirectory();
        Path workDir = fs.getWorkingDirectory();
        Path dst = new Path(workDir + "/" + remote);
        Path src = new Path(local);
        try {
            fs.copyFromLocalFile(false, true, src, dst);
            log.info("upload " + local + " to  " + remote + " successed. ");
        } catch (Exception e) {
            log.error("upload " + local + " to  " + remote + " failed :"
                    + ExceptionUtils.getFullStackTrace(e));
        }
    }
    /**
     * 下载目录或文件
     *
     * @param fs
     * @param local
     * @param remote
     */
    public synchronized static void download(FileSystem fs, String local,
            String remote) {
        // Path home = fs.getHomeDirectory();
        Path workDir = fs.getWorkingDirectory();
        Path dst = new Path(workDir + "/" + remote);
        Path src = new Path(local);
        try {
            fs.copyToLocalFile(false, dst, src);
            log.info("download from " + remote + " to  " + local
                    + " successed. ");
        } catch (Exception e) {
            log.error("download from " + remote + " to  " + local + " failed :"
                    + ExceptionUtils.getFullStackTrace(e));
        }
    }
    /**
     * 字节数转换
     *
     * @param size
     * @return
     */
    public synchronized static String convertSize(long size) {
        String result = String.valueOf(size);
        if (size < 1024 * 1024) {
            result = String.valueOf(size / 1024) + " KB";
        } else if (size >= 1024 * 1024 && size < 1024 * 1024 * 1024) {
            result = String.valueOf(size / 1024 / 1024) + " MB";
        } else if (size >= 1024 * 1024 * 1024) {
            result = String.valueOf(size / 1024 / 1024 / 1024) + " GB";
        } else {
            result = result + " B";
        }
        return result;
    }
    /**
     * 遍历HDFS上的文件和目录
     *
     * @param fs
     * @param path
     */
    public synchronized static void listFile(FileSystem fs, String path) {
        Path workDir = fs.getWorkingDirectory();
        Path dst;
        if (null == path || "".equals(path)) {
            dst = new Path(workDir + "/" + path);
        } else {
            dst = new Path(path);
        }
        try {
            String relativePath = "";
            FileStatus[] fList = fs.listStatus(dst);
            for (FileStatus f : fList) {
                if (null != f) {
                    relativePath = new StringBuffer()
                            .append(f.getPath().getParent()).append("/")
                            .append(f.getPath().getName()).toString();
                    if (f.isDir()) {
                        listFile(fs, relativePath);
                    } else {
                        System.out.println(convertSize(f.getLen()) + "/t/t"
                                + relativePath);
                    }
                }
            }
        } catch (Exception e) {
            log.error("list files of " + path + " failed :"
                    + ExceptionUtils.getFullStackTrace(e));
        } finally {
        }
    }
    public synchronized static void write(FileSystem fs, String path,
            String data) {
        // Path home = fs.getHomeDirectory();
        Path workDir = fs.getWorkingDirectory();
        Path dst = new Path(workDir + "/" + path);
        try {
            FSDataOutputStream dos = fs.create(dst);
            dos.writeUTF(data);
            dos.close();
            log.info("write content to " + path + " successed. ");
        } catch (Exception e) {
            log.error("write content to " + path + " failed :"
                    + ExceptionUtils.getFullStackTrace(e));
        }
    }
    public synchronized static void append(FileSystem fs, String path,
            String data) {
        // Path home = fs.getHomeDirectory();
        Path workDir = fs.getWorkingDirectory();
        Path dst = new Path(workDir + "/" + path);
        try {
            FSDataOutputStream dos = fs.append(dst);
            dos.writeUTF(data);
            dos.close();
            log.info("append content to " + path + " successed. ");
        } catch (Exception e) {
            log.error("append content to " + path + " failed :"
                    + ExceptionUtils.getFullStackTrace(e));
        }
    }
    public synchronized static String read(FileSystem fs, String path) {
        String content = null;
        // Path home = fs.getHomeDirectory();
        Path workDir = fs.getWorkingDirectory();
        Path dst = new Path(workDir + "/" + path);
        try {
            // reading
            FSDataInputStream dis = fs.open(dst);
            content = dis.readUTF();
            dis.close();
            log.info("read content from " + path + " successed. ");
        } catch (Exception e) {
            log.error("read content from " + path + " failed :"
                    + ExceptionUtils.getFullStackTrace(e));
        }
        return content;
    }
}

 

 

 

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;


public class HadoopFSOperations {
    public static void main(String[] args) throws Exception
    {
        // createNewHDFSFile("/tmp/create2.c", "hello");
       
        //System.out.println(readHDFSFile("/tmp/copy.c").toString());
        //mkdir("/tmp/testdir");
        //deleteDir("/tmp/testdir");
        listAll("/tmp/");
    }
   
    /*
     * upload the local file to the hds
     * notice that the path is full like /tmp/test.c
     */
    public static void uploadLocalFile2HDFS(String s, String d)
        throws IOException
    {
        Configuration config = new Configuration();
        FileSystem hdfs = FileSystem.get(config);
       
        Path src = new Path(s);
        Path dst = new Path(d);
       
        hdfs.copyFromLocalFile(src, dst);
       
        hdfs.close();
    }
   
    /*
     * create a new file in the hdfs.
     * notice that the toCreateFilePath is the full path
     * and write the content to the hdfs file.
     */
    public static void createNewHDFSFile(String toCreateFilePath, String content) throws IOException
    {
        Configuration config = new Configuration();
        FileSystem hdfs = FileSystem.get(config);
       
        FSDataOutputStream os = hdfs.create(new Path(toCreateFilePath));

        os.write(content.getBytes("UTF-8"));
       
        os.close();
       
        hdfs.close();
    }
   
    /*
     * delete the hdfs file
     * notice that the dst is the full path name
     */
    public static boolean deleteHDFSFile(String dst) throws IOException
    {
        Configuration config = new Configuration();
        FileSystem hdfs = FileSystem.get(config);
       
        Path path = new Path(dst);
        boolean isDeleted = hdfs.delete(path);
       
        hdfs.close();
       
        return isDeleted;
    }
   
   
    /*
     * read the hdfs file content
     * notice that the dst is the full path name
     */
    public static byte[] readHDFSFile(String dst) throws Exception
    {
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
       
        // check if the file exists
        Path path = new Path(dst);
        if ( fs.exists(path) )
        {
            FSDataInputStream is = fs.open(path);
            // get the file info to create the buffer
            FileStatus stat = fs.getFileStatus(path);
           
            // create the buffer
            byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];
            is.readFully(0, buffer);
           
            is.close();
            fs.close();
           
            return buffer;
        }
        else
        {
            throw new Exception("the file is not found .");
        }
    }
   
   
    /*
     * make a new dir in the hdfs
     *
     * the dir may like '/tmp/testdir'
     */
    public static void mkdir(String dir) throws IOException
    {
        Configuration conf =  new Configuration();
        FileSystem fs = FileSystem.get(conf);
       
        fs.mkdirs(new Path(dir));
       
        fs.close();
    }
   
    /*
     * delete a dir in the hdfs
     *
     * dir may like '/tmp/testdir'
     */
    public static void deleteDir(String dir) throws IOException
    {
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
       
        fs.delete(new Path(dir));
       
        fs.close();
    }
   
    public static void listAll(String dir) throws IOException
    {
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
       
        FileStatus[] stats = fs.listStatus(new Path(dir));
       
        for(int i = 0; i < stats.length; ++i)
        {
            if (stats[i].isFile())
            {
                // regular file
                System.out.println(stats[i].getPath().toString());
            }
            else if (stats[i].isDirectory())
            {
                // dir
                System.out.println(stats[i].getPath().toString());
            }
            else if(stats[i].isSymlink())
            {
                // is s symlink in linux
                System.out.println(stats[i].getPath().toString());
            }
                
        }
        fs.close();
    }
   
}

分享到:
评论

相关推荐

    infrared-remote-candroid studiodemo

    android studio下载

    【新质生产力】新质生产力赋能智能制造数字化解决方案.pptx

    【新质生产力】新质生产力赋能智能制造数字化解决方案.pptx

    基于matlab实现的用于应用布格重力异常数据反演地下异常密度体.rar

    基于matlab实现的用于应用布格重力异常数据反演地下异常密度体.rar

    node-v8.10.0-linux-x64.tar.xz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    基于Yolov5目标检测和deepsort目标跟踪无人机跟踪.zip

    无人机最强算法源码,易于部署和学习交流使用

    数据库课程设计实战.zip

    数据库课程设计后端 使用Springboot + Mybatis + Redis + Maven 数据库课程设计实战.zip,使用到了所有的相关SQL 的操作,如增删改查等,让你可以在一个项目里面,锻炼到所有的数据库相关的知识。项目亲测可以运行,里面含有运行相关的文档,不会的可以丝我请求帮助。 数据库课程设计后端 使用Springboot + Mybatis + Redis + Maven 具体的表和相关的数据如下: 用户(电话号码,密码,身份证号,邮箱,真实姓名,用户类型,性别,地址) 乘客(用户电话号码,乘客身份证号,乘客真实姓名,乘客电话号码,乘客类型,地址) 列车信息(列车编号,车次,列车类型,列车车厢数,列车始发站,列车终点站,列车开车时间,列车到达时间,列车到达日期,列车运行时间,列车状态) 列车座位信息(列车编号,车厢号,座位类型,座位数) 列车经停信息(列车编号,车次,车站编号,车站名,到达时间,总运行时间,开车时间) 订单信息(订单编号,用户电话号码,乘客身份证号码,列车编号,出发站编号,到达站编号,车厢号,座位编号,订单创建时间,订单状态,开车时间)

    咨询的分析方法gl.ppt

    咨询的分析方法gl.ppt

    node-v10.14.0-linux-ppc64le.tar.xz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    2019年电赛无人机题目(B题)OpenMV相关代码

    These're the OpenMV codes written by microPython in 2019 NUEDC. 2019年电赛无人机题目(B题)OpenMV相关代码(原创).zip

    无人机降落TRT版本.zip

    无人机最强算法源码,易于部署和学习交流使用

    熊出没.zip

    熊出没.zip

    基于SpringBoot和Vue的家教信息平台设计与实现.zip

    基于SpringBoot和Vue的家教信息平台设计与实现.zip 有完整的部署指导文档,源码也是完整的,可以直接运行,里面包含了所有的相关步骤。 本文旨在设计和实现一套基于Java技术的家教信息系统,采用Spring Boot框架构建后端服务,MySQL数据库存储数据,Vue.js作为前端框架实现用户界面。该系统旨在解决家教信息管理的问题,包括家教师资信息管理、用户信息管理以及家教入驻等功能。通过综合运用Java、Spring Boot、MySQL和Vue等技术,实现了系统的高效运行和良好的用户体验。系统提供了用户注册、登录、信息查看和编辑等功能,同时支持家教的发布和查看,用户信息的管理以及家教审核的后台管理。家长可以方便地寻找合适的家教老师,家教老师也能够更便捷地管理自己的信息和相关资料。通过本设计,展示了Java技术在现代化家教信息系统中的应用,为家教行业的信息化管理提供了一种有效的解决方案。该系统的设计与实现将为家长、家教老师和用户提供便利,促进家教行业的发展与进步。 关键词:SpringBoot; MySQL; 系统设计; 家教

    利用CNN进行无人售货机的商品识别.zip

    无人机最强算法源码,易于部署和学习交流使用

    node-v11.10.1-linux-armv6l.tar.xz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    (R语言)-6-箱线图的绘制

    (R语言)-6-箱线图的绘制

    麦肯锡-xx联通固定市场举措gl.ppt

    麦肯锡-xx联通固定市场举措gl.ppt

    在PyCharm中配置Python环境步骤

    附件是在PyCharm中配置Python环境步骤,文件绿色安全,请大家放心下载,仅供交流学习使用,无任何商业目的!

    【北京工业大学】集成电路分析与设计实验报告

    本课程实验分为数字集成电路设计实验与全定制设计实验两部分。 实验1—4为基于Cadence的数字集成电路设计实验部分,主要内容为通过一个简单数字低通滤波器的设计、综合、仿真,让学生熟悉数字集成电路前段实际设计流程,以培养学生实际设计集成电路的能力。具体为:实验1Matlab实现数字低通滤波器算法设计。 实验2Linux环境下基本操作。 实验3RTLCompiler对数字低通滤波器电路的综合。 实验4NC对数字低通滤波器电路的仿真。 其中,实验1主要目的是为了展示算法分析的方法和重要性。使用Matlab实现数字滤波器的算法设计和HDL代码生成。由于Matlab工具可以在Windows环境下工作,而其他集成电路EDA工具均需要在linux下工作,故建议本实验在课堂演示和讲述,学生课下练习。实验2的主要目的是学习linux下的基本操作。包括目录管理、文件管理、文件编辑以及文件压缩等在使用集成电路EDA工具时所需要的操作。本实验是实验3和实验4的基础,建议在实验室完成。

    基于Transformer模型构建的聊天机器人python源码+运行说明.zip

    一、简介 基于Transformer模型构建的聊天机器人,可实现日常聊天。 二、系统说明 2.1 功能介绍 使用者输入文本后,系统可根据文本做出相应的回答。 2.2 数据介绍 * 百度中文问答 WebQA数据集 * 青云数据集 * 豆瓣数据集 * chatterbot数据集 由于数据集过大,因此不会上传,如有需要可以在issue中提出。 2.3. 模型介绍(v1.0版本) 基于Transformer模型,使用Python中的keras-transformer包。 训练的参数文件没有上传,如有需要可在issue中提出。 三、注意事项 * keras-transformer包需要自行安装:`pip install keras-transformer`。 * 如果需要实际运行,参数文件放在`ModelTrainedParameters`文件下;`ListData`文件下包含了已经处理好的字典等数据,不需要修改,直接运行Main.py即可。 * 如果需要自行训练,将数据集文件放在`DataSet`文件下。 * `HyperParameters.py`文件中包含了系统所需

Global site tag (gtag.js) - Google Analytics