Posts

Showing posts from August, 2018

Filtering out Nulls and Headers in Scala/Spark

Filtering out Nulls and Headers in Scala/Spark Consider a contact_file in HDFS location  /User/VJ/testfile  which has a null record in a non-null field. Here, the last line has no value in the 'age' field so the requirement is to filter all such lines. id,fname,lname,age,designation 1, amarnath, jaiswal, 61, Businessman 2, prakash, yadav, 30, Developer 3, vishal, jaiswal, 32, Engineer 4, ravi, jaiswal,, Builder Solution: Usage of mapPartitionsWithIndex to drop the 1st iterator for 0th index will filter the header from your input file, while the Usage of != "" on the 4th field will filter out the 3rd line scala> sc.textFile("/User/VJ/testfile"). mapPartitionsWithIndex ((x,y) => if (x==0) y.drop(1) else y).filter(x=>x.split(",")(3) != "" ).take(5).foreach(println) Output: 1, amarnath, jaiswal, 61, Businessman 2, prakash, yadav, 30, Developer 3, vishal, jaiswal, 32, Engineer Thanks, Vishal.

Apache Spark :: aggregateByKey explained :: CCA175 exam topic

Apache Spark :: aggregateByKey explained :: S ample question for Spark Developer's exam (Cloudera/Databricks) Scenario :   Sample Input Tuple 'ordersVJ' is in the form of (ItemId, RevenuePerItemId) as follows: ... .. (10,299.98) (20, 199.99 )  (20,250.0) (20,129.99) (40,49.98) -- Key = 40,  Value  = 49.98 (Input value type is  Float ) (40,299.95) (40,150.0) (40,199.92) (50,299.98) (50,299.95) ... .. Using the  aggregateByKey  Spark RDD API, find the total revenue and maximum revenue per ItemId. Desired output for ItemId 40 will be (40,( 699.85 , 299.95 )) Solution : The RDD way of achieving the result using aggregateByKey is a bit trickier and complex when you compare it with the Spark Dataframe or Spark SQL way of achieving the results. You need to have very good understanding of what is being passed in the function. The aggregateByKey function takes 2 argument. The desired output for ItemId 40 is in a (Key, Value) pair with value

Apache Spark :: Error Resolution :: 'value join is not a member of org.apache.spark.rdd.RDD'

Apache Spark :: Error Resolution :: 'value join is not a member of org.apache.spark.rdd.RDD' ERROR DESCRIPTION Consider 2 Spark RDDs to be joined together.. Say, rdd1.first is in the form of (Int, Int, Float) = (1,957,299.98) while rdd2.first is something like (Int, Int) = (25876,1) where the join is supposed to take place on the 1st field from both the RDDs. scala> rdd1.join(rdd2)  --- results in an error <console>:**: error: value join is not a member of org.apache.spark.rdd.RDD[(Int, Int, Float)] REASON Both the RDDs should be in the form of a Key-Value pair. Here, rdd2 -- being in the form of (1,957,299.98) -- does not obey this rule.. While rdd1 -- which is in the form of (25876,1) -- does. RESOLUTION Convert the output of the 1st RDD from (1,957,299.98) to a Key-Value pair in the form of (1,(957,299.98)) before joining it with rdd2, as shown below: scala> val rdd1KV = rdd1.map(x=>(x.split(",")(1).toInt,(x.split(","