Spark on Amazon's Cloud

Kevin Mader
15 October 2014

Spark on AWS EC2 Tutorial

Paul Scherrer Institut ETH Zurich 4Quant

Outline

  • Setting up AWS EC2
    • Access Key IDs
    • Login Key Pairs
  • Setting up Spark
  • Basic Tests

Useful Links

Interesting Big Data

  • Amazon's Library
    • web crawl data composed of over 5 billion web pages
    • Wikipedia Page Traffic Statistic
    • Google Books n-gram corpuses

Setting up AWS EC2

Access Key ID

This is a key refering to your Amazon account and allows the creation and destruction of new instances, S3 access, and so forth. It can be found in the AWS Management Console under Security Credentials. If you have not created one before, you will need to create a new one. If you already have 2, you will either have to use an old one, or delete one and create a new one.

  • The environmental variables must be set before running the script
export AWS_ACCESS_KEY_ID="???"
export AWS_SECRET_ACCESS_KEY="???"

Login Key Pair

The login key pair is used for logging into instances using SSH and SFTP. This can be created using the AWS console and creating a new key pair.

  • The key pair will be downloaded by the browser
  • The permissions for this file must be changed
chmod 600 ~/Downloads/my-key-pair.pem

Launching the Cluster

The first time a cluster is run, it needs to be launched. This is peformed using the script ec2/spark-ec2

The following command starts a cluster with 1 slave using the spark-key-pair.pem file just created in the Amazon Region eu-west-1 (this must be the same as where you created the key pair, and should be eu-west-1 if you are in Switzerland)

./spark-ec2 -k spark-key-pair -i ~/Downloads/spark-key-pair.pem -s 1 launch big-data-test-cluster --region=eu-west-1
  • Replace launch with start after the cluster has been created for the first time
  • Screencast of setup

Monitoring the Cluster

Once it is started you will see the following message

Connection to ec2-54-78-73-225.eu-west-1.compute.amazonaws.com closed.
Spark standalone cluster started at http://ec2-54-78-73-225.eu-west-1.compute.amazonaws.com:8080
Ganglia started at http://ec2-54-78-73-225.eu-west-1.compute.amazonaws.com:5080/ganglia
Done!

You can look at either of these URLs to see the Spark Console (port 8080) and the Ganglia Cluster monitoring tools (5080).

EC2 offers rudimentary monitoring using the EC2 console (mainly to check if machines are still running)

Stopping and Destorying the Cluster

After you are finished using the cluster you must shut it down or Amazon continues to charge your account even if the cluster is sitting idle

stop

either temporarily (if you plan on using it again)

./spark-ec2 -k spark-key-pair -i ~/Downloads/spark-key-pair.pem stop big-data-test-cluster --region=eu-west-1

destory

or destroy it (if you do not plan on using it soon)

./spark-ec2 -k spark-key-pair -i ~/Downloads/spark-key-pair.pem destory big-data-test-cluster --region=eu-west-1

Note

If you do not destory it, there are residual storage costs Amazon charges your account

Logging in

  • Screencast of logging in

  • To login you can use a very similar statement to the one to start the cluster (-s is no longer needed)

./spark-ec2 -k spark-key-pair -i ~/Downloads/spark-key-pair.pem login big-data-test-cluster --region=eu-west-1
  • Logging in gets you a bash console where you can start spark-shell by running
cd spark/bin/
MASTER=spark://localhost:7077;./spark-shell

Basic Tests in Spark Shell

Ensure the cluster is working correctly

val testdata = sc.parallelize(0 to 10000)
val testdata2 = testdata.cartesian(testdata)
val totcount = testdata2.count
val closecount = testdata2.map(ival => ival._1 - ival._2).filter(_<10).count

Interacting with S3

Spark has support for S3 and HDFS built in. The raw data can then be stored independently of cluster volumes and then read from and written to S3. Instead of reading

val localFiles = sc.textFile("*.txt")

They can be read directly from a bucket on S3 by

val remoteFiles = sc.textFile("s3n://BUCKET/cells/*.csv")

Getting data on S3

To get data onto S3 Screencast of setup

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId","....")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey","....")
  • load your data from the bucket
val cells = sc.textFile("s3n://spark-class-data/cell_colony.csv")

Playing with the data

  • Convert the data to numeric tuples
val pts = cells.map(_.split(",").map(_.toDouble))
  • Convert the tuples to case classes
case class ImPos(x: Double, y: Double, intensity: Double)
val ccpts = pts.map(inpt => ImPos(inpt(0),inpt(1),inpt(2)))
  • Show the first point, then count how many points are in a region of interest
ccpts.first
ccpts.filter(_.x>50).count

Using Spark SQL

  • Import SparkSQL Tools
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
import sqlContext._
  • Register ccpts as a table
ccpts.registerAsTable("Cells")
  • Perform a simple query
sql("SELECT SUM(x) FROM Cells").collect

Image Processing with Spark

To process data with Spark it needs to be a in a list or ideally a key-value pair format. For an image this is not trivial since they are normally represented by a 2- or 3D matrix which encodes information in both the value (intensity, absorption, etc) and the position (A\( [x][y][z] \)).

  • A key-value par can be made by having the key as the position \( (x,y,z) \) and the value as the intensity or whatever other signals are important.

  • This format is also better suited for multichannel (more then one value per position) and even hyperspectral imaginging since there is little difference between \( (I) \) and \( (I,J,K,U,V,W,\cdots) \)

Getting an image to Key-Value Format

library(jpeg)
in.img<-readJPEG("ext-figures/Cell_Colony.jpg")
kv.img<-im.to.df(in.img)
write.table(kv.img,"cell_colony.csv",row.names=F,col.names=F,sep=",")
kable(head(kv.img))
x y val
1 1 0.6275
2 1 0.7804
3 1 0.8863
4 1 0.8980
5 1 0.9098
6 1 0.9216

The key is position \( \langle x, y \rangle \) and value is the intensity \( val \)

Loading the data into Spark (Scala)

val rawImage=sc.textFile("s3n://spark-class-data/cell_colony.csv")
val imgAsColumns=rawImage.map(_.split(","))
val imgAsKV=imgAsColumns.map(point => ((point(0).toInt,point(1).toInt),point(2).toDouble))
  • Count the number of pixels
imgAsKV.count
  • Get the first value
imgAsKV.take(1)
  • Sample 100 values from the data
imgAsKV.sample(true,0.1,0).collect

Perform a threshold

val threshVal=0.5
val labelImg=imgAsKV.filter(_._2<threshVal)

Get Volume Fraction

100.0*labelImg.count/(imgAsKV.count)

Region of Interest

Take a region of interest between 0 and 100 in X and Y

def roiFun(pvec: ((Int,Int),Double)) = 
 {pvec._1._1>=0 & pvec._1._1<100 & // X
  pvec._1._2>=0 & pvec._1._2<100 } //Y
val roiImg=imgAsKV.filter(roiFun)

Perform a 3x3 box filter

def spread_voxels(pvec: ((Int,Int),Double), windSize: Int = 1) = {
  val wind=(-windSize to windSize)
  val pos=pvec._1
  val scalevalue=pvec._2/(wind.length*wind.length)
  for(x<-wind; y<-wind) 
    yield ((pos._1+x,pos._2+y),scalevalue)
}

val filtImg=roiImg.
      flatMap(cvec => spread_voxels(cvec)).
      filter(roiFun).reduceByKey(_ + _)

Setting up Component Labeling

  • Create the first labels from a thresheld image as a mutable type
val xWidth=100
var newLabels=labelImg.map(pvec => (pvec._1,(pvec._1._1.toLong*xWidth+pvec._1._2+1,true)))
  • Spreading to Neighbor Function
def spread_voxels(pvec: ((Int,Int),(Long,Boolean)), windSize: Int = 1) = {
  val wind=(-windSize to windSize)
  val pos=pvec._1
  val label=pvec._2._1
  for(x<-wind; y<-wind) 
    yield ((pos._1+x,pos._2+y),(label,(x==0 & y==0)))
}

Running Component Labeling

var groupList=Array((0L,0))
var running=true
var iterations=0
while (running) {
  newLabels=newLabels.
  flatMap(spread_voxels(_,1)).
    reduceByKey((a,b) => ((math.min(a._1,b._1),a._2 | b._2))).
    filter(_._2._2)
  // make a list of each label and how many voxels are in it
  val curGroupList=newLabels.map(pvec => (pvec._2._1,1)).
    reduceByKey(_ + _).sortByKey(true).collect
  // if the list isn't the same as before, continue running since we need to wait for swaps to stop
  running = (curGroupList.deep!=groupList.deep)
  groupList=curGroupList
  iterations+=1
  print("Iter #"+iterations+":"+groupList.mkString(","))
}
groupList

Calculating From Images

  • Average Voxel Count
val labelSize = newLabels.
  map(pvec => (pvec._2._1,1)).
  reduceByKey((a,b) => (a+b)).
  map(_._2)
labelSize.reduce((a,b) => (a+b))*1.0/labelSize.count
  • Center of Volume for Each Label
val labelPositions = newLabels.
  map(pvec => (pvec._2._1,pvec._1)).
  groupBy(_._1)
def posAvg(pvec: Seq[(Long,(Int,Int))]): (Double,Double) = {
val sumPt=pvec.map(_._2).reduce((a,b) => (a._1+b._1,a._2+b._2))
(sumPt._1*1.0/pvec.length,sumPt._2*1.0/pvec.length)
}
print(labelPositions.map(pvec=>posAvg(pvec._2)).mkString(","))

Finite Element Modeling with Graphs in GraphX

For many systems like bone tissue, cellular tissues, cellular materials and many others, the structure is just the beginning and the most interesting results come from the application to physical, chemical, or biological rules inside of these structures.

\[ \sum_j \vec{F}_{ij} = m\ddot{x}_i \]

Such systems can be easily represented by a graph, and analyzed using GraphX in a distributed, fault tolerant manner.

plot of chunk unnamed-chunk-2

Model for Graph

  • Each pixel has a mass (\( m \)) is a vertex and is connected to the neighoring pixels by edges.
  • These edges are compressible strings and exert force by Hooke's Law \[ \vec{F}=k (\vec{d}_{ij}-\vec{\text{restLength}}) \]

Basic types for the image graph (position)

case class D3int(x: Int, y: Int, z: Int)
case class D3float(x: Float, y: Float, z: Float)
case class ImageVertex(index: Int,pos: D3int = new D3int(0),value: Int = 0,original: Boolean = false)
case class ImageEdge(dist: Double, orientation: D3float, restLength: Double = 1.0)
case class ForceEdge(ie: ImageEdge, force: D3float)

Creating a graph

import org.apache.spark.graphx._
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.Edge
import org.apache.spark.graphx.impl.GraphImpl

From an Array to a Graph

  def twoDArrayToGraph(sc: SparkContext, inArr: Array[Array[Int]]): Graph[ImageVertex, ImageEdge] = {
    val ywidth=inArr.length
    val xwidth=inArr(0).length
    val vertices = sc.parallelize(0 until xwidth*ywidth).map{
      idx => extractPoint(idx,inArr,xwidth,ywidth)
    }
    val fvertices: RDD[(VertexId, ImageVertex)] = vertices.map(cpt => (cpt.index,cpt))
    val edges = vertices.flatMap{
      cpt => spreadVertices(cpt,1)
    }.groupBy(_.pos).filter{
      // at least one original point
      ptList => ptList._2.map(_.original).reduce(_ || _)
    }.flatMap{ combPoint => {
        val pointList=combPoint._2
        val centralPoint = pointList.filter(_.original).head
        val neighborPoints = pointList.filter(pvec => !pvec.original)
        for(cNeighbor<-neighborPoints) 
          yield Edge[Unit](centralPoint.index,cNeighbor.index)
      }
    }
    Graph[ImageVertex, Unit](fvertices,edges).
    mapTriplets(triplet => triplet.srcAttr-triplet.dstAttr)
  }

