Around the Storage Block
cancel
Showing results for 
Search instead for 
Did you mean: 

Data Locality in Hadoop: Taking a Deep Dive

StorageExpert

 

Delve into the data locality aspect of Hadoop as it applies to two architectures related to HPE’s Elastic platform for Big Data Analytics—conventional and hyperscale deployment architectures.

big data_Hadoop_blog.jpgOne of Hadoop’s initial fundamental concepts was to improve performance by moving computation to data. In other words, ship the code across the network in order to process the data locally since the code is usually smaller than the data to process. This was especially true with lower bandwidth networks (<= 1Gbps). With the advent of webscale workloads, hyperscale architectures that separate compute and storage to drive agility has been the norm for public cloud environments.

 The evolution in modern big data processing frameworks has seen an increased adoption of such architectures in the big data space. HPE has published reference architectures to independently scale compute and storage since 2014. HPE’s Elastic platform for Big Data Analytics offers both conventional and hyperscale deployment architectures, referred to as Balanced and Density Optimized (BDO) and Workload and Density Optimized (WDO) systems respectively. In this blog, I’m delving into the data locality aspect of Hadoop as it applies to the two architectures. You can find the complete list of Hadoop reference architectures here: www.hpe.com/info/bigdata-ra

Most customers express concern about the ability of hyperscale architectures to handle the volume of data now being accessed over the network. In our design, we use high speed networks that match the storage IO capabilities of our storage nodes, so there’s no network bottlenecks. Here, I’m focusing on the data locality aspect. Our findings based on tests run in our labs and customer testing, show that data locality plays a smaller role in overall application performance, and judging data locality by using Hadoop job reports can be very misleading.

Let’s look first at how data locality is used in a traditional Hadoop design where compute and storage resources are collocated on the same physical server.

It’s important to note, that data locality is a best effort attempt, not a guarantee. Meaning that when you attempt to run a MapReduce task for example, Hadoop will try to allocate that task on a node that has local data, but only if that node has enough compute resources available at that time. If all nodes with local data are busy, Hadoop will schedule the task on a node that doesn’t have local data instead.  In order to maximize data locality careful settings and data balancing across the cluster needs to be done, otherwise some nodes receive more data to process, slowing down the whole cluster. In addition to that, Hadoop balancer process attempts to balance the data across all nodes/disk, but without knowledge of individual folders/files. For example, in a 4 data node cluster with each data node having 100GB data loaded, you could still see severe locality imbalance when running a job, because the job operates at folder/file level, not datanode levelBig Data Hadoop 2.jpg

In comparison, in a design like EPA WDO with separate compute and storage servers, all HDFS IO is 100% remote so there’s nothing to manage in terms of balancing. All compute nodes are always balanced because no HDFS data is local. Only temporary/shuffle/cache data is local to the node.

So far the concepts are relatively easy to grasp and Hadoop offers counters to measure the data locality for a job.  At the end of a MapReduce job for example you’ll notice counters like this: 

  • Other local map tasks=48
  • Data-local map tasks=6
  • Rack-local map tasks=124

Data-local map task has data locally on the node, rack-local map task has data in the same rack while “other local” map task has data in a remote rack.  

Let’s look at little deeper into how locality is calculated for a map task. When reading data from HDFS, you actually read records, with multiple records being stored in HDFS blocks and multiple HDFS blocks being allocated to HDFS files. We noticed a few distinct cases that have an impact on how much data is read locally when a map task is reported “data-local”.

Case 1: If each HDFS file contains a single HDFS block, every map task will read a single HDFS block, and every map task reported as “data-local” reads all the data locally.  This is the only case where you can really experience 100% local reads if cluster resources are properly configured to allow that. This is the case described in most of the Hadoop documentation related to data locality.Big Data Hadoop 1.jpgCase 2: If each map task reads a single HDFS block, but HDFS files contain multiple blocks, you could end up involving a very small number of remote reads for tasks reported as “data-local”. This small read from the next HDFS block (64k by default) is needed to make sure you’ve reached the end of the last record for the map task. We highlighted in ORANGE on the diagram bellow that you can have a record that starts on one HDFS block and ends on a different HDFS block. Since the next HDFS block is not guaranteed to be local, you can experience a remote read for the next HDFS block. In practice we observed this to be a very small percentage of total bytes read, so in general if you have 100% data-local map tasks, this translates to 99.99% local reads. Here’s a link to the source code for that extra line read:

https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java#L187Big Data Hadoop 3.jpgCase 3: If each map tasks reads multiple HDFS blocks and HDFS files contain multiple blocks, you could end up experiencing a number of remote reads even when map tasks are reported as data-local. This happens because data locality is based only on the first HDFS block in the map task list of HDFS blocks to process, and the number of remote HDFS block reads can increase with an increase in number of HDFS blocks processed per map task and/or increase in number of datanodes. In our lab measurements we noticed remote reads accounting for 20% to 65% of the total reads when using 4 datanodes or more, the percentage increasing as you add more datanodes to the cluster. These were measured on clusters intentionally designed to have resources available to process data locally (maximum 50% of the cluster compute resources were used in all tests)Big Data Hadoop 4.jpg

Case 3 is usually encountered when trying to improve application performance by reducing the impact of start/stop time on the overall task time using mapreduce.input.fileinputformat.split.minsize setting. For example when reading from a disk at 100MB/sec, it takes 1.3 seconds to read a 128MB HDFS block. In our lab measurements, Java start/stop time accounts for ~3 seconds, so using the default values you end up spending ~70% of the task time just starting/stopping the thread. You can increase the HDFS block size to counter that effect, and it’s our recommendation to do so, however increasing it over 2GB could expose issues with some applications that inadvertently used integer data type to store HDFS block size. 

Even at 2GB HDFS block size, it takes 20 seconds to read one 2GB block and the general recommendation is to strive for tasks that run for a minute or longer. In order to increase the amount of data processed per task you can use the mapreduce.input.fileinputformat.split.minsize setting to specify the lower limit for the amount of data processed per task. The same setting applies to Spark jobs also.

If you’re interested in seeing these details in action yourself you can run a MapReduce job using TRACE level logging by setting mapreduce.map.log.level=TRACE and then collecting the logs using ‘yarn logs –applicationId’ command. Be prepared for big log files, so I would recommend running small jobs for this. Newer Hadoop versions have an ‘-out’ option for this command which is very useful since it’s creating a separate file for each container, otherwise you have to use a little scripting to do that yourself. This command does the job: csplit -n 4 <log filename> ‘/=================/-1' '{*}'

A few notes on what to look for in these logs

Example local read

TRACE [main] org.apache.hadoop.hdfs.BlockReaderLocal: read(arr.length=65536, off=0, len=65536, filename=/benchmarks/input/part-m-00000, block= BP-108411597-172.24.1.17-1494238182461:blk_1073741858_1034, canSkipChecksum=false): starting

TRACE [main] org.apache.hadoop.hdfs.BlockReaderLocal: read(arr.length=65536, off=0, len=65536, filename=/benchmarks/input/part-m-00000, block= BP-108411597-172.24.1.17-1494238182461:blk_1073741858_1034, canSkipChecksum=false): returning 65536

Example remote read

This was for the extra line from the next HDFS block. Notice the size of the read

TRACE [main] org.apache.hadoop.hdfs.RemoteBlockReader2: Starting read #d104792b-6701-4046-913a-1723d80dd736 file /benchmarks/input/part-m-00002 from datanode node2

TRACE [main] org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver: readNextPacket: dataPlusChecksumLen = 66048 headerLen = 25

TRACE [main] org.apache.hadoop.hdfs.RemoteBlockReader2: DFSClient readNextPacket got header PacketHeader with packetLen=66048 header data: offsetInBlock: 0

seqno: 0

lastPacketInBlock: false

dataLen: 65536

TRACE [main] org.apache.hadoop.hdfs.RemoteBlockReader2: Finishing read #d104792b-6701-4046-913a-1723d80dd73

Split info

Lines like this show you the HDFS blocks detailed information for the file split you’re processing: org.apache.hadoop.hdfs.DFSClient: newInfo = LocatedBlocks

