Reading time: 18 minutes

How to Optimize Nested Queries using Apache Spark

Spark has a great query optimization capability that can significantly improve the execution time of queries and ensure cost reduction. However, when it comes to nested queries, there is a need to further advance optimization techniques in Spark. Nested queries typically include multiple dimensions and metrics such as “OR” and “AND” operations along with millions of values that need to be filtered out.

For instance, when Spark is executed with a query of, say 100 filters, with nested operations between them, the execution process slows down. Since there are multiple dimensions involved, it takes a huge amount of time for the optimization process. Depending upon the cluster size, it may take hours to process a dataFrame with those filters.

In this blog, we will walk you through common challenges faced while working with nested filter queries and how we were able to solve it using logical tree optimization. We found that upon tweaking some of the codes and applying a common algorithm, we were able to reduce the total cost and execution time of the query.

We could further optimize the logical plan and send processed data to apply filters, thus reducing the total load on processing. The logical query is optimized in such a way that there’s always a predicate pushdown for optimal execution of the next part of the query. We used Apache Spark with scala API for this use case.

Problem Statement

The user needed to filter out all values from the millions of single columns of records from a file. For this filter file, the supported filters are “include” and “exclude”. The user can select multiple dimensions for applying filters and can combine them with a filter operator which is either “AND” or “OR” or both. So, the user could now create a nested filter expression containing multiple dimensions with both “OR” and “AND” operations between them.

This requirement was to be implemented in our product SigView. For this use case, we developed a feature called Bulk Filter (as it includes “OR” operation between filter dimensions). All the filters inside the Bulk Filter are rearranged on the basis of cost, forming an optimized group of filters for processing.

Optimization Plan Description

    1. Filter Expression
      1. Each filter expression is a tree that is composed of nodes
      2. Each node is immutable and consists of either:
        1. Filter expression having dimensions and values with filter operations called filter nodes
        2. Or combinations of filter nodes with an operator to be applied between them (operator is either “OR” or “AND”) called nested nodes
        3. Filter nodes are called as ProjectionBulkFilter class
        4. Nested Nodes are called the BulkFilter class
    2. Optimization Rules
      1. Every node needs to be executed
      2. Hence, the fewer nodes we have, the faster the execution
      3. Each node will have a df(Data Frame) passed for processing
      4. More the size of df, slower the execution
      5. If a filter is applied on df and there is some amount of predicate pushdown on preprocessing of that df, the size of the df can be minimized which is to be passed to the next node, thus decreasing the total execution time
      6. In a nested query, where each node is nested together with an operator, the nodes can be distributed on the basis of the predicate pushdown. Thus, calculating the cost for each node’s execution and re-arranging in an increasing order of cost. And executing on the same calculated order
      7. This reduces the load on each node, where filtered/processed df is passed after each execution.Thus, maintaining a predicate pushdown with optimizing the logical plan for faster execution
    3. Defining operator coefficients
      1. Lemma
        1. Let us assume that each joins with a filter result in 10% of base data size (90% data is filtered out) (here for filtering millions of values join is used)
        2. A union is “Union ALL” in Spark that basically combines together rows of two source data frames
        3. Based on the above tenets we are defining the following three coefficients
          1. Cost coefficient of each filter Join operation = 0.1
          2. AND: multiply(children) i.e. Multiplication of cost of each child BulkFilter instance
          3. OR: sum(children) * 10 i.e Sum of cost of each child BulkFilter instance * 10
        4. Illustration: For the following Bulk Filter Expressions, the Cost Calculations are below:
            a and (b or c) and (d and e)

            1. AND
              1. a = 0.10
              2. OR =2 (0.20 * 10)
                1. b = 0.1
                2. c = 0.1
              3. AND = 0.01
                1. d = 0.1
                2. e = 0.1
        5. a and (b or c) and (d and e) and (x and (w or y))
          1. a = 0.10
          2. OR =2 (0.20 * 10)
            1. b = 0.1
            2. c = 0.1

