Spark Introduction

Kevin Mader
22 July 2014

IBM Data Science Connect Event

Paul Scherrer Institut ETH Zurich 4Quant

Outline

  • Motivation
  • What is Big Data?
  • Simple Example
  • What is Spark
  • How to get started?
  • Examples

Motivation

  • Data science, machine learning, image processing are all computationally intensive tasks.
  • More data is available now than ever before
    • X-Ray Imaging: 8GB/s continuous images
    • Genomics: Full genomes can be analyzed for less than $1000 at rapidly increasing rates
    • Facebook, Twitter, Google collect and analyze petabytes of data per day
  • Standard tools on single machines cannot keep up

Big Data: Definition

Velocity, Volume, Variety

When a ton of heterogeneous is coming in fast.

Performant, scalable, and flexible

When scaling isn't scary

10X, 100X, 1000X is the same amount of effort

When you are starving for enough data

Michael Franklin, Director of AMPLab, said their rate limiting factor is always enough interesting data

O 'clicks' per sample

K-Means Clustering

A common method to automatically group large datasets into different groups.

K-Means Figure

Example

Input Data

Each row represents a single measurement (in this case student) and each column a piece of information about this student.

name grade
Joe 5.0
Jane 6.0
Bob 5.5
Kevin 3.0
Alice 4.0
Tim 3.5

K-Means Clustering

Along with the input we need to define a distance metric for comparing rows (students). Since it is difficult to define a distance between names in a meaningful way (is “Kevin” > “Tim”?), we just use grades

Group.1 Group.2
Joe Kevin
Jane Alice
Bob Tim
  • Group.1 is probably good students
  • Group.2 is more worrisome students

Growing up

Most data science / statistics require significantly more data. Here is an invented example with cancer risk (I am very tired of advertising and click-conversion examples)

drinks.per.day age smoker location cancer.risk
3 85.65 Yes Rome 26.734
7 56.22 No Rome 46.130
3 17.77 Sometimes Rome 6.718
8 28.22 Yes Beijing 30.220
3 39.25 Sometimes New York 11.470
4 27.74 Yes Beijing 23.215
5 38.07 No Beijing 27.110
1 14.48 Sometimes Rome 5.991
8 58.01 Yes New York 58.003
1 88.47 Sometimes New York 22.357

How do you analyze data like this?

drinks.per.day age smoker location cancer.risk
Min. :0.00 Min. :10.1 No :323 Beijing :349 Min. : 0.0
1st Qu.:2.00 1st Qu.:33.3 Sometimes:346 New York:339 1st Qu.:12.0
Median :4.00 Median :55.0 Yes :331 Rome :312 Median :26.2
Mean :4.47 Mean :55.2 NA NA Mean :33.8
3rd Qu.:7.00 3rd Qu.:77.9 NA NA 3rd Qu.:54.4
Max. :9.00 Max. :99.0 NA NA Max. :97.9

How do you analyze data like this?

plot of chunk unnamed-chunk-5

Using K-Means to Understand

plot of chunk unnamed-chunk-7

What happens when it gets really big?

  • 10,000 people
demo.kmeans(10000)
[1] "elapsed: 0.618"
  • 1,000,000 people
demo.kmeans(1e6)
[1] "elapsed: 11.88"

Beyond

plot of chunk unnamed-chunk-12

Solution?

Solution has traditionally been to buy a faster computer, but

  • processors aren't getting faster like before.
  • Transistors per chip is still doubling

    • clockspeed hasn't progressed significantly beyond 3Ghz
    • with multicore architectures

    \[ \longrightarrow \]

Divide and Conquer Perform processing on multiple cores / computers at the same time!

How?

Imperative vs Declarative Soups

Imperative soup

  1. Buy {carrots, peas, tomatoes} at market
  2. then Buy meat at butcher
  3. then Chop carrots into pieces
  4. then Chop potatos into pieces
  5. then Heat water
  6. then Wait until boiling then add chopped vegetables
  7. then Wait 5 minutes and add meat

