Sunday, March 20, 2016

Interesting Q&A while working with Apache Spark & Big Data



Here I am trying to draft interesting concepts, problems while working through Apache Spark with Massive dataset. The intention is to have a single point of source if any specific interesting problem encountered and not all the solutions are provided by me.
I will try to update the Q&A ...


Why reduceByKey is better than grouByKey?


In reduceByKey, data is already combined so each partition output at most one value for each key to send over network





In groupByKey, all the data is sent over the notebook and collected on the reduce workers which makes it slow.





So reduceByKey, aggregateByKey, foldByKey and combineByKey preferred over groupByKey









Problem joining a large RDD to a Small RDD ?



- This will lead to limited parallelism because based on query , large RDD will be shuffled to limited keys


Broadcast the small RDD to all worker nodes. Then parallelism of large RDD will be maintained and no shuffle will be required.



Example:


Consider 2 RDDs:

val rdd1:RDD[(T,U)]
val rdd2:RDD[((T,W), V)]
Output: rdd_joined:RDD[((T,W) , (U,V))]

Solution:



val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C")))
val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((2, "Y"), 222), ((3, "X"), 333)))


val rdd1Broadcast = sc.broadcast(rdd1.collectAsMap())
val joined = rdd2.mapPartitions({ iter =>
  val m = rdd1Broadcast.value
  for {
    ((t, w), u) <- iter
    if m.contains(t)
  } yield ((t, w), (u, m.get(t).get))
}, preservesPartitioning = true)

preservesPartitioning tells that this map function doesn't modify the keys of rdd2 and spark will avoid any re-partitioning of rdd2








Driver vs Workers?


The main program is executed on the Spark Driver

Transformations are executed on the Spark Workers

Actions may transfer data from the Workers to the Drivers.








What does Spark collect do ?


collect() send all the partitions to the single driver , so collects on a large RDD can trigger a Out of Memory error







Don't call collect() on a large RDD.

Easiest option to choose actions that return a bounded output per partition, such as count() or take(n)


Similarly if you want to store a spark output as a single file, you can do that by saving collect() output as a file or use coalesce(1,true).saveAsTextFile 





How to write a Large RDD to a Database ?


Initialize the database connection on the Worker rather than Driver .

Use foreachPartition to reuse the connection between elements









What is the way to iterate over Large RDD when memory is less ?


Consider a scenario when you need to iterate over 20 GB of data on a single node cluster or in your laptop, so you memory will be around 8GB but you want to read the data.



val rdd
rdd.toLocalIterator.forEach(println)





Apache Spark Serialization problem?



https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory-leaks-no-ws.md






Performance implication of pySpark vs Scala in apache Spark.



  • Ideally, every data comes and go from through Python has to be passed through Socket and JVM Executor, so its actually an overhead and some cost
  • Python is Process Based Executor.Scala is Executor based.  So each python process runs in its own process so it makes it creates a strong isolation for python code but a higher memory.
  • PySpark configuration provides spark.python.worker.reuse the option which can be used to choose between forking Python process for each task and reusing existing process. The latter option seems to be useful to avoid expensive garbage collecting (it is more an impression than a result of systematic tests), the former one (default) is optimal for in case of expensive broadcasts and imports


For MLib, python code just converts everything to scala and then it further scala process, so MLib is only taking cost while converting py - scala.






Define partitioning for Spark Dataframe ?



in Spark 1.6, its very easy and straightforward

val df = sc.parallelize(Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
)).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain






What is good about Broadcast in Spark?



Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.


Ideally, if you have a huge array to be used by all the tasks in a process then this array will be shipped to each Spark node with closure.So if you have 10 nodes with 10 partitions each so in total your array will be distributed at least 100 times (10 * 10).


But broadcasting a variable will distribute the same array once per node using p2p protocol. So it gives huge performance benefit.



val broadcasted = sc.broadcast(array)
rdd.map(i => broadcasted.value.contains(i))

Broadcast data is in serialized form, so you avoid deserializing cost as well so you avoid deserializing cost as well. Broadcast communication happens via Driver.


When will the Broadcast  slower than Shuffle?




Its pure math and logical.

Consider you have 10 Executors and 2 RDDS where 
RDD_1 = 2000mb
RDD_2 = 400mb

Normal Join with Shuffle

Consider the scenario where there is no partition over the data. So normally 1/10th of data will already be in the correct executor.


 Executor_1 - 200mb(RDD_1) + 40mb(RDD_2)  
 Data to Shuffle = (2000 - 200) + (400-40) = 2160mb  

So ideally, you need to shuffle 2160mb of data.


So usually we will broadcast the RDD_2, so now RDD_2 will be copied to each of the 10 Executors so in total 400 * 10 = 4GB of data needed to transfer. Now all the communication will happen via Driver so ideally you need to move quite an amount of data with the driver so cost associated.






Why and How to use HashPartitioner ?




 val rdd = sc.parallelize(for {  
    | x <- 1 to 3  
    | y <- 1 to 2  
    | } yield (x, None),8)  
 rdd.collect  
 res0: Array[(Int, None.type)] = Array((1,None), (1,None), (2,None), (2,None), (3,None), (3,None))  
 rdd.partitioner  
 res3: Option[org.apache.spark.Partitioner] = None  
 import org.apache.spark.HashPartitioner  
 val rdd1 = rdd.partitionBy(new HashPartitioner(4))  
 def countPartitions(rdd: RDD[(Int,None.type)]) = {  
    | rdd.mapPartitions(iter => Iterator(iter.length))}  
 countPartitions(rdd1).collect  
 res11: Array[Int] = Array(0, 2, 2, 2)  

Java arrays have hashCodes that are based on the arrays' identities rather than their contents, so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will produce an unexpected or incorrect result.




  • HashPartitioner is based on hashCode of keys
  • if distribution of keys is not uniform you can end up in situations when part of your cluster is idle






S3 vs HDFS vs Cassandra

S3 – Non urgent batch jobs.
Cassandra – Perfect for streaming data analysis and an overkill for batch jobs.
HDFS – Great fit for batch jobs without compromising on data locality.







Why use Apache Kafka with Spark Streaming?

Generally, Apache Kafka is being used while using Streaming because to reduce the chance of data loss. There is a probability that receiver goes down but Kafka ensures no data loss.

If the setup is in hadoop cluster and process is happening over stream then we can avoid Apache Kafka.





Why completed jobs are not releasing memory in Spark Standalone mode ?



Because Spark automatically won't cleanup the jars transferred in the worker node.
In Spark Standalone , you need to set these options-
spark.worker.cleanup.enabled : true
spark.worker.cleanup.interval