Thursday, July 7, 2016

Map Reduce

A MapReduce job is a unit of work that the client wants to be performed: it consists of the input data, the MapReduce program, and configuration information. Hadoop runs the job by dividing it into tasks, of which there are two types: map tasks and reduce tasks.
  •  The Map task takes a set of data and converts it into another set of data, where individual elements are broken down into tuples (key-value pairs).
  • The Reduce task takes the output from the Map as an input and combines those data tuples (key-value pairs) into a smaller set of tuples. The reduce task is always performed after the map job.



Input Phase: Hadoop divides the input to a MapReduce job into fixed-size pieces called input splits, or just splits. RecordReader will read the given file and split the file into [K,V] pairs depending on the separator. K will be the start  byteoffset and V will be end byteoffset.

Map Phase: Hadoop creates one map task for each split, which runs the userdefined map function for each record in the split.  NO.OF MAPPERS=NO.OF INPUT SPLITS. 
Supplied K,V pairs will be further mapped to List[K,V] pairs depending on any separator. Generated List[K,V] pairs will be called as Intermediate Keys which will be sent to Reducer.

Before Reduce, Shuffle&Sort will organise the data to K,List[V] pairs and will be passed to Reducer.

Reducer will apply the further processing logic and will be deriving the final output.





Partitioner:
When there are multiple reducers, the map tasks partition their output, each creating one partition for each reduce task. There can be many keys (and their associated values) in each partition, but the records for any given key are all in a single partition. The partitioning can be controlled by a user-defined partitioning function, but normally the default partitioner—which buckets keys using a hash function—works very well.

Combiner:
Many MapReduce jobs are limited by the bandwidth available on the cluster, so it pays to minimize the data transferred between map and reduce tasks. Hadoop allows the user to specify a combiner function to be run on the map output—the combiner function’s output forms the input to the reduce function. Since the combiner function is an optimization, Hadoop does not provide a guarantee of how many times it will call it for a particular map output record, if at all. In other words, calling the combiner function zero, one, or many times should produce the same output from the reducer.



Why the optimal split size is the same as the block size?
It is the largest size of input that can be guaranteed to be stored on a single node. If the split spanned two blocks, it would be unlikely that any HDFS node stored both blocks, so some of the split would have to be transferred across the network to the node running the map task, which is clearly less efficient than running the whole map task using local data.

Hadoop does its best to run the map task on a node where the input data resides in HDFS. This is called the data locality optimization since it doesn’t use valuable cluster bandwidth. Sometimes, however, all three nodes hosting the HDFS block replicas for a map task’s input split are running other map tasks so the job scheduler will look for a free map slot on a node in the same rack as one of the blocks. Very occasionally even this is not possible, so an off-rack node is used, which results in an inter-rack network transfer.






Map tasks write their output to the local disk, not to HDFS. Why is this? 
Map output is intermediate output: it’s processed by reduce tasks to produce the final output, and once the job is complete the map output can be thrown away. So storing it in HDFS, with replication, would be overkill. If the node running the map task fails before the map output has been consumed by the reduce task, then Hadoop will automatically rerun the map task on another node to re-create the map output.

Reduce tasks don’t have the advantage of data locality—the input to a single reduce task is normally the output from all mappers. The sorted map outputs have to be transferred across the network to the node where the reduce task is running, where they are merged and then passed to the user-defined reduce function. The output of the reduce is normally stored in HDFS for reliability. For each HDFS block of the reduce output, the first replica is stored on the local node, with other replicas being stored on off-rack nodes. Thus, writing the reduce output does consume network bandwidth, but only as much as a normal HDFS write pipeline consumes.

The series of steps that undergoes in MapReduce programming is as below:

1. A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. 
2. The framework sorts the outputs of the maps, which are then input to the reduce tasks
Typically both the input and the output of the job are stored in a file-system. The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks.
As seen in earlier posts,The MapReduce framework consists of a single master JobTracker and one slave TaskTracker per cluster-node. The master is responsible for scheduling the jobs, component tasks on the slaves, monitoring them and re-executing the failed tasks. The slaves execute the tasks as directed by the master.
3. The Hadoop job client then submits the job (jar/executable etc.) and configuration to the JobTracker which then assumes the responsibility of distributing the software/configuration to the slaves, scheduling tasks and monitoring them, providing status and diagnostic information to the job-client.