Calculate Forces

 def calcForces(inGraph: Graph[ImageVertex, ImageEdge]) = {
    inGraph.mapEdges(
     rawEdge => {
      val edge: ImageEdge = rawEdge.attr
    val k = 0.01
      val force = (edge.restLength-edge.dist)
      new ForceEdge(edge,edge.orientation*force)
    })
  }
  def sumForces(mGraph: Graph[ImageVertex, ForceEdge]) = {
    mGraph.mapReduceTriplets[D3float](
        // map function
        triplet => {
           Iterator((triplet.srcId, triplet.attr.force),
                    (triplet.dstId, triplet.attr.force*(-1))
               )
        },
        // reduce function
        (force1: D3float,force2: D3float) => force1+force2
    ).join(mGraph.vertices)
  }

One iteration

val femGraph = twoDArrayToGraph(sc,testImg)
val forceGraph = calcForces(myGraph)
val forces = sumForces(forceGraph)

Making a Domain Specific Language

Scala allows for Implicit Conversions and Wrappers making DSL programming very easy

  • Instead of
val femGraph = twoDArrayToGraph(sc,testImg)
val forceGraph = calcForces(myGraph)
val forces = sumForces(forceGraph)
  • You can mix-in your own methods to existing structures
val forceGraph = testImg toGraph addForces
val nodeForces = forceGraph sumForces
val offsetForces = forceGraph + (0,0,1)

Implicit Classes and Methods

implicit class GraphLoader(inArray: Array[Array[Int]]) {
  def createGraph() = {
    twoDArrayToGraph(sc,inArray)
  }
}
implicit class ImageGraph(inGraph: Graph[ImageVertex, ImageEdge]) {
  def addForces() = {
    calcForces(inGraph)
  }
}
implicit class ForceGraph(inGraph: Graph[ImageVertex, ForceEdge]) {
  def sumForces() = {
    sumForces(inGraph)
  }
  def +(offset: (Int,Int,Int)) = {
    inGraph.mapEdges{edge => 
      ForceEdge(edge.attr.ie,edge.attr.force+offset)
    }
  }
}

Reality Check

Spark Shortcomings

  • Spark is not performant \( \rightarrow \) dedicated, optimized CPU and GPU codes will perform slightly to much much better when evaulated by data points per second per processing power unit
    • these codes will be wildly outperformed by dedicated hardware / FPGA solutions
  • Serialization overhead and network congestion are not neglible for large datasets

But

  • Scala / Python in Spark is substantially easier to write and test
    • Highly optimized codes are very inflexible
    • Human time is 400x more expensive than AWS time
    • Mistakes due to poor testing can be fatal
  • Spark scales smoothly to enormous datasets
    • GPUs rarely have more than a few gigabytes
    • Writing code that pages to disk is painful
  • Spark is hardware agnostic (no drivers or vendor lock-in)

SQL Automatically Detecting Format for more complex data

From pull request 1351 (https://github.com/apache/spark/pull/1351/)

import org.apache.spark.sql.catalyst.types._
val fields = header.map( fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, AttributeReference, Row}
def castToType(value: Any, dataType: DataType): Any = dataType match {
  case StringType => value.asInstanceOf[String]
  case BooleanType => value.asInstanceOf[Boolean]
  case DoubleType => value.asInstanceOf[Double]
  case FloatType => value.asInstanceOf[Float]
  case IntegerType => value.asInstanceOf[Int]
  case LongType => value.asInstanceOf[Long]
  case ShortType => value.asInstanceOf[Short]
  case _ => null
}
def parseCSV(iter: Iterator[String],
delimiter: String, schema: StructType): Iterator[Row] = {
  val row = new GenericMutableRow(schema.fields.length)
  iter.map { line =>
    val tokens = line.split(delimiter)
    schema.fields.zipWithIndex.foreach {
      case (StructField(name, dataType, _), index) =>
      row.update(index, castToType(tokens(index), dataType))
    }
    row
  }
}
import org.apache.spark.sql.SchemaRDD
val asAttr = schema.fields.map(field => AttributeReference(field.name, field.dataType, nullable = true)())
val parsedCSV = test.mapPartitions(line => parseCSV(line,",",schema))
val newRDD = new SchemaRDD(inData,SparkLogicalPlan(ExistingRdd(asAttr, parsedCSV)))