Tuesday, August 23, 2016

Few points On Apache Spark 2.0 Streaming Over cluster

Apache Spark 2.0 Streaming Over cluster and Learning

Apache Spark streaming documentation has enough details about its basic, how to use and other details.

Setup Apache Spark in Cluster

 Very basic
 MasterNode – master
Slaves – salve1,slave2,slave3

In each of the node-

Spark-env.sh-

export HADOOP_CONF_DIR=$HADOOP_CONF_DIR
export SPARK_MASTER_IP=master
export JAVA_HOME=/home/user/jdk1.8.0_91
export PYSPARK_PYTHON=PYTHON_PATH

Spark-sefaults.conf

spark.master                     spark://master:7077
 spark.eventLog.enabled           true
spark.eventLog.dir              file:///home/home/spark-events
spark.history.fs.logDirectory   file:///home/home/spark-events
spark.shuffle.service.enabled     true

slaves

slave1
slave2
slave3

Now sbin/start-all.sh


What is the way to start Spark Streaming in 2.0 ?


SparkSession
.builder
.config(sparkConf)
.getOrCreate()


val ssc = new StreamingContext(conf, Seconds(windowtime.toInt))


Spark 2.0 has brought a Builder pattern to start any Spark component.

How to calculate Window Length or Right batch Interval?


For stable Spark Streaming, the processing of data must be done within the window or queue will grow and excessive queue size will crash the job. It happened to me several times :-)

There are several things to be considered and few of them –

If possible, data rate can be slow down. If the processing job cannot be negotiated, try with lower data rate.

Increase the Window size. If your application can accept a larger window, then intelligent programming and larger window will make the streaming very stable.  Intelligent programming means using the entire power/tricks of Spark, like caching as much as possible, avoid unbalanced join, re-partition and coalesce based in your need.
Multiple write operation during streaming is sometimes mandatory but resource inefficient, so consider partition/re-partitioning/coalesce shuffle block size and other tricks to optimize that.

Introduce a cool-off period. After every interval of X time, stop the data flow to clear the existing queue of streaming job and then reset the data rate again.

Check Spark Configuration and use them properly. Like using CMS Garbage collector, spark memory fraction, spark.shuffle.file.buffer, spark.shuffle.spill.compress, spark.memory.offHeap.size and others.



Few Things to consider

When running spark streaming job for long time, you might get an error of No Space found, but that may be not the real case.
Most of the time, it happens because of spark.local.dir setting as /tmp, so change the same in spark configuration.


While Using kafkaStreaming, make sure about the number of Threads while receiving because you must not ask for all the threads available in your cpu and if Kafka Streaming is non performent consider using multiple kafka streaming for each topic approach -

val kafkaStreams = (1 to 3).map { i => KafkaUtils.createStream(ssc,  broker, "kfkrawdatagroup",Map(topicList(0) -> 1)).map(_._2.toString)}

val unifiedStream = ssc.union(kafkaStreams)           unifiedStream.repartition(3)


  Keep checking each of your executor. Sometime some of your executors might be very busy but not all of them, that will lead to unnecessary time waste during any shuffling operation and that lead to spark streaming queue. Monitor all the jobs properly and keep cleaning your garbage.

Spark Streaming Logging?


Logging while Spark Streaming can be important sometime but one can imagine the amount of logs. So there is small api which is handy –

//Valid log levels include:
// ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN

 spark.sparkContext.setLogLevel(“DEBUG”)


Or consider using Log4J RollingFileAppender-

log4j.rootLogger=INFO, rolling
log4j.appender.rolling=org.apache.log4j.RollingFileAppender
log4j.appender.rolling.layout=org.apache.log4j.PatternLayout
log4j.appender.rolling.layout.conversionPattern=[%d] %p %m (%c)%n
log4j.appender.rolling.maxFileSize=50MB
log4j.appender.rolling.maxBackupIndex=5
log4j.appender.rolling.file=/var/log/spark/${dm.logging.name}.log
log4j.appender.rolling.encoding=UTF-8

Spark Dynamic Allocation of Resources


There may be time when your streaming is slow or If data rate is not static all the time and there is a possibility of cores availability during sometime of hours, why not assign them to other required job.
Changes needed to be make in spark-defaults.conf-

 spark.dynamicAllocation.enabled    true
 spark.shuffle.service.enabled     true
 spark.dynamicAllocation.minExecutors   2
 spark.dynamicAllocation.maxExecutors   56

And while submit the job, don’t assign –total-executor-cores and executor-memory

*While working in Spark 2.0, I found even while assigning them dynamic allocation automatically manages the core allocation , need to check it further.


No comments: