Tuesday, August 23, 2016

Few points On Apache Spark 2.0 Streaming Over cluster

Apache Spark 2.0 Streaming Over cluster and Learning

Apache Spark streaming documentation has enough details about its basic, how to use and other details.

Setup Apache Spark in Cluster

 Very basic
 MasterNode – master
Slaves – salve1,slave2,slave3

In each of the node-

Spark-env.sh-

export HADOOP_CONF_DIR=$HADOOP_CONF_DIR
export SPARK_MASTER_IP=master
export JAVA_HOME=/home/user/jdk1.8.0_91
export PYSPARK_PYTHON=PYTHON_PATH

Spark-sefaults.conf

spark.master                     spark://master:7077
 spark.eventLog.enabled           true
spark.eventLog.dir              file:///home/home/spark-events
spark.history.fs.logDirectory   file:///home/home/spark-events
spark.shuffle.service.enabled     true

slaves

slave1
slave2
slave3

Now sbin/start-all.sh


What is the way to start Spark Streaming in 2.0 ?


SparkSession
.builder
.config(sparkConf)
.getOrCreate()


val ssc = new StreamingContext(conf, Seconds(windowtime.toInt))


Spark 2.0 has brought a Builder pattern to start any Spark component.

How to calculate Window Length or Right batch Interval?


For stable Spark Streaming, the processing of data must be done within the window or queue will grow and excessive queue size will crash the job. It happened to me several times :-)

There are several things to be considered and few of them –

If possible, data rate can be slow down. If the processing job cannot be negotiated, try with lower data rate.

Increase the Window size. If your application can accept a larger window, then intelligent programming and larger window will make the streaming very stable.  Intelligent programming means using the entire power/tricks of Spark, like caching as much as possible, avoid unbalanced join, re-partition and coalesce based in your need.
Multiple write operation during streaming is sometimes mandatory but resource inefficient, so consider partition/re-partitioning/coalesce shuffle block size and other tricks to optimize that.

Introduce a cool-off period. After every interval of X time, stop the data flow to clear the existing queue of streaming job and then reset the data rate again.

Check Spark Configuration and use them properly. Like using CMS Garbage collector, spark memory fraction, spark.shuffle.file.buffer, spark.shuffle.spill.compress, spark.memory.offHeap.size and others.



Few Things to consider

When running spark streaming job for long time, you might get an error of No Space found, but that may be not the real case.
Most of the time, it happens because of spark.local.dir setting as /tmp, so change the same in spark configuration.


While Using kafkaStreaming, make sure about the number of Threads while receiving because you must not ask for all the threads available in your cpu and if Kafka Streaming is non performent consider using multiple kafka streaming for each topic approach -

val kafkaStreams = (1 to 3).map { i => KafkaUtils.createStream(ssc,  broker, "kfkrawdatagroup",Map(topicList(0) -> 1)).map(_._2.toString)}

val unifiedStream = ssc.union(kafkaStreams)           unifiedStream.repartition(3)


  Keep checking each of your executor. Sometime some of your executors might be very busy but not all of them, that will lead to unnecessary time waste during any shuffling operation and that lead to spark streaming queue. Monitor all the jobs properly and keep cleaning your garbage.

Spark Streaming Logging?


Logging while Spark Streaming can be important sometime but one can imagine the amount of logs. So there is small api which is handy –

//Valid log levels include:
// ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN

 spark.sparkContext.setLogLevel(“DEBUG”)


Or consider using Log4J RollingFileAppender-

log4j.rootLogger=INFO, rolling
log4j.appender.rolling=org.apache.log4j.RollingFileAppender
log4j.appender.rolling.layout=org.apache.log4j.PatternLayout
log4j.appender.rolling.layout.conversionPattern=[%d] %p %m (%c)%n
log4j.appender.rolling.maxFileSize=50MB
log4j.appender.rolling.maxBackupIndex=5
log4j.appender.rolling.file=/var/log/spark/${dm.logging.name}.log
log4j.appender.rolling.encoding=UTF-8

Spark Dynamic Allocation of Resources


There may be time when your streaming is slow or If data rate is not static all the time and there is a possibility of cores availability during sometime of hours, why not assign them to other required job.
Changes needed to be make in spark-defaults.conf-

 spark.dynamicAllocation.enabled    true
 spark.shuffle.service.enabled     true
 spark.dynamicAllocation.minExecutors   2
 spark.dynamicAllocation.maxExecutors   56