Declarative soup

  • Buy {carrots, peas, tomatoes} at market \( \rightarrow shop_{veggies} \)
  • Buy meat at butcher \( \rightarrow shop_{meat} \)
  • Wait for \( shop_{veggies} \): Chop carrots into pieces \( \rightarrow chopped_{carrots} \)
  • Wait for \( shop_{veggies} \): Chop potatos into pieces \( \rightarrow chopped_{potatos} \)
  • Heat water
  • Wait for \( boiling_{water} \),\( chopped_{carrots} \),\( chopped_{potatos} \): Add chopped vegetables
    • Wait 5 minutes and add meat

Conseqeunce

The imperative instructions is a list of very clear commands which must be run in a fixed specified order

Imperative

  • optimize specific tasks (chopping vegetables, mixing) so that many people can do it faster
    • Matlab/Python do this with fast-fourier-transforms (automatically uses many cores to compute faster)
  • make many soups at the same time (independent)
    • This leads us to cluster-based computing

The declarative instructions are a list of separate task which can be run in any order

Declarative

  • run everything at once
  • each core (computer) takes a task and runs it
  • execution order does not matter
    • wait for portions to be available (dependency)

Lazy Evaluation

  • do not run anything at all
  • until something needs to be exported or saved
  • run only the tasks that are needed for the final result
    • never buy tomatoes since they are not in the final soup

From Imperative to Declarative

  1. Requires rethinking many types of algorithms
  2. A course-grained approach
    • operations instead of loops (map)
    • selection instead of if/then (filter)
    • combination instead of complex logic (reduce)

From Imperative to Declarative (continued)

Map

  • Apply the exact same operation (function) to every item in a collection (list, array, tree, …)
  • Transforms a collection of type \( \mathcal{A} \) into an equally dimensioned collection of type \( \mathcal{B} \)

\[ f(a\text{ of type }\mathcal{A}) \rightarrow b\text{ of type }\mathcal{B} \]

Filter

  • Apply the same operation to every item in a collection and keep only the elements which are true
  • Prunes a collection from elements where f is false

Group By

  • Divides a collection into subcollections using an operation to group the items

Reduce

  • Combine elements in a collection using a fixed operation
  • Reduces a collection to a single element

K-Means Standard (4 groups)

  1. Starting list of points: Points
  2. Take 4 random points from the list \( \rightarrow \) Centers
  3. Calculate the distance from every point in the list to every center: CenterDistance
for cPoint in Points
  for cCenter in Centers
    CenterDistance(cPoint,cCenter)=dist(cPoint,cCenter)
  end
end
  1. Find the nearest center for each point
for cPoint in Points
  NearestCenter(cPoint) = argmin(CenterDist(cPoint,*))
end

K-Means Continued

  1. Define the groups by the nearest center
  2. Calculate the mean position for the points in each group
for cCenter in Centers
  for cPoint in Points
    if NearestCenter(cPoint) is cCenter
      CurrentPoints+=cPoint
    end
  end
  NewCenter(cCenter)=mean(CurrentPoints)
end
  1. Repeat

K-Means Optimized (MPI/CUDA)

Taken from Serban's K-Means CUDA Project (https://github.com/serban/kmeans/blob/master/cuda_kmeans.cu)

checkCuda(cudaMalloc(&deviceObjects, numObjs*numCoords*sizeof(float)));
checkCuda(cudaMalloc(&deviceClusters, numClusters*numCoords*sizeof(float)));
checkCuda(cudaMalloc(&deviceMembership, numObjs*sizeof(int)));
checkCuda(cudaMalloc(&deviceIntermediates, numReductionThreads*sizeof(unsigned int)));
checkCuda(cudaMemcpy(deviceObjects, dimObjects[0],
  numObjs*numCoords*sizeof(float), cudaMemcpyHostToDevice));
checkCuda(cudaMemcpy(deviceMembership, membership,
  numObjs*sizeof(int), cudaMemcpyHostToDevice));
