您的位置:

javahdfs的简单介绍

本文目录一览:

使用Java API操作HDFS时,_方法用于获取文件列表?

当使用 Java API 操作 HDFS 时,可以使用 FileSystem.listFiles() 方法来获取文件列表。该方法接受一个 Path 对象,表示要列举文件的目录,并返回一个 RemoteIteratorLocatedFileStatus 对象,该对象可用于迭代目录中的文件。

例如,下面的代码演示了如何使用 listFiles() 方法来获取 HDFS 上的文件列表:

// 定义 HDFS 连接配置

Configuration conf = new Configuration();

// 获取 HDFS FileSystem 对象

FileSystem fs = FileSystem.get(conf);

// 定义要列举文件的目录

Path dirPath = new Path("/user/hadoop");

// 获取文件列表

RemoteIteratorLocatedFileStatus fileIter = fs.listFiles(dirPath, true);

// 遍历文件列表

while (fileIter.hasNext()) {

// 获取当前文件信息

LocatedFileStatus fileStatus = fileIter.next();

// 输出文件名称和大小

System.out.println(fileStatus.getPath().getName() + " : " + fileStatus.getLen());

}

java怎么连接hdfs文件系统,需要哪些包?

apache的Hadoop项目提供一类api可以通过java工程操作hdfs中的文件,包括:文件打开,读写,删除等、目录的创建,删除,读取目录中所有文件等。

1、到下载Hadoop,解压后把所有jar加入项目的lib里

2、程序处理步骤: 1)得到Configuration对象,2)得到FileSystem对象,3)进行文件操作,简单示例如下:

/**

*

*/

package org.jrs.wlh;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileStatus;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

/**

* @PutMeger.java

* java操作hdfs 往 hdfs中上传数据

* @version $Revision$/br

* update: $Date$

*/

public class PutMeger {

public static void main(String[] args) throws IOException {

String[] str = new String[]{"E:\\hadoop\\UploadFileClient.java","hdfs://master:9000/user/hadoop/inccnt.java"};

Configuration conf = new Configuration();

FileSystem fileS= FileSystem.get(conf);

FileSystem localFile = FileSystem.getLocal(conf); //得到一个本地的FileSystem对象

Path input = new Path(str[0]); //设定文件输入保存路径

Path out = new Path(str[1]); //文件到hdfs输出路径

try{

FileStatus[] inputFile = localFile.listStatus(input); //listStatus得到输入文件路径的文件列表

FSDataOutputStream outStream = fileS.create(out); //创建输出流

for (int i = 0; i inputFile.length; i++) {

System.out.println(inputFile[i].getPath().getName());

FSDataInputStream in = localFile.open(inputFile[i].getPath());

byte buffer[] = new byte[1024];

int bytesRead = 0;

while((bytesRead = in.read(buffer))0){ //按照字节读取数据

System.out.println(buffer);

outStream.write(buffer,0,bytesRead);

}

in.close();

}

}catch(Exception e){

e.printStackTrace();

}

}

}

java修改hdfs上文件权限问题

看来你是开启了HDFS的权限检查功能,这样你访问HDFS,NameNode都会检查访问用户的权限的。

你现在想要修改/process/startall.txt文件的权限,那process目录以及startall.txt的有效用户、有效组以及其权限是什么呢?

假设process目录与startall.txt原始的有效用户和有效组分别为root和supergroup,原始权限为750的话,你若在自己电脑运行上述程序,它会自动获取当前计算机的登录用户,假设为wyc,去访问HDFS,很显然,你的程序连process目录都进不去的。

此外,想要更改一个目录或文件的权限,当前用户则必须是有效用户或超级用户才可以。

想要解决的话,嘿嘿, 如果你设置的hadoop.security.authentication property,也就是认证方式为simple的话(默认就是simple),那还可以钻该认证方式的空子,运行程序是伪装成有效用户或者超级用户即可。

此外,有一行代码需要修改一下,我在实验后发现设置权限那一行有误,如下:

//hdfs.setPermission(dstPath, new FsPermission((short) 775));

hdfs.setPermission(dstPath, new FsPermission("755"));

