1 Copper

Increasing Hadoop Resiliency & Operational Efficiency with EMC Isilon

Failures are common in today’s data center environments and can significantly impact the performance of important jobs running on top of large scale computing frameworks like Hadoop.

This discussion post highlights some failure scenarios that has both an operational and performance impact on legacy DAS Hadoop deployments.  Even though Hadoop was designed to withstand disruptions, failures can cause administrative headaches, performance degradation, and negatively affect data driven operations -  you will be surprised to learn how even a single failure can have a detrimental effect on Hadoop job running times.

For example, referring to the log message below:

WARN org.apache.hadoop.hdfs.server.datanode.DataNode: DataNode is shutting down: DataNode failed volumes:/data2/dfs/current;
2016-04-22 13:01:00,112 ERROR org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:svc-platfora (auth:SIMPLE) cause:java.io.IOException: Block blk_2910942244825575033_338680521 is not valid.
2016-04-22 13:01:00,112 INFO org.apache.hadoop.ipc.Server: IPC Server handler 50 on 50020, call org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol.getBlockLocalPathInfo from
error: java.io.IOException: Block blk_2910942244825575033_338680521 is not valid. java.io.IOException: Block blk_2910942244825575033_338680521 is not valid.

By default Hadoop configures the hdfs-site.xml parmeter "dfs.datanode.failed.volumes.tolerated" to 0, this will force the datanode daemon to shutdown in the event of failure accessing one its defined data volumes.  The data volumes are defined by parameter "dfs.datanode.data.dir" and in the example below, is set to use data volumes data1, data2, and data3.

  <name>dfs.datanode.failed.volumes.tolerated</name> <value>0</value>


From the error we can see that /data2 data volume became inaccessible and the datanode will shutdown as a result.  Typically the data volume will be associated with a single disk that is configured with raid 0 (for I/O performance) so whatever data existed on that volume is lost.  The default replication factor for  hdfs is 3, defined by parameter "dfs.replication", so chances are there are 2 safe copies somewhere else in the cluster that applications can read from. There are side effects however...

Hadoop’s speculative execution (SE) algorithm can be negatively influenced by the presence of fast advancing tasks. DataNode failures are one cause of such fast tasks. These fast tasks can indirectly impact the moment when other tasks are speculated. One fast task can severely delay or even preclude subsequent SE's. The reason lies with the statistical nature of Hadoop’s SE algorithm.  The algorithm deems a task slow by comparing its individual progress metric against aggregate progress metrics of similar tasks. Fast advancing tasks can skew these aggregate statistical measures. Also, Hadoop tasks do not share failure information. For scalability and simplicity, each compute task performs failure detection and recovery on its own. The side effect to this is that multiple tasks could be left wasting time discovering a datanode failure that has already been identified by another task.  With EMC Isilon, failure detection is localized and immediate within the OneFS file system.  Offloading the HDFS layer and all namenode and datanode functionality to Isilon results in an immediate increase in resiliency to datanode failure scenarios within a Hadoop cluster.

Moreover, a speculated task may have to re-discover the same failure that hindered the progress of the original task in the first place. To ensure that a speculated task helps improve job running time, failure information needs to be effectively shared between tasks during the run-time of a job, this is simply not the case with Hadoop.  Isilon is much more effective at communicating failures through the shared infiniband network and the integrated SmartConnect feature in Isilon OneFS automates fail handling mechanisms internally without disrupting the external Hadoop work flows. By load balancing namenode and datanode connections internally via Isilon's scale-out architecture, OneFS SmartConnect removes typical network congestion seen in DAS Hadoop deployments during datanode falures as excessive end-host load can lead to TCP connection failures.

In Hadoop, the MapReduce model has a master node that receives user jobs, then breaks them into a number of tasks and delegates (maps) the tasks to other slave computing nodes. The master also assigns other nodes to merge the pieces of completed computations of the task from the mappers back into integrated pieces (and eventually into the completed job).  These slave nodes are called reducers. MapReduce is highly flexible in that it can work on large tasks that require large amounts of mappers and reducers just as easily as it can work on small tasks that only require very limited computing resources.  Mapreduce is also scalable in that it can process a small number of user jobs as well as a huge number of user jobs by dispatching the actual computation onto thousands, even millions using standard compute nodes, but MapReduce is not fault-tolerant for jobs, it's fault-tolerance stops at the task level in that if a mapper or reducer fails, then the master can still complete the job by re-delegating the task of the failed node to another node.

From an operational perspective, dealing with data node failures incurs administrative work.  The fix and work around for the above error log requires the replacement of any failed disks associated with /data2 volume and recreate the data directory structor as defined by dfs.datanode.data.dir.

  1. replace failed hardware
  2. restore data volume using OS utilities to recreate the file system and mount.
  3. mkdir /data2/dfs
  4. chown hdfs:hadoop /data2/dfs
  5. service hadoop-hdfs-datanode start

Workaround 1: You can increase the dfs.datanode.failed.volumes.tolerated parameter to 1 and start the datanode service.   This will prevent the datanode from shutting down when a single data volume fails.

NOTE:  It is not recommended to increase this value if you have a datanode with 4 or less volumes or if the hardware is not being monitored for disk drive failures.  You may experience data loss if you have individual volume failures that are spread across multiple datanodes and no alerts in place to detect failed data volumes.

  1. Parameter change:<property>
      <name>dfs.datanode.failed.volumes.tolerated</name> <value>1</value>
  2. service hadoop-hdfs-datanode start

Workaround 2:

  1. Remove the faulted data volume form datanode configuration
    Change From:
      <name>dfs.datanode.data.dir</name> <value>/data1/dfs,/data2/dfs,/data3/dfs</value>

    Change To:
      <name>dfs.datanode.data.dir</name> <value>/data1/dfs,/data3/dfs</value>
  2. Service hadoop-hdfs-datanode start
  3. Verify dfs is healthy with "sudo -u hdfs hdfs dfsadmin -report"

With EMC Isilon, namenode and datanode functionality is completely centralized and the scale-out architecture and built-in efficiency of OneFS greatly alleviates many of the namenode and datanode problems seen with DAS Hadoop deployments during failures.  The slides below highlight just some of the benefits EMC Isilon provides over DAS for some Hadoop failure scenarios. 






As a side note, there are failure scenarios outside the HDFS layer within the Hadoop cluster that Isilon doesn't address as those processes would exist entirely external to Isilon.  For example, let's inspect the consequence of failures in the Hadoop MapReduce architecture.

As mentioned earlier, if a task fails at a salve node, MapReduce can take care of this failure by initiating another task, possibly at another node. However, if the master physical node crashes (less likely), or the JobTracker/ApplicationManager fails (more likely), the status of a running job at the master node is gone, even if tasks are still running on slave nodes. As a result, the user loses track of his/her job. Through YARN, the ResourceManager starts another instance of the user job from scratch. If a user job requires extensive computation e.g. climate simulation, returning the job takes a significant amount of resources and time. The problem becomes even worse if the ResouraceManager fails because no more user requests can can be accepted and the management of the cluster resources gets out of control.

So, unlike what many people think, there are single points of failure in Hadoop, the Hadoop MapReduce Engine is one example. 

I hope this was a good read for you.


Boni Bruno, Principal Solutions Architect

EMC | Emerging Technologies Division

0 Kudos