do {
  checkCuda(cudaMemcpy(deviceClusters, dimClusters[0],
                  numClusters*numCoords*sizeof(float), cudaMemcpyHostToDevice));
  find_nearest_cluster
            <<< numClusterBlocks, numThreadsPerClusterBlock, clusterBlockSharedDataSize >>>
            (numCoords, numObjs, numClusters,
             deviceObjects, deviceClusters, deviceMembership, deviceIntermediates);
  cudaDeviceSynchronize(); checkLastCudaError();
        compute_delta <<< 1, numReductionThreads, reductionBlockSharedDataSize >>>
            (deviceIntermediates, numClusterBlocks, numReductionThreads);
        cudaDeviceSynchronize(); checkLastCudaError();
        int d;
        checkCuda(cudaMemcpy(&d, deviceIntermediates,
                  sizeof(int), cudaMemcpyDeviceToHost));
        delta = (float)d;
        checkCuda(cudaMemcpy(membership, deviceMembership,
                  numObjs*sizeof(int), cudaMemcpyDeviceToHost));
        for (i=0; i<numObjs; i++) {
            /* find the array index of nestest cluster center */
            index = membership[i];

            /* update new cluster centers : sum of objects located within */
            newClusterSize[index]++;
            for (j=0; j<numCoords; j++)
                newClusters[j][index] += objects[i][j];
        }

K-Means Declarative

  1. Starting list of points: Points
  2. Take 4 random points from the list \( \rightarrow \) Centers
    • Apply a filter operation with the function (random()<nPoints/4)
  3. Create a function called nearest center which takes a point \( \vec{x} \) and returns the nearest center to the point and the point itself
nearestCenter(x) = {
  for cCenter in Centers
    pointDist(cCenter) = dist(x,cCenter)
  end
  return (argmin(pointDist),x)
}
  1. Map nearestCenter onto Points as NearestCenterList

K-Means Declarative Continued

  1. Group By NearestCenterList and the first value (index of the nearest center) as GroupedCenterList
  2. Reduce GroupedCenterList by calculating the average point for each group as AverageCenters
  3. Compare AverageCenters with Centers and if the difference is too large replace Centers \( \leftarrow \) AverageCenters and repeat from step 2

The major differences

Instead of for, if, and other commands the more complicated map, filter, group by, and reduce are used

Downsides

  • Slightly more complicated to think through the steps
  • Some commands seem less natural than for and if

Upsides

  • Map, Filter, Group By, and Reduce are written by someone else (part of a framework)
    • If these commands are parallel, your program is parallel
  • Independent of underlying data structures

Spark / Resilient Distributed Datasets

Technical Specifications

  • Developed by the Algorithms, Machines, and People Lab at UC Berkeley in 2012
  • General tool for all Directed Acyclical Graph (DAG) workflows
  • Course-grained processing \( \rightarrow \) simple operations applied to entire sets
    • Map, reduce, join, group by, fold, foreach, filter,…
  • In-memory caching

Spark Zaharia, M., et. al (2012). Resilient distributed datasets: a fault-tolerant abstraction for in-memory cluster computing

Practical Specification

  • Distributed, parallel computing without logistics, libraries, or compiling
  • Declarative rather than imperative
    • Apply operation \( f \) to each image / block
    • NOT tell computer 3 to wait for an image from computer 2 to and perform operation \( f \) and send it to computer 1
    • Even scheduling is handled automatically
  • Results can be stored in memory, on disk, redundant or not

Spark (Demo)

Install Spark (Advanced)

cd /scratch
curl -o spark.tgz http://d3kbcqa49mib13.cloudfront.net/spark-0.9.1-bin-hadoop1.tgz
tar -xvf spark.tgz
cd spark-0.9.1-bin-hadoop1/

Starting Spark

Spin up your own cluster in an hour ~~ we only use it on one node acting as the master, scheduler, and worker, but normally it is run on different computers ~~

  • Start the Spark-Shell ./bin/spark-shell
  • Start Spark-python ./bin/pyspark
    • Write code in Python

Basic Operations with Spark

  • Load in text data (a CSV file)
val inData=sc.textFile("summary.csv")

Output inData: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile

  • Read the first line
val header = inData.first.split(",")

Output Array[String] = Array(“file.name”,“LACUNA_NUMBER”,“POS_X”,“POS_Y”,…

  • Count lines
inData.count

Output Long = 1182

Format the data into case classes

  • Split into pieces using comma delimiter

    val colData = inData.filter(_ != header).map(_.split(","))
    

    Output colData: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[2]

  • Define a format

case class CellShape(sample: Long,PosX: Double, PosY: Double, PosZ: Double,Volume: Double) 
val classedData = colData.map(line => CellShape(line(0).toLong,line(5),line(6),line(7),line(31))).cache

Find nearest neighbors of a point

  • Perform a Cartesian Product (Everything x Everything)
val crossData = classedData.cartesian(classedData).filter{dPair => dPair._1.sample != dPair._2.sample}
  • Calculate the distance between the points
val distData = crossData.map{dPair => 
   (dPair._1.sample,
    (sqrt(pow(dPair._1.PosX-dPair._2.PosX,2)+pow(dPair._1.PosY-dPair._2.PosY,2)+pow(dPair._1.PosZ-dPair._2.PosZ,2)),dPair._1,dPair._2)
    )
}
  • Group the data by point and take the smallest distance
val pointData = distData.groupByKey.map{cvals => 
     val minDist = cvals._2.map(_._1).min
     (cvals._1,cvals._2.filter(ival => ival._1==minDist).head) }
  • Calculate the nearest neighbor distance
pointData.map(_._2._1).reduce(_ + _)/pointData.count

Moving Data into SparkSQL

  • Import SparkSQL Tools
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
import sqlContext._
classedData.registerAsTable("Summary")
sql("SELECT SUM(PosX) from Summary").collect

Output

== Query Plan ==
Aggregate false, [], [SUM(PartialSum#12) AS c0#11]
 Exchange SinglePartition
  Aggregate true, [], [SUM(PosZ#8) AS PartialSum#12]
   Project [PosZ#8:2]
    ExistingRdd [sample#6,id#7L,PosZ#8], MapPartitionsRDD[19] at mapPartitions at basicOperators.scala:173

Automatically Detecting Format for SQL

From pull request 1351 (https://github.com/apache/spark/pull/1351/)

import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, AttributeReference, Row}
import org.apache.spark.sql.SchemaRDD
val fields = headerLine.map( fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
def castToType(value: Any, dataType: DataType): Any = dataType match {
  case StringType => value.asInstanceOf[String]
  case BooleanType => value.asInstanceOf[Boolean]
  case DoubleType => value.asInstanceOf[Double]
  case FloatType => value.asInstanceOf[Float]
  case IntegerType => value.asInstanceOf[Int]
  case LongType => value.asInstanceOf[Long]
  case ShortType => value.asInstanceOf[Short]
  case _ => null
}
def parseCSVLine(splitLine: Array[String], schema: StructType): Row = {
  val row = new GenericMutableRow(schema.fields.length)
  schema.fields.zipWithIndex.foreach {
    case (StructField(name, dataType, _), index) =>
    row.update(index, castToType(splitLine(index), dataType))
  }
  row
}
val asAttr = schema.fields.map(field => AttributeReference(field.name, field.dataType, nullable = true)())
val parsedCSV = rowData.map(_.split(",")).map(line => parseCSVLine(line,schema))
val sqlRdd = new SchemaRDD(sqlContext,SparkLogicalPlan(ExistingRdd(asAttr, parsedCSV)))

Image Processing with Spark

To process data with Spark it needs to be a in a list or ideally a key-value pair format. For an image this is not trivial since they are normally represented by a 2- or 3D matrix which encodes information in both the value (intensity, absorption, etc) and the position (A\( [x][y][z] \)).

  • A key-value par can be made by having the key as the position \( (x,y,z) \) and the value as the intensity or whatever other signals are important.

  • This format is also better suited for multichannel (more then one value per position) and even hyperspectral imaginging since there is little difference between \( (I) \) and \( (I,J,K,U,V,W,\cdots) \)

Getting an image to Key-Value Format

library(jpeg)
in.img<-readJPEG("ext-figures/Cell_Colony.jpg")
kv.img<-im.to.df(in.img)
write.table(kv.img,"cell_colony.csv",row.names=F,col.names=F,sep=",")
kable(head(kv.img))
x y val
1 1 0.6275
2 1 0.7804
3 1 0.8863
4 1 0.8980
5 1 0.9098
6 1 0.9216

The key is position \( \langle x, y \rangle \) and value is the intensity \( val \)

Loading the data into Spark (Scala)

val rawImage=sc.textFile("cell_colony.csv")
val imgAsColumns=rawImage.map(_.split(","))
val imgAsKV=imgAsColumns.map(point => ((point(0).toInt,point(1).toInt),point(2).toDouble))
  • Count the number of pixels
imgAsKV.count
  • Get the first value
imgAsKV.take(1)
  • Sample 100 values from the data
imgAsKV.sample(true,0.1,0).collect

Perform a threshold

val threshVal=0.5
val labelImg=imgAsKV.filter(_._2<threshVal)
  • Runs on 1 core on your laptop or 1000 cores in the cloud or local cluster.
  • If one computer crashes or disconnects it automatically continues on another one.
  • If one part of the computation is taking too long it will be sent to other computers to finish
  • If a computer runs out of memory it writes the remaining results to disk and continues running (graceful dropoff in performance )

Get Volume Fraction

100.0*labelImg.count/(imgAsKV.count)

Region of Interest

Take a region of interest between 0 and 100 in X and Y

def roiFun(pvec: ((Int,Int),Double)) = 
 {pvec._1._1>=0 & pvec._1._1<100 & // X
  pvec._1._2>=0 & pvec._1._2<100 } //Y
val roiImg=imgAsKV.filter(roiFun)

Perform a 3x3 box filter

def spread_voxels(pvec: ((Int,Int),Double), windSize: Int = 1) = {
  val wind=(-windSize to windSize)
  val pos=pvec._1
  val scalevalue=pvec._2/(wind.length*wind.length)
  for(x<-wind; y<-wind) 
    yield ((pos._1+x,pos._2+y),scalevalue)
}

val filtImg=roiImg.
      flatMap(cvec => spread_voxels(cvec)).
      filter(roiFun).reduceByKey(_ + _)

Setting up Component Labeling

  • Create the first labels from a thresheld image as a mutable type
val xWidth=100
var newLabels=labelImg.map(pvec => (pvec._1,(pvec._1._1.toLong*xWidth+pvec._1._2+1,true)))
  • Spreading to Neighbor Function
def spread_voxels(pvec: ((Int,Int),(Long,Boolean)), windSize: Int = 1) = {
  val wind=(-windSize to windSize)
  val pos=pvec._1
  val label=pvec._2._1
  for(x<-wind; y<-wind) 
    yield ((pos._1+x,pos._2+y),(label,(x==0 & y==0)))
}

Running Component Labeling

var groupList=Array((0L,0))
var running=true
var iterations=0
while (running) {
  newLabels=newLabels.
  flatMap(spread_voxels(_,1)).
    reduceByKey((a,b) => ((math.min(a._1,b._1),a._2 | b._2))).
    filter(_._2._2)
  // make a list of each label and how many voxels are in it
  val curGroupList=newLabels.map(pvec => (pvec._2._1,1)).
    reduceByKey(_ + _).sortByKey(true).collect
  // if the list isn't the same as before, continue running since we need to wait for swaps to stop
  running = (curGroupList.deep!=groupList.deep)
  groupList=curGroupList
  iterations+=1
  print("Iter #"+iterations+":"+groupList.mkString(","))
}
groupList

Calculating From Images

  • Average Voxel Count
val labelSize = newLabels.
  map(pvec => (pvec._2._1,1)).
  reduceByKey((a,b) => (a+b)).
  map(_._2)
labelSize.reduce((a,b) => (a+b))*1.0/labelSize.count
  • Center of Volume for Each Label
val labelPositions = newLabels.
  map(pvec => (pvec._2._1,pvec._1)).
  groupBy(_._1)
def posAvg(pvec: Seq[(Long,(Int,Int))]): (Double,Double) = {
val sumPt=pvec.map(_._2).reduce((a,b) => (a._1+b._1,a._2+b._2))
(sumPt._1*1.0/pvec.length,sumPt._2*1.0/pvec.length)
}
print(labelPositions.map(pvec=>posAvg(pvec._2)).mkString(","))

Finite Element Modeling with Graphs in GraphX

For many systems like bone tissue, cellular tissues, cellular materials and many others, the structure is just the beginning and the most interesting results come from the application to physical, chemical, or biological rules inside of these structures.

\[ \sum_j \vec{F}_{ij} = m\ddot{x}_i \]

Such systems can be easily represented by a graph, and analyzed using GraphX in a distributed, fault tolerant manner.

plot of chunk unnamed-chunk-15

Model for Graph

  • Each pixel has a mass (\( m \)) is a vertex and is connected to the neighoring pixels by edges.
  • These edges are compressible strings and exert force by Hooke's Law \[ \vec{F}=k (\vec{d}_{ij}-\vec{\text{restLength}}) \]

Basic types for the image graph (position)

case class D3int(x: Int, y: Int, z: Int)
case class D3float(x: Float, y: Float, z: Float)
case class ImageVertex(index: Int,pos: D3int = new D3int(0),value: Int = 0,original: Boolean = false)
case class ImageEdge(dist: Double, orientation: D3float, restLength: Double = 1.0)
case class ForceEdge(ie: ImageEdge, force: D3float)

Creating a graph

import org.apache.spark.graphx._
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.Edge
import org.apache.spark.graphx.impl.GraphImpl

From an Array to a Graph

  def twoDArrayToGraph(sc: SparkContext, inArr: Array[Array[Int]]): Graph[ImageVertex, ImageEdge] = {
    val ywidth=inArr.length
    val xwidth=inArr(0).length
    val vertices = sc.parallelize(0 until xwidth*ywidth).map{
      idx => extractPoint(idx,inArr,xwidth,ywidth)
    }
    val fvertices: RDD[(VertexId, ImageVertex)] = vertices.map(cpt => (cpt.index,cpt))
    val edges = vertices.flatMap{
      cpt => spreadVertices(cpt,1)
    }.groupBy(_.pos).filter{
      // at least one original point
      ptList => ptList._2.map(_.original).reduce(_ || _)
    }.flatMap{ combPoint => {
        val pointList=combPoint._2
        val centralPoint = pointList.filter(_.original).head
        val neighborPoints = pointList.filter(pvec => !pvec.original)
        for(cNeighbor<-neighborPoints) 
          yield Edge[Unit](centralPoint.index,cNeighbor.index)
      }
    }
    Graph[ImageVertex, Unit](fvertices,edges).
    mapTriplets(triplet => triplet.srcAttr-triplet.dstAttr)
  }

Calculate Forces

 def calcForces(inGraph: Graph[ImageVertex, ImageEdge]) = {
    inGraph.mapEdges(
     rawEdge => {
      val edge: ImageEdge = rawEdge.attr
    val k = 0.01
      val force = (edge.restLength-edge.dist)
      new ForceEdge(edge,edge.orientation*force)
    })
  }
  def sumForces(mGraph: Graph[ImageVertex, ForceEdge]) = {
    mGraph.mapReduceTriplets[D3float](
        // map function
        triplet => {
           Iterator((triplet.srcId, triplet.attr.force),
                    (triplet.dstId, triplet.attr.force*(-1))
               )
        },
        // reduce function
        (force1: D3float,force2: D3float) => force1+force2
    ).join(mGraph.vertices)
  }

One iteration

val femGraph = twoDArrayToGraph(sc,testImg)
val forceGraph = calcForces(myGraph)
val forces = sumForces(forceGraph)

Making a Domain Specific Language

Scala allows for Implicit Conversions and Wrappers making DSL programming very easy

  • Instead of
val femGraph = twoDArrayToGraph(sc,testImg)
val forceGraph = calcForces(myGraph)
val forces = sumForces(forceGraph)
  • You can mix-in your own methods to existing structures
val forceGraph = testImg toGraph addForces
val nodeForces = forceGraph sumForces
val offsetForces = forceGraph + (0,0,1)

Implicit Classes and Methods

implicit class GraphLoader(inArray: Array[Array[Int]]) {
  def createGraph() = {
    twoDArrayToGraph(sc,inArray)
  }
}
implicit class ImageGraph(inGraph: Graph[ImageVertex, ImageEdge]) {
  def addForces() = {
    calcForces(inGraph)
  }
}
implicit class ForceGraph(inGraph: Graph[ImageVertex, ForceEdge]) {
  def sumForces() = {
    sumForces(inGraph)
  }
  def +(offset: (Int,Int,Int)) = {
    inGraph.mapEdges{edge => 
      ForceEdge(edge.attr.ie,edge.attr.force+offset)
    }
  }
}

Reality Check

Spark Shortcomings

  • Spark is not performant \( \rightarrow \) dedicated, optimized CPU and GPU codes will perform slightly to much much better when evaulated by data points per second per processing power unit
    • these codes will be wildly outperformed by dedicated hardware / FPGA solutions
  • Serialization overhead and network congestion are not neglible for large datasets

But

  • Scala / Python in Spark is substantially easier to write and test
    • Highly optimized codes are very inflexible
    • Human time is 400x more expensive than AWS time
    • Mistakes due to poor testing can be fatal
  • Spark scales smoothly to enormous datasets
    • GPUs rarely have more than a few gigabytes
    • Writing code that pages to disk is painful
  • Spark is hardware agnostic (no drivers or vendor lock-in)

Spark Error Messages

Exception in thread "main" java.lang.AssertionError: assertion failed: Tried to find '$line47' in '/var/folders/yq/w_mvh2xj7yzdzb6k4pknwzgc0000gn/T/spark-f8aac450-9c70-4232-a71f-e089e7bdd03b' but it is not a directory
  at scala.reflect.io.AbstractFile.subdirectoryNamed(AbstractFile.scala:254)
  at scala.tools.nsc.backend.jvm.BytecodeWriters$class.getFile(BytecodeWriters.scala:31)
    at scala.tools.nsc.backend.jvm.BytecodeWriters$class.scala$tools$nsc$backend$jvm$BytecodeWriters$$getFile(BytecodeWriters.scala:37)
    at scala.tools.nsc.backend.jvm.BytecodeWriters$ClassBytecodeWriter$class.writeClass(BytecodeWriters.scala:89)
    at scala.tools.nsc.backend.jvm.GenASM$AsmPhase$$anon$4.writeClass(GenASM.scala:67)
    at scala.tools.nsc.backend.jvm.GenASM$JBuilder.writeIfNotTooBig(GenASM.scala:459)
    at scala.tools.nsc.backend.jvm.GenASM$JPlainBuilder.genClass(GenASM.scala:1413)
    at scala.tools.nsc.backend.jvm.GenASM$AsmPhase.run(GenASM.scala:120)
    at scala.tools.nsc.Global$Run.compileUnitsInternal(Global.scala:1583)
    at scala.tools.nsc.Global$Run.compileUnits(Global.scala:1557)
    at scala.tools.nsc.Global$Run.compileSources(Global.scala:1553)
    at org.apache.spark.repl.SparkIMain.compileSourcesKeepingRun(SparkIMain.scala:468)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.compileAndSaveRun(SparkIMain.scala:859)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.compile(SparkIMain.scala:815)
    at org.apache.spark.repl.SparkIMain$Request.compile$lzycompute(SparkIMain.scala:1009)
    at org.apache.spark.repl.SparkIMain$Request.compile(SparkIMain.scala:1004)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:644)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753)
    at org.apache.spark.repl.SparkILoop$$anonfun$replay$1.apply(SparkILoop.scala:634)
    at org.apache.spark.repl.SparkILoop$$anonfun$replay$1.apply(SparkILoop.scala:632)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.repl.SparkILoop.replay(SparkILoop.scala:632)
    at org.apache.spark.repl.SparkILoop$$anonfun$1.applyOrElse(SparkILoop.scala:579)
    at org.apache.spark.repl.SparkILoop$$anonfun$1.applyOrElse(SparkILoop.scala:566)
    at scala.runtime.AbstractPartialFunction$mcZL$sp.apply$mcZL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcZL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcZL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608)
    at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)

Acknowledgements

  • AIT at PSI
  • TOMCAT Group Tomcat Group

We are interested in partnerships and collaborations

Learn more at