用java向hdfs上传文件时,如何实现断点续传

@Component("javaLargeFileUploaderServlet")

@WebServlet(name = "javaLargeFileUploaderServlet", urlPatterns = { "/javaLargeFileUploaderServlet" })

public class UploadServlet extends HttpRequestHandlerServlet

implements HttpRequestHandler {

private static final Logger log = LoggerFactory.getLogger(UploadServlet.class);

@Autowired

UploadProcessor uploadProcessor;

@Autowired

FileUploaderHelper fileUploaderHelper;

@Autowired

ExceptionCodeMappingHelper exceptionCodeMappingHelper;

@Autowired

Authorizer authorizer;

@Autowired

StaticStateIdentifierManager staticStateIdentifierManager;

@Override

public void handleRequest(HttpServletRequest request, HttpServletResponse response)

throws IOException {

log.trace("Handling request");

Serializable jsonObject = null;

try {

// extract the action from the request

UploadServletAction actionByParameterName =

UploadServletAction.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.action));

// check authorization

checkAuthorization(request, actionByParameterName);

// then process the asked action

jsonObject = processAction(actionByParameterName, request);

// if something has to be written to the response

if (jsonObject != null) {

fileUploaderHelper.writeToResponse(jsonObject, response);

}

}

// If exception, write it

catch (Exception e) {

exceptionCodeMappingHelper.processException(e, response);

}

}

private void checkAuthorization(HttpServletRequest request, UploadServletAction actionByParameterName)

throws MissingParameterException, AuthorizationException {

// check authorization

// if its not get progress (because we do not really care about authorization for get

// progress and it uses an array of file ids)

if (!actionByParameterName.equals(UploadServletAction.getProgress)) {

// extract uuid

final String fileIdFieldValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId, false);

// if this is init, the identifier is the one in parameter

UUID clientOrJobId;

String parameter = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false);

if (actionByParameterName.equals(UploadServletAction.getConfig) parameter != null) {

clientOrJobId = UUID.fromString(parameter);

}

// if not, get it from manager

else {

clientOrJobId = staticStateIdentifierManager.getIdentifier();

}

// call authorizer

authorizer.getAuthorization(

request,

actionByParameterName,

clientOrJobId,

fileIdFieldValue != null ? getFileIdsFromString(fileIdFieldValue).toArray(new UUID[] {}) : null);

}

}

private Serializable processAction(UploadServletAction actionByParameterName, HttpServletRequest request)

throws Exception {

log.debug("Processing action " + actionByParameterName.name());

Serializable returnObject = null;

switch (actionByParameterName) {

case getConfig:

String parameterValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false);

returnObject =

uploadProcessor.getConfig(

parameterValue != null ? UUID.fromString(parameterValue) : null);

break;

case verifyCrcOfUncheckedPart:

returnObject = verifyCrcOfUncheckedPart(request);

break;

case prepareUpload:

returnObject = prepareUpload(request);

break;

case clearFile:

uploadProcessor.clearFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)));

break;

case clearAll:

uploadProcessor.clearAll();

break;

case pauseFile:

ListUUID uuids = getFileIdsFromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId));

uploadProcessor.pauseFile(uuids);

break;

case resumeFile:

returnObject =

uploadProcessor.resumeFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)));

break;

case setRate:

uploadProcessor.setUploadRate(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)),

Long.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.rate)));

break;

case getProgress:

returnObject = getProgress(request);

break;

}

return returnObject;

}

ListUUID getFileIdsFromString(String fileIds) {

String[] splittedFileIds = fileIds.split(",");

ListUUID uuids = Lists.newArrayList();

for (int i = 0; i splittedFileIds.length; i++) {

uuids.add(UUID.fromString(splittedFileIds[i]));

}

return uuids;

}

private Serializable getProgress(HttpServletRequest request)

throws MissingParameterException {

Serializable returnObject;

String[] ids =

new Gson()

.fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId), String[].class);

CollectionUUID uuids = Collections2.transform(Arrays.asList(ids), new FunctionString, UUID() {

@Override

public UUID apply(String input) {

return UUID.fromString(input);

}

});

