Showing posts with label Hadoop. Show all posts
Showing posts with label Hadoop. Show all posts

Thursday, May 5, 2016

Apache Spark 2.0 Features


Apache Spark 2.0 Features

New APIs


  • Apache Spark 2.0 can run all 99 TPC-DS queries

  • Apache Spark 2.0 will merge DataFrame to DataSet[Row]

- DataFrames are collections of rows with a schema
- Datasets add static types, eg. DataSet[Person], actually brings type safety over DataFrame
- Both run on Tungsten

in 2.0 DataFrame and DataSets will unify
case class Person(email: String, id : Long, name: String)
val ds = spark.read.json("/path/abc/file.json").as[Person]

  • Introducing SparkSession
SparkSession is the "SparkContext" for Dataset/DataFrame.
So if anyone using only DataFrame/DataSet, can simply use SparkSession but SparkContext and all older APi  will remain


Code demo2 Spark Session demo

Long term Prospects

  • RDD will remain the low-level API in Spark
  • Datasets & DataFrames give richer semantics and optimizations
  • DataFrame based ML pipeline API will become the main MLLib API
  • ML model & pipeline persistance with almost complete coverage
  • Improved R Support


Spark Structured Streaming

Spark 2.0 is introducing Spark Infinite DataFrames using the same single API
logs = ctx.read.format("json").stream("source")

  • High-level streaming API build on Spark SQL Engine which extends DataFrames/ Datasets and includes all functions available in the past
  • Supports interactive & batch queries like aggregate data in a stream , change queries in runtime etc.

Tungsten phase 2.0

Phase 1 is basically for robustness and Memory Management for Tungsten

Tungsten phase 2.0 speed up Spark by 10X by using Spark as Compiler


Code demo3 Tungsten Phase 2 demo




2nd Week of May , Spark 2.0 preview version will release and May end or June will have official version of Spark 2.0











Thursday, September 10, 2015

Apache Spark 1.5 Released

Apache Spark 1.5 is released and now available to download 



http://spark.apache.org/downloads.html



Major features included-

  • First actual implementation of Project Tungsten  

  • Change in code generation 
  • performance improvement with Project Tungsten- 
  • As in my previous post , Spark introduced new visual for analyzing SQL and Dataframe .
  • Improvements and stability in Spark Streaming in the sense they actually tried to make batch processing and streaming closer. Python API for streaming machine learning algorithms: K-Means, linear regression, and logistic regression.

  • Include streaming storage in web UI. Kafka offsets of Direct Kafka streams available through Python API. Added Python API for Kinesis , MQTT and Flume.
  • Introduction for more algorithms for Machine learning and Analytics. Added more python Api for distributed matrices, streaming k-means and linear models, LDA, power iteration clustering, etc.
  • Find the release notes for Apache Spark -
    And now its time to use it more & actually use the Python API :-)

Sunday, May 10, 2015

HBase ignore comma while using bulk loading importTSV

HBase simply ignore Text while importing the same with CSV file, and the best part it didn't even inform you.
Entire job will be passed , but your HBase table won't have any data or partial data , like if any column has some values

"this text can be uploaded , but it has more" , then till uploaded it will be there in HBase Table cell , then rest of the contents are gone.
This is because I was importing TSV with seperator comma (,) and that lead to import engine to ignore comma among the csv cell.



It took 32 YARN jobs to figure out the actual issue.

Import CSV command -


create ‘bigdatatwitter’,’main’,’detail’,’other’


hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns="HBASE_ROW_KEY,other:contributors,main:truncated,main:text,main:in_reply_to_status_id,main:id,main:favorite_count,main:source,detail:retweeted,detail:coordinates,detail:entities,detail:in_reply_to_screen_name,detail:in_reply_to_user_id,detail:retweet_count,detail:id_str,detail:favorited,detail:retweeted_status,other:user,other:geo,other:in_reply_to_user_id_str,other:possibly_sensitive,other:lang,detail:created_at,other:in_reply_to_status_id_str,detail:place,detail:metadata" -Dimporttsv.separator="," bigdatatwitter file:///Users/abhishekchoudhary/PycharmProjects/DeepLearning/AllMix/bigdata3.csv

Monday, March 30, 2015

Internal of Hadoop Mapper Input to customize



Internal of Hadoop Mapper Input 

Well I just got a requirement to somehow change the input split size to the Mapper , and just by changing the configuration didn't help me lot, so I moved further and just tried to understand exactly whats inside -



So above the Job flow in Mapper and the 5 methods are seriously something to do with split size.

Following is the way , an input is being processed inside Mapper - 

- Input File is split by InputFormat class

- Key value pairs from the inputSplit is being generated for each Split using RecordReader




- All the generated Key Value pairs from the same split will be sent to the same Mapper, so a common unique mapper to handle all key value pairs from a specific Split

- All the result from each mapper is collected further in Partitioner

- The map method os called for each key-value pair once and output sent to the partitioner

- so now the above result in partioner is actually further taken into account by Reducer.


So Now I found the class InputFormat to just introduce my change and that is based on my requirement.



But further checking the exact class helped me more -

 @Deprecated  
 public interface InputSplit extends Writable {  
  /**  
   * Get the total number of bytes in the data of the <code>InputSplit</code>.  
   *   
   * @return the number of bytes in the input split.  
   * @throws IOException  
   */  
  long getLength() throws IOException;  
  /**  
   * Get the list of hostnames where the input split is located.  
   *   
   * @return list of hostnames where data of the <code>InputSplit</code> is  
   *     located as an array of <code>String</code>s.  
   * @throws IOException  
   */  
  String[] getLocations() throws IOException;  
 }  


Further there few more things to check like TextInputFormat , SequenceFileInputFormat and others

 Hold On.. We've RecordReader inbetween which splits the input in Key-value and what if I got something to do with it-

RecordReader.java Interface

We can find implementation of RecordReader in LineRecordReader or SequenceFileRecordReader.



Over there we can find that input split size crosses boundary sometimes, and such situation is being handled , so custom RecordReader must need to address the situation.