Significant Parameters in YARN¶
Qubole offers Spark-on-YARN variant and so, the YARN parameters are applicable to both Hadoop 2 and Spark.
The parameters that can be useful in Hadoop 2 (Hive) and Spark configuration are described in the following sub-topics.
All platforms:
- Configuring Job History Compression describes compressing job history to store it in a Cloud location.
- Configuring Job Runtime describes how to configure how long a YARN application can run.
- Enabling Container Packing in Hadoop 2 and Spark describes how to more effectively downscale in Hadoop 2 (Hive) and Spark clusters.
- Configuring Graceful Restart for Resource Manager shows how to configure graceful restart for the Resource Manager so as to avoid losing information from running jobs.
- Understanding YARN Virtual Cores describes the YARN virtual cores.
- Configuring Direct File Output Committer (AWS, Azure) describes configuring a DirectFileOutputCommitter (DFOC) for a MapReduce task in an Hadoop 2 cluster. For DFOC on a Spark cluster, see Improving the Speed of Data Writes Cloud Directories (AWS and Azure).
- Improving Performance of Data Writes shows how to improve the performance of data writes in Hadoop 2 and Spark.
See also:
- An Overview of Heterogeneous Nodes in Clusters explains how to configure heterogeneous nodes in Hadoop 2 (Hive) and Spark clusters.
- Locating Logs.
Currently AWS-only:
- Configuring Timeout to Schedule AMs on Spot Nodes (AWS) describes configuring a timeout for a Resource Manager (RM) to schedule Application Masters (AMs) on AWS Spot nodes.
- Mapping an AWS S3 Bucket to an AWS Region describes mapping AWS S3 buckets to AWS regions on the S3a filesystem.
- Handling Eventual Consistency Failures in Hadoop FileOutputCommitter Jobs (AWS) describes how to handle eventual consistency and increase the speed of data writes in FileOutput Committer jobs.
- Configuring Multipart File Output Committer describes the multipart File Output Committer configuration supported only for Hadoop jobs.
See also:
- Enabling SSE-KMS while using Hadoop DistCp for information on the parameters.
- Configuring EBS Upscaling in AWS Hadoop and Spark Clusters describes how you can use EBS upscaling and save cluster running costs in Hadoop 2 (Hive) and Spark clusters. node_configuration and ebs_upscaling_config provides API parameters related to dynamically increasing the EBS volume storage capacity in Hadoop 2 (Hive) and Spark clusters.
Note
See Composing a Hadoop Job for information on composing a Hadoop job.
Configuring Job History Compression¶
mapreduce.jobhistory.completed.codec specifies the codec to use to compress the job history files while storing them
in a Cloud location. The default value is com.hadoop.compression.lzo.LzopCodec.
Configuring Job Runtime¶
Use yarn.resourcemanager.app.timeout.minutes to configure how many minutes a YARN application can run.
This parameter can prevent a runaway application from keeping the cluster alive unnecessarily.
This is a cluster-level setting; set it in the Override Hadoop Configuration Variables field under the Advanced Configuration tab of the Clusters page in the QDS UI. See Advanced configuration: Modifying Hadoop Cluster Settings for more information.
The Resource Manager kills a YARN application if it runs longer than the configured timeout.
Setting this parameter to -1 means that the application never times out.
Enabling Container Packing in Hadoop 2 and Spark¶
Qubole allows you to pack containers in Hadoop 2 (Hive) and Spark. You must enable this feature; it is disabled by default. When enabled, container packing causes the scheduler to pack containers on a subset of nodes instead of distributing them across all the nodes of the cluster. This increases the probability of some nodes remaining unused; these nodes become eligible for downscaling, reducing your cost.
How Container Packing Works¶
Packing works by separating nodes into three sets:
- Nodes with no containers (the Low set)
- Nodes with memory utilization greater than the threshold (the High set)
- All other nodes (the Medium set)
When container packing is enabled, YARN schedules each container request in this order: nodes in the Medium set first, nodes in the Low set next, nodes the High set last.
Configuring Container Packing¶
Configure container packing as an Hadoop cluster override in the Override Hadoop Configuration Variables field on the Edit Cluster page. See Managing Clusters for more information. The configuration options are:
To enable container packing, set
yarn.scheduler.fair.continuous-scheduling-packed=true.In clusters smaller than the configured minimum size, containers are distributed across all. This minimum number of nodes is governed by the following parameter:
yarn.scheduler.fair.continuous-scheduling-packed.min.nodes=<value>. Its default value is 5.A node’s memory-utilization threshold percentage, above which Qubole schedules containers on another node, is governed by the following parameter:
yarn.scheduler.fair.continuous-scheduling-packed.high.memory.threshold=<value>. Its default value is 60.This parameter also denotes the threshold above which a node moves to the High set from the Medium set.
Configuring Graceful Restart for Resource Manager¶
The Resource Manager on Hadoop 2 (Hive) clusters can restart gracefully in case of errors, without losing information from running Hadoop 2 jobs. To enable this feature, set the following as a cluster Hadoop override:
yarn.resourcemanager.recovery.enabled=true
Understanding YARN Virtual Cores¶
As of Hadoop 2.4, YARN introduced the concept of vcores (virtual cores). A vcore is a share of host CPU that the YARN Node Manager allocates to available resources.
yarn.scheduler.maximum-allocation-vcores is the maximum allocation for each container request at the Resource Manager,
in terms of virtual CPU cores. Requests higher than this would not get effective and get capped to this value.
The default value for yarn.scheduler.maximum-allocation-vcores in Qubole is set to twice the number of CPUs. This
over subscription assumes that CPUs are not always running a thread, and hence, assigning more cores enables maximum CPU
utilization.
Configuring Direct File Output Committer (AWS, Azure)¶
In general, the final output of a MapReduce job is written to a location in Cloud storage or HDFS, but is first written into a temporary location. The output data is moved from the temporary location to the final location in the task’s commit phase.
When DirectFileOutputCommitter (DFOC) is enabled, the output data is written directly to the final location. In this case, a commit phase is not required. DFOC is a Qubole-specific parameter that is also supported by other big-data vendors. Qubole supports DFOC on Amazon S3n and S3a, and Azure Blob and Data Lake storage.
Note
For DFOC on a Spark cluster, see Improving the Speed of Data Writes Cloud Directories (AWS and Azure).
The pros and cons of setting DFOC are:
Pros:
- Avoids AWS Eventual Consistency issues (Eventual Consistency no longer applies in the AWS us-east1 region).
- Improves performance when data is written to a Cloud location. (DFOC does not have much impact on performance when data is written into a HDFS location, because in HDFS the movement of files from one directory to another directory is very fast.)
Cons:
- DFOC does not perform well in case of failure: in these cases, stale data may be left in the final location and workflows are generally designed to delete the final location. Hence Qubole does not enable DFOC by default. If DFOC is disabled, the abort phase of the task deletes the data in the temporary directory and a retry takes care of data deletion; no stale data is left in the final location.
Enabling DFOC¶
DFOC can be set in the MapReduce APIs mapred and mapreduce as follows:
DFOC in Mapred API:
mapred.output.committer.class=org.apache.hadoop.mapred.DirectFileOutputCommitterDFOC in Mapreduce API:
mapreduce.use.directfileoutputcommitter=true
To set these parameters for a cluster, navigate to the Clusters section of the QDS UI, choose the cluster, and enter both strings in the Override Hadoop Configuraton Variables field under the Advanced Configuration tab. You can also set them at the job level.
Improving Performance of Data Writes¶
To improve the speed of data writes, set the following Qubole options to true:
mapreduce.use.parallelmergepathsfor Hadoop 2 jobsspark.hadoop.mapreduce.use.parallelmergepathsfor Spark jobs with Parquet data.
Handling Eventual Consistency Failures in Hadoop FileOutputCommitter Jobs (AWS)¶
YARN does not honor DFOC when appending Parquet files and thus it is forced to use FileOutputCommitter. To handle
Eventual Consistency errors during FileOutputCommiter data writes to S3, and increase the speed of data writes,
Qubole provides the configurable option mapreduce.use.parallelmergepaths for Hadoop 2 jobs. Set the option
to true and FileOutputCommitter version to 1.
The equivalent parameter to set in Spark jobs with Parquet data is spark.hadoop.mapreduce.use.parallelmergepaths. For
more information, see Handling Eventual Consistency Failures in Spark FileOutputCommitter Jobs (AWS).
Configuring Multipart File Output Committer¶
QDS now supports Multipart File Output Committer (MFOC). For more information, refer to this link.
Currently in QDS, it is available only for Hadoop jobs when the s3a file system is enabled. It is an open-beta feature.
To enable this committer, set the following properties as Hadoop overrides on the cluster configuration UI page.
mapreduce.outputcommitter.factory.class=org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
fs.s3a.committer.name=<directory | partitioned>
fs.s3a.committer.staging.conflict-mode=<fail | append | replace>
mapreduce.fileoutputcommitter.algorithm.version=1
In the above configuration, fs.s3a.committer.name accepts these two values:
directory- Use it when the S3 location is a directory.partitioned- Use it when you have partitioned data on the S3 location.
Note
You must set fs.s3a.committer.staging.conflict-mode when the output directory or partition, already exists
on the S3 location. The default value is fail. You can also choose to set it as append or replace
as required.
As it is a cluster-level property, a cluster restart is required for the updated configuration to be effective.
Note
When you are using MFOC, it is recommended to configure the S3 bucket-level policy to delete pending multipart uploads which are older than 7 days. For more information, see Policy for Aborting Incomplete Multipart Upload.
Here is an S3 bucket-level policy for deleting pending multipart uploads that are older than 7 days.
LifecycleConfiguration={
'Rules': [
{
'ID': 'MultipartUpload',
'Prefix': "",
'Status': 'Enabled',
'AbortIncompleteMultipartUpload': {
'DaysAfterInitiation': 7
}
},
]
}
Advantages of MFOC¶
These are the benefits of MFOC:
- There is an improvement of up to 13% in the overall job performance when writing to S3.
- The intermediate task outputs of a job are not visible on the S3 location until the job completes.
- It works well with Speculation. Unlike DirectFileOutputCommitter, MFOC does not cause job failures when used in conjunction with Speculation.
- It reduces eventual consistency issues because there are no renames in the job and task commit processes.
Configuring Timeout to Schedule AMs on Spot Nodes (AWS)¶
Qubole has introduced yarn.scheduler.qubole.am-on-stable.timeout.ms to set a timeout for scheduling Application Managers (AMs) on spot nodes. The timeout is set in milliseconds. By default, Qubole does not schedule AMs on AWS spot nodes. This is because such nodes can go away at any time and losing the AM of a YARN application can be disastrous. This default is specified by setting the above parameter to -1. The exceptions to this rule are as follows:
- If the timeout option is overridden, then Qubole waits for that much time from the time the AM was requested to try to schedule on an On-Demand node. After the specified timeout expires, Qubole schedules the AM even on a Spot node.
- If the cluster uses 100% Spot instances for autoscaling; and the minimum cluster size is less than 5, Qubole never waits to schedule the AM. Instead, it immediately schedules it on a Spot node if there is available capacity. Again, this can be overridden to wait for some time using the above option.
When you use this parameter to set a timeout, RM tries to schedule AMs on stable nodes first, however, once the timeout is hit and the RM has been unable to schedule the AM, it considers spot nodes as well.
When you set the parameter’s value to 0, RM immediately considers all nodes when trying to schedule the AM.
This parameter can have the values as described in the following table.
| Parameter Values | Description |
|---|---|
| -1 | It is the default value that implies that AMs are not scheduled on volatile spot nodes. |
| 0 | RM schedules AMs on volatile spot nodes whenever possible (after waiting for 0 millisecond). |
| Any other value | RM waits for <value> milliseconds before scheduling an AM on a volatile spot node. |
Mapping an AWS S3 Bucket to an AWS Region¶
Qubole supports mapping an AWS S3 bucket to an AWS Region on Hadoop 2 (Hive) clusters using the S3a filesystem. This feature is useful when Hadoop has to work with AWS S3 buckets in different AWS regions. It can be set as an Hadoop override in the REST API call for a Hadoop 2 (Hive) cluster or in the cluster UI configuration. For information on adding an Hadoop override through the UI, see Advanced configuration: Modifying Hadoop Cluster Settings.
For information on adding an Hadoop override through a REST API call, see hadoop_settings.
The parameter used as an Hadoop Override for configuring S3-bucket-to-AWS-region mapping is fs.s3.awsBucketToRegionMapping.
Here is an example of the S3-bucket-to-AWS-region mapping configuration.
fs.s3.awsBucketToRegionMapping = {"acm-354-test": "s3.ap-northeast-2.amazonaws.com", "acm-920": "s3.ap-south-1.amazonaws.com"}