returnObject = Maps.newHashMap();

for (UUID fileId : uuids) {

try {

ProgressJson progress = uploadProcessor.getProgress(fileId);

((HashMapString, ProgressJson) returnObject).put(fileId.toString(), progress);

}

catch (FileNotFoundException e) {

log.debug("No progress will be retrieved for " + fileId + " because " + e.getMessage());

}

}

return returnObject;

}

private Serializable prepareUpload(HttpServletRequest request)

throws MissingParameterException, IOException {

// extract file information

PrepareUploadJson[] fromJson =

new Gson()

.fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.newFiles), PrepareUploadJson[].class);

// prepare them

final HashMapString, UUID prepareUpload = uploadProcessor.prepareUpload(fromJson);

// return them

return Maps.newHashMap(Maps.transformValues(prepareUpload, new FunctionUUID, String() {

public String apply(UUID input) {

return input.toString();

};

}));

}

private Boolean verifyCrcOfUncheckedPart(HttpServletRequest request)

throws IOException, MissingParameterException, FileCorruptedException, FileStillProcessingException {

UUID fileId = UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId));

try {

uploadProcessor.verifyCrcOfUncheckedPart(fileId,

fileUploaderHelper.getParameterValue(request, UploadServletParameter.crc));

}

catch (InvalidCrcException e) {

// no need to log this exception, a fallback behaviour is defined in the

// throwing method.

// but we need to return something!

return Boolean.FALSE;

}

return Boolean.TRUE;

}

}

HDFS 系统架构

HDFS Architecture

Hadoop Distributed File System (HDFS) 是设计可以运行于普通商业硬件上的分布式文件系统。它跟现有的分布式文件系统有很多相通的地方,但是区别也是显著的。HDFS具有高度容错性能,被设计运行于低成本硬件上。HDFS可以向应用提供高吞吐带宽,适合于大数据应用。HDFS 放宽了一些 POSIX 的要求,以开启对文件系统数据的流式访问。HDFS 最初是作为Apache Nutch web 搜索引擎项目的基础设施开发的。HDFS 现在是 Apache Hadoop 核心项目的一部分。

HDFS是主从架构。一个HDFS集群包含一个NameNode,一个管理文件系统命名空间和控制客户端访问文件的master server。以及,若干的 DataNodes,通常集群的每个node一个,管理运行DataNode的节点上的存储。HDFS 发布一个文件系统命名空间,并允许用户数据已文件的形式存储在上面。内部,一个文件被分成一个或多个块,存储在一组DataNodes上。NameNode 执行文件系统命名空间操作,比如:打开、关闭、重命名文件或目录。它还确定块到DataNodes的映射。DataNodes 负责向文件系统客户端提供读写服务。DataNodes 根据 NameNode 的指令执行块的创建、删除以及复制。

NameNode 和 DataNode 是设计运行于普通商业机器的软件。这些机器通常运行 GNU/Linux 操作系统。HDFS 是Java 语言编写的;任何支持Java的机器都可以运行NameNode or DataNode 软件。使用高移植性Java语言,意味着HDFS可以部署在很大范围的机器上。一个典型的部署就是一台特定的机器只运行NameNode 软件,而集群内的其他机器运行DataNode 软件的一个实例。这种架构不排除一台机器上运行多个DataNodes ,但是在实际部署中很少见。

单 NameNode 节点的存在大大简化了架构。NameNode 是所有HDFS 元数据的仲裁和仓库。系统设计上,用户数据永远不经过NameNode。

HDFS 支持传统的文件分级组织。用户或应用可以创建目录,并在目录内存储文件。 文件系统命名空间的层次结构跟其他文件系统类似;可以创建、删除、移动、重命名文件。HDFS 支持 user quotas 和 access permissions 。 HDFS 不支持软、硬链接。但是,HDFS 架构不排除实现这些功能。

虽然HDFS遵守 文件系统命名约定 ,一些路径和名称 (比如/.reserved 和.snapshot ) 保留了。比如功能 transparent encryption 和 snapshot 就使用的保留路径。

