Kevin Mader
22 July 2014
When a ton of heterogeneous is coming in fast.
Performant, scalable, and flexible
10X, 100X, 1000X is the same amount of effort
Michael Franklin, Director of AMPLab, said their rate limiting factor is always enough interesting data
A common method to automatically group large datasets into different groups.
Input Data
Each row represents a single measurement (in this case student) and each column a piece of information about this student.
name | grade |
---|---|
Joe | 5.0 |
Jane | 6.0 |
Bob | 5.5 |
Kevin | 3.0 |
Alice | 4.0 |
Tim | 3.5 |
K-Means Clustering
Along with the input we need to define a distance metric for comparing rows (students). Since it is difficult to define a distance between names in a meaningful way (is “Kevin” > “Tim”?), we just use grades
Group.1 | Group.2 |
---|---|
Joe | Kevin |
Jane | Alice |
Bob | Tim |
Most data science / statistics require significantly more data. Here is an invented example with cancer risk (I am very tired of advertising and click-conversion examples)
drinks.per.day | age | smoker | location | cancer.risk |
---|---|---|---|---|
3 | 85.65 | Yes | Rome | 26.734 |
7 | 56.22 | No | Rome | 46.130 |
3 | 17.77 | Sometimes | Rome | 6.718 |
8 | 28.22 | Yes | Beijing | 30.220 |
3 | 39.25 | Sometimes | New York | 11.470 |
4 | 27.74 | Yes | Beijing | 23.215 |
5 | 38.07 | No | Beijing | 27.110 |
1 | 14.48 | Sometimes | Rome | 5.991 |
8 | 58.01 | Yes | New York | 58.003 |
1 | 88.47 | Sometimes | New York | 22.357 |
drinks.per.day | age | smoker | location | cancer.risk | |
---|---|---|---|---|---|
Min. :0.00 | Min. :10.1 | No :323 | Beijing :349 | Min. : 0.0 | |
1st Qu.:2.00 | 1st Qu.:33.3 | Sometimes:346 | New York:339 | 1st Qu.:12.0 | |
Median :4.00 | Median :55.0 | Yes :331 | Rome :312 | Median :26.2 | |
Mean :4.47 | Mean :55.2 | NA | NA | Mean :33.8 | |
3rd Qu.:7.00 | 3rd Qu.:77.9 | NA | NA | 3rd Qu.:54.4 | |
Max. :9.00 | Max. :99.0 | NA | NA | Max. :97.9 |
demo.kmeans(10000)
[1] "elapsed: 0.618"
demo.kmeans(1e6)
[1] "elapsed: 11.88"
Beyond
Solution has traditionally been to buy a faster computer, but
Transistors per chip is still doubling
\[ \longrightarrow \]
Divide and Conquer Perform processing on multiple cores / computers at the same time!
The imperative instructions is a list of very clear commands which must be run in a fixed specified order
The declarative instructions are a list of separate task which can be run in any order
\[ f(a\text{ of type }\mathcal{A}) \rightarrow b\text{ of type }\mathcal{B} \]
for cPoint in Points
for cCenter in Centers
CenterDistance(cPoint,cCenter)=dist(cPoint,cCenter)
end
end
for cPoint in Points
NearestCenter(cPoint) = argmin(CenterDist(cPoint,*))
end
for cCenter in Centers
for cPoint in Points
if NearestCenter(cPoint) is cCenter
CurrentPoints+=cPoint
end
end
NewCenter(cCenter)=mean(CurrentPoints)
end
Taken from Serban's K-Means CUDA Project (https://github.com/serban/kmeans/blob/master/cuda_kmeans.cu)
checkCuda(cudaMalloc(&deviceObjects, numObjs*numCoords*sizeof(float)));
checkCuda(cudaMalloc(&deviceClusters, numClusters*numCoords*sizeof(float)));
checkCuda(cudaMalloc(&deviceMembership, numObjs*sizeof(int)));
checkCuda(cudaMalloc(&deviceIntermediates, numReductionThreads*sizeof(unsigned int)));
checkCuda(cudaMemcpy(deviceObjects, dimObjects[0],
numObjs*numCoords*sizeof(float), cudaMemcpyHostToDevice));
checkCuda(cudaMemcpy(deviceMembership, membership,
numObjs*sizeof(int), cudaMemcpyHostToDevice));
do {
checkCuda(cudaMemcpy(deviceClusters, dimClusters[0],
numClusters*numCoords*sizeof(float), cudaMemcpyHostToDevice));
find_nearest_cluster
<<< numClusterBlocks, numThreadsPerClusterBlock, clusterBlockSharedDataSize >>>
(numCoords, numObjs, numClusters,
deviceObjects, deviceClusters, deviceMembership, deviceIntermediates);
cudaDeviceSynchronize(); checkLastCudaError();
compute_delta <<< 1, numReductionThreads, reductionBlockSharedDataSize >>>
(deviceIntermediates, numClusterBlocks, numReductionThreads);
cudaDeviceSynchronize(); checkLastCudaError();
int d;
checkCuda(cudaMemcpy(&d, deviceIntermediates,
sizeof(int), cudaMemcpyDeviceToHost));
delta = (float)d;
checkCuda(cudaMemcpy(membership, deviceMembership,
numObjs*sizeof(int), cudaMemcpyDeviceToHost));
for (i=0; i<numObjs; i++) {
/* find the array index of nestest cluster center */
index = membership[i];
/* update new cluster centers : sum of objects located within */
newClusterSize[index]++;
for (j=0; j<numCoords; j++)
newClusters[j][index] += objects[i][j];
}
(random()<nPoints/4)
nearestCenter(x) = {
for cCenter in Centers
pointDist(cCenter) = dist(x,cCenter)
end
return (argmin(pointDist),x)
}
nearestCenter
onto Points as NearestCenterListInstead of for, if, and other commands the more complicated map, filter, group by, and reduce are used
Zaharia, M., et. al (2012). Resilient distributed datasets: a fault-tolerant abstraction for in-memory cluster computing
cd /scratch
curl -o spark.tgz http://d3kbcqa49mib13.cloudfront.net/spark-0.9.1-bin-hadoop1.tgz
tar -xvf spark.tgz
cd spark-0.9.1-bin-hadoop1/
Spin up your own cluster in an hour ~~ we only use it on one node acting as the master, scheduler, and worker, but normally it is run on different computers ~~
./bin/spark-shell
./bin/pyspark
val inData=sc.textFile("summary.csv")
Output inData: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile
val header = inData.first.split(",")
Output Array[String] = Array(“file.name”,“LACUNA_NUMBER”,“POS_X”,“POS_Y”,…
inData.count
Output Long = 1182
Split into pieces using comma delimiter
val colData = inData.filter(_ != header).map(_.split(","))
Output colData: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[2]
Define a format
case class CellShape(sample: Long,PosX: Double, PosY: Double, PosZ: Double,Volume: Double)
val classedData = colData.map(line => CellShape(line(0).toLong,line(5),line(6),line(7),line(31))).cache
val crossData = classedData.cartesian(classedData).filter{dPair => dPair._1.sample != dPair._2.sample}
val distData = crossData.map{dPair =>
(dPair._1.sample,
(sqrt(pow(dPair._1.PosX-dPair._2.PosX,2)+pow(dPair._1.PosY-dPair._2.PosY,2)+pow(dPair._1.PosZ-dPair._2.PosZ,2)),dPair._1,dPair._2)
)
}
val pointData = distData.groupByKey.map{cvals =>
val minDist = cvals._2.map(_._1).min
(cvals._1,cvals._2.filter(ival => ival._1==minDist).head) }
pointData.map(_._2._1).reduce(_ + _)/pointData.count
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
import sqlContext._
classedData.registerAsTable("Summary")
sql("SELECT SUM(PosX) from Summary").collect
Output
== Query Plan ==
Aggregate false, [], [SUM(PartialSum#12) AS c0#11]
Exchange SinglePartition
Aggregate true, [], [SUM(PosZ#8) AS PartialSum#12]
Project [PosZ#8:2]
ExistingRdd [sample#6,id#7L,PosZ#8], MapPartitionsRDD[19] at mapPartitions at basicOperators.scala:173
From pull request 1351 (https://github.com/apache/spark/pull/1351/)
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, AttributeReference, Row}
import org.apache.spark.sql.SchemaRDD
val fields = headerLine.map( fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
def castToType(value: Any, dataType: DataType): Any = dataType match {
case StringType => value.asInstanceOf[String]
case BooleanType => value.asInstanceOf[Boolean]
case DoubleType => value.asInstanceOf[Double]
case FloatType => value.asInstanceOf[Float]
case IntegerType => value.asInstanceOf[Int]
case LongType => value.asInstanceOf[Long]
case ShortType => value.asInstanceOf[Short]
case _ => null
}
def parseCSVLine(splitLine: Array[String], schema: StructType): Row = {
val row = new GenericMutableRow(schema.fields.length)
schema.fields.zipWithIndex.foreach {
case (StructField(name, dataType, _), index) =>
row.update(index, castToType(splitLine(index), dataType))
}
row
}
val asAttr = schema.fields.map(field => AttributeReference(field.name, field.dataType, nullable = true)())
val parsedCSV = rowData.map(_.split(",")).map(line => parseCSVLine(line,schema))
val sqlRdd = new SchemaRDD(sqlContext,SparkLogicalPlan(ExistingRdd(asAttr, parsedCSV)))
To process data with Spark it needs to be a in a list or ideally a key-value pair format. For an image this is not trivial since they are normally represented by a 2- or 3D matrix which encodes information in both the value (intensity, absorption, etc) and the position (A\( [x][y][z] \)).
A key-value par can be made by having the key as the position \( (x,y,z) \) and the value as the intensity or whatever other signals are important.
This format is also better suited for multichannel (more then one value per position) and even hyperspectral imaginging since there is little difference between \( (I) \) and \( (I,J,K,U,V,W,\cdots) \)
library(jpeg)
in.img<-readJPEG("ext-figures/Cell_Colony.jpg")
kv.img<-im.to.df(in.img)
write.table(kv.img,"cell_colony.csv",row.names=F,col.names=F,sep=",")
kable(head(kv.img))
x | y | val |
---|---|---|
1 | 1 | 0.6275 |
2 | 1 | 0.7804 |
3 | 1 | 0.8863 |
4 | 1 | 0.8980 |
5 | 1 | 0.9098 |
6 | 1 | 0.9216 |
The key is position \( \langle x, y \rangle \) and value is the intensity \( val \)
val rawImage=sc.textFile("cell_colony.csv")
val imgAsColumns=rawImage.map(_.split(","))
val imgAsKV=imgAsColumns.map(point => ((point(0).toInt,point(1).toInt),point(2).toDouble))
imgAsKV.count
imgAsKV.take(1)
imgAsKV.sample(true,0.1,0).collect
val threshVal=0.5
val labelImg=imgAsKV.filter(_._2<threshVal)
100.0*labelImg.count/(imgAsKV.count)
Take a region of interest between 0 and 100 in X and Y
def roiFun(pvec: ((Int,Int),Double)) =
{pvec._1._1>=0 & pvec._1._1<100 & // X
pvec._1._2>=0 & pvec._1._2<100 } //Y
val roiImg=imgAsKV.filter(roiFun)
def spread_voxels(pvec: ((Int,Int),Double), windSize: Int = 1) = {
val wind=(-windSize to windSize)
val pos=pvec._1
val scalevalue=pvec._2/(wind.length*wind.length)
for(x<-wind; y<-wind)
yield ((pos._1+x,pos._2+y),scalevalue)
}
val filtImg=roiImg.
flatMap(cvec => spread_voxels(cvec)).
filter(roiFun).reduceByKey(_ + _)
val xWidth=100
var newLabels=labelImg.map(pvec => (pvec._1,(pvec._1._1.toLong*xWidth+pvec._1._2+1,true)))
def spread_voxels(pvec: ((Int,Int),(Long,Boolean)), windSize: Int = 1) = {
val wind=(-windSize to windSize)
val pos=pvec._1
val label=pvec._2._1
for(x<-wind; y<-wind)
yield ((pos._1+x,pos._2+y),(label,(x==0 & y==0)))
}
var groupList=Array((0L,0))
var running=true
var iterations=0
while (running) {
newLabels=newLabels.
flatMap(spread_voxels(_,1)).
reduceByKey((a,b) => ((math.min(a._1,b._1),a._2 | b._2))).
filter(_._2._2)
// make a list of each label and how many voxels are in it
val curGroupList=newLabels.map(pvec => (pvec._2._1,1)).
reduceByKey(_ + _).sortByKey(true).collect
// if the list isn't the same as before, continue running since we need to wait for swaps to stop
running = (curGroupList.deep!=groupList.deep)
groupList=curGroupList
iterations+=1
print("Iter #"+iterations+":"+groupList.mkString(","))
}
groupList
val labelSize = newLabels.
map(pvec => (pvec._2._1,1)).
reduceByKey((a,b) => (a+b)).
map(_._2)
labelSize.reduce((a,b) => (a+b))*1.0/labelSize.count
val labelPositions = newLabels.
map(pvec => (pvec._2._1,pvec._1)).
groupBy(_._1)
def posAvg(pvec: Seq[(Long,(Int,Int))]): (Double,Double) = {
val sumPt=pvec.map(_._2).reduce((a,b) => (a._1+b._1,a._2+b._2))
(sumPt._1*1.0/pvec.length,sumPt._2*1.0/pvec.length)
}
print(labelPositions.map(pvec=>posAvg(pvec._2)).mkString(","))
For many systems like bone tissue, cellular tissues, cellular materials and many others, the structure is just the beginning and the most interesting results come from the application to physical, chemical, or biological rules inside of these structures.
\[ \sum_j \vec{F}_{ij} = m\ddot{x}_i \]
Such systems can be easily represented by a graph, and analyzed using GraphX in a distributed, fault tolerant manner.
Basic types for the image graph (position)
case class D3int(x: Int, y: Int, z: Int)
case class D3float(x: Float, y: Float, z: Float)
case class ImageVertex(index: Int,pos: D3int = new D3int(0),value: Int = 0,original: Boolean = false)
case class ImageEdge(dist: Double, orientation: D3float, restLength: Double = 1.0)
case class ForceEdge(ie: ImageEdge, force: D3float)
import org.apache.spark.graphx._
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.Edge
import org.apache.spark.graphx.impl.GraphImpl
def twoDArrayToGraph(sc: SparkContext, inArr: Array[Array[Int]]): Graph[ImageVertex, ImageEdge] = {
val ywidth=inArr.length
val xwidth=inArr(0).length
val vertices = sc.parallelize(0 until xwidth*ywidth).map{
idx => extractPoint(idx,inArr,xwidth,ywidth)
}
val fvertices: RDD[(VertexId, ImageVertex)] = vertices.map(cpt => (cpt.index,cpt))
val edges = vertices.flatMap{
cpt => spreadVertices(cpt,1)
}.groupBy(_.pos).filter{
// at least one original point
ptList => ptList._2.map(_.original).reduce(_ || _)
}.flatMap{ combPoint => {
val pointList=combPoint._2
val centralPoint = pointList.filter(_.original).head
val neighborPoints = pointList.filter(pvec => !pvec.original)
for(cNeighbor<-neighborPoints)
yield Edge[Unit](centralPoint.index,cNeighbor.index)
}
}
Graph[ImageVertex, Unit](fvertices,edges).
mapTriplets(triplet => triplet.srcAttr-triplet.dstAttr)
}
def calcForces(inGraph: Graph[ImageVertex, ImageEdge]) = {
inGraph.mapEdges(
rawEdge => {
val edge: ImageEdge = rawEdge.attr
val k = 0.01
val force = (edge.restLength-edge.dist)
new ForceEdge(edge,edge.orientation*force)
})
}
def sumForces(mGraph: Graph[ImageVertex, ForceEdge]) = {
mGraph.mapReduceTriplets[D3float](
// map function
triplet => {
Iterator((triplet.srcId, triplet.attr.force),
(triplet.dstId, triplet.attr.force*(-1))
)
},
// reduce function
(force1: D3float,force2: D3float) => force1+force2
).join(mGraph.vertices)
}
val femGraph = twoDArrayToGraph(sc,testImg)
val forceGraph = calcForces(myGraph)
val forces = sumForces(forceGraph)
Scala allows for Implicit Conversions and Wrappers making DSL programming very easy
val femGraph = twoDArrayToGraph(sc,testImg)
val forceGraph = calcForces(myGraph)
val forces = sumForces(forceGraph)
val forceGraph = testImg toGraph addForces
val nodeForces = forceGraph sumForces
val offsetForces = forceGraph + (0,0,1)
implicit class GraphLoader(inArray: Array[Array[Int]]) {
def createGraph() = {
twoDArrayToGraph(sc,inArray)
}
}
implicit class ImageGraph(inGraph: Graph[ImageVertex, ImageEdge]) {
def addForces() = {
calcForces(inGraph)
}
}
implicit class ForceGraph(inGraph: Graph[ImageVertex, ForceEdge]) {
def sumForces() = {
sumForces(inGraph)
}
def +(offset: (Int,Int,Int)) = {
inGraph.mapEdges{edge =>
ForceEdge(edge.attr.ie,edge.attr.force+offset)
}
}
}
Exception in thread "main" java.lang.AssertionError: assertion failed: Tried to find '$line47' in '/var/folders/yq/w_mvh2xj7yzdzb6k4pknwzgc0000gn/T/spark-f8aac450-9c70-4232-a71f-e089e7bdd03b' but it is not a directory
at scala.reflect.io.AbstractFile.subdirectoryNamed(AbstractFile.scala:254)
at scala.tools.nsc.backend.jvm.BytecodeWriters$class.getFile(BytecodeWriters.scala:31)
at scala.tools.nsc.backend.jvm.BytecodeWriters$class.scala$tools$nsc$backend$jvm$BytecodeWriters$$getFile(BytecodeWriters.scala:37)
at scala.tools.nsc.backend.jvm.BytecodeWriters$ClassBytecodeWriter$class.writeClass(BytecodeWriters.scala:89)
at scala.tools.nsc.backend.jvm.GenASM$AsmPhase$$anon$4.writeClass(GenASM.scala:67)
at scala.tools.nsc.backend.jvm.GenASM$JBuilder.writeIfNotTooBig(GenASM.scala:459)
at scala.tools.nsc.backend.jvm.GenASM$JPlainBuilder.genClass(GenASM.scala:1413)
at scala.tools.nsc.backend.jvm.GenASM$AsmPhase.run(GenASM.scala:120)
at scala.tools.nsc.Global$Run.compileUnitsInternal(Global.scala:1583)
at scala.tools.nsc.Global$Run.compileUnits(Global.scala:1557)
at scala.tools.nsc.Global$Run.compileSources(Global.scala:1553)
at org.apache.spark.repl.SparkIMain.compileSourcesKeepingRun(SparkIMain.scala:468)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.compileAndSaveRun(SparkIMain.scala:859)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.compile(SparkIMain.scala:815)
at org.apache.spark.repl.SparkIMain$Request.compile$lzycompute(SparkIMain.scala:1009)
at org.apache.spark.repl.SparkIMain$Request.compile(SparkIMain.scala:1004)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:644)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753)
at org.apache.spark.repl.SparkILoop$$anonfun$replay$1.apply(SparkILoop.scala:634)
at org.apache.spark.repl.SparkILoop$$anonfun$replay$1.apply(SparkILoop.scala:632)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.repl.SparkILoop.replay(SparkILoop.scala:632)
at org.apache.spark.repl.SparkILoop$$anonfun$1.applyOrElse(SparkILoop.scala:579)
at org.apache.spark.repl.SparkILoop$$anonfun$1.applyOrElse(SparkILoop.scala:566)
at scala.runtime.AbstractPartialFunction$mcZL$sp.apply$mcZL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcZL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcZL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
We are interested in partnerships and collaborations