Anatomy of File Read:
1. The client opens the file it wishes to read by calling open() on the FileSystem object, which for HDFS is an instance of DistributedFileSystem.
2. DistributedFileSystem calls the namenode, using RPC, to determine the locations of the blocks for the first few blocks in the file. For each block, the namenode returns the addresses of the datanodes that have a copy of that block. Furthermore, the datanodes are sorted according to their proximity to the client (according to the topology). If the client is itself a datanode (in the case of a MapReduce task, for instance), then it will read from the local datanode, if it hosts a copy of the block.
**The DistributedFileSystem returns an FSDataInputStream (an input stream that supports file seeks) to the client for it to read data from. FSDataInputStream in turn wraps a DFSInputStream, which manages the datanode and namenode I/O.
3. The client then calls read() on the stream. DFSInputStream, which has stored
the datanode addresses for the first few blocks in the file, then connects to the first (closest) datanode for the first block in the file.
4. Data is streamed from the datanode back to the client, which calls read() repeatedly on the stream.
5. When the end of the block is reached, DFSInputStream will close the connection to the datanode, then find the best datanode for the next block . This happens transparently to the client, which from its point of view is just reading a continuous stream.
**Blocks are read in order with the DFSInputStream opening new connections to datanodes as the client reads through the stream. It will also call the namenode to retrieve the datanode locations for the next batch of blocks as needed.
6. When the client has finished reading, it calls close on the FSDataInputStream.
1. The client opens the file it wishes to read by calling open() on the FileSystem object, which for HDFS is an instance of DistributedFileSystem.
2. DistributedFileSystem calls the namenode, using RPC, to determine the locations of the blocks for the first few blocks in the file. For each block, the namenode returns the addresses of the datanodes that have a copy of that block. Furthermore, the datanodes are sorted according to their proximity to the client (according to the topology). If the client is itself a datanode (in the case of a MapReduce task, for instance), then it will read from the local datanode, if it hosts a copy of the block.
**The DistributedFileSystem returns an FSDataInputStream (an input stream that supports file seeks) to the client for it to read data from. FSDataInputStream in turn wraps a DFSInputStream, which manages the datanode and namenode I/O.
3. The client then calls read() on the stream. DFSInputStream, which has stored
the datanode addresses for the first few blocks in the file, then connects to the first (closest) datanode for the first block in the file.
4. Data is streamed from the datanode back to the client, which calls read() repeatedly on the stream.
5. When the end of the block is reached, DFSInputStream will close the connection to the datanode, then find the best datanode for the next block . This happens transparently to the client, which from its point of view is just reading a continuous stream.
**Blocks are read in order with the DFSInputStream opening new connections to datanodes as the client reads through the stream. It will also call the namenode to retrieve the datanode locations for the next batch of blocks as needed.
6. When the client has finished reading, it calls close on the FSDataInputStream.
During reading, if the DFSInputStream encounters an error while communicating with a datanode, then it will try the next closest one for that block. It will also remember datanodes that have failed so that it doesn’t needlessly retry them for later blocks. The DFSInputStream also verifies checksums for the data transferred to it from the datanode.
If a corrupted block is found, it is reported to the namenode before the DFSInputStream attempts to read a replica of the block from another datanode.
One important aspect of this design is that the client contacts datanodes directly to retrieve data and is guided by the namenode to the best datanode for each block. This design allows HDFS to scale to a large number of concurrent clients, since the data traffic is spread across all the datanodes in the cluster.
The namenode meanwhile merely has to service block location requests (which it stores in memory, making them very efficient) and does not, for example, serve data, which would quickly become a bottleneck as the number of clients grew.
What does it mean for two nodes in a local network to be “close” to each other?
In the context of high-volume data processing, the limiting factor is the rate at which we can transfer data between nodes—bandwidth is a scarce commodity. The idea is to use the bandwidth between two nodes as a measure of distance.
Rather than measuring bandwidth between nodes, which can be difficult to do in practice (it requires a quiet cluster, and the number of pairs of nodes in a cluster grows as the square of the number of nodes), Hadoop takes a simple approach in which the network is represented as a tree and the distance between two nodes is the sum of their distances to their closest common ancestor. Levels in the tree are not predefined, but it is common to have levels that correspond to the data center, the rack, and the node that a process is running on. The idea is that the bandwidth available for each of the following scenarios becomes progressively less:
• Processes on the same node
• Different nodes on the same rack
• Nodes on different racks in the same data center
• Nodes in different data centers7
For example, imagine a node n1 on rack r1 in data center d1. This can be represented
as /d1/r1/n1. Using this notation, here are the distances for the four scenarios:
• distance(/d1/r1/n1, /d1/r1/n1) = 0 (processes on the same node)
• distance(/d1/r1/n1, /d1/r1/n2) = 2 (different nodes on the same rack)
• distance(/d1/r1/n1, /d1/r2/n3) = 4 (nodes on different racks in the same data center)
• distance(/d1/r1/n1, /d2/r3/n4) = 6 (nodes in different data centers)
No comments:
Post a Comment