Just Analytics Blog | Performance Management News, Views and Op-ed

HIVE-TEZ SQL Query Optimization Best Practices

Written by Pravat Kumar Sutar | Nov 12, 2016 2:51:44 AM

Introduction

Whie working on my current project for a large bank on a data warehouse and processing engine built using Hive we ran into a number of issues which we did not anticipate initially because of the large data volume.  This document puts down some of our observations and some of the methods we followed to optimize and tune our batch processig.

First of all, TEZ view of Ambari in HDP can be monitored to easily identify which HQL jobs are consuming the most resources and which are the best candidates to optimize. Once we identify those jobs, we can drill down to see exactly how the job is executed and the resources it uses at every step of the way. It also provides graphical view of the query execution plan.

 


This helps the user debug the query for correctness and for tuning the performance. Hive performance optimization is a larger topic on its own and is very specific to the queries you are using. In fact, each query in a query file needs separate performance tuning to get the most robust results. This reference document focus on optimization in hive SQL query itself.

Virtual Columns must be last within the inserted datasets

As we know that using partitions can significantly improve performance if a query has a filter on the partitioned columns, which can prune partition scanning to one or a few partitions that match the filter criteria. Partition pruning occurs directly when a partition key is present in the WHERE clause. Pruning occurs indirectly when the partition key is discovered during query processing. For example, after joining with a dimension table, the partition key might come from the dimension table. Partitioned columns are not written into the main table because they are the same for the entire partition, so they are "virtual columns."

INSERT INTO sale (xdate='2016-03-08', state='CA')

SELECT * FROM staging_table

WHERE xdate='2016-03-08' AND state='CA'

By default, at least one virtual column must be hard coded.

SET hive.exec.dynamic.partition.mode=nonstrict;

SET hive.exec.dynamic.partition=true;

 

INSERT INTO sale (xdate, state)

SELECT * FROM staging_table;

Without the partition key, the data can be loaded using dynamic partitions, but that makes it slower. You can load all partitions in one shot.

INSERT INTO sale (xdate, state='CA')

SELECT id, amount, other columns, xdate, state

FROM staging table WHERE state='CA' 

The virtual columns that are used as partitioning keys must be last. You can use select statement to reorder.

 Count Distinct and distinct in sub query

Considering two cases below and question comes which is faster and recommended!

  1. SELECT COUNT(DISTINCT(col_name)) FROM table_name
  2. SELECT COUNT(*) FROM (SELECT DISTINCT(col) FROM table_name)

There is a performance bottleneck of query defined in #a. The reason is Maps send each value to the reducer and a single reducer counts them all.

 

In the second case, Maps splits up the values to many reducers. Each reducer generates its list and the final job counts the size of each list.

 

Hence singleton reduces are almost always not recommended!

 

Using OLAP functionality (OVER and RANK) instead of Join wherever possible

 Taking another example where we can use Hive’s OLAP functionality instead of JOIN to add a better performance gain.

CREATE TABLE click_event (

time_stamps date,

session_ID string,

url string,

source_IP string)

STORED as ORC tblproperties (“orc.compress” = “SNAPPY”);

 Here each record represents a click event, and we would like to find the latest URL for each sessionID. One might consider the following approach:

SELECT click_event.* FROM click_event inner join

(select session_ID, max(time_stamps) as max_ts from click_event group by session_ID) latest

ON click_event.session_ID = latest.session_ID

AND click_event.time_stamps = latest.max_ts;

In the above query, we build a sub-query to collect the timestamp of the latest event in each session, and then use an inner join to filter out the rest. While the query is a reasonable solution—from a functional point of view—it turns out there’s a better way to re-write this query as follows:

SELECT * FROM

(SELECT *, RANK() over (partition by session_ID, order by time_stamps DESC) as rank FROM click_event) ranked_clicks

WHERE ranked_clicks.rank=1;

 

Here we use Hive’s OLAP functionality (OVER and RANK) to achieve the same thing, but without a Join. So it is clear that removing an unnecessary join can always result a better performance. We can look carefully at every query and consider if a rewrite can make it better and faster!

Using SORT BY and DISTRIBUTE BY instead of ORDER BY wherever possible

ORDER BY takes only single reducer to process the data which may take an unacceptably long time to execute for longer data sets.

Hive provides an alternative, SORT BY, that orders the data only within each reducer and performs a local ordering where each reducer’s output will be sorted. Better performance is traded for total ordering.

In both cases, the syntax differs only by the use of the ORDER or SORT keyword. We can specify any columns you wish and specify whether or not the columns are ascending using the ASC keyword (the default) or descending using the DESC keyword.

 

Here is an example using ORDER BY:

SELECT s.year_month_date, s.symbol, s.price_close

FROM stocks s

ORDER BY s.year_month_date ASC, s.symbol DESC;

 

Here is the same example using SORT BY instead:

SELECT s.year_month_date, s.symbol, s.price_close

FROM stocks s