And while submit the job, don’t assign –total-executor-cores and executor-memory

*While working in Spark 2.0, I found even while assigning them dynamic allocation automatically manages the core allocation , need to check it further.


Few points On Apache Spark 2.0 Streaming Over cluster

Apache Spark 2.0 Streaming Over cluster and Learning

Apache Spark streaming documentation has enough details about its basic, how to use and other details.

Setup Apache Spark in Cluster

 Very basic
 MasterNode – master
Slaves – salve1,slave2,slave3

In each of the node-

Spark-env.sh-

export HADOOP_CONF_DIR=$HADOOP_CONF_DIR
export SPARK_MASTER_IP=master
export JAVA_HOME=/home/user/jdk1.8.0_91
export PYSPARK_PYTHON=PYTHON_PATH

Spark-sefaults.conf

spark.master                     spark://master:7077
 spark.eventLog.enabled           true
spark.eventLog.dir              file:///home/home/spark-events
spark.history.fs.logDirectory   file:///home/home/spark-events
spark.shuffle.service.enabled     true

slaves

slave1
slave2
slave3

Now sbin/start-all.sh


What is the way to start Spark Streaming in 2.0 ?


SparkSession

  .builder

  .config(sparkConf)

  .getOrCreate()
 
val ssc = new StreamingContext(conf, Seconds(windowtime.toInt))

Spark 2.0 has brought a Builder pattern to start any Spark component.

How to calculate Window Length or Right batch Interval?


For stable Spark Streaming, the processing of data must be done within the window or queue will grow and excessive queue size will crash the job. It happened to me several times :-)

There are several things to be considered and few of them –

If possible, data rate can be slow down. If the processing job cannot be negotiated, try with lower data rate.

Increase the Window size. If your application can accept a larger window, then intelligent programming and larger window will make the streaming very stable.  Intelligent programming means using the entire power/tricks of Spark, like caching as much as possible, avoid unbalanced join, re-partition and coalesce based in your need.
Multiple write operation during streaming is sometimes mandatory but resource inefficient, so consider partition/re-partitioning/coalesce shuffle block size and other tricks to optimize that.

Introduce a cool-off period. After every interval of X time, stop the data flow to clear the existing queue of streaming job and then reset the data rate again.

Check Spark Configuration and use them properly. Like using CMS Garbage collector, spark memory fraction, spark.shuffle.file.buffer, spark.shuffle.spill.compress, spark.memory.offHeap.size and others.



Few Things to consider

When running spark streaming job for long time, you might get an error of No Space found, but that may be not the real case.
Most of the time, it happens because of spark.local.dir setting as /tmp, so change the same in spark configuration.


While Using kafkaStreaming, make sure about the number of Threads while receiving because you must not ask for all the threads available in your cpu and if Kafka Streaming is non performent consider using multiple kafka streaming for each topic approach 

 
val kafkaStreams = (1 to 3).map { i => KafkaUtils.createStream(ssc,  broker, "kfkrawdatagroup",Map(topicList(0) -> 1)).map(_._2.toString)}



val unifiedStream = ssc.union(kafkaStreams)           unifiedStream.repartition(3)

  Keep checking each of your executor. Sometime some of your executors might be very busy but not all of them, that will lead to unnecessary time waste during any shuffling operation and that lead to spark streaming queue. Monitor all the jobs properly and keep cleaning your garbage.

Spark Streaming Logging?


Logging while Spark Streaming can be important sometime but one can imagine the amount of logs. So there is small api which is handy –
Valid log levels include:

* ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
 
spark.sparkContext.setLogLevel(“DEBUG”)
 

Or consider using Log4J RollingFileAppender-
log4j.rootLogger=INFO, rolling
log4j.appender.rolling=org.apache.log4j.RollingFileAppender
log4j.appender.rolling.layout=org.apache.log4j.PatternLayout
log4j.appender.rolling.layout.conversionPattern=[%d] %p %m (%c)%n
log4j.appender.rolling.maxFileSize=50MB
log4j.appender.rolling.maxBackupIndex=5
log4j.appender.rolling.file=/var/log/spark/${dm.logging.name}.log
log4j.appender.rolling.encoding=UTF-8


Spark Dynamic Allocation of Resources


