Apache Spark 2.0 Streaming Over cluster and Learning
Apache Spark streaming documentation has enough details
about its basic, how to use and other details.
Setup Apache Spark in Cluster
Slaves – salve1,slave2,slave3
In each of the node-
Spark-env.sh-
export HADOOP_CONF_DIR=$HADOOP_CONF_DIR
export SPARK_MASTER_IP=master
export JAVA_HOME=/home/user/jdk1.8.0_91
export PYSPARK_PYTHON=PYTHON_PATH
Spark-sefaults.conf
spark.master spark://master:7077
spark.eventLog.enabled true
spark.eventLog.dir file:///home/home/spark-events
spark.history.fs.logDirectory file:///home/home/spark-events
spark.shuffle.service.enabled true
slaves
slave1
slave2
slave3
Now sbin/start-all.sh
What is the way to start Spark Streaming in 2.0 ?
SparkSession
.builder
.config(sparkConf)
.getOrCreate()
val ssc = new StreamingContext(conf, Seconds(windowtime.toInt))
Spark 2.0 has brought a Builder pattern to start any Spark
component.
How to calculate Window Length or Right batch Interval?
For stable Spark Streaming, the processing of data must be
done within the window or queue will grow and excessive queue size will crash
the job. It happened to me several times :-)
There are several things to be considered and few of them –
• If possible, data rate can be slow down. If the processing job cannot be negotiated, try with lower data rate.
• Increase the Window size. If your application can accept a larger window, then intelligent programming and larger window will make the streaming very stable. Intelligent programming means using the entire power/tricks of Spark, like caching as much as possible, avoid unbalanced join, re-partition and coalesce based in your need.
Multiple write operation during streaming is sometimes mandatory but resource inefficient, so consider partition/re-partitioning/coalesce shuffle block size and other tricks to optimize that.
• Introduce a cool-off period. After every interval of X time, stop the data flow to clear the existing queue of streaming job and then reset the data rate again.
• Check Spark Configuration and use them properly. Like using CMS Garbage collector, spark memory fraction, spark.shuffle.file.buffer, spark.shuffle.spill.compress, spark.memory.offHeap.size and others.
Few Things to consider
• When running spark streaming job for long time, you might get an error of No Space found, but that may be not the real case.
Most of the time, it happens because of spark.local.dir setting as /tmp, so change the same in spark configuration.
• While Using kafkaStreaming, make sure about the number of Threads while receiving because you must not ask for all the threads available in your cpu and if Kafka Streaming is non performent consider using multiple kafka streaming for each topic approach -
val kafkaStreams = (1 to 3).map { i => KafkaUtils.createStream(ssc, broker, "kfkrawdatagroup",Map(topicList(0) -> 1)).map(_._2.toString)}
val unifiedStream = ssc.union(kafkaStreams) unifiedStream.repartition(3)
val kafkaStreams = (1 to 3).map { i => KafkaUtils.createStream(ssc, broker, "kfkrawdatagroup",Map(topicList(0) -> 1)).map(_._2.toString)}
val unifiedStream = ssc.union(kafkaStreams) unifiedStream.repartition(3)
Spark Streaming Logging?
Logging while Spark Streaming can be important sometime but
one can imagine the amount of logs. So there is small api which is handy –
//Valid log levels include:
// ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
spark.sparkContext.setLogLevel(“DEBUG”)
//Valid log levels include:
// ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
spark.sparkContext.setLogLevel(“DEBUG”)
Or consider using Log4J RollingFileAppender-
log4j.rootLogger=INFO, rolling
log4j.appender.rolling=org.apache.log4j.RollingFileAppender
log4j.appender.rolling.layout=org.apache.log4j.PatternLayout
log4j.appender.rolling.layout.conversionPattern=[%d] %p %m (%c)%n
log4j.appender.rolling.maxFileSize=50MB
log4j.appender.rolling.maxBackupIndex=5
log4j.appender.rolling.file=/var/log/spark/${dm.logging.name}.log
log4j.appender.rolling.encoding=UTF-8
log4j.rootLogger=INFO, rolling
log4j.appender.rolling=org.apache.log4j.RollingFileAppender
log4j.appender.rolling.layout=org.apache.log4j.PatternLayout
log4j.appender.rolling.layout.conversionPattern=[%d] %p %m (%c)%n
log4j.appender.rolling.maxFileSize=50MB
log4j.appender.rolling.maxBackupIndex=5
log4j.appender.rolling.file=/var/log/spark/${dm.logging.name}.log
log4j.appender.rolling.encoding=UTF-8
Spark Dynamic Allocation of Resources
There may be time when your streaming is slow or If data
rate is not static all the time and there is a possibility of cores
availability during sometime of hours, why not assign them to other required
job.
Changes needed to be make in spark-defaults.conf-
spark.dynamicAllocation.enabled true
spark.shuffle.service.enabled true
spark.dynamicAllocation.minExecutors 2
spark.dynamicAllocation.maxExecutors 56
And while submit the job, don’t assign –total-executor-cores
and executor-memory
*While working in Spark 2.0, I found even while assigning
them dynamic allocation automatically manages the core allocation , need to
check it further.