NameNode 维护文件系统命名空间。任何文件系统命名空间或属性的变化,都会被NameNode记录。 应用可以指定HDFS应维护的文件副本数量。文件副本的数量被称为该文件的复制因子 replication factor 。该信息存储于NameNode。

HDFS 被设计用于在一个大规模集群上跨机器可靠地存储巨大的文件。它以一序列的块的方式存储文件。每个文件都可以配置块尺寸和复制因子。

一个文件除了最后一个块外,其他的块一样大。在 append 和 hsync 添加了可变长度块的支持后,用户可以启动一个新的块,而不用填充最后一个块到配置的块大小。

应用可以指定一个文件的副本数量。复制因子可以在创建的时候指定,也可以以后更改。HDFS的文件只写一次(除了 appends 和 truncates) ,并在任何时候只允许一个 writer 。

NameNode 指定块复制的所有决策。它周期性的从集群的每个DataNodes 接受 Heartbeat 和 Blockreport。Heartbeat 的接受代表 DataNode 工作正常。Blockreport 包含了DataNode上所有块的清单。

副本的位置对HDFS的可靠性和性能至关重要。副本位置的优化是HDFS和其他大多数分布式文件系统的区别。这是一个需要大量调优和经验的特性。Rack-aware 复制策略的目的就是提高数据可靠性,可用性和网络带宽利用率。当前副本位置策略的实现是这个方向的第一步。实施该策略的短期目标是在生产环境验证它,了解其更多的行为,为测试和研究更复杂的策略打下基础。

大型HDFS实例运行在跨多个Rack的集群服务器上。不同rack的两个node通信需要通过交换机。大多数情况下,同一rack内的带宽大于rack之间的带宽。

NameNode 通过在 Hadoop Rack Awareness 内的进程描述 判断DataNode 属于哪个rack id。一个简单但是并非最佳的策略是将副本分布于不同的racks。这可以防止整个机架发生故障时丢失数据,并允许在读取数据时使用多个机架的带宽。该策略在群集中均匀地分布副本,使得组件故障时很容易平衡负载。 但是,该策略会增加写入成本,因为写入操作需要将块传输到多个机架。

一般,复制因子设置为3, HDFS 的分布策略是:如果writer在datanode上则将一个副本放到本地机器, 如果writer不在datanode上则将一个副本放到writer所在机柜的随机datanode 上;另一个副本位于不同机架的node上;最后一个副本位于同一远程机架的不同node上。 该策略减少了机架间的写流量,提升了写性能。机架故障的概率远小于节点故障的概率;此策略不会影响数据可靠性和可用性承诺。但是,在读取数据时,它确实减少了聚合带宽,因为块存储于两个机柜而不是三个机柜内。使用此策略,副本不会均匀的分布于机架上。1/3 副本 位于同一节点, 2/3 副本位于同一机架, 另1/3副本位于其他机架。该策略提升了写性能而不影响数据可靠性和读性能。

如果复制因子大于3,那么第4个及以后的副本则随机放置,只要满足每个机架的副本在(replicas - 1) / racks + 2)之下。

因为 NameNode 不允许 DataNodes 拥有同一个块的多个副本,所以副本的最大数就是DataNodes的数量。

在把对 存储类型和存储策略 的支持添加到 HDFS 后,除了上面介绍的rack awareness外, NameNode 会考虑其他副本排布的策略。NameNode 先基于rack awareness 选择节点,然后检查候选节点有文件关联的策略需要的存储空间。 如果候选节点没有该存储类型, NameNode 会查找其他节点。如果在第一条路径中找不到足够的节点来放置副本,NameNode会在第二条路径中查找具有回滚存储类型的节点。 、

当前,这里描述的默认副本排布策略正在使用中。

为了最小化全局带宽消耗和读取延迟, HDFS 会尝试从最靠近reader的副本响应读取请求。如果在reader节点的同一机架上上存在副本,则该副本有限响应读请求。如果HDFS集群跨多个数据中心,则本地数据中心优先。