Unusable short circuit

You might see  messages similar to this:

PerformanceAdvisory: BlockReaderFactory(fileName=/benchmarks/input/part-m-00000, block=BP-108411597-172.24.1.17-1494238182461:blk_1073742761_1937): PathInfo{path=, state=UNUSABLE} is not usable for short circuit; giving up on BlockReaderLocal

These are expected as short circuit read is attempted on all block reads, including blocks that are remote.

getBlockLocations

For some splits, it will look like getBlockLocations is called with the wrong parameters, like bellow. When processing the locality for a split, the first call to getBlockLocations always asks for the first 10 HDFS blocks in the file and if the split offset+length doesn't fall into those 10 HDFS blocks a 2nd call to getBlockLocations is made with the exact offset and length needed

INFO [main] org.apache.hadoop.mapred.MapTask: Processing split: hdfs://headnode:8020/benchmarks/input/part-m-00002:28319940608+134217728

TRACE [main] org.apache.hadoop.ipc.ProtobufRpcEngine: 1: Call -> headnode/172.24.1.17:8020: getBlockLocations {src: "/benchmarks/input/part-m-00002" offset: 0 length: 1342177280}

In order to see exactly how much data is read locally versus remote, we recommend to setup monitoring for reads. Here’s an example of a monitoring chart for Cloudera.

select reads_from_local_client_rate . reads_from_remote_client_rate, blocks_get_local_path_info_rate, blocks_read_rate

reads_from_local_client_rate - contains all local reads without short-circuit

reads_from_remote_client_rate - contains all remote reads, no separation between same rack vs different rack remote reads

blocks_get_local_path_info_rate - contains all local short-circuit reads

In our findings, data locality plays a small role in overall application performance, and judging data locality by using Hadoop job reports can be very misleading. With a properly designed network, you can read remote data just as fast as from local disks. There is a latency increase of course, but since rotational disk latency is an order of magnitude bigger compared to high speed network latency, its impact is negligible on overall performance. Reading from local disks does have the advantage of reading from filesystem cache at memory speed, however in order to achieve that speed the data has to already be in filesystem cache and the dataset has to be small enough to fit into the filesystem cache. In our lab observations, looking at a multitude of workloads, we didn’t notice remote reads having a noticeable negative impact on overall application performance with a properly configured system.

While all the examples described here used MapReduce, we expect similar behavior from all Hadoop services that rely on HDFS locality and use the same split location logic.

Reading library

This is a list of articles listed as reference that also share viewpoints on the relevance of data locality in Hadoop:

https://robinsystems.com/blog/data-locality-overrated/

https://www.bluedata.com/blog/2015/05/data-locality-is-irrelevant-for-hadoop/

https://hvivani.com.ar/2014/11/23/mapreduce-compression-and-input-splits/

https://www.quora.com/What-does-the-term-data-locality-mean-in-Hadoop

https://www.cs.cornell.edu/projects/ladis2009/papers/porter-ladis2009.pdf

https://people.eecs.berkeley.edu/~alig/papers/disk-locality-irrelevant.pdf

Top 31+ data and analytics conferences to attend in 2017

 

 icon.jpgAround the Storage Block blog by Daniel Pol, Data and Analytics Architect, HPE.

 

 

What's the future of Analytics & Data?
Get inspired at Enterprise.nxt
> Go now

0 Kudos
About the Author

StorageExpert

Our team of Hewlett Packard Enterprise storage experts helps you to dive deep into relevant infrastructure topics.

Comments
Keith Scott

Your suggestion is good and helps us. But 100% cyber security is not possible.

daniel_pol

Keith Scott - Glad you found the article useful. We'll have another blog in the future covering various aspects of Hadoop security

Labels
Events
28-30 November
Madrid, Spain
Discover 2017 Madrid
Join us for Hewlett Packard Enterprise Discover 2017 Madrid, taking place 28-30 November at the Feria de Madrid Convention Center
Read more
See posts for dates
Online
HPE Webinars - 2017
Find out about this year's live broadcasts and on-demand webinars.
Read more
View all