Kevin Mader
15 October 2014
This is a key refering to your Amazon account and allows the creation and destruction of new instances, S3 access, and so forth. It can be found in the AWS Management Console under Security Credentials. If you have not created one before, you will need to create a new one. If you already have 2, you will either have to use an old one, or delete one and create a new one.
export AWS_ACCESS_KEY_ID="???"
export AWS_SECRET_ACCESS_KEY="???"
The login key pair is used for logging into instances using SSH and SFTP. This can be created using the AWS console and creating a new key pair.
chmod 600 ~/Downloads/my-key-pair.pem
The first time a cluster is run, it needs to be launched. This is peformed using the script ec2/spark-ec2
The following command starts a cluster with 1 slave using the spark-key-pair.pem file just created in the Amazon Region eu-west-1 (this must be the same as where you created the key pair, and should be eu-west-1 if you are in Switzerland)
./spark-ec2 -k spark-key-pair -i ~/Downloads/spark-key-pair.pem -s 1 launch big-data-test-cluster --region=eu-west-1
launch
with start
after the cluster has been created for the first timeOnce it is started you will see the following message
Connection to ec2-54-78-73-225.eu-west-1.compute.amazonaws.com closed.
Spark standalone cluster started at http://ec2-54-78-73-225.eu-west-1.compute.amazonaws.com:8080
Ganglia started at http://ec2-54-78-73-225.eu-west-1.compute.amazonaws.com:5080/ganglia
Done!
You can look at either of these URLs to see the Spark Console (port 8080) and the Ganglia Cluster monitoring tools (5080).
EC2 offers rudimentary monitoring using the EC2 console (mainly to check if machines are still running)
After you are finished using the cluster you must shut it down or Amazon continues to charge your account even if the cluster is sitting idle
either temporarily (if you plan on using it again)
./spark-ec2 -k spark-key-pair -i ~/Downloads/spark-key-pair.pem stop big-data-test-cluster --region=eu-west-1
or destroy it (if you do not plan on using it soon)
./spark-ec2 -k spark-key-pair -i ~/Downloads/spark-key-pair.pem destory big-data-test-cluster --region=eu-west-1
If you do not destory it, there are residual storage costs Amazon charges your account
Screencast of logging in
To login you can use a very similar statement to the one to start the cluster (-s is no longer needed)
./spark-ec2 -k spark-key-pair -i ~/Downloads/spark-key-pair.pem login big-data-test-cluster --region=eu-west-1
cd spark/bin/
MASTER=spark://localhost:7077;./spark-shell
Ensure the cluster is working correctly
val testdata = sc.parallelize(0 to 10000)
val testdata2 = testdata.cartesian(testdata)
val totcount = testdata2.count
val closecount = testdata2.map(ival => ival._1 - ival._2).filter(_<10).count
Spark has support for S3 and HDFS built in. The raw data can then be stored independently of cluster volumes and then read from and written to S3. Instead of reading
val localFiles = sc.textFile("*.txt")
They can be read directly from a bucket on S3 by
val remoteFiles = sc.textFile("s3n://BUCKET/cells/*.csv")
To get data onto S3 Screencast of setup
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId","....")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey","....")
val cells = sc.textFile("s3n://spark-class-data/cell_colony.csv")
val pts = cells.map(_.split(",").map(_.toDouble))
case class ImPos(x: Double, y: Double, intensity: Double)
val ccpts = pts.map(inpt => ImPos(inpt(0),inpt(1),inpt(2)))
ccpts.first
ccpts.filter(_.x>50).count
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
import sqlContext._
ccpts
as a tableccpts.registerAsTable("Cells")
sql("SELECT SUM(x) FROM Cells").collect
To process data with Spark it needs to be a in a list or ideally a key-value pair format. For an image this is not trivial since they are normally represented by a 2- or 3D matrix which encodes information in both the value (intensity, absorption, etc) and the position (A\( [x][y][z] \)).
A key-value par can be made by having the key as the position \( (x,y,z) \) and the value as the intensity or whatever other signals are important.
This format is also better suited for multichannel (more then one value per position) and even hyperspectral imaginging since there is little difference between \( (I) \) and \( (I,J,K,U,V,W,\cdots) \)
library(jpeg)
in.img<-readJPEG("ext-figures/Cell_Colony.jpg")
kv.img<-im.to.df(in.img)
write.table(kv.img,"cell_colony.csv",row.names=F,col.names=F,sep=",")
kable(head(kv.img))
x | y | val |
---|---|---|
1 | 1 | 0.6275 |
2 | 1 | 0.7804 |
3 | 1 | 0.8863 |
4 | 1 | 0.8980 |
5 | 1 | 0.9098 |
6 | 1 | 0.9216 |
The key is position \( \langle x, y \rangle \) and value is the intensity \( val \)
val rawImage=sc.textFile("s3n://spark-class-data/cell_colony.csv")
val imgAsColumns=rawImage.map(_.split(","))
val imgAsKV=imgAsColumns.map(point => ((point(0).toInt,point(1).toInt),point(2).toDouble))
imgAsKV.count
imgAsKV.take(1)
imgAsKV.sample(true,0.1,0).collect
val threshVal=0.5
val labelImg=imgAsKV.filter(_._2<threshVal)
100.0*labelImg.count/(imgAsKV.count)
Take a region of interest between 0 and 100 in X and Y
def roiFun(pvec: ((Int,Int),Double)) =
{pvec._1._1>=0 & pvec._1._1<100 & // X
pvec._1._2>=0 & pvec._1._2<100 } //Y
val roiImg=imgAsKV.filter(roiFun)
def spread_voxels(pvec: ((Int,Int),Double), windSize: Int = 1) = {
val wind=(-windSize to windSize)
val pos=pvec._1
val scalevalue=pvec._2/(wind.length*wind.length)
for(x<-wind; y<-wind)
yield ((pos._1+x,pos._2+y),scalevalue)
}
val filtImg=roiImg.
flatMap(cvec => spread_voxels(cvec)).
filter(roiFun).reduceByKey(_ + _)
val xWidth=100
var newLabels=labelImg.map(pvec => (pvec._1,(pvec._1._1.toLong*xWidth+pvec._1._2+1,true)))
def spread_voxels(pvec: ((Int,Int),(Long,Boolean)), windSize: Int = 1) = {
val wind=(-windSize to windSize)
val pos=pvec._1
val label=pvec._2._1
for(x<-wind; y<-wind)
yield ((pos._1+x,pos._2+y),(label,(x==0 & y==0)))
}
var groupList=Array((0L,0))
var running=true
var iterations=0
while (running) {
newLabels=newLabels.
flatMap(spread_voxels(_,1)).
reduceByKey((a,b) => ((math.min(a._1,b._1),a._2 | b._2))).
filter(_._2._2)
// make a list of each label and how many voxels are in it
val curGroupList=newLabels.map(pvec => (pvec._2._1,1)).
reduceByKey(_ + _).sortByKey(true).collect
// if the list isn't the same as before, continue running since we need to wait for swaps to stop
running = (curGroupList.deep!=groupList.deep)
groupList=curGroupList
iterations+=1
print("Iter #"+iterations+":"+groupList.mkString(","))
}
groupList
val labelSize = newLabels.
map(pvec => (pvec._2._1,1)).
reduceByKey((a,b) => (a+b)).
map(_._2)
labelSize.reduce((a,b) => (a+b))*1.0/labelSize.count
val labelPositions = newLabels.
map(pvec => (pvec._2._1,pvec._1)).
groupBy(_._1)
def posAvg(pvec: Seq[(Long,(Int,Int))]): (Double,Double) = {
val sumPt=pvec.map(_._2).reduce((a,b) => (a._1+b._1,a._2+b._2))
(sumPt._1*1.0/pvec.length,sumPt._2*1.0/pvec.length)
}
print(labelPositions.map(pvec=>posAvg(pvec._2)).mkString(","))
For many systems like bone tissue, cellular tissues, cellular materials and many others, the structure is just the beginning and the most interesting results come from the application to physical, chemical, or biological rules inside of these structures.
\[ \sum_j \vec{F}_{ij} = m\ddot{x}_i \]
Such systems can be easily represented by a graph, and analyzed using GraphX in a distributed, fault tolerant manner.
Basic types for the image graph (position)
case class D3int(x: Int, y: Int, z: Int)
case class D3float(x: Float, y: Float, z: Float)
case class ImageVertex(index: Int,pos: D3int = new D3int(0),value: Int = 0,original: Boolean = false)
case class ImageEdge(dist: Double, orientation: D3float, restLength: Double = 1.0)
case class ForceEdge(ie: ImageEdge, force: D3float)
import org.apache.spark.graphx._
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.Edge
import org.apache.spark.graphx.impl.GraphImpl
def twoDArrayToGraph(sc: SparkContext, inArr: Array[Array[Int]]): Graph[ImageVertex, ImageEdge] = {
val ywidth=inArr.length
val xwidth=inArr(0).length
val vertices = sc.parallelize(0 until xwidth*ywidth).map{
idx => extractPoint(idx,inArr,xwidth,ywidth)
}
val fvertices: RDD[(VertexId, ImageVertex)] = vertices.map(cpt => (cpt.index,cpt))
val edges = vertices.flatMap{
cpt => spreadVertices(cpt,1)
}.groupBy(_.pos).filter{
// at least one original point
ptList => ptList._2.map(_.original).reduce(_ || _)
}.flatMap{ combPoint => {
val pointList=combPoint._2
val centralPoint = pointList.filter(_.original).head
val neighborPoints = pointList.filter(pvec => !pvec.original)
for(cNeighbor<-neighborPoints)
yield Edge[Unit](centralPoint.index,cNeighbor.index)
}
}
Graph[ImageVertex, Unit](fvertices,edges).
mapTriplets(triplet => triplet.srcAttr-triplet.dstAttr)
}
def calcForces(inGraph: Graph[ImageVertex, ImageEdge]) = {
inGraph.mapEdges(
rawEdge => {
val edge: ImageEdge = rawEdge.attr
val k = 0.01
val force = (edge.restLength-edge.dist)
new ForceEdge(edge,edge.orientation*force)
})
}
def sumForces(mGraph: Graph[ImageVertex, ForceEdge]) = {
mGraph.mapReduceTriplets[D3float](
// map function
triplet => {
Iterator((triplet.srcId, triplet.attr.force),
(triplet.dstId, triplet.attr.force*(-1))
)
},
// reduce function
(force1: D3float,force2: D3float) => force1+force2
).join(mGraph.vertices)
}
val femGraph = twoDArrayToGraph(sc,testImg)
val forceGraph = calcForces(myGraph)
val forces = sumForces(forceGraph)
Scala allows for Implicit Conversions and Wrappers making DSL programming very easy
val femGraph = twoDArrayToGraph(sc,testImg)
val forceGraph = calcForces(myGraph)
val forces = sumForces(forceGraph)
val forceGraph = testImg toGraph addForces
val nodeForces = forceGraph sumForces
val offsetForces = forceGraph + (0,0,1)
implicit class GraphLoader(inArray: Array[Array[Int]]) {
def createGraph() = {
twoDArrayToGraph(sc,inArray)
}
}
implicit class ImageGraph(inGraph: Graph[ImageVertex, ImageEdge]) {
def addForces() = {
calcForces(inGraph)
}
}
implicit class ForceGraph(inGraph: Graph[ImageVertex, ForceEdge]) {
def sumForces() = {
sumForces(inGraph)
}
def +(offset: (Int,Int,Int)) = {
inGraph.mapEdges{edge =>
ForceEdge(edge.attr.ie,edge.attr.force+offset)
}
}
}
From pull request 1351 (https://github.com/apache/spark/pull/1351/)
import org.apache.spark.sql.catalyst.types._
val fields = header.map( fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, AttributeReference, Row}
def castToType(value: Any, dataType: DataType): Any = dataType match {
case StringType => value.asInstanceOf[String]
case BooleanType => value.asInstanceOf[Boolean]
case DoubleType => value.asInstanceOf[Double]
case FloatType => value.asInstanceOf[Float]
case IntegerType => value.asInstanceOf[Int]
case LongType => value.asInstanceOf[Long]
case ShortType => value.asInstanceOf[Short]
case _ => null
}
def parseCSV(iter: Iterator[String],
delimiter: String, schema: StructType): Iterator[Row] = {
val row = new GenericMutableRow(schema.fields.length)
iter.map { line =>
val tokens = line.split(delimiter)
schema.fields.zipWithIndex.foreach {
case (StructField(name, dataType, _), index) =>
row.update(index, castToType(tokens(index), dataType))
}
row
}
}
import org.apache.spark.sql.SchemaRDD
val asAttr = schema.fields.map(field => AttributeReference(field.name, field.dataType, nullable = true)())
val parsedCSV = test.mapPartitions(line => parseCSV(line,",",schema))
val newRDD = new SchemaRDD(inData,SparkLogicalPlan(ExistingRdd(asAttr, parsedCSV)))