SORT BY s.year_month_date ASC, s.symbol DESC;

 

The two queries look almost identical, but in the second case, if more than one reducer is invoked, the output will be sorted differently. While each reducer’s output files will be sorted, the data will probably overlap with the output of other reducers.

 

Here Hive provides DISTRIBUTE BY with SORT BY which controls how map output is divided among reduces.

The idea is like all data that flows through a MapReduce job is organized into key-value pairs and Hive must use this feature internally when it converts your queries to MapReduce jobs. By default, MapReduce computes a hash on the keys output by mappers and tries to evenly distribute the key-value pairs among the available reducers using the hash values.

SELECT s.year_month_date, s.symbol, s.price_close

FROM stocks s

DISTRIBUTE BY s.symbol

SORT BY s.symbol ASC, s.year_month_date ASC;

DISTRIBUTE BY works similar to GROUP BY in the sense that it controls how reducers receive rows for processing, while SORT BY controls the sorting of data inside the reducer.

HAVING clause for filtering the rows not for other purpose

Basically HAVING clause is used to filter the rows after all the rows are selected. It is just like a filter. We should avoid HAVING clause for any other purposes. Considering below example:

Use: 

SELECT cellName, count(cellName)

FROM internal_channel_switching

WHERE cellName != ‘1059’

AND cellName != ‘5730’

GROUP BY cellName;

Instead of:

SELECT cellName, count(cellName)

FROM internal_channel_switching

GROUP BY cellName

HAVING cellName != ‘1059’ AND cellName != ‘5730’;

Minimizing number of subquery blocks in the query 

Sometimes you may have more than one subqueries in your main query. Try to minimize the number of subquery block in your query.  Considering below example:

SELECT name

FROM employee

WHERE (salary, age) = (SELECT MAX (salary), MAX (age)

FROM employee_details) AND emp_dept = ‘Computer Science;

instead of

SELECT name

FROM employee

WHERE salary = (SELECT MAX(salary) FROM employee_details)

AND age = (SELECT MAX(age) FROM employee_details) AND emp_dept = ‘Computer Science’;

Use operator EXISTS, IN and table joins appropriately in your query

  • Usually IN has the slowest performance.
  • IN is efficient when most of the filter criteria is in the sub-query.
  • EXISTS is efficient when most of the filter criteria is in the main query.

Considering below example

Use:

Select * from product p

where EXISTS (select * from order_items o

where o.product_id = p.product_id)

Instead of:

Select * from product p

where product_id IN (select product_id from order_items)

Giving importance to the conditions in WHERE clause 

Recommended

SELECT emp_id, first_name, salary FROM employee WHERE salary > 50000;

Not recommended

SELECT emp_id, first_name, salary FROM employee WHERE salary != 50000;

 

Recommended

SELECT emp_id, first_name, salary

FROM employee

WHERE first_name LIKE 'Pravat%';

Not recommended

SELECT emp_id, first_name, salary

FROM employee

WHERE SUBSTR(first_name,1,3) = 'Pra';

Recommended

SELECT emp_id, first_name, salary

FROM employee

WHERE first_name LIKE NVL ( :name, '%');

Not Recommended

SELECT emp_id, first_name, salary

FROM employee

WHERE first_name = NVL ( :name, first_name);

Recommended

SELECT product_id, product_name

FROM product

WHERE unit_price BETWEEN MAX(unit_price) and MIN(unit_price)

Not Recommended

SELECT product_id, product_name

FROM product

WHERE unit_price >= MAX(unit_price)

and unit_price <= MIN(unit_price)

Recommended

SELECT emp_id, first_name, salary

FROM employee WHERE dept = 'ComputerScience'

AND location = 'Singapore';

Not Recommended

SELECT emp_id, first_name, salary

FROM employee

WHERE dept || location= 'ComputerScienceSingapore';

Use non-column expression on one side of the query because it will be processed earlier.

Recommended

SELECT emp_id, first_name, salary

FROM employee

WHERE salary < 25000;

Not Recommended

SELECT emp_id, first_name, salary

FROM employee

WHERE salary + 10000 < 35000;

Recommended

SELECT emp_id, first_name, age

FROM employee_details

WHERE age > 35;

Not Recommended

SELECT emp_id, first_name, age

FROM employee_details

WHERE age NOT = 35;

Better to limit the data flow down the queries in HQL scripts 

Basically while executing the HQL script, the sequence of query getting executed from top to bottom and the volume of data that flows each level down is the factor that decides performance. Here we need to ensure that make sure that the data filtration happens on the first few stages rather than bringing unwanted data to bottom. This will give you significant performance numbers as the queries down the lane will have very less data to crunch on. Here we need to understand the requirement or the existing SQL script and design your hive job considering data flow

Conclusion

Improvement of performance or computational cost etc. of a query is important. In MapReduce environment, we can use metadata statistics to further optimize query plan. Column level statistics and table level statistics can be collected for analyzing the execution plan to optimize further.