Anatomy of File Read in Hadoop

Anatomy of File Read:

1. The client opens the file it wishes to read by calling open() on the FileSystem object, which for HDFS is an instance of DistributedFileSystem.
2. DistributedFileSystem calls the namenode, using RPC, to determine the locations of the blocks for the first few blocks in the file. For each block, the namenode returns the addresses of the datanodes that have a copy of that block. Furthermore, the datanodes are sorted according to their proximity to the client (according to the topology). If the client is itself a datanode (in the case of a MapReduce task, for instance), then it will read from the local datanode, if it hosts a copy of the block.

**The DistributedFileSystem returns an FSDataInputStream (an input stream that supports file seeks) to the client for it to read data from. FSDataInputStream in turn wraps a DFSInputStream, which manages the datanode and namenode I/O.
3. The client then calls read() on the stream. DFSInputStream, which has stored
the datanode addresses for the first few blocks in the file, then connects to the first (closest) datanode for the first block in the file. 
4. Data is streamed from the datanode back to the client, which calls read() repeatedly on the stream.
5. When the end of the block is reached, DFSInputStream will close the connection to the datanode, then find the best datanode for the next block . This happens transparently to the client, which from its point of view is just reading a continuous stream.
**Blocks are read in order with the DFSInputStream opening new connections to datanodes as the client reads through the stream. It will also call the namenode to retrieve the datanode locations for the next batch of blocks as needed. 
6. When the client has finished reading, it calls close on the FSDataInputStream.


During reading, if the DFSInputStream encounters an error while communicating with a datanode, then it will try the next closest one for that block. It will also remember datanodes that have failed so that it doesn’t needlessly retry them for later blocks. The DFSInputStream also verifies checksums for the data transferred to it from the datanode.
If a corrupted block is found, it is reported to the namenode before the DFSInputStream attempts to read a replica of the block from another datanode.

One important aspect of this design is that the client contacts datanodes directly to retrieve data and is guided by the namenode to the best datanode for each block. This design allows HDFS to scale to a large number of concurrent clients, since the data traffic is spread across all the datanodes in the cluster. 
The namenode meanwhile merely has to service block location requests (which it stores in memory, making them very efficient) and does not, for example, serve data, which would quickly become a bottleneck as the number of clients grew.


What does it mean for two nodes in a local network to be “close” to each other? 
In the context of high-volume data processing, the limiting factor is the rate at which we can transfer data between nodes—bandwidth is a scarce commodity. The idea is to use the bandwidth between two nodes as a measure of distance.
Rather than measuring bandwidth between nodes, which can be difficult to do in practice (it requires a quiet cluster, and the number of pairs of nodes in a cluster grows as the square of the number of nodes), Hadoop takes a simple approach in which the network is represented as a tree and the distance between two nodes is the sum of their distances to their closest common ancestor. Levels in the tree are not predefined, but it is common to have levels that correspond to the data center, the rack, and the node that a process is running on. The idea is that the bandwidth available for each of the following scenarios becomes progressively less:
• Processes on the same node
• Different nodes on the same rack
• Nodes on different racks in the same data center
• Nodes in different data centers7
For example, imagine a node n1 on rack r1 in data center d1. This can be represented
as /d1/r1/n1. Using this notation, here are the distances for the four scenarios:
• distance(/d1/r1/n1, /d1/r1/n1) = 0 (processes on the same node)
• distance(/d1/r1/n1, /d1/r1/n2) = 2 (different nodes on the same rack)
• distance(/d1/r1/n1, /d1/r2/n3) = 4 (nodes on different racks in the same data center)
• distance(/d1/r1/n1, /d2/r3/n4) = 6 (nodes in different data centers)






Wednesday, July 6, 2016