There may be time when your streaming is slow or If data rate is not static all the time and there is a possibility of cores availability during sometime of hours, why not assign them to other required job.
Changes needed to be make in spark-defaults.conf-
 spark.dynamicAllocation.enabled    true
 spark.shuffle.service.enabled     true
 spark.dynamicAllocation.minExecutors   2
 spark.dynamicAllocation.maxExecutors   56

And while submit the job, don’t assign –total-executor-cores and executor-memory

*While working in Spark 2.0, I found even while assigning them dynamic allocation automatically manages the core allocation , need to check it further.


Thursday, May 5, 2016

Apache Spark 2.0 Features


Apache Spark 2.0 Features

New APIs


  • Apache Spark 2.0 can run all 99 TPC-DS queries

  • Apache Spark 2.0 will merge DataFrame to DataSet[Row]

- DataFrames are collections of rows with a schema
- Datasets add static types, eg. DataSet[Person], actually brings type safety over DataFrame
- Both run on Tungsten

in 2.0 DataFrame and DataSets will unify
case class Person(email: String, id : Long, name: String)
val ds = spark.read.json("/path/abc/file.json").as[Person]

  • Introducing SparkSession
SparkSession is the "SparkContext" for Dataset/DataFrame.
So if anyone using only DataFrame/DataSet, can simply use SparkSession but SparkContext and all older APi  will remain


Code demo2 Spark Session demo

Long term Prospects

  • RDD will remain the low-level API in Spark
  • Datasets & DataFrames give richer semantics and optimizations
  • DataFrame based ML pipeline API will become the main MLLib API
  • ML model & pipeline persistance with almost complete coverage
  • Improved R Support


Spark Structured Streaming

Spark 2.0 is introducing Spark Infinite DataFrames using the same single API
logs = ctx.read.format("json").stream("source")

  • High-level streaming API build on Spark SQL Engine which extends DataFrames/ Datasets and includes all functions available in the past
  • Supports interactive & batch queries like aggregate data in a stream , change queries in runtime etc.

Tungsten phase 2.0

Phase 1 is basically for robustness and Memory Management for Tungsten

Tungsten phase 2.0 speed up Spark by 10X by using Spark as Compiler


Code demo3 Tungsten Phase 2 demo




2nd Week of May , Spark 2.0 preview version will release and May end or June will have official version of Spark 2.0











Sunday, March 20, 2016

Interesting Q&A while working with Apache Spark & Big Data



Here I am trying to draft interesting concepts, problems while working through Apache Spark with Massive dataset. The intention is to have a single point of source if any specific interesting problem encountered and not all the solutions are provided by me.
I will try to update the Q&A ...


Why reduceByKey is better than grouByKey?


In reduceByKey, data is already combined so each partition output at most one value for each key to send over network





In groupByKey, all the data is sent over the notebook and collected on the reduce workers which makes it slow.





So reduceByKey, aggregateByKey, foldByKey and combineByKey preferred over groupByKey









Problem joining a large RDD to a Small RDD ?



- This will lead to limited parallelism because based on query , large RDD will be shuffled to limited keys


Broadcast the small RDD to all worker nodes. Then parallelism of large RDD will be maintained and no shuffle will be required.



Example:


Consider 2 RDDs:

val rdd1:RDD[(T,U)]
val rdd2:RDD[((T,W), V)]
Output: rdd_joined:RDD[((T,W) , (U,V))]

Solution:



val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C")))
val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((2, "Y"), 222), ((3, "X"), 333)))


val rdd1Broadcast = sc.broadcast(rdd1.collectAsMap())
val joined = rdd2.mapPartitions({ iter =>
  val m = rdd1Broadcast.value
  for {
    ((t, w), u) <- iter
    if m.contains(t)
  } yield ((t, w), (u, m.get(t).get))
}, preservesPartitioning = true)

preservesPartitioning tells that this map function doesn't modify the keys of rdd2 and spark will avoid any re-partitioning of rdd2








Driver vs Workers?


The main program is executed on the Spark Driver

Transformations are executed on the Spark Workers

Actions may transfer data from the Workers to the Drivers.








What does Spark collect do ?


collect() send all the partitions to the single driver , so collects on a large RDD can trigger a Out of Memory error







Don't call collect() on a large RDD.

Easiest option to choose actions that return a bounded output per partition, such as count() or take(n)


Similarly if you want to store a spark output as a single file, you can do that by saving collect() output as a file or use coalesce(1,true).saveAsTextFile 





How to write a Large RDD to a Database ?


Initialize the database connection on the Worker rather than Driver .

Use foreachPartition to reuse the connection between elements









