Apache Spark :: aggregateByKey explained :: CCA175 exam topic




Apache Spark :: aggregateByKey explained :: Sample question for Spark Developer's exam (Cloudera/Databricks)


Scenario:  

Sample Input Tuple 'ordersVJ' is in the form of (ItemId, RevenuePerItemId) as follows:
...
..
(10,299.98)
(20,199.99
(20,250.0)
(20,129.99)
(40,49.98)-- Key = 40, Value = 49.98 (Input value type is Float)
(40,299.95)
(40,150.0)
(40,199.92)
(50,299.98)
(50,299.95)
...
..

Using the aggregateByKey Spark RDD API, find the total revenue and maximum revenue per ItemId.

Desired output for ItemId 40 will be (40,(699.85,299.95))


Solution:


The RDD way of achieving the result using aggregateByKey is a bit trickier and complex when you compare it with the Spark Dataframe or Spark SQL way of achieving the results.

You need to have very good understanding of what is being passed in the function. The aggregateByKey function takes 2 argument.

The desired output for ItemId 40 is in a (Key, Value) pair with value itself being a tuple as shown:
Key = 40
Value = (total_revenue, maximum_revenue)

The 1st argument for aggregateByKey is the Initializer, which will be in form of the desired output (float, float). Thus, the 1st parameter is (0.0,0.0)

The 2nd argument has to be split into 2 steps:

1. Combiner to process input values.. Think of preparing (x,y) of Combiner from the Input Value
2. Reducer to process the value coming in from the above step.


Step 1: Preparing the Combiner (x,y)

Consider x as the initializer (float, float) which is a tuple (0.0, 0.0) as of now.
Consider y to be the 1st Value itself, which is 49.98

In Scala, the 1st element of tuple can be accessed by _1 and the 2nd element can be accessed by _2. You want the 1st Value to be added to the initializer's 1st element. Thus x._1 + y will look like (49.98, 49.98) after 1st iteration, and (49.98 + 299.95, 299.95) in the 2nd iteration

2nd element (supposed to contain the max value of all the amounts) can be accessed by x._2 which is 0.0 as of now but because you want to have the maximum value between the initializer's value and the incoming value, you'll have to have the if condition in place as shown below.


Step 2: Preparing the Reducer

Reducer has to process the values coming in from the Combiner.

scala> val totRevAndMaxRevPerItemId = ordersVJ.aggregateByKey((0.0,0.0))(
     | (x,y) => (x._1 + y, if (x._2 > y) x._2 else y),
     | (x,y) => (x._1 + y._1, if (y._1 > y._2) y._1 else y._2)
     | )


Validation of results:

scala> ordersVJ.filter(x=>x._1 == 40).take(4).foreach(println)
(40,49.98)
(40,299.95)
(40,150.0)
(40,199.92)

scala> totRevAndMaxRevPerItemId.filter(x=>x._1 == 40).collect()
res11: Array[(Int, (Double, Double))] = Array((40,(699.8500099182129,299.95001220703125)))



Thanks,
Vishal.

Comments

Popular posts from this blog

Automated bash script to export all Hive DDLs from an existing environment at one go!

Apache Spark :: Error Resolution :: 'value join is not a member of org.apache.spark.rdd.RDD'

Filtering out Nulls and Headers in Scala/Spark