Kevin Mader
1 July 2014, Spark Summit
A modern image analysis infrastructure to make complex problems simple and handle the flood of data from current and next generation instruments
Ever more sensitive and faster hardware means image acquisition possibilities are growing rapidly
Images are only as useful as what you can do with them, the bottleneck isn't measurement speed, but analysis
Adapted from: Sboner A,et. al. Genome Biology, 2011
The only technique which can do all
[1] Mokso et al., J. Phys. D, 46(49),2013
Courtesy of M. Pistone at U. Bristol
\[ \downarrow \text{ Converted to Visible Light and Captured} \]
Filtered back-projection / Radon transform
Segmentation and Post-processing
Collaboration with Keith Cheng, Ian Foster, Xuying Xin, Patrick La Raviere, Steve Wang
1000s of samples of full adult animals, imaged at 0.74 \( \mu m \) resolution: Images 11500 x 2800 x 628 \( \longrightarrow \) 20-40GVx / sample
Collaboration with Alberto Astolfo, Matthias Schneider, Bruno Weber, Marco Stampanoni
1 \( cm^3 \) scanned at 1 \( \mu m \) resolution: Images \( \longrightarrow \) 1000 GVx / sample
There are three different types of problems that we will run into.
Developed at 4Quant, ETH Zurich, and Paul Scherrer Institut
SIL is a flexible framework for image processing and testing
Report Generation
Standard problem: asking the questions you can (which are easy), rather than the questions you should
Phenotype | Within | Between | Ratio (%) |
---|---|---|---|
Length | 36.97 | 4.279 | 864.1 |
Volume | 67.85 | 12.479 | 543.7 |
Nearest Canal Distance | 70.35 | 333.395 | 21.1 |
Density (Lc.Te.V) | 144.40 | 27.658 | 522.1 |
Nearest Neighbors (Lc.DN) | 31.86 | 1.835 | 1736.1 |
Stretch (Lc.St) | 13.98 | 2.360 | 592.5 |
The results in the table show the within and between sample variation for selected phenotypes in the first two columns and the ratio of the within and between sample numbers
How does this result look visually? Each line shows the mean \( \pm \) standard deviation for sample. The range within a single sample is clearly much larger than between
Instead of taking the standard metrics, we can search for
within the 65 million samples based on a number of different phenotypes to reduce the variation within single samples and
With the ability to scale to millions of samples there is no need to condense. We can analyze the entire dataset in real-time and try and identify groups or trends within the whole data instead of just precalculated views of it.
1276 comma separated text files with 56 columns of data and 15-60K rows
Task | Single Core Time | Spark Time (40 cores) |
---|---|---|
Load and Preprocess | 360 minutes | 10 minutes |
Single Column Average | 4.6s | 400ms |
1 K-means Iteration | 2 minutes | 1s |
We found several composite phenotypes which are more consistent within samples than between samples
Extensions of Spark out of AMPLab like BlinkDB are moving towards approximate results.
mean(volume)
mean(volume).within_time(5)
mean(volume).within_ci(0.95)
For real-time image processing, with tasks like component labeling and filtering it could work well
It provides a much more robust solution than taking a small region of interest or scaling down
For many datasets processing, segmentation, and morphological analysis is all the information needed to be extracted. For many systems like bone tissue, cellular tissues, cellular materials and many others, the structure is just the beginning and the most interesting results come from the application to physical, chemical, or biological rules inside of these structures.
\[ \sum F_{i} = 0 \]
Such systems can be easily represented by a graph, and analyzed using GraphX in a distributed, fault tolerant manner.
For cellular systems, a model called the Cellular Potts Model (CPM) is used to simulate growth, differentiation, coarsening, phase changes, and a number of similar properties for complex systems. Implementing such algorithms has traditionally been very tedious requiring thousands of lines of optimized MPI code (which is difficult to program well and in a stable manner). For real imaging data, the number of elements in these simulations can exceed 14 billion elements with 378 billion edges run over thousands of iterations
\[ E = \frac{1}{2} \sum_i\sum_{\rangle j\langle_i} J[1 - \delta(S_i - S_j)] \]
Spark provides a very flexible solution for large, fast image processing tasks.
We are interested in partnerships and collaborations
library(jpeg)
in.img<-readJPEG("ext-figures/input_image.jpg")
kv.img<-im.to.df(in.img)
write.table(kv.img,"cell_colony.csv",row.names=F,col.names=F,sep=",")
kable(head(kv.img))
x | y | val |
---|---|---|
1 | 1 | 0.6275 |
2 | 1 | 0.7804 |
3 | 1 | 0.8863 |
4 | 1 | 0.8980 |
5 | 1 | 0.9098 |
6 | 1 | 0.9216 |
The key is position \( \langle x, y \rangle \) and value is the intensity \( val \)
val rawImage=sc.textFile("cell_colony.csv")
val imgAsColumns=rawImage.map(_.split(","))
val imgAsKV=imgAsColumns.map(point => ((point(0).toInt,point(1).toInt),point(2).toDouble))
imgAsKV.count
imgAsKV.take(1)
imgAsKV.sample(true,0.1,0).collect
val threshVal=0.5
val labelImg=imgAsKV.filter(_._2<threshVal)
100.0*labelImg.count/(imgAsKV.count)
Take a region of interest between 0 and 100 in X and Y
def roiFun(pvec: ((Int,Int),Double)) =
{pvec._1._1>=0 & pvec._1._1<100 & // X
pvec._1._2>=0 & pvec._1._2<100 } //Y
val roiImg=imgAsKV.filter(roiFun)
def spread_voxels(pvec: ((Int,Int),Double), windSize: Int = 1) = {
val wind=(-windSize to windSize)
val pos=pvec._1
val scalevalue=pvec._2/(wind.length*wind.length)
for(x<-wind; y<-wind)
yield ((pos._1+x,pos._2+y),scalevalue)
}
val filtImg=roiImg.
flatMap(cvec => spread_voxels(cvec)).
filter(roiFun).reduceByKey(_ + _)
val xWidth=100
var newLabels=labelImg.map(pvec => (pvec._1,(pvec._1._1.toLong*xWidth+pvec._1._2+1,true)))
def spread_voxels(pvec: ((Int,Int),(Long,Boolean)), windSize: Int = 1) = {
val wind=(-windSize to windSize)
val pos=pvec._1
val label=pvec._2._1
for(x<-wind; y<-wind)
yield ((pos._1+x,pos._2+y),(label,(x==0 & y==0)))
}
var groupList=Array((0L,0))
var running=true
var iterations=0
while (running) {
newLabels=newLabels.
flatMap(spread_voxels(_,1)).
reduceByKey((a,b) => ((math.min(a._1,b._1),a._2 | b._2))).
filter(_._2._2)
// make a list of each label and how many voxels are in it
val curGroupList=newLabels.map(pvec => (pvec._2._1,1)).
reduceByKey(_ + _).sortByKey(true).collect
// if the list isn't the same as before, continue running since we need to wait for swaps to stop
running = (curGroupList.deep!=groupList.deep)
groupList=curGroupList
iterations+=1
print("Iter #"+iterations+":"+groupList.mkString(","))
}
groupList
val labelSize = newLabels.
map(pvec => (pvec._2._1,1)).
reduceByKey((a,b) => (a+b)).
map(_._2)
labelSize.reduce((a,b) => (a+b))*1.0/labelSize.count
val labelPositions = newLabels.
map(pvec => (pvec._2._1,pvec._1)).
groupBy(_._1)
def posAvg(pvec: Seq[(Long,(Int,Int))]): (Double,Double) = {
val sumPt=pvec.map(_._2).reduce((a,b) => (a._1+b._1,a._2+b._2))
(sumPt._1*1.0/pvec.length,sumPt._2*1.0/pvec.length)
}
print(labelPositions.map(pvec=>posAvg(pvec._2)).mkString(","))
Ideally Spark is used on a dedicated cluster or using Mesos tools, but many existing clusters are running Sun Grid Engine or similar frameworks and unlikely to change in the near future (legacy codes). Spark can, however, be run very efficiently run inside these environments (more here)
Start the master node on one of the machines (locally)
start-master.sh
Submit the code to run to the queue management system
qsub mysparkjob.sge -master=spark://masternode.me:7077
(replace masta.me accordingly)
for i in {1..100};
do
qsub sparkworker.sge spark://masternode.me:7077;
done
As worker jobs are scheduled by Sun Grid Engine they connect to the master and work on whatever needs to be done. Workers will not quit until they exceed the maximum running time (s_rt
)