Breadcrumb Breadcrumb

Blogs Blogs

Back

Spark Dataframes – From machine readable to human fathomable code

Background

Apache Spark, an in-memory cluster compute platform that provides over 100x performance for certain applications over traditional MapReduce, has hit home with data analysts and developers alike,  on the promise of writing  succinct & much simpler code.  Continuing with this tradition of speed and simplicity, with Spark 1.3 version, a new abstraction called Dataframes (so long Schema RDDs) has been introduced.

Dataframes enable developers to create and run Spark programs faster by writing less code & reading only the required data through high level operations that Dataframe API provides.

What's a Dataframe?

A dataframe, similar to a RDD, is a distributed collection of rows, with named columns.  Dataframe has a lot of schema/structure information in it,  allowing the Spark engine to exploit that structure of underlying data to run your Spark jobs faster. 

The Dataframe concept in Spark isn’t new and is heavily borrowed from R and Python’s data analysis library (Panda).  These Libraries provide you with a very simple abstractions of filtering, selecting columns, aggregation, plotting etc.

Schema RDD of Spark 1.2 has been rechristened to Spark Dataframe as part of 1.3 along with a API unification across Java, Scala and Python. From 1.3 onwards, Spark Dataframe is technically not a schema RDD through inheritance.  Inheritance has been removed from its hierarchy.  More importantly, Spark SQL module in 1.3 version has its alpha/experimental tags removed guaranteeing API stability across the entire 1.3 stack.

Example

Enough talk - let's look at example.  The goal is to find average gate departure delays  at all the airports in the US for the year 2014. We will first calculate this average using the traditional way and then use Dataframe on the same data set, joining it to two other look-up tables - GeoCode and Carrier files.  The public data set we have picked for this is the On-time performance data for all US airlines for the year 2014.

Traditionally, you would write your MR code as shown in Listing 1.  Without an understanding of the metadata of the unerlying dataset, reading this esoteric code seems a little unwieldy.


Listing 1