Implementation Approach

  1. Rearrange the BulkFilter(BF) object’s instances on the basis of cost
    1. Low-cost instances should be on the left side
    2. High cost should be on the right side
    3. This is a type of predicate pushdown, where we are pushing smaller instances before the larger instances so that the larger instances will have fewer data to read on
    4. This will be done to all nested instances
    5. For eg: we have BF as
      1. (a or b) and c and (d or f)
      2. Since there are two instances of BF and 1 ProjectionBulkFilter(PBF)
      3. As the operator between them is “AND”, we need to re-arrange the instances on the basis of cost
      4. so the arranged form will be:
        1. c and (a or b) and (d or f)
    6. We are arranging them so that the less costly will be processed first and their output will be input for the next instance
    7. In the above case, after arranging, c will be executed then it’s output will be sent to the next instance (a or b) as input df and similarly
    8. Assign the operator coefficients for each operator, which is used to calculate the cost of each BulkFilter node
    9. The cost will be on the basis of no. of nodes and the operator coefficients
  1. Decide the input for the next instance on the basis of operator
    1. On the basis of the operator decide the input for the next instance
    2. In the case of “OR”, we have to send the baseDF to all instances
    3. In the case of “And”, we need to send the processed df to the next instance
    4. For eg: we have below case
      1. c and (a or b) and (d or f)
        1. As the operator is “AND” between them, the c should be processed first
        2. Then it’s output is sent to (a or b) for processing a,b and (a or b)
        3. Now, after (a or b) is processed, the output of (a or b) will be sent to (d or f)
        4. The output of (d or f) will be the output for the above instance
      2. c or (a or b) or (d or f)
        1. The baseDF is passed to each instance as input and is reduced with OR between them

Class Creation

  1. Data Type
    1. Filter Expression
      1. Filter nodes are called as ProjectionBulkFilter class.
      2. Nested Nodes are called the BulkFilter class.
    2. Classes and Object
        1. Scala Class for Filter expressions
          1. case class ProjectionBulkFilter(key: Column,fileLocation: String,operation: String) – PBF
            1. It’s the lowest level of Filter Expression. The supported operations are:
              1. Include – Inner Join
              2. Exclude – Anti-left
          2. case class BulkFilter(operator: String, instance : List[Either[ProjectionBulkFilter, BulkFilter]]) – BF
            1. It contains an operator and a list of either ProjectionBulkFilter or Itself
            2. The supported operator are: AND & OR
        2. Helper classes for calculating cost, and sorting on the basis of it
        3. case class ProjectionBulkFilterWithPoints(key: Column,fileLocation: String,operation: String,points:Double) – PBFWP
          1. It’s similar to ProjectionBulkFilter except it contains an extra class variable point which is the cost of it
        4. case class BulkFilterWithPoints(operator: String, instance : List[Any],points:Double) – BFWP
          1. It’s similar to BulkFilter except it contains an extra class variable point which is the cost of it

Implementation Strategies

  1. Getting cost for BF
    1. Strategies: bottom-up and level-order
      1. Traverse through the instance of the BF
        1. If it’s a PBF, return PBFWP with 0.1 points
        2. If it’s a BF
          1. Recursively run this method for BF
          2. Return a BFWP with points, which is the output from below “reduce” method
  2. Reduce method to get points of each BF
    1. Traverse through the instance of the BF
    2. If it’s a PBF, return 0.1.
    3. If it’s a BF, return reduce(BF)
    4. After assigning the points, reduce by operator coefficients:
      1. If “OR” : (sum of the cost of all level nodes) *10
      2. If “AND” : (product of the cost of each node in the same level)
  3. Sort on the basis of Cost
    1. Strategies: Bottom-up and level-order
      1. After assigning cost for each node, traverse through the node and apply sortBy for its instance(level)
      2. The sortBy has points as key to sort
      3. Whichever node on the same level has the lowest points will be on the left side and the node having the highest points will be on the right side (sorted in ascending order)
      4. The sort occurs for each nested instance of BF
      5. When all levels are sorted from bottom-up, the top level is again sorted in a similar way
  4. Applying and reducing BF
    1. Strategies: Bottom-up and level-order
      1. Traverse through the graph and for each node do as below
      2. If the node is a PBFWP, reduce it by applying applyBulkFilterForEachDimension method
        1. If the operator for this level is And, pass the output of this node to the next node as an input data frame
      3. If the node is a BFWP, reduce recursively for its child nodes.
        1. If the operator for this level is And, pass the output of this node to the next node as the input data frame
      4. Finally, reduce on the basis of the operator:
        1. If the operator is OR: apply Union and dropDuplicates on all data frames reduced from its child nodes
        2. If the operator is And: return the last df on the reduced list of DF

Illustrations

  1. Getting cost for BF
    1. Suppose we have a DataFrame for cars called carDF
    2. And have four filters A,B,C and D as:
      1. A -> cars of color (green,blue,white,grey,silver,gold,yellow)
      2. B -> cars of color (blue,green,silver)
      3. C -> cars of brand (BMW,Hyundia,Tata)
      4. D -> cars of region (ASIA,NA,EUROPE)
    3. And have nested query as:
      1. (D) or ((C) and (A and B))
      2. The Parse tree will be as :
      3. So, if we apply a filter in baseDF “carDF” without predicate pushdown, the carDF will be passed as input df to all leaf nodes/filters. As shown below:
      4. (carDF ^ D) OR ((carDF^C) AND ((carDF^A) AND (carDF^B)))
      5. Here, ^ : stands denotes filter
    4. As we can see from above, there’s a chance of optimization for each leaf, if the operator is “AND” we can pass processed df as an input to the next leaf, else the baseDF if the operator is “OR”
    5. So, for the above case if we apply the given algorithm and cost optimization, the nested query will be changed as:
      1. (((A and B) and C) or D)
      2. Now, A and B can be processed and its output can be passed as input to C
      3. As, if we visualize the tree there are two nodes from the parent, to which baseDF can be passed as input
      4. So, for D filter operation will be -> carDF ^ D
      5. For ((A and B) and C), the baseDF will be passed to its child as input, for the operator is “AND”, until we get a leaf node.
      6. Now, A will be processed -> carDF ^ A = carDFfa (filtered with A)
      7. carDFfa will be passed as input to B -> carDFfa ^ B = carDFfab
      8. carDFfab will be passed as input to C -> carDFfab ^ C = carDFfabc
      9. So, as we can see some predicate pushdown where only a portion of baseDF is passed for filters thus reducing the total load on execution
    6. So, the output for the whole tree will be :
      1. carDFfabc OR D -> (carDFabc union (carDF ^ D) = carDFabcd (union as operator is OR)
    7. Suppose, we have nested query as :
      1. ((B OR C) AND A)
      2. After, optimization the query will be changed to :
        1. ((A) and (B or C))
        2. The tree will be:
        3. So, baseDF will be passed to A. So the output will: carDF ^ A -> carDFfa
        4. As, operator between (A) and (B OR C) is “AND”, So the output of A will be passed as input to the tree (B OR C). Thus, reducing the total size of data to be processed for the next node
        5. So, carDFfa is passed to both leaf nodes of B and C as input DF

Conclusion

By implementing above strategies, we could optimize the logical plan of the nested query. This technique rearranges the filter expressions in an optimized logical query applying all predicate pushdown, thus decreasing the total execution time and cost of execution. Using this approach, the nested queries are processed faster while taking less computation time and resources.

About the Author

Pravin Mehta is a Data Engineer at Sigmoid. He is passionate about solving problems using big data technologies,open source and cloud services, and he has keen interest in Apache spark and its optimization.