What is the way to iterate over Large RDD when memory is less ?


Consider a scenario when you need to iterate over 20 GB of data on a single node cluster or in your laptop, so you memory will be around 8GB but you want to read the data.



val rdd
rdd.toLocalIterator.forEach(println)





Apache Spark Serialization problem?



https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory-leaks-no-ws.md






Performance implication of pySpark vs Scala in apache Spark.



  • Ideally, every data comes and go from through Python has to be passed through Socket and JVM Executor, so its actually an overhead and some cost
  • Python is Process Based Executor.Scala is Executor based.  So each python process runs in its own process so it makes it creates a strong isolation for python code but a higher memory.
  • PySpark configuration provides spark.python.worker.reuse the option which can be used to choose between forking Python process for each task and reusing existing process. The latter option seems to be useful to avoid expensive garbage collecting (it is more an impression than a result of systematic tests), the former one (default) is optimal for in case of expensive broadcasts and imports


For MLib, python code just converts everything to scala and then it further scala process, so MLib is only taking cost while converting py - scala.






Define partitioning for Spark Dataframe ?



in Spark 1.6, its very easy and straightforward

val df = sc.parallelize(Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
)).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain






What is good about Broadcast in Spark?



Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.


Ideally, if you have a huge array to be used by all the tasks in a process then this array will be shipped to each Spark node with closure.So if you have 10 nodes with 10 partitions each so in total your array will be distributed at least 100 times (10 * 10).


But broadcasting a variable will distribute the same array once per node using p2p protocol. So it gives huge performance benefit.



val broadcasted = sc.broadcast(array)
rdd.map(i => broadcasted.value.contains(i))

Broadcast data is in serialized form, so you avoid deserializing cost as well so you avoid deserializing cost as well. Broadcast communication happens via Driver.


When will the Broadcast  slower than Shuffle?




Its pure math and logical.

Consider you have 10 Executors and 2 RDDS where 
RDD_1 = 2000mb
RDD_2 = 400mb

Normal Join with Shuffle

Consider the scenario where there is no partition over the data. So normally 1/10th of data will already be in the correct executor.


 Executor_1 - 200mb(RDD_1) + 40mb(RDD_2)  
 Data to Shuffle = (2000 - 200) + (400-40) = 2160mb  

So ideally, you need to shuffle 2160mb of data.


So usually we will broadcast the RDD_2, so now RDD_2 will be copied to each of the 10 Executors so in total 400 * 10 = 4GB of data needed to transfer. Now all the communication will happen via Driver so ideally you need to move quite an amount of data with the driver so cost associated.






Why and How to use HashPartitioner ?




 val rdd = sc.parallelize(for {  
    | x <- 1 to 3  
    | y <- 1 to 2  
    | } yield (x, None),8)  
 rdd.collect  
 res0: Array[(Int, None.type)] = Array((1,None), (1,None), (2,None), (2,None), (3,None), (3,None))  
 rdd.partitioner  
 res3: Option[org.apache.spark.Partitioner] = None  
 import org.apache.spark.HashPartitioner  
 val rdd1 = rdd.partitionBy(new HashPartitioner(4))  
 def countPartitions(rdd: RDD[(Int,None.type)]) = {  
    | rdd.mapPartitions(iter => Iterator(iter.length))}  
 countPartitions(rdd1).collect  
 res11: Array[Int] = Array(0, 2, 2, 2)  

Java arrays have hashCodes that are based on the arrays' identities rather than their contents, so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will produce an unexpected or incorrect result.




  • HashPartitioner is based on hashCode of keys
  • if distribution of keys is not uniform you can end up in situations when part of your cluster is idle






S3 vs HDFS vs Cassandra

S3 – Non urgent batch jobs.
Cassandra – Perfect for streaming data analysis and an overkill for batch jobs.
HDFS – Great fit for batch jobs without compromising on data locality.







Why use Apache Kafka with Spark Streaming?

Generally, Apache Kafka is being used while using Streaming because to reduce the chance of data loss. There is a probability that receiver goes down but Kafka ensures no data loss.

If the setup is in hadoop cluster and process is happening over stream then we can avoid Apache Kafka.





Why completed jobs are not releasing memory in Spark Standalone mode ?



Because Spark automatically won't cleanup the jars transferred in the worker node.
In Spark Standalone , you need to set these options-
spark.worker.cleanup.enabled : true
spark.worker.cleanup.interval