启动时,NameNode 会进入一个称为 Safemode 的特殊状态。当NameNode处于Safemode状态时,不会复制数据块。NameNode从DataNodes接收Heartbeat和Blockreport消息。Blockreport包含DataNode托管的数据块列表。每个块都指定了最小副本数。当数据块的最小副本数已与NameNode签入时,该块被认为是安全复制的。在NameNode签入安全复制数据块的已配置百分比(加上额外的30秒)后,NameNode退出Safemode状态。然后,它判断列表内的数据块清单是否少于副本指定的数量。NameNode 然后复制这些块给其他 DataNodes。

HDFS 命名空间由 NameNode 存储。NameNode 使用事务日志 EditLog 来持久化的保存系统元数据的每次变更。比如,在HDFS创建一个新文件,NameNode会在 EditLog 插入一条记录来指示该变更。类似的,变更文件的复制因子也会在 EditLog 插入一条新记录。NameNode 以文件的形式,将 EditLog 保存在本地OS文件系统上。整个文件系统命名空间,包括块到文件的映射、文件系统属性,都存储于名字为 FsImage 的文件内。 FsImage 也以文件的形式,存储在NameNode的本地文件系统上。

NameNode 将包含整个文件系统和块映射的image保存在内存中。当NameNode启动时,或检查点被预先定义的阈值触发时,它会从磁盘读取 FsImage 和 EditLog ,把 EditLog 内的事物应用到内存中的FsImage,再将新版本刷新回磁盘的新 FsImage 。然后会截断旧的 EditLog ,因为它的事物已经应用到了持久化的 FsImage 上。 这个过程称为检查点 checkpoint 。检查点的目的是通过对文件系统元数据进行快照并保存到FsImage,来确保HDFS拥有文件系统元数据的一致性视图。尽管读取 FsImage 是高效的,但是对 FsImage 直接增量修改是不高效的。不是对每次编辑修改 FsImage ,而是将每次编辑保存到 Editlog 。在检查点期间,将 Editlog 的变更应用到 FsImage 。一个检查点可以在固定周期(dfs.namenode.checkpoint.period)(以秒为单位)触发,也可以文件系统事物数量达到某个值(dfs.namenode.checkpoint.txns)的时候触发。

DataNode 在本地文件系统上以文件的形式存储 HDFS data 。DataNode 不知道 HDFS 文件。它将HDFS data 的每个块以独立的文件存储于本地文件系统上。DataNode 不在同一目录创建所有的文件。而是,使用heuristic来确定每个目录的最佳文件数量,并适当的创建子目录。在一个目录创建所有的本地文件是不好的,因为本地文件系统可能不支持单目录的海量文件数量。当DataNode启动的时候,它扫描本地文件系统,生成与本地文件系统一一对应的HDFS数据块列表,然后报告给NameNode。这个报告称为 Blockreport。

所有的HDFS通信协议都在TCP/IP协议栈上。客户端与NameNode指定的端口建立连接。与NameNode以ClientProtocol 通信。DataNodes与NameNode以DataNode Protocol进行通信。远程过程调用(RPC)封装了Client Protocol 和 DataNode Protocol。设计上,NameNode从不启动任何RPCs。相反,它只应答DataNodes or clients发出的RPC请求。

HDFS的主要目标是可靠的存储数据,即使是在故障的情况下。常见故障类型有三种: NameNode failures , DataNode failures 和 network partitions 。

每个DataNode都周期性的向NameNode发送心跳信息。 一个 network partition 可能导致DataNodes子集丢失与NameNode的连接。NameNode会基于心跳信息的缺失来侦测这种情况。NameNode将没有心跳信息的DataNodes标记为 dead ,并不再转发任何IO请求给它们。任何注册到dead DataNode的数据对HDFS将不再可用。DataNode death会导致某些块的复制因子低于它们指定的值。NameNode不断跟踪需要复制的块,并在必要时启动复制。很多因素会导致重新复制:DataNode不可用,副本损坏,DataNode上硬盘故障,复制因子增加。

标记 DataNodes dead 的超时时间保守地设置了较长时间 (默认超过10分钟) 以避免DataNodes状态抖动引起的复制风暴。对于性能敏感的应用,用户可以设置较短的周期来标记DataNodes为过期,读写时避免过期节点。