scala> var delays = deptDelays_2014.map(x => (x.split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)")(14), toDouble(x.split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)")(32))));
scala> delays.mapValues((_, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).mapValues{ case (sum, count) => (1.0 * sum)/count}.collectAsMap()
res29: scala.collection.Map[String,Double] = Map("LGA" -> 13.85575588183916, "OME" -> 7.442953020134228, "ORF" -> 13.401497938231087, "SLC" -> 8.274310461894189, "AZO" -> 8.56232427366448, "BQN" -> 15.299703264094955, "SUN" -> 11.503620273531778, "EWR" -> 16.740703068140963, "PNS" -> 10.681928001932834, "STL" -> 14.069748674198754, "AVP" -> 17.43314651721377, "GEG" -> 8.873259355961705, "IAH" -> 14.513608037290533, "DHN" -> 7.834855938158819, "GUC" -> 22.937888198757765, "HYA" -> 3.925, "LWS" -> 3.499261447562777, "BZN" -> 14.819143310417768, "RST" -> 10.848408710217756, "TPA" -> 13.669393898749647, "BTV" -> 18.7355476673428, "HOU" -> 13.982479718102105, "SNA" -> 11.619507073938909, "BMI" -> 13.972002635046113, "LNK" -> 22.61370202228642, "EGE" -> 24.9670510708402, "CMH" -> 15.344372072...

Now contrast this with Listing 2 that uses data frame.  Note, the average delay at  LGA. It is about 13.86 minutes.

Listing 2

String query2 = "SELECT avg(CAST(DepDelayMinutes as float)) as delay, o.Origin, a.Description, g.Latitude, g.Longitude FROM OTP o,  GEOCODE g, AIRPORT_L a  "
					+ "WHERE  g.locationID = o.Origin AND a.Code = o.Origin group by  o.Origin, a.Description, g.Latitude, g.Longitude  ";
			
			DataFrame df = getDataFrameForaYear(sqlCtx, year);
			df.registerTempTable("OTP");
			df.printSchema();		
			DataFrame dfGeo = getGeoCodeDF(sqlCtx);
			dfGeo.registerTempTable("GEOCODE");
			dfGeo.printSchema();
			
			DataFrame dfAirPrt = getAirportDF(sqlCtx);
			dfAirPrt.registerTempTable("AIRPORT_L");
			dfAirPrt.printSchema();
	
			sqlCtx.sql(query2).explain(true);
			
			Column sortCol = new Column("delay");
			DataFrame dfPers = sqlCtx.sql(query2).sort(sortCol).persist();

As shown above in bold, dataframes can work with SQL-like syntax and is very lucid.   Spark SQL is the lingua franca for doing data analysis and BI.  Spark SQL is more than just SQL. You can mix-and-match Spark SQL along side Spark Program.  It has rich language bindings in Scala, Python and Java just as rest of the Spark stack .  Things that don’t lend themselves into SQL, can be moved into custom code. Also, using the query optimization framework, Catalyst,  Spark tries to optimize the query (ex. predicate push down) to speed-up the data processing pipeline.

 

Phases of query planning in Spark SQL. Rounded rectangles represent Catalyst trees

Catalyst takes advantage of Scala’s powerful language features such as pattern matching and runtime metaprogramming to allow developers to concisely specify complex relational optimizations.  Most analysts prefer  Spark SQL as the gateway to the data stored in HDFS.  Here are the different plans for the above query.

 

Listing 3

== Parsed Logical Plan ==
'Aggregate ['o.Origin,'a.Description,'g.Latitude,'g.Longitude], 
  [AVG(CAST('DepDelayMinutes, FloatType)) 
   AS delay#225,'o.Origin,'a.Description,'g.Latitude,'g.Longitude]
 'Filter (('g.locationID = 'o.Origin) && ('a.Code = 'o.Origin))
  'Join Inner, None
   'Join Inner, None
    'UnresolvedRelation [OTP], Some(o)
    'UnresolvedRelation [GEOCODE], Some(g)
   'UnresolvedRelation [AIRPORT_L], Some(a)

== Analyzed Logical Plan ==
Aggregate [Origin#14,Description#224,Latitude#221,Longitude#222], 
	[AVG(CAST(CAST(DepDelayMinutes#32, FloatType), DoubleType)) AS delay#225,
	Origin#14,Description#224,Latitude#221,Longitude#222]
 Filter ((locationID#220 = Origin#14) && (Code#223 = Origin#14))
  Join Inner, None
   Join Inner, None
    Subquery o
     Subquery OTP
      Union
       Relation[Year#0,Quarter#1,Month#2,..........]
CsvRelation(On_Time_On_Time_Performance_2014_2.csv,true,,,",\,PERMISSIVE,null)
    Subquery g
     Subquery GEOCODE
      Relation[locationID#220,Latitude#221,Longitude#222] 
       CsvRelation(Airport_Codes_mapped_to_Latitude_Longitude.csv,true,,,",\,PERMISSIVE,null)
   Subquery a
    Subquery AIRPORT_L
     Relation[Code#223,Description#224] CsvRelation(/L_AIRPORT.csv_,true,,,",\,PERMISSIVE,null)

== Optimized Logical Plan ==
Aggregate [Origin#14,Description#224,Latitude#221,Longitude#222], 
  [AVG(CAST(CAST(DepDelayMinutes#32, FloatType), DoubleType)) AS delay#225,Origin#14,Description#224,Latitude#221,Longitude#222]
 Project [Longitude#222,DepDelayMinutes#32,Origin#14,Description#224,Latitude#221]
  Join Inner, Some((Code#223 = Origin#14))
   Project [Origin#14,DepDelayMinutes#32,Latitude#221,Longitude#222]
    Join Inner, Some((locationID#220 = Origin#14))
     Union
      Project [Origin#14,DepDelayMinutes#32]
== Physical Plan ==
Aggregate false, [Origin#14,Description#224,Latitude#221,Longitude#222], 
		[(CAST(SUM(PartialSum#229), DoubleType) / CAST(SUM(PartialCount#230L), DoubleType)) AS delay#225,
		Origin#14,Description#224,Latitude#221,Longitude#222]
 Aggregate true, [Origin#14,Description#224,Latitude#221,Longitude#222], [Origin#14,Description#224,
 	Latitude#221,Longitude#222,COUNT(CAST(CAST(DepDelayMinutes#32, FloatType), DoubleType)) AS PartialCount#230L,
 		SUM(CAST(CAST(DepDelayMinutes#32, FloatType), DoubleType)) AS PartialSum#229]
  Project [Longitude#222,DepDelayMinutes#32,Origin#14,Description#224,Latitude#221]
   ShuffledHashJoin [Origin#14], [Code#223], BuildRight
    Project [Origin#14,DepDelayMinutes#32,Latitude#221,Longitude#222]
     ShuffledHashJoin [Origin#14], [locationID#220], BuildRight
      Exchange (HashPartitioning [Origin#14], 200)
       Union [Project [Origin#14,DepDelayMinutes#32]
 PhysicalRDD 

 

Here is the JSON representation for airports  LGA  and FAT generated from the dataframe.

{
"abbrev":"LGA",
"parentState":"LGA",
"airport":"New York, NY: LaGuardia",
"lat":"40.7772",
"lon":"-73.8725",
"avg":"13.86"
},
{
"abbrev":"FAT",
"parentState":"FAT",
"airport":"Fresno, CA: Fresno Yosemite International",
"lat":"36.7761",
"lon":"-119.7181",
"avg":"13.91"
},

Feeding JSON data for all the airports to HighMaps - gives you a complete perspective on which airport to pick, next time you want to fly out.

 

Here is  the output representated using HighCharts.  To view a bigger map, click here

 
Next
Comments
Trackback URL:

Recent Bloggers Recent Bloggers

Shailesh D Dangi
Posts: 2
Stars: 0
Date: 5/13/15

Blog Categories Blog Categories