本文目录一览:
- 1、使用Java API操作HDFS时,_方法用于获取文件列表?
- 2、java怎么连接hdfs文件系统,需要哪些包?
- 3、java修改hdfs上文件权限问题
- 4、用java向hdfs上传文件时,如何实现断点续传
- 5、HDFS 系统架构
- 6、如何使用Java API读写HDFS
使用Java API操作HDFS时,_方法用于获取文件列表?
当使用 Java API 操作 HDFS 时,可以使用 FileSystem.listFiles() 方法来获取文件列表。该方法接受一个 Path 对象,表示要列举文件的目录,并返回一个 RemoteIteratorLocatedFileStatus 对象,该对象可用于迭代目录中的文件。
例如,下面的代码演示了如何使用 listFiles() 方法来获取 HDFS 上的文件列表:
// 定义 HDFS 连接配置
Configuration conf = new Configuration();
// 获取 HDFS FileSystem 对象
FileSystem fs = FileSystem.get(conf);
// 定义要列举文件的目录
Path dirPath = new Path("/user/hadoop");
// 获取文件列表
RemoteIteratorLocatedFileStatus fileIter = fs.listFiles(dirPath, true);
// 遍历文件列表
while (fileIter.hasNext()) {
// 获取当前文件信息
LocatedFileStatus fileStatus = fileIter.next();
// 输出文件名称和大小
System.out.println(fileStatus.getPath().getName() + " : " + fileStatus.getLen());
}
java怎么连接hdfs文件系统,需要哪些包?
apache的Hadoop项目提供一类api可以通过java工程操作hdfs中的文件,包括:文件打开,读写,删除等、目录的创建,删除,读取目录中所有文件等。
1、到下载Hadoop,解压后把所有jar加入项目的lib里
2、程序处理步骤: 1)得到Configuration对象,2)得到FileSystem对象,3)进行文件操作,简单示例如下:
/**
*
*/
package org.jrs.wlh;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* @PutMeger.java
* java操作hdfs 往 hdfs中上传数据
* @version $Revision$/br
* update: $Date$
*/
public class PutMeger {
public static void main(String[] args) throws IOException {
String[] str = new String[]{"E:\\hadoop\\UploadFileClient.java","hdfs://master:9000/user/hadoop/inccnt.java"};
Configuration conf = new Configuration();
FileSystem fileS= FileSystem.get(conf);
FileSystem localFile = FileSystem.getLocal(conf); //得到一个本地的FileSystem对象
Path input = new Path(str[0]); //设定文件输入保存路径
Path out = new Path(str[1]); //文件到hdfs输出路径
try{
FileStatus[] inputFile = localFile.listStatus(input); //listStatus得到输入文件路径的文件列表
FSDataOutputStream outStream = fileS.create(out); //创建输出流
for (int i = 0; i inputFile.length; i++) {
System.out.println(inputFile[i].getPath().getName());
FSDataInputStream in = localFile.open(inputFile[i].getPath());
byte buffer[] = new byte[1024];
int bytesRead = 0;
while((bytesRead = in.read(buffer))0){ //按照字节读取数据
System.out.println(buffer);
outStream.write(buffer,0,bytesRead);
}
in.close();
}
}catch(Exception e){
e.printStackTrace();
}
}
}
java修改hdfs上文件权限问题
看来你是开启了HDFS的权限检查功能,这样你访问HDFS,NameNode都会检查访问用户的权限的。
你现在想要修改/process/startall.txt文件的权限,那process目录以及startall.txt的有效用户、有效组以及其权限是什么呢?
假设process目录与startall.txt原始的有效用户和有效组分别为root和supergroup,原始权限为750的话,你若在自己电脑运行上述程序,它会自动获取当前计算机的登录用户,假设为wyc,去访问HDFS,很显然,你的程序连process目录都进不去的。
此外,想要更改一个目录或文件的权限,当前用户则必须是有效用户或超级用户才可以。
想要解决的话,嘿嘿, 如果你设置的hadoop.security.authentication property,也就是认证方式为simple的话(默认就是simple),那还可以钻该认证方式的空子,运行程序是伪装成有效用户或者超级用户即可。
此外,有一行代码需要修改一下,我在实验后发现设置权限那一行有误,如下:
//hdfs.setPermission(dstPath, new FsPermission((short) 775));
hdfs.setPermission(dstPath, new FsPermission("755"));
用java向hdfs上传文件时,如何实现断点续传
@Component("javaLargeFileUploaderServlet")
@WebServlet(name = "javaLargeFileUploaderServlet", urlPatterns = { "/javaLargeFileUploaderServlet" })
public class UploadServlet extends HttpRequestHandlerServlet
implements HttpRequestHandler {
private static final Logger log = LoggerFactory.getLogger(UploadServlet.class);
@Autowired
UploadProcessor uploadProcessor;
@Autowired
FileUploaderHelper fileUploaderHelper;
@Autowired
ExceptionCodeMappingHelper exceptionCodeMappingHelper;
@Autowired
Authorizer authorizer;
@Autowired
StaticStateIdentifierManager staticStateIdentifierManager;
@Override
public void handleRequest(HttpServletRequest request, HttpServletResponse response)
throws IOException {
log.trace("Handling request");
Serializable jsonObject = null;
try {
// extract the action from the request
UploadServletAction actionByParameterName =
UploadServletAction.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.action));
// check authorization
checkAuthorization(request, actionByParameterName);
// then process the asked action
jsonObject = processAction(actionByParameterName, request);
// if something has to be written to the response
if (jsonObject != null) {
fileUploaderHelper.writeToResponse(jsonObject, response);
}
}
// If exception, write it
catch (Exception e) {
exceptionCodeMappingHelper.processException(e, response);
}
}
private void checkAuthorization(HttpServletRequest request, UploadServletAction actionByParameterName)
throws MissingParameterException, AuthorizationException {
// check authorization
// if its not get progress (because we do not really care about authorization for get
// progress and it uses an array of file ids)
if (!actionByParameterName.equals(UploadServletAction.getProgress)) {
// extract uuid
final String fileIdFieldValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId, false);
// if this is init, the identifier is the one in parameter
UUID clientOrJobId;
String parameter = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false);
if (actionByParameterName.equals(UploadServletAction.getConfig) parameter != null) {
clientOrJobId = UUID.fromString(parameter);
}
// if not, get it from manager
else {
clientOrJobId = staticStateIdentifierManager.getIdentifier();
}
// call authorizer
authorizer.getAuthorization(
request,
actionByParameterName,
clientOrJobId,
fileIdFieldValue != null ? getFileIdsFromString(fileIdFieldValue).toArray(new UUID[] {}) : null);
}
}
private Serializable processAction(UploadServletAction actionByParameterName, HttpServletRequest request)
throws Exception {
log.debug("Processing action " + actionByParameterName.name());
Serializable returnObject = null;
switch (actionByParameterName) {
case getConfig:
String parameterValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false);
returnObject =
uploadProcessor.getConfig(
parameterValue != null ? UUID.fromString(parameterValue) : null);
break;
case verifyCrcOfUncheckedPart:
returnObject = verifyCrcOfUncheckedPart(request);
break;
case prepareUpload:
returnObject = prepareUpload(request);
break;
case clearFile:
uploadProcessor.clearFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)));
break;
case clearAll:
uploadProcessor.clearAll();
break;
case pauseFile:
ListUUID uuids = getFileIdsFromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId));
uploadProcessor.pauseFile(uuids);
break;
case resumeFile:
returnObject =
uploadProcessor.resumeFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)));
break;
case setRate:
uploadProcessor.setUploadRate(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)),
Long.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.rate)));
break;
case getProgress:
returnObject = getProgress(request);
break;
}
return returnObject;
}
ListUUID getFileIdsFromString(String fileIds) {
String[] splittedFileIds = fileIds.split(",");
ListUUID uuids = Lists.newArrayList();
for (int i = 0; i splittedFileIds.length; i++) {
uuids.add(UUID.fromString(splittedFileIds[i]));
}
return uuids;
}
private Serializable getProgress(HttpServletRequest request)
throws MissingParameterException {
Serializable returnObject;
String[] ids =
new Gson()
.fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId), String[].class);
CollectionUUID uuids = Collections2.transform(Arrays.asList(ids), new FunctionString, UUID() {
@Override
public UUID apply(String input) {
return UUID.fromString(input);
}
});
returnObject = Maps.newHashMap();
for (UUID fileId : uuids) {
try {
ProgressJson progress = uploadProcessor.getProgress(fileId);
((HashMapString, ProgressJson) returnObject).put(fileId.toString(), progress);
}
catch (FileNotFoundException e) {
log.debug("No progress will be retrieved for " + fileId + " because " + e.getMessage());
}
}
return returnObject;
}
private Serializable prepareUpload(HttpServletRequest request)
throws MissingParameterException, IOException {
// extract file information
PrepareUploadJson[] fromJson =
new Gson()
.fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.newFiles), PrepareUploadJson[].class);
// prepare them
final HashMapString, UUID prepareUpload = uploadProcessor.prepareUpload(fromJson);
// return them
return Maps.newHashMap(Maps.transformValues(prepareUpload, new FunctionUUID, String() {
public String apply(UUID input) {
return input.toString();
};
}));
}
private Boolean verifyCrcOfUncheckedPart(HttpServletRequest request)
throws IOException, MissingParameterException, FileCorruptedException, FileStillProcessingException {
UUID fileId = UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId));
try {
uploadProcessor.verifyCrcOfUncheckedPart(fileId,
fileUploaderHelper.getParameterValue(request, UploadServletParameter.crc));
}
catch (InvalidCrcException e) {
// no need to log this exception, a fallback behaviour is defined in the
// throwing method.
// but we need to return something!
return Boolean.FALSE;
}
return Boolean.TRUE;
}
}
HDFS 系统架构
HDFS Architecture
Hadoop Distributed File System (HDFS) 是设计可以运行于普通商业硬件上的分布式文件系统。它跟现有的分布式文件系统有很多相通的地方,但是区别也是显著的。HDFS具有高度容错性能,被设计运行于低成本硬件上。HDFS可以向应用提供高吞吐带宽,适合于大数据应用。HDFS 放宽了一些 POSIX 的要求,以开启对文件系统数据的流式访问。HDFS 最初是作为Apache Nutch web 搜索引擎项目的基础设施开发的。HDFS 现在是 Apache Hadoop 核心项目的一部分。
HDFS是主从架构。一个HDFS集群包含一个NameNode,一个管理文件系统命名空间和控制客户端访问文件的master server。以及,若干的 DataNodes,通常集群的每个node一个,管理运行DataNode的节点上的存储。HDFS 发布一个文件系统命名空间,并允许用户数据已文件的形式存储在上面。内部,一个文件被分成一个或多个块,存储在一组DataNodes上。NameNode 执行文件系统命名空间操作,比如:打开、关闭、重命名文件或目录。它还确定块到DataNodes的映射。DataNodes 负责向文件系统客户端提供读写服务。DataNodes 根据 NameNode 的指令执行块的创建、删除以及复制。
NameNode 和 DataNode 是设计运行于普通商业机器的软件。这些机器通常运行 GNU/Linux 操作系统。HDFS 是Java 语言编写的;任何支持Java的机器都可以运行NameNode or DataNode 软件。使用高移植性Java语言,意味着HDFS可以部署在很大范围的机器上。一个典型的部署就是一台特定的机器只运行NameNode 软件,而集群内的其他机器运行DataNode 软件的一个实例。这种架构不排除一台机器上运行多个DataNodes ,但是在实际部署中很少见。
单 NameNode 节点的存在大大简化了架构。NameNode 是所有HDFS 元数据的仲裁和仓库。系统设计上,用户数据永远不经过NameNode。
HDFS 支持传统的文件分级组织。用户或应用可以创建目录,并在目录内存储文件。 文件系统命名空间的层次结构跟其他文件系统类似;可以创建、删除、移动、重命名文件。HDFS 支持 user quotas 和 access permissions 。 HDFS 不支持软、硬链接。但是,HDFS 架构不排除实现这些功能。
虽然HDFS遵守 文件系统命名约定 ,一些路径和名称 (比如/.reserved 和.snapshot ) 保留了。比如功能 transparent encryption 和 snapshot 就使用的保留路径。
NameNode 维护文件系统命名空间。任何文件系统命名空间或属性的变化,都会被NameNode记录。 应用可以指定HDFS应维护的文件副本数量。文件副本的数量被称为该文件的复制因子 replication factor 。该信息存储于NameNode。
HDFS 被设计用于在一个大规模集群上跨机器可靠地存储巨大的文件。它以一序列的块的方式存储文件。每个文件都可以配置块尺寸和复制因子。
一个文件除了最后一个块外,其他的块一样大。在 append 和 hsync 添加了可变长度块的支持后,用户可以启动一个新的块,而不用填充最后一个块到配置的块大小。
应用可以指定一个文件的副本数量。复制因子可以在创建的时候指定,也可以以后更改。HDFS的文件只写一次(除了 appends 和 truncates) ,并在任何时候只允许一个 writer 。
NameNode 指定块复制的所有决策。它周期性的从集群的每个DataNodes 接受 Heartbeat 和 Blockreport。Heartbeat 的接受代表 DataNode 工作正常。Blockreport 包含了DataNode上所有块的清单。
副本的位置对HDFS的可靠性和性能至关重要。副本位置的优化是HDFS和其他大多数分布式文件系统的区别。这是一个需要大量调优和经验的特性。Rack-aware 复制策略的目的就是提高数据可靠性,可用性和网络带宽利用率。当前副本位置策略的实现是这个方向的第一步。实施该策略的短期目标是在生产环境验证它,了解其更多的行为,为测试和研究更复杂的策略打下基础。
大型HDFS实例运行在跨多个Rack的集群服务器上。不同rack的两个node通信需要通过交换机。大多数情况下,同一rack内的带宽大于rack之间的带宽。
NameNode 通过在 Hadoop Rack Awareness 内的进程描述 判断DataNode 属于哪个rack id。一个简单但是并非最佳的策略是将副本分布于不同的racks。这可以防止整个机架发生故障时丢失数据,并允许在读取数据时使用多个机架的带宽。该策略在群集中均匀地分布副本,使得组件故障时很容易平衡负载。 但是,该策略会增加写入成本,因为写入操作需要将块传输到多个机架。
一般,复制因子设置为3, HDFS 的分布策略是:如果writer在datanode上则将一个副本放到本地机器, 如果writer不在datanode上则将一个副本放到writer所在机柜的随机datanode 上;另一个副本位于不同机架的node上;最后一个副本位于同一远程机架的不同node上。 该策略减少了机架间的写流量,提升了写性能。机架故障的概率远小于节点故障的概率;此策略不会影响数据可靠性和可用性承诺。但是,在读取数据时,它确实减少了聚合带宽,因为块存储于两个机柜而不是三个机柜内。使用此策略,副本不会均匀的分布于机架上。1/3 副本 位于同一节点, 2/3 副本位于同一机架, 另1/3副本位于其他机架。该策略提升了写性能而不影响数据可靠性和读性能。
如果复制因子大于3,那么第4个及以后的副本则随机放置,只要满足每个机架的副本在(replicas - 1) / racks + 2)之下。
因为 NameNode 不允许 DataNodes 拥有同一个块的多个副本,所以副本的最大数就是DataNodes的数量。
在把对 存储类型和存储策略 的支持添加到 HDFS 后,除了上面介绍的rack awareness外, NameNode 会考虑其他副本排布的策略。NameNode 先基于rack awareness 选择节点,然后检查候选节点有文件关联的策略需要的存储空间。 如果候选节点没有该存储类型, NameNode 会查找其他节点。如果在第一条路径中找不到足够的节点来放置副本,NameNode会在第二条路径中查找具有回滚存储类型的节点。 、
当前,这里描述的默认副本排布策略正在使用中。
为了最小化全局带宽消耗和读取延迟, HDFS 会尝试从最靠近reader的副本响应读取请求。如果在reader节点的同一机架上上存在副本,则该副本有限响应读请求。如果HDFS集群跨多个数据中心,则本地数据中心优先。
启动时,NameNode 会进入一个称为 Safemode 的特殊状态。当NameNode处于Safemode状态时,不会复制数据块。NameNode从DataNodes接收Heartbeat和Blockreport消息。Blockreport包含DataNode托管的数据块列表。每个块都指定了最小副本数。当数据块的最小副本数已与NameNode签入时,该块被认为是安全复制的。在NameNode签入安全复制数据块的已配置百分比(加上额外的30秒)后,NameNode退出Safemode状态。然后,它判断列表内的数据块清单是否少于副本指定的数量。NameNode 然后复制这些块给其他 DataNodes。
HDFS 命名空间由 NameNode 存储。NameNode 使用事务日志 EditLog 来持久化的保存系统元数据的每次变更。比如,在HDFS创建一个新文件,NameNode会在 EditLog 插入一条记录来指示该变更。类似的,变更文件的复制因子也会在 EditLog 插入一条新记录。NameNode 以文件的形式,将 EditLog 保存在本地OS文件系统上。整个文件系统命名空间,包括块到文件的映射、文件系统属性,都存储于名字为 FsImage 的文件内。 FsImage 也以文件的形式,存储在NameNode的本地文件系统上。
NameNode 将包含整个文件系统和块映射的image保存在内存中。当NameNode启动时,或检查点被预先定义的阈值触发时,它会从磁盘读取 FsImage 和 EditLog ,把 EditLog 内的事物应用到内存中的FsImage,再将新版本刷新回磁盘的新 FsImage 。然后会截断旧的 EditLog ,因为它的事物已经应用到了持久化的 FsImage 上。 这个过程称为检查点 checkpoint 。检查点的目的是通过对文件系统元数据进行快照并保存到FsImage,来确保HDFS拥有文件系统元数据的一致性视图。尽管读取 FsImage 是高效的,但是对 FsImage 直接增量修改是不高效的。不是对每次编辑修改 FsImage ,而是将每次编辑保存到 Editlog 。在检查点期间,将 Editlog 的变更应用到 FsImage 。一个检查点可以在固定周期(dfs.namenode.checkpoint.period)(以秒为单位)触发,也可以文件系统事物数量达到某个值(dfs.namenode.checkpoint.txns)的时候触发。
DataNode 在本地文件系统上以文件的形式存储 HDFS data 。DataNode 不知道 HDFS 文件。它将HDFS data 的每个块以独立的文件存储于本地文件系统上。DataNode 不在同一目录创建所有的文件。而是,使用heuristic来确定每个目录的最佳文件数量,并适当的创建子目录。在一个目录创建所有的本地文件是不好的,因为本地文件系统可能不支持单目录的海量文件数量。当DataNode启动的时候,它扫描本地文件系统,生成与本地文件系统一一对应的HDFS数据块列表,然后报告给NameNode。这个报告称为 Blockreport。
所有的HDFS通信协议都在TCP/IP协议栈上。客户端与NameNode指定的端口建立连接。与NameNode以ClientProtocol 通信。DataNodes与NameNode以DataNode Protocol进行通信。远程过程调用(RPC)封装了Client Protocol 和 DataNode Protocol。设计上,NameNode从不启动任何RPCs。相反,它只应答DataNodes or clients发出的RPC请求。
HDFS的主要目标是可靠的存储数据,即使是在故障的情况下。常见故障类型有三种: NameNode failures , DataNode failures 和 network partitions 。
每个DataNode都周期性的向NameNode发送心跳信息。 一个 network partition 可能导致DataNodes子集丢失与NameNode的连接。NameNode会基于心跳信息的缺失来侦测这种情况。NameNode将没有心跳信息的DataNodes标记为 dead ,并不再转发任何IO请求给它们。任何注册到dead DataNode的数据对HDFS将不再可用。DataNode death会导致某些块的复制因子低于它们指定的值。NameNode不断跟踪需要复制的块,并在必要时启动复制。很多因素会导致重新复制:DataNode不可用,副本损坏,DataNode上硬盘故障,复制因子增加。
标记 DataNodes dead 的超时时间保守地设置了较长时间 (默认超过10分钟) 以避免DataNodes状态抖动引起的复制风暴。对于性能敏感的应用,用户可以设置较短的周期来标记DataNodes为过期,读写时避免过期节点。
HDFS 架构支持数据再平衡schemes。如果一个DataNode的空余磁盘空间低于阈值,sheme就会将数据从一个DataNode 移动到另外一个。在某些文件需求突然增长的情况下,sheme可能会在集群内动态的创建额外的副本,并再平衡其他数据。这些类型的数据再平衡schemes还没有实现。
有可能从DataNode获取的数据块,到达的时候损坏了。这种损坏可能是由于存储设备故障、网络故障、软件bug。HDFS客户端软件会HDFS的内容进行校验。当客户端创建HDFS文件的时候,它计算文件每个块的校验值,并以独立的隐藏文件存储在同一HDFS命名空间内。当客户端检索文件时候,它会校验从每个DataNode获取的数据,是否与关联校验文件内的校验值匹配。 如果不匹配,客户端可以从另外拥有副本块的DataNode检索。
FsImage 和 EditLog 是HDFS的核心数据结构。这些文件的损坏将导致HDFS实例异常。 因此,NameNode可以配置为支持多 FsImage 和 EditLog 副本模式。任何对 FsImage or EditLog 的更新都会导致每个 FsImages 和 EditLogs 的同步更新。 FsImage 和 EditLog 的同步更新会导致降低命名空间每秒的事物效率。但是,这种降级是可以接受的,因为HDFS应用是数据密集型,而不是元数据密集型。当NameNode重启的时候,它会选择最新的一致的 FsImage 和 EditLog 。
另外一种提供故障恢复能力的办法是多NameNodes 开启HA,以 shared storage on NFS or distributed edit log (called Journal)的方式。推荐后者。
Snapshots - 快照,支持在特定时刻存储数据的副本。快照功能的一个用法,可以回滚一个故障的HDFS实例到已知工作良好的时候。
HDFS被设计与支持超大的文件。与HDFS适配的软件都是处理大数据的。这些应用都只写一次,但是它们会读取一或多次,并且需要满足流式读速度。HDFS支持文件的 一次写入-多次读取 语义。 HDFS典型的块大小是128 MB.。因此,HDFS文件被分割为128 MB的块,可能的话每个块都位于不同的DataNode上。
当客户端以复制因子3写入HDFS文件时,NameNode以 复制目标选择算法 replication target choosing algorithm 检索DataNodes 列表。该列表包含了承载该数据块副本的DataNodes清单。然后客户端写入到第一个DataNode。第一DataNode逐步接受数据的一部分,将每一部分内容写入到本地仓库,并将该部分数据传输给清单上的第二DataNode。第二DataNode,按顺序接受数据块的每个部分,写入到仓库,然后将该部分数据刷新到第三DataNode。最终,第三DataNode将数据写入到其本地仓库。
因此,DataNode从管道的前一个DataNode获取数据,同时转发到管道的后一个DataNode。因此,数据是以管道的方式从一个DataNode传输到下一个的。
应用访问HDFS有很多方式。原生的,HDFS 提供了 FileSystem Java API 来给应用调用。还提供了 C language wrapper for this Java API 和 REST API 。另外,还支持HTTP浏览器查看HDFS实例的文件。 通过使用 NFS gateway ,HDFS还可以挂载到客户端作为本地文件系统的一部分。
HDFS的用户数据是以文件和目录的形式组织的。它提供了一个命令行接口 FS shell 来提供用户交互。命令的语法类似于其他shell (比如:bash, csh)。如下是一些范例:
FS shell 的目标是向依赖于脚本语言的应用提供与存储数据的交互。
DFSAdmin 命令用于管理HDFS集群。这些命令仅给HDFS管理员使用。如下范例:
如果启用了回收站配置,那么文件被 FS Shell 移除时并不会立即从HDFS删除。HDFS会将其移动到回收站目录(每个用户都有回收站,位于 /user/username/.Trash )。只要文件还在回收站内,就可以快速恢复。
最近删除的文件大多数被移动到 current 回收站目录 ( /user/username/.Trash/Current ),在配置周期内,HDFS给 current目录内的文件创建检查点 checkpoints (位于 /user/username/.Trash/date ) ,并删除旧的检查点。参考 expunge command of FS shell 获取更多关于回收站检查点的信息。
在回收站过期后,NameNode从HDFS命名空间删除文件。删除文件会将文件关联的块释放。注意,在用户删除文件和HDFS增加free空间之间,会有一个明显的延迟。
如下范例展示了FS Shell如何删除文件。我们在delete目录下创建两个文件(test1 test2)
我们删除文件 test1。如下命令显示文件被移动到回收站。
现在我们尝试以skipTrash参数删除文件,该参数将不将文件发送到回收站。文件将会从HDFS完全删除。
我们检查回收站,只有文件test1。
如上,文件test1进了回收站,文件test2被永久删除了。
当缩减文件的复制因子时,NameNode选择可以被删除的多余副本。下一个Heartbeat会通报此信息给DataNode。DataNode然后会删除响应的块,相应的剩余空间会显示在集群内。同样,在setReplication API调用完成和剩余空间在集群显示之间会有一个时间延迟。
Hadoop JavaDoc API .
HDFS source code:
如何使用Java API读写HDFS
Java API读写HDFS
public class FSOptr {
/**
* @param args
*/
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
makeDir(conf);
rename(conf);
delete(conf);
}
// 创建文件目录
private static void makeDir(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path dir = new Path("/user/hadoop/data/20140318");
boolean result = fs.mkdirs(dir);// 创建文件夹
System.out.println("make dir :" + result);
// 创建文件,并写入内容
Path dst = new Path("/user/hadoop/data/20140318/tmp");
byte[] buff = "hello,hadoop!".getBytes();
FSDataOutputStream outputStream = fs.create(dst);
outputStream.write(buff, 0, buff.length);
outputStream.close();
FileStatus files[] = fs.listStatus(dst);
for (FileStatus file : files) {
System.out.println(file.getPath());
}
fs.close();
}
// 重命名文件
private static void rename(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path oldName = new Path("/user/hadoop/data/20140318/1.txt");
Path newName = new Path("/user/hadoop/data/20140318/2.txt");
fs.rename(oldName, newName);
FileStatus files[] = fs.listStatus(new Path(
"/user/hadoop/data/20140318"));
for (FileStatus file : files) {
System.out.println(file.getPath());
}
fs.close();
}
// 删除文件
@SuppressWarnings("deprecation")
private static void delete(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/user/hadoop/data/20140318");
if (fs.isDirectory(path)) {
FileStatus files[] = fs.listStatus(path);
for (FileStatus file : files) {
fs.delete(file.getPath());
}
} else {
fs.delete(path);
}
// 或者
fs.delete(path, true);
fs.close();
}
/**
* 下载,将hdfs文件下载到本地磁盘
*
* @param localSrc1
* 本地的文件地址,即文件的路径
* @param hdfsSrc1
* 存放在hdfs的文件地址
*/
public boolean sendFromHdfs(String hdfsSrc1, String localSrc1) {
Configuration conf = new Configuration();
FileSystem fs = null;
try {
fs = FileSystem.get(URI.create(hdfsSrc1), conf);
Path hdfs_path = new Path(hdfsSrc1);
Path local_path = new Path(localSrc1);
fs.copyToLocalFile(hdfs_path, local_path);
return true;
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
/**
* 上传,将本地文件copy到hdfs系统中
*
* @param localSrc
* 本地的文件地址,即文件的路径
* @param hdfsSrc
* 存放在hdfs的文件地址
*/
public boolean sendToHdfs1(String localSrc, String hdfsSrc) {
InputStream in;
try {
in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();// 得到配置对象
FileSystem fs; // 文件系统
try {
fs = FileSystem.get(URI.create(hdfsSrc), conf);
// 输出流,创建一个输出流
OutputStream out = fs.create(new Path(hdfsSrc),
new Progressable() {
// 重写progress方法
public void progress() {
// System.out.println("上传完一个设定缓存区大小容量的文件!");
}
});
// 连接两个流,形成通道,使输入流向输出流传输数据,
IOUtils.copyBytes(in, out, 10240, true); // in为输入流对象,out为输出流对象,4096为缓冲区大小,true为上传后关闭流
return true;
} catch (IOException e) {
e.printStackTrace();
}
} catch (FileNotFoundException e) {
e.printStackTrace();
}
return false;
}
/**
* 移动
*
* @param old_st原来存放的路径
* @param new_st移动到的路径
*/
public boolean moveFileName(String old_st, String new_st) {
try {
// 下载到服务器本地
boolean down_flag = sendFromHdfs(old_st, "/home/hadoop/文档/temp");
Configuration conf = new Configuration();
FileSystem fs = null;
// 删除源文件
try {
fs = FileSystem.get(URI.create(old_st), conf);
Path hdfs_path = new Path(old_st);
fs.delete(hdfs_path);
} catch (IOException e) {
e.printStackTrace();
}
// 从服务器本地传到新路径
new_st = new_st + old_st.substring(old_st.lastIndexOf("/"));
boolean uplod_flag = sendToHdfs1("/home/hadoop/文档/temp", new_st);
if (down_flag uplod_flag) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
// copy本地文件到hdfs
private static void CopyFromLocalFile(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path src = new Path("/home/hadoop/word.txt");
Path dst = new Path("/user/hadoop/data/");
fs.copyFromLocalFile(src, dst);
fs.close();
}
// 获取给定目录下的所有子目录以及子文件
private static void getAllChildFile(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/user/hadoop");
getFile(path, fs);
}
private static void getFile(Path path, FileSystem fs)throws Exception {
FileStatus[] fileStatus = fs.listStatus(path);
for (int i = 0; i fileStatus.length; i++) {
if (fileStatus[i].isDir()) {
Path p = new Path(fileStatus[i].getPath().toString());
getFile(p, fs);
} else {
System.out.println(fileStatus[i].getPath().toString());
}
}
}
//判断文件是否存在
private static boolean isExist(Configuration conf,String path)throws Exception{
FileSystem fileSystem = FileSystem.get(conf);
return fileSystem.exists(new Path(path));
}
//获取hdfs集群所有主机结点数据
private static void getAllClusterNodeInfo(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
DistributedFileSystem hdfs = (DistributedFileSystem)fs;
DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();
String[] names = new String[dataNodeStats.length];
System.out.println("list of all the nodes in HDFS cluster:"); //print info
for(int i=0; i dataNodeStats.length; i++){
names[i] = dataNodeStats[i].getHostName();
System.out.println(names[i]); //print info
}
}
//get the locations of a file in HDFS
private static void getFileLocation(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
Path f = new Path("/user/cluster/dfs.txt");
FileStatus filestatus = fs.getFileStatus(f);
BlockLocation[] blkLocations = fs.getFileBlockLocations(filestatus,0,filestatus.getLen());
int blkCount = blkLocations.length;
for(int i=0; i blkCount; i++){
String[] hosts = blkLocations[i].getHosts();
//Do sth with the block hosts
System.out.println(hosts);
}
}
//get HDFS file last modification time
private static void getModificationTime(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
Path f = new Path("/user/cluster/dfs.txt");
FileStatus filestatus = fs.getFileStatus(f);
long modificationTime = filestatus.getModificationTime(); // measured in milliseconds since the epoch
Date d = new Date(modificationTime);
System.out.println(d);
}
}