HDFS 架构支持数据再平衡schemes。如果一个DataNode的空余磁盘空间低于阈值,sheme就会将数据从一个DataNode 移动到另外一个。在某些文件需求突然增长的情况下,sheme可能会在集群内动态的创建额外的副本,并再平衡其他数据。这些类型的数据再平衡schemes还没有实现。

有可能从DataNode获取的数据块,到达的时候损坏了。这种损坏可能是由于存储设备故障、网络故障、软件bug。HDFS客户端软件会HDFS的内容进行校验。当客户端创建HDFS文件的时候,它计算文件每个块的校验值,并以独立的隐藏文件存储在同一HDFS命名空间内。当客户端检索文件时候,它会校验从每个DataNode获取的数据,是否与关联校验文件内的校验值匹配。 如果不匹配,客户端可以从另外拥有副本块的DataNode检索。

FsImage 和 EditLog 是HDFS的核心数据结构。这些文件的损坏将导致HDFS实例异常。 因此,NameNode可以配置为支持多 FsImage 和 EditLog 副本模式。任何对 FsImage or EditLog 的更新都会导致每个 FsImages 和 EditLogs 的同步更新。 FsImage 和 EditLog 的同步更新会导致降低命名空间每秒的事物效率。但是,这种降级是可以接受的,因为HDFS应用是数据密集型,而不是元数据密集型。当NameNode重启的时候,它会选择最新的一致的 FsImage 和 EditLog 。

另外一种提供故障恢复能力的办法是多NameNodes 开启HA,以 shared storage on NFS or distributed edit log (called Journal)的方式。推荐后者。

Snapshots - 快照,支持在特定时刻存储数据的副本。快照功能的一个用法,可以回滚一个故障的HDFS实例到已知工作良好的时候。

HDFS被设计与支持超大的文件。与HDFS适配的软件都是处理大数据的。这些应用都只写一次,但是它们会读取一或多次,并且需要满足流式读速度。HDFS支持文件的 一次写入-多次读取 语义。 HDFS典型的块大小是128 MB.。因此,HDFS文件被分割为128 MB的块,可能的话每个块都位于不同的DataNode上。

当客户端以复制因子3写入HDFS文件时,NameNode以 复制目标选择算法 replication target choosing algorithm 检索DataNodes 列表。该列表包含了承载该数据块副本的DataNodes清单。然后客户端写入到第一个DataNode。第一DataNode逐步接受数据的一部分,将每一部分内容写入到本地仓库,并将该部分数据传输给清单上的第二DataNode。第二DataNode,按顺序接受数据块的每个部分,写入到仓库,然后将该部分数据刷新到第三DataNode。最终,第三DataNode将数据写入到其本地仓库。

因此,DataNode从管道的前一个DataNode获取数据,同时转发到管道的后一个DataNode。因此,数据是以管道的方式从一个DataNode传输到下一个的。

应用访问HDFS有很多方式。原生的,HDFS 提供了 FileSystem Java API 来给应用调用。还提供了 C language wrapper for this Java API 和 REST API 。另外,还支持HTTP浏览器查看HDFS实例的文件。 通过使用 NFS gateway ,HDFS还可以挂载到客户端作为本地文件系统的一部分。

HDFS的用户数据是以文件和目录的形式组织的。它提供了一个命令行接口 FS shell 来提供用户交互。命令的语法类似于其他shell (比如:bash, csh)。如下是一些范例:

FS shell 的目标是向依赖于脚本语言的应用提供与存储数据的交互。

DFSAdmin 命令用于管理HDFS集群。这些命令仅给HDFS管理员使用。如下范例:

如果启用了回收站配置,那么文件被 FS Shell 移除时并不会立即从HDFS删除。HDFS会将其移动到回收站目录(每个用户都有回收站,位于 /user/username/.Trash )。只要文件还在回收站内,就可以快速恢复。

最近删除的文件大多数被移动到 current 回收站目录 ( /user/username/.Trash/Current ),在配置周期内,HDFS给 current目录内的文件创建检查点 checkpoints (位于 /user/username/.Trash/date ) ,并删除旧的检查点。参考 expunge command of FS shell 获取更多关于回收站检查点的信息。

在回收站过期后,NameNode从HDFS命名空间删除文件。删除文件会将文件关联的块释放。注意,在用户删除文件和HDFS增加free空间之间,会有一个明显的延迟。

如下范例展示了FS Shell如何删除文件。我们在delete目录下创建两个文件(test1 test2)

我们删除文件 test1。如下命令显示文件被移动到回收站。

现在我们尝试以skipTrash参数删除文件,该参数将不将文件发送到回收站。文件将会从HDFS完全删除。

我们检查回收站,只有文件test1。

如上,文件test1进了回收站,文件test2被永久删除了。

当缩减文件的复制因子时,NameNode选择可以被删除的多余副本。下一个Heartbeat会通报此信息给DataNode。DataNode然后会删除响应的块,相应的剩余空间会显示在集群内。同样,在setReplication API调用完成和剩余空间在集群显示之间会有一个时间延迟。

Hadoop JavaDoc API .

HDFS source code:

如何使用Java API读写HDFS

Java API读写HDFS

public class FSOptr {

/**

* @param args

*/

public static void main(String[] args) throws Exception {

// TODO Auto-generated method stub

Configuration conf = new Configuration();

makeDir(conf);

rename(conf);

delete(conf);

}

// 创建文件目录

private static void makeDir(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf);

Path dir = new Path("/user/hadoop/data/20140318");

boolean result = fs.mkdirs(dir);// 创建文件夹

System.out.println("make dir :" + result);

// 创建文件,并写入内容

Path dst = new Path("/user/hadoop/data/20140318/tmp");

byte[] buff = "hello,hadoop!".getBytes();

FSDataOutputStream outputStream = fs.create(dst);

outputStream.write(buff, 0, buff.length);

outputStream.close();

FileStatus files[] = fs.listStatus(dst);

for (FileStatus file : files) {

System.out.println(file.getPath());

}

fs.close();

}

// 重命名文件

private static void rename(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf);

Path oldName = new Path("/user/hadoop/data/20140318/1.txt");

Path newName = new Path("/user/hadoop/data/20140318/2.txt");

fs.rename(oldName, newName);

FileStatus files[] = fs.listStatus(new Path(

"/user/hadoop/data/20140318"));

for (FileStatus file : files) {

System.out.println(file.getPath());

}

fs.close();

}

// 删除文件

@SuppressWarnings("deprecation")

private static void delete(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf);

Path path = new Path("/user/hadoop/data/20140318");

if (fs.isDirectory(path)) {

FileStatus files[] = fs.listStatus(path);

for (FileStatus file : files) {

fs.delete(file.getPath());

}

} else {

fs.delete(path);

}

// 或者

fs.delete(path, true);

fs.close();

}

/**

* 下载,将hdfs文件下载到本地磁盘

*

* @param localSrc1

* 本地的文件地址,即文件的路径

* @param hdfsSrc1

* 存放在hdfs的文件地址

*/

public boolean sendFromHdfs(String hdfsSrc1, String localSrc1) {

Configuration conf = new Configuration();

FileSystem fs = null;

try {

fs = FileSystem.get(URI.create(hdfsSrc1), conf);

Path hdfs_path = new Path(hdfsSrc1);

Path local_path = new Path(localSrc1);

fs.copyToLocalFile(hdfs_path, local_path);

return true;

} catch (IOException e) {

e.printStackTrace();

}

return false;

}

/**

* 上传,将本地文件copy到hdfs系统中

*

* @param localSrc

* 本地的文件地址,即文件的路径

* @param hdfsSrc

* 存放在hdfs的文件地址

*/

public boolean sendToHdfs1(String localSrc, String hdfsSrc) {

InputStream in;

try {

in = new BufferedInputStream(new FileInputStream(localSrc));

Configuration conf = new Configuration();// 得到配置对象

FileSystem fs; // 文件系统

try {

fs = FileSystem.get(URI.create(hdfsSrc), conf);

// 输出流,创建一个输出流

OutputStream out = fs.create(new Path(hdfsSrc),

new Progressable() {

// 重写progress方法

public void progress() {

// System.out.println("上传完一个设定缓存区大小容量的文件!");

}

});

// 连接两个流,形成通道,使输入流向输出流传输数据,

IOUtils.copyBytes(in, out, 10240, true); // in为输入流对象,out为输出流对象,4096为缓冲区大小,true为上传后关闭流

return true;

} catch (IOException e) {

e.printStackTrace();

}

} catch (FileNotFoundException e) {

e.printStackTrace();

}

return false;

}

/**

* 移动

*

* @param old_st原来存放的路径

* @param new_st移动到的路径

*/

public boolean moveFileName(String old_st, String new_st) {

try {

// 下载到服务器本地

boolean down_flag = sendFromHdfs(old_st, "/home/hadoop/文档/temp");

Configuration conf = new Configuration();

FileSystem fs = null;

// 删除源文件

try {

fs = FileSystem.get(URI.create(old_st), conf);

Path hdfs_path = new Path(old_st);

fs.delete(hdfs_path);

} catch (IOException e) {

e.printStackTrace();

}

// 从服务器本地传到新路径

new_st = new_st + old_st.substring(old_st.lastIndexOf("/"));

boolean uplod_flag = sendToHdfs1("/home/hadoop/文档/temp", new_st);

if (down_flag uplod_flag) {

return true;

}

} catch (Exception e) {

e.printStackTrace();

}

return false;

}

// copy本地文件到hdfs

private static void CopyFromLocalFile(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf);

Path src = new Path("/home/hadoop/word.txt");

Path dst = new Path("/user/hadoop/data/");

fs.copyFromLocalFile(src, dst);

fs.close();

}

// 获取给定目录下的所有子目录以及子文件

private static void getAllChildFile(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf);

Path path = new Path("/user/hadoop");

getFile(path, fs);

}

private static void getFile(Path path, FileSystem fs)throws Exception {

FileStatus[] fileStatus = fs.listStatus(path);

for (int i = 0; i fileStatus.length; i++) {

if (fileStatus[i].isDir()) {

Path p = new Path(fileStatus[i].getPath().toString());

getFile(p, fs);

} else {

System.out.println(fileStatus[i].getPath().toString());

}

}

}

//判断文件是否存在

private static boolean isExist(Configuration conf,String path)throws Exception{

FileSystem fileSystem = FileSystem.get(conf);

return fileSystem.exists(new Path(path));

}

//获取hdfs集群所有主机结点数据

private static void getAllClusterNodeInfo(Configuration conf)throws Exception{

FileSystem fs = FileSystem.get(conf);

DistributedFileSystem hdfs = (DistributedFileSystem)fs;

DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();

String[] names = new String[dataNodeStats.length];

System.out.println("list of all the nodes in HDFS cluster:"); //print info

for(int i=0; i dataNodeStats.length; i++){

names[i] = dataNodeStats[i].getHostName();

System.out.println(names[i]); //print info

}

}

//get the locations of a file in HDFS

private static void getFileLocation(Configuration conf)throws Exception{

FileSystem fs = FileSystem.get(conf);

Path f = new Path("/user/cluster/dfs.txt");

FileStatus filestatus = fs.getFileStatus(f);

BlockLocation[] blkLocations = fs.getFileBlockLocations(filestatus,0,filestatus.getLen());

int blkCount = blkLocations.length;

for(int i=0; i blkCount; i++){

String[] hosts = blkLocations[i].getHosts();

//Do sth with the block hosts

System.out.println(hosts);

}

}

//get HDFS file last modification time

private static void getModificationTime(Configuration conf)throws Exception{

FileSystem fs = FileSystem.get(conf);

Path f = new Path("/user/cluster/dfs.txt");

FileStatus filestatus = fs.getFileStatus(f);

long modificationTime = filestatus.getModificationTime(); // measured in milliseconds since the epoch

Date d = new Date(modificationTime);

System.out.println(d);

}

}