Following up on my earlier post on some of the configuration and optimization techniques for HIVE-TEZ , this document describes few points on how to optimize hive queries/hive performance tuning. If we don’t fine tune hive properly, then even for select queries on smaller table will take few minutes to emit results. Because of this reason hive is mainly limited to OLAP features only. When instant results expected then hive is not suitable but by following some of the underlined best practices, we can improve the query performance at least by 50%.
When Tez executes a query, it initially determines the number of reducers it needs and automatically adjusts as needed based on the number of bytes processed.
We can use parameter mapred.reduce.tasks. By default, it is set to -1, which lets Tez automatically determine the number of reducers (Recommended)
It is better let Tez determine this and make the proper changes within its framework, instead of using the brute force method.
set mapred.reduce.tasks = 38; (Not Recommended)
set mapred.reduce.tasks = -1; (Recommended)
We can generate statistics using "ANALYZE TABLE .. COMPUTE STATISTICS" statement in order to observe how many Mappers and Reducers are being created by TEZ. This statistic will help to identify if the mapper completes quickly or if any execution stuck at the reducer end.
We can turn CBO and Vectorization ON and execute the EXPLAIN plan to observe the amount of data funnel through the reducers.
If we are not setting mapred.reduce.task to -1 as stated above, we need to calculate the number of reducer to be configured. The formula for the calculation is
Max(1, Min(hive.exec.reducers.max [1099], ReducerStage estimate/hive.exec.reducers.byte.per.reducer)) x hive.tez.max.partition.factor [2]
TEZ uses the above formula to properly define the number of reducer with the -1 value before scheduling the TEZ DAG.
This optimizes "select statement with where clause" on ORC tables.
set hive.optimize.index.filter=true;
This optimizes "select statement with limit clause;" to run < 1 second
set hive.fetch.task.conversion=more;
This optimizes "select count (1) from table;" to run in ~1 second
set hive.compute.query.using.stats=true;
In Hiveserver2, to improve performance by turning off some isolation & sharing sessions between JDBC queries, the tez setting for multi-tenancy is
tez.am.container.session.delay-allocation-millis=1000
This means that a container idling for more than 1 second will be killed. This is ideal for re-use within a query, but will free up resources between queries. This has been fairly good for multi-tenancy and keeps reuse working within long-running queries. But for a single query perf run, you can set this to 2 minutes for most queries to reuse containers from a previous query in the same session.
Vectorized query execution improves performance of operations like scans, aggregations, filters and joins, by performing them in batches of 1024 rows at once instead of single row each time.
Introduced in Hive 0.13, this feature significantly improves query execution time, and is easily enabled with two parameters settings:
set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true;
Hive optimizes each query’s logical and physical execution plan before submitting for final execution. These optimizations are not based on the cost of the query.
A recent addition to Hive, Cost-based optimization, performs further optimizations based on query cost, resulting in potentially different decisions: how to order joins, which type of join to perform, degree of parallelism and others.
To use cost-based optimization (also known as CBO), we need to set the following parameters at the beginning of your query:
set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;
Now we can prepare the data for CBO by running Hive’s “analyze” command to collect various statistics on the tables for which we want to use CBO.
analyze table table_name compute statistics;
Example:
analyze table tweets compute statistics for columns sender, topic;
With HIVE 0.14 (on HDP 2.2) the analyze command works much faster, and we don’t need to specify each column, so we can just issue:
analyze table tweets compute statistics for columns;
Now executing a query using this table should result in a different execution plan that is faster because of the cost calculation and different execution plan created by Hive.
Queues are the primary method used to manage multiple workloads. Queues can provide workload isolation and can guarantee that capacity is available for different workloads. Queues can also support meeting Service Level Agreements (SLAs) for different workloads. Within each queue, you can allow one or more sessions to live simultaneously. Sessions cooperatively share the resources of the queue.
For example, if you have a queue that is assigned 10% of cluster resources, those cluster resources can be allocated anywhere in the cluster, depending on the query and data placement. Where resources are allocated might change as more queries run.
Using map joins is very efficient because one table is held in memory as a hash map on every node and the larger fact table is streamed. This minimizes data movement, resulting in very fast joins. However, there must be enough memory for the in-memory table so you must set more memory for a Tez container with the following settings in hive-site.xml:
Set the Tez container size to be a larger multiple of the YARN container size (4GB):
SET hive.tez.container.size=4096MB
Set how much of this memory can be used for tables stored as the hash map (one-third of the Tez container size is recommended):
SET hive.auto.convert.join.noconditionaltask.size=1370MB
Note: The size is shown in bytes in the hive-site.xml file, but set in MB with Ambari.
If we find that you are not getting map joins, we need to check the size of your Tez containers in relation to YARN containers. The size of Tez containers must be a multiple of the YARN container size. For example, if our YARN containers are set to 2GB, set Tez container size to 4GB. Then run the EXPLAIN command with our query to view the query execution plan to make sure we are getting map joins instead of shuffle joins. Keep in mind that if our Tez containers are too large, the space is wasted. Also, do not configure more than one processor per Tez container to limit the size of our largest container. As an example, if you have 16 processors and 64GB of memory, configure one Tez container per processor and set their size to 4GB and no larger.
For data intensive workloads, I/O operation and network data transfer will take considerable time to complete. By enabling compression in hive, we can improve the performance hive queries and as well as save the storage space on HDFS cluster. Compression can be enabled at various stages like on intermediate data, final output and at time of table creation.
Global sorting in hive can be achieved with ORDER BY clause but this comes with a drawback. ORDER BY produced a result by setting the number of reducers to one, making a very inefficient for large datasets. When a global sorted result is not required, then we can use SORT BY clause. SORT BY produces a sorted file per reducer. If we need to control which reducer a particular rows goes to, we can use DISTRIBUTE BY clause.
For example
SELECT id, name, salary, dept FROM employee
DISTRIBUTE BY dept
SORT BY id ASC, name DESC;
Here dept will be processed separately by a reducer and the records will be sorted by id and name fields with in each dept separately.
By default LIMIT operator executes the entire query, then returns a limited results. Ideally this behavior is wasteful. It can be avoided by setting below properties
<property>
<name>hive.limit.optimize.enable</name>
<value>true</value>
<description>Whether to enable to optimization to trying a smaller subset of data for simple LIMIT first.</description>
</property>
<property>
<name>hive.limit.row.max.size</name>
<value>100000</value>
<description>When trying a smaller subset of data for simple LIMIT, how much size we need to guarantee each row to have at least.</description>
</property>
<property>
<name>hive.limit.optimize.limit.file</name>
<value>10</value>
<description>When trying a smaller subset of data for simple LIMIT, maximum number of files we can sample.</description>
</property>
<property>
<name>hive.limit.optimize.fetch.max</name>
<value>50000</value>
<description>Maximum number of rows allowed for a smaller subset of data for simple LIMIT, if it is a fetch query. Insert queries are not restricted by this limit.</description>
</property>
Hive converts a query into one or more stages. Stages could be map reduce stage, merge stage, a limit stage etc. By default hive executes these stages one at a time. A particular job may consists of some stages that are not dependednt on each other and could be executed in parallel. This helps the overall job to complete more quickly. Parallel execution can be enables using below properties.
<property>
<name>hive.exec.parallel</name>
<value>true</value>
<description>Whether to execute jobs in parallel</description>
</property>
<property>
<name>hive.exec.parallel.thread.number</name>
<value>8</value>
<description>How many jobs at most can be executed in parallel</description>
</property>
Using Optimized Record Columnar file format, we can improve the the performance of hive queries very effectively. Below picture depicts the power of ORC file over other format.
We can improve the performance of joins by enabling Auto convert Map join and enabling optimization of skew joins.
Auto Map Join:
Auto map join is very useful when joining a big table with a small table. If we enable this feature, the small table will be saved in the local cache on each node and then joined with the big table in the map phase itself. This fearure provides two advantages
<property>
<name>hive.auto.convert.join</name>
<value>true</value>
<description>Whether Hive enables the optimization about converting common join into mapjoin based on the input file size</description>
</property>
<property>
<name>hive.auto.convert.join.noconditionaltask</name>
<value>true</value>
<description>Whether Hive enables the optimization about converting common join into mapjoin based on the input file size. If this parameter is on, and the sum of size for n-1 of the tables/partitions for a n-way join is smaller than the specified size, the join is directly converted to a mapjoin (there is no conditional task).</description>
</property>
<property>
<name>hive.auto.convert.join.noconditionaltask.size</name>
<value>10000000</value>
<description>If hive.auto.convert.join.noconditionaltask is off, this parameter does not take affect. However, if it is on, and the sum of size for n-1 of the tables/partitions for a n-way join is smaller than this size, the join is directly converted to a mapjoin(there is no conditional task). The default is 10MB</description>
</property>
<property>
<name>hive.auto.convert.join.use.nonstaged</name>
<value>false</value>
<description>For conditional joins, if input stream from a small alias can be directly applied to join operator without filtering or projection, the alias need not to be pre-staged in distributed cache via mapred local task. Currently, this is not working with vectorization or tez execution engine.</description>
</property>
Skew Join
We can enable optimization of skew join i.e. imbalanced joins by setting hive.optimize.skewjoin property to true.
<property>
<name>hive.optimize.skewjoin</name>
<value>true</value>
<description> The algorithm is as follows: At runtime, detect the keys with a large skew. Instead of processing those keys, store them temporarily in an HDFS directory. In a follow-up map-reduce job, process those skewed keys. The same key need not be skewed for all the tables, and so, the follow-up map-reduce job (for the skewed keys) would be much faster, since it would be a map-join.</description>
</property>
<property>
<name>hive.skewjoin.key</name>
<value>100000</value>
<description>Determine if we get a skew key in join. If we see more than the specified number of rows with the same key in join operator, we think the key as a skew join key.</description>
</property>
<property>
<name>hive.skewjoin.mapjoin.map.tasks</name>
<value>10000</value>
<description>Determine the number of map task used in the follow up map join job for a skew join. It should be used together with hive.skewjoin.mapjoin.min.split to perform a fine grained control.</description>
</property>
<property>
<name>hive.skewjoin.mapjoin.min.split</name>
<value>33554432</value>
<description>Determine the number of map task at most used in the follow up map join job for a skew join by specifying the minimum split size. It should be used together with hive.skewjoin.mapjoin.map.tasks to perform a fine grained control.</description>
</property>
“Fair scheduling" policy in YARN is introduced in HDP 2.3. Fair scheduling enables all sessions running within a queue to get equal resources. For example, if there is a query running already in a queue and taking up all of the resources, when the second session with a query is introduced, the sessions eventually end up with equal numbers of resources per session. Initially, there is a delay, but if ten queries are run concurrently most of the time, the resources are divided equally among them.