Hadoop Framework for processing Large Data Sets in a Distributed Computing Environment
Hadoop is a free, Java-based programming framework that supports the processing of large data sets in a distributed computing environment.
It is part of the Apache project sponsored by the Apache Software Foundation.
Hadoop makes it possible to run applications on systems with thousands of nodes involving thousands of terabytes.
Its distributed file system facilitates rapid data transfer rates among nodes and allows the system to continue operating uninterrupted in case of a node failure.
This approach lowers the risk of catastrophic system failure, even if a significant number of nodes become inoperative.
* Admin Package included: OpenVPN, SSH, SFTP, OS root access
The Apache™ Hadoop® project develops open-source software for reliable, scalable, distributed computing.
The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models.
It is designed to scale up from single servers to thousands of machines, each offering local computation and storage.
Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures.
The project includes these modules:
- Hadoop Common: The common utilities that support the other Hadoop modules.
- Hadoop Distributed File System (HDFS™): A distributed file system that provides high-throughput access to application data.
- Hadoop YARN: A framework for job scheduling and cluster resource management.
- Hadoop MapReduce: A YARN-based system for parallel processing of large data sets.
Other Hadoop-related projects at Apache
- Ambari™: A web-based tool for provisioning, managing, and monitoring Apache Hadoop clusters which includes support for Hadoop HDFS, Hadoop MapReduce, Hive, HCatalog, HBase, ZooKeeper, Oozie, Pig and Sqoop. Ambari also provides a dashboard for viewing cluster health such as heatmaps and ability to view MapReduce, Pig and Hive applications visually along with features to diagnose their performance characteristics in a user-friendly manner.
- Avro™: A data serialization system.
- Cassandra™: A scalable multi-master database with no single points of failure.
- Chukwa™: A data collection system for managing large distributed systems.
- HBase™: A scalable, distributed database that supports structured data storage for large tables.
- Hive™: A data warehouse infrastructure that provides data summarization and ad hoc querying.
- Mahout™: A Scalable machine learning and data mining library.
- Pig™: A high-level data-flow language and execution framework for parallel computation.
- Spark™: A fast and general compute engine for Hadoop data. Spark provides a simple and expressive programming model that supports a wide range of applications, including ETL, machine learning, stream processing, and graph computation.
- Tez™: A generalized data-flow programming framework, built on Hadoop YARN, which provides a powerful and flexible engine to execute an arbitrary DAG of tasks to process data for both batch and interactive use-cases. Tez is being adopted by Hive™, Pig™ and other frameworks in the Hadoop ecosystem, and also by other commercial software (e.g. ETL tools), to replace Hadoop™ MapReduce as the underlying execution engine.
- ZooKeeper™: A high-performance coordination service for distributed applications.
Aliyun OSS Support
- Read and write data stored in Aliyun OSS
- Present a hierarchical file system view by implementing the standard Hadoop FileSystem interface
- Can act as a source of data in a MapReduce job, or a sink
Hadoop Resource Estimator
Estimating job resource requirements remains an important and challenging problem for enterprise clusters.
This is amplified by the ever-increasing complexity of workloads, i.e. from traditional batch jobs to interactive queries to streaming and recently machine learning jobs.
This results in jobs relying on multiple computation frameworks such as Tez, MapReduce, Spark, etc., and the problem is further compounded by sharing nature of the clusters.
Current state-of-art solution relies on user expertise to make resource requirement estimations for the jobs (for e.g.: number of reducers or container memory size, etc.), which is both tedious and inefficient.
Based on the analysis of our cluster workloads, we observe that a large portion of jobs (more than 60%) are recurring jobs, giving us the opportunity to automatically estimate job resource requirements based on job’s history runs.
It is worth noting that jobs usually come from different computation frameworks, and the version may change across runs as well.
Therefore, we want to come up with a framework agnostic black-box solution to automatically make resource requirement estimation for the recurring jobs.
Hadoop Distributed File System (HDFS™)
The HDFS is a distributed, scalable, and portable file system written in Java for the Hadoop framework.
Some consider it to instead be a data store due to its lack of POSIX compliance, but it does provide shell commands and Java application programming interface (API) methods that are similar to other file systems.
A Hadoop cluster has nominally a single namenode plus a cluster of datanodes, although redundancy options are available for the namenode due to its criticality.
Each datanode serves up blocks of data over the network using a block protocol specific to HDFS.
The file system uses TCP/IP sockets for communication.
Clients use remote procedure calls (RPC) to communicate with each other.
HDFS stores large files (typically in the range of gigabytes to terabytes) across multiple machines.
It achieves reliability by replicating the data across multiple hosts, and hence theoretically does not require redundant array of independent disks (RAID) storage on hosts (but to increase input-output (I/O) performance some RAID configurations are still useful).
With the default replication value, 3, data is stored on three nodes: two on the same rack, and one on a different rack.
Data nodes can talk to each other to rebalance data, to move copies around, and to keep the replication of data high.
HDFS is not fully POSIX-compliant, because the requirements for a POSIX file-system differ from the target goals of a Hadoop application.
The trade-off of not having a fully POSIX-compliant file-system is increased performance for data throughput and support for non-POSIX operations such as Append.
HDFS added the high-availability capabilities, as announced for version 2.0 in May 2012, letting the main metadata server (the NameNode) manually fail-over onto a backup.
The project has also started developing automatic fail-overs.
The HDFS file system includes a so-called secondary namenode, a misleading term that some might incorrectly interpret as a backup namenode when the primary namenode goes offline.
In fact, the secondary namenode regularly connects with the primary namenode and builds snapshots of the primary namenode's directory information, which the system then saves to local or remote directories.
These checkpointed images can be used to restart a failed primary namenode without having to replay the entire journal of file-system actions, then to edit the log to create an up-to-date directory structure.
Because the namenode is the single point for storage and management of metadata, it can become a bottleneck for supporting a huge number of files, especially a large number of small files.
HDFS Federation, a new addition, aims to tackle this problem to a certain extent by allowing multiple namespaces served by separate namenodes.
Moreover, there are some issues in HDFS such as small file issues, scalability problems, Single Point of Failure (SPoF), and bottlenecks in huge metadata requests.
One advantage of using HDFS is data awareness between the job tracker and task tracker.
The job tracker schedules map or reduce jobs to task trackers with an awareness of the data location.
For example: if node A contains data (a, b, c) and node X contains data (x, y, z), the job tracker schedules node A to perform map or reduce tasks on (a, b, c) and node X would be scheduled to perform map or reduce tasks on (x, y, z).
This reduces the amount of traffic that goes over the network and prevents unnecessary data transfer.
When Hadoop is used with other file systems, this advantage is not always available.
This can have a significant impact on job-completion times as demonstrated with data-intensive jobs.
HDFS was designed for mostly immutable files and may not be suitable for systems requiring concurrent write-operations.
HDFS can be mounted directly with a Filesystem in Userspace (FUSE) virtual file system on Linux and some other Unix systems.
File access can be achieved through the native Java API, the Thrift API (generates a client in a number of languages e.g. C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, Smalltalk, and OCaml), the command-line interface, the HDFS-UI web application over HTTP, or via 3rd-party network client libraries.
HDFS is designed for portability across various hardware platforms and for compatibility with a variety of underlying operating systems.
The HDFS design introduces portability limitations that result in some performance bottlenecks, since the Java implementation cannot use features that are exclusive to the platform on which HDFS is running.
Due to its widespread integration into enterprise-level infrastructure, monitoring HDFS performance at scale has become an increasingly important issue.
Monitoring end-to-end performance requires tracking metrics from datanodes, namenodes, and the underlying operating system.
There are currently several monitoring platforms to track HDFS performance, including HortonWorks, Cloudera, and Datadog.
YARN Timeline Service v.2 uses a set of collectors (writers) to write data to the backend storage.
The collectors are distributed and co-located with the application masters to which they are dedicated.
All data that belong to that application are sent to the application level timeline collectors with the exception of the resource manager timeline collector.
For a given application, the application master can write data for the application to the co-located timeline collectors (which is an NM auxiliary service in this release).
In addition, node managers of other nodes that are running the containers for the application also write data to the timeline collector on the node that is running the application master.
The resource manager also maintains its own timeline collector.
It emits only YARN-generic lifecycle events to keep its volume of writes reasonable.
The timeline readers are separate daemons separate from the timeline collectors, and they are dedicated to serving queries via REST API.
YARN is known to scale to thousands of nodes.
The scalability of YARN is determined by the Resource Manager, and is proportional to number of nodes, active applications, active containers, and frequency of heartbeat (of both nodes and applications).
Lowering heartbeat can provide scalability increase, but is detrimental to utilization (see old Hadoop 1.x experience).
This document described a federation-based approach to scale a single YARN cluster to tens of thousands of nodes, by federating multiple YARN sub-clusters.
The proposed approach is to divide a large (10-100k nodes) cluster into smaller units called sub-clusters, each with its own YARN RM and compute nodes.
The federation system will stitch these sub-clusters together and make them appear as one large YARN cluster to the applications.
The applications running in this federated environment will see a single massive YARN cluster and will be able to schedule tasks on any node of the federated cluster.
Under the hood, the federation system will negotiate with sub-clusters resource managers and provide resources to the application.
The goal is to allow an individual job to “span” sub-clusters seamlessly.
This design is structurally scalable, as we bound the number of nodes each RM is responsible for, and appropriate policies, will try to ensure that the majority of applications will reside within a single sub-cluster, thus the number of applications each RM will see is also bounded.
This means we could almost linearly scale, by simply adding sub-clusters (as very little coordination is needed across them).
This architecture can provide very tight enforcement of scheduling invariants within each sub-cluster (simply inherits from YARN), while continuous rebalancing across subcluster will enforce (less strictly) that these properties are also respected at a global level (e.g., if a sub-cluster loses a large number of nodes, we could re-map queues to other sub-clusters to ensure users running on the impaired sub-cluster are not unfairly affected).
Federation is designed as a “layer” atop of existing YARN codebase, with limited changes in the core YARN mechanisms.
- We assume reasonably good connectivity across sub-clusters (e.g., we are not looking to federate across DC yet, though future investigations of this are not excluded).
- We rely on HDFS federation (or equivalently scalable DFS solutions) to take care of scalability of the store side.
Unlike existing YARN containers that are scheduled in a node only if there are unallocated resources, opportunistic containers can be dispatched to an NM, even if their execution at that node cannot start immediately.
In such a case, opportunistic containers will be queued at that NM until resources become available.
The main goal of opportunistic container execution is to improve cluster resource utilization, and therefore increase task throughput.
Resource utilization and task throughput improvements are more pronounced for workloads that include relatively short tasks (in the order of seconds).
Your Virtual Machine Specs
Your Hadoop Server will be running on an isolated and secure Virtual Machine with the following configuration 1 :
- CPU: 1 vCPU on 7th Generation Intel® Core™ i5-7260U Physical Processor(s)
- Base Frequency: 2.20 GHz
- Max Turbo Frequency: 3.40 GHz
- Memory: 7168 MB on 32 GB DDR4-2133 Physical Memory Chip(s)
- DDR4-2133 1.2V SO-DIMM
- Max Memory Bandwidth: 34.1 GB/s
- Disk Size: 22.06 GB on 1TB M.2. SSD Physical Storage Chip(s)
- M.2 Solid-State Drive (SSD)
- Sequential Read: 530 MB/s
- Sequential Write: 510 MB/s
- Random Read IOPS: 92 K
- Random Write IOPS: 83 K
Note 1 : Virtual Machine Resources are already optimized for performance. Under extreme usage or circumstances, more resources can be easily acquired via our Add-ons section.