Anatomy of File Write in Hadoop

Anatomy of File Write:

1. The client creates the file by calling create() on DistributedFileSystem. 
2. DistributedFileSystem makes an RPC call to the namenode to create a new file in the filesystem’s namespace, with no blocks associated with it.
**The namenode performs various checks to make sure the file doesn’t already exist, and that the client has the right permissions to create the file. If these checks pass, the namenode makes a record of the new file; otherwise, file creation fails and the client is thrown an IOException.
**The DistributedFileSystem returns an FSDataOutputStream for the client to start writing data to.
3. As the client writes data, DFSOutputStream splits it into packets, which it writes to an internal queue, called the data queue. 
**The data queue is consumed by the Data Streamer, whose responsibility it is to ask the namenode to allocate new blocks by picking a list of suitable datanodes to store the replicas. 
4. The list of datanodes forms a pipeline—we’ll assume the replication level is three, so there are three nodes in the pipeline.   -> pipelined write
5. DFSOutputStream also maintains an internal queue of packets that are waiting to be acknowledged by datanodes, called the ack queue. A packet is removed from the ack queue only when it has been acknowledged by all the datanodes in the pipeline (Ack's from the 2nd and 3rd datanodes will be given to Datanode1 and Datanode1 will submit the Ack to DFSOutputStream).
6. When the client has finished writing data, it calls close() on the stream.This action flushes all the remaining packets to the datanode pipeline and waits for acknowledgments before  contacting the namenode to signal that the file is complete.
7. The namenode already knows which blocks the file is made up of (via Data
Streamer asking for block allocations), so it only has to wait for blocks to be minimally replicated before returning successfully.


If a datanode fails while data is being written to it, then the following actions are taken,which are transparent to the client writing the data.

1) First the pipeline is closed, and any packets in the ack queue are added to the front of the data queue so that datanodes that are downstream from the failed node will not miss any packets. 
2) The current block on the good datanodes is given a new identity, which is communicated to the namenode,so that the partial block on the failed datanode will be deleted if the failed datanode recovers later on. 
3) The failed datanode is removed from the pipeline and the remainder of the block’s data is written to the two good datanodes in the pipeline. The namenode notices that the block is under-replicated, and it arranges for a further replica
to be created on another node. Subsequent blocks are then treated as normal.

HDFS Architecture



HDFS has a master/slave architecture. HDFS is built using the Java language;



5 daemons exists in Hadoop Master-Slave Architecture.
On Master Node: Name Node, Secondary Name Node and Job Tracker
On Slave Node: DataNode and Task Tracker

An HDFSCluster consists of a 
i) Single NameNode
ii) Number of DataNodes


HDFS LAYER: For Storage
NameNode: A master server that manages the file system namespace and regulates access to files by clients. It maintains the filesystem tree and the metadata for all the files and directories in the tree. This information is stored persistently on the local disk in the form of two files: 
- the namespace image and 
- the edit log.
The NameNode is a Single Point of Failure for the Hadoop Cluster.
The namenode also knows the datanodes on which all the blocks for a given file are located, however, it does not store block locations persistently, since this information is reconstructed from datanodes when the system starts.
Example:
Suppose File1  has been stored in HDFS which has been splitted into Block1+Block2.
NameNode: knows that File1-> Block1 (stored in DN2, DN3,DN4)+Block2 (stored in DN2,DN3DN5).

DataNodes:  Datanodes are the workhorses of the filesystem. They store and retrieve blocks when they are told to (by clients or the namenode), and they report back to the namenode periodically with lists of blocks that they are storing.


Without the namenode, the filesystem cannot be used. If the machine running the namenode failed, all the files on the filesystem would be lost since there would be no way of knowing how to reconstruct the files from the blocks on the datanodes. For this reason, it is important to make the namenode resilient to failure, and Hadoop provides two mechanisms for this.
1=> The first way is to back up the files that make up the persistent state of the filesystem metadata. Hadoop can be configured so that the namenode writes its persistent state to multiple filesystems. These writes are synchronous and atomic. The usual configuration choice is to write to local disk as well as a remote NFS mount.
2=> It is also possible to run a secondary namenode, which despite its name does not act as a namenode. Its main role is to periodically merge the namespace image with the edit log to prevent the edit log from becoming too large. The secondary namenode usually runs on a separate physical machine, since it requires plenty of CPU and as much memory as the namenode to perform the merge. It keeps a copy of the merged namespace image, which can be used in the event of the namenode failing. However, the state of the secondary  namenode lags that of the primary, so in the event of total failure
of the primary, data loss is almost certain.

MAPREDUCE LAYER: For Processing
MapReduce job is a unit of work that the client wants to be performed: it consists of the input data, the MapReduce program, and configuration information. Hadoop runs the job by dividing it into tasks, of which there are two types:map tasks and reduce tasks.
There are two types of nodes that control the job execution process: a jobtracker and a number of tasktrackers. 


JobTracker: 
It will handle client requests and assigns TaskTrackers to perform the tasks. The jobtracker coordinates all the jobs run on the system by scheduling tasks to run on tasktrackers depending on the Data Locality Mechanism. If any node fails in any TaskTracker, Job Tracker will take care of assigning the Tasks to other DataNodes, wherever replicas are available. This ensures no job fail even node fails in a cluster.

TaskTracker:
Tasktrackers run tasks and send progress reports to the jobtracker, which keeps a record of the overall progress of each job. If a task fails, the jobtracker can reschedule it on a different tasktracker. TaskTracker will send the heartbeat to JobTracker + slot information available in that particular node+ progress of Task Execution.





When client wants to write any file to HDFS, client will contact the Namenode to get the free locations and NameNode will provide that information to client.
Later which client can interact with Datanodes for data copy and the first datanode will take care of the replication using pipleined write.

To understand the write and read in HDFS in detail, refer next posts.


Tuesday, July 5, 2016

History of Hadoop


Hadoop was derived from Google’s MapReduce and Google File System (GFS) papers. Hadoop was created by Doug Cutting and Michael J. Cafarella (Yahoo employees) and name by Doug’s son’s toy elephant.

Dec 2004 – Google GFS paper published
July 2005 – Nutch uses MapReduce
Feb 2006 – Starts as a Lucene subproject
Apr 2007 – Yahoo! on 1000-node cluster
Jan 2008 – An Apache Top Level Project
Jul 2008 – A 4000 node test cluster
May 2009 – Hadoop sorts Petabyte in 17 hours



Design of HDFS:
HDFS  is  a  filesystem  designed  for  storing  very  large  files  with  streaming  data  access patterns,  running  on  clusters  of  commodity  hardware.
1. Very large files:- “Very  large”  in  this  context  means  files  that  are  hundreds  of  megabytes,  gigabytes, or  terabytes  in  size.  There  are  Hadoop  clusters  running  today  that  store  petabytes of data.
2.  Streaming data access:- HDFS  is  built  around  the  idea  that  the  most  efficient  data  processing  pattern  is  a write-once,  read-many-times  pattern.  A  dataset  is  typically  generated  or  copied from  source,  then  various  analyses  are  performed  on  that  dataset  over  time.  Each analysis  will  involve  a  large  proportion,  if  not  all,  of  the  dataset,  so  the  time  to  read the whole dataset is more important than the latency in reading the first record.
3. Commodity hardware:- Hadoop doesn’t require expensive, highly reliable hardware to run on. It’s designed to  run  on  clusters  of  commodity hardware (commonly available hardware available from multiple vendors)  for  which  the  chance  of  node  failure  across  the  cluster  is high,  at  least  for  large  clusters.  HDFS  is  designed  to  carry  on  working  without  a noticeable interruption to the user in the face of such failure.

HDFS is not a good fit for below cases:
1. Low-latency data access
Applications that require low-latency access to data, in the tens of milliseconds
range, will not work well with HDFS. Remember, HDFS is optimized for delivering
a high throughput of data, and this may be at the expense of latency. HBase is currently a better choice for low-latency access.
2. Lots of small files
Since the namenode holds filesystem metadata in memory, the limit to the number
of files in a filesystem is governed by the amount of memory on the namenode. As
a rule of thumb, each file, directory, and block takes about 150 bytes. So, for
example, if you had one million files, each taking one block, you would need at
least 300 MB of memory. While storing millions of files is feasible, billions is beyond
the capability of current hardware.
3. Multiple writers, arbitrary file modifications
Files in HDFS may be written to by a single writer. Writes are always made at the
end of the file. There is no support for multiple writers, or for modifications at
arbitrary offsets in the file. 

Introduction to Hadoop

We live in a Data World!
Data is coming from all over sources like a flood!. See your facebook photo uploads. In earlier days you used to have image of lower quality but now the resolution,pixel quality and everything may lead to the image size more.. So, day by day data is getting more. So, this we call it as Big Data - Amount of data that is beyond the storage and processing capabilities of a single machine.
-> Large Volume : Data to be referred as BigData (of massive volume)
-> Variety : Multiple Varieties of data whether semi-structured/no-structured and may originate from variety of sources.
-> Velocity: Rate at which data comes into system is really fast.
Example: Facebook uploads/posts across world

Data Storage and Analysis:Now, you may need to store the data. This Big Data cant be stored in a single disk so there comes Distributed File System, where we can store the data across multiple systems. Now, assume if you want to copy 3 GB of data, it may take slow for writing/copying the data.
Hadoop is a framework that allows for the distributed processing of large data sets across clusters of computers with commodity hardware using simple programming models.




1st problem-> If you are storing the data across distributed/network of systems, then there is a chances of data loss due to one or other issue's. So to have data safe, we may need to replicate the data across 2 copies in 2 different disks which is called replication of data.
Then how about processing the data.?
There comes the logic of Analysis.
Suppose if you want to give a code to 1 person, he may take 6 months of time for developing.
If you distribute the same, then the same can be done to 3 people, so we can expect the same can be done in lesser time i.e, in 2 months.
Same-way, we can get the data across different disks , manipulate and then integrate the same to provide the final output.
2nd problem-> You are analyzing large dataset's and you need to integrate this, which may take more bandwidth again.
Here, comes HADOOP where Hadoop provides: a reliable shared storage and analysis system.
The storage is provided by HDFS and analysis/processing is done by MapReduce.

HADOOP -> HDFS (Storage) + MapReduce (Processing Data)


Why can’t we use databases with lots of disks to do large-scale batch analysis? Why is MapReduce needed?
1=> An RDBMS is good for point queries or updates, where the dataset has been indexed to deliver low-latency retrieval and update times of a relatively small amount of data.
2=> MapReduce suits applications where the data is written once, and read many times, whereas a relational database is good for datasets that are continually updated.
3=> Structured data means data that is organized into entities that have a defined format, such as XML documents or database tables that conform to a particular predefined schema. This can be stored using RDMS.
4=> What if the data is unstructured/no structured means sometimes image and sometimes text, maps e.t.c, How to store such semi-structured/unstructured data.
MapReduce works well on unstructured or semistructured data, since it is designed to interpret the data at processing time.
5=> Relational data is often normalized to retain its integrity and remove redundancy.
Normalization poses problems for MapReduce, since it makes reading a record a non-local operation, and one of the central assumptions that MapReduce makes is that it is possible to perform (high-speed) streaming reads and writes.
A web server log is a good example of a set of records that is not normalized (for example, the client hostnames are specified in full each time, even though the same client may appear many times), and this is one reason that logfiles of all kinds are particularly well-suited to analysis with MapReduce.




Features of HDFS:

  • Highly fault-tolerant and is designed to be deployed on low-cost hardware.
  • High throughput access to application data 
  • Suitable for applications with large datasets
  • Streaming access to file system data
  • Low cost.
What is MapReduce?
MapReduce is a linearly scalable programming model. The programmer writes two functions—a map function and a reduce function—each of which defines a mapping from one set of key-value pairs to another.