Kevin Mader
October 2014, XRM Big Data Satellite Workshop
To develop an image analysis infrastructure to make complex problems simple.
The only technique which can do all
[1] Mokso et al., J. Phys. D, 46(49),2013
Courtesy of M. Pistone at U. Bristol
\[ \downarrow \text{ Converted to Visible Light and Captured} \]
Filtered back-projection / Radon transform
Segmentation and Post-processing
Collaboration with Keith Cheng, Ian Foster, Xuying Xin, Patrick La Raviere, Steve Wang
1000s of samples of full adult animals, imaged at 0.74 \( \mu m \) resolution: Images 11500 x 2800 x 628 \( \longrightarrow \) 20-40GVx / sample
Collaboration with Alberto Astolfo, Matthias Schneider, Bruno Weber, Marco Stampanoni
1 \( cm^3 \) scanned at 1 \( \mu m \) resolution: Images \( \longrightarrow \) 1000 GVx / sample
Ever more sensitive and faster hardware means image acquisition possibilities are growing rapidly
Images are only as useful as what you can do with them, the bottleneck isn't measurement speed, but analysis
Adapted from: Sboner A,et. al. Genome Biology, 2011
A single absorption image can easily be represented as an array. Hundreds of existing tools will process this array and filter, segment, and quantify structures.
What took an entire PhD 3-4 years ago, can now be measured in a weekend, or even several seconds. Analysis tools have not kept up, are difficult to customize, and usually highly specific.
Data-structures that were fast and efficient for computers with 640kb of memory do not make sense anymore
CPU's are not getting that much faster but there are a lot more of them. Iterating through a huge array takes almost as long on 2014 hardware as 2006 hardware
Google, Facebook, Yahoo, and Amazon had these, or very similar problems years ago. They hired a lot of very competent Computer Science PhDs to solve it.
Cloud computing infrastructures are 10-100X cheaper than labor costs, usually more reliable internal systems, and easily accessible from almost anywhere.
Run it on your laptop, local cluster, or 5000 machines at Amazon without rewriting anything
Readable, small code bases which focus on analysis rather than data management details
When a ton of heterogeneous data is coming in fast.
Performant, scalable, and flexible
10X, 100X, 1000X is the same amount of effort
Co-director of UC-Berkeley's AMPLab, Michael Franklin, said their rate limiting factor is always enough interesting data
Google ran head-first into Big Data trying to create an index of the internet. Single computers cannot do everything and writing parallel and distributed code is complicated, difficult to test, easy to break, and often non-deterministic. The resulting code is a tangled inseperable mess of computation, data, and processing management.
A robust, fault-tolerant framework for handling distributed processing and data storage on which complicated analyses could be run declaritively by specifying what not how. Reduced data processing vocabulary to two words Map and Reduce. Recreated and open-sourced by Yahoo as Hadoop.
MapReduce can be applied to many tasks, but it is often very tricky and time-consuming to describe complicated, interative workflows into just map and reduce. Spark expanded the vocabulary of Hadoop to filter, flatMap, aggregate, join, groupBy, and fold, incorporated many performance improvements through caching (very important for images) and interactive (REPL) shell
The two frameworks provide a free out of the box solution for
Bottleneck is filesystem connection, many nodes (10+) reading in parallel brings even GPFS-based fiber system to a crawl
One of the central tenants of MapReduce™ is data-centric computation \( \rightarrow \) instead of data to computation, move the computation to the data.
These frameworks are really cool and Spark has a big vocabulary, but flatMap, filter, aggregate, join, groupBy, and fold still do not sound like anything I want to do to an image.
I want to
We have developed a number of commands for SIL handling standard image processing tasks
New components can be added using or imported from ImageJ directly. The resulting analyses are then parallelized by the Spark Engine and run on multiple cores/CPUs locally, on a cluster, supercomputer, or even a virtual in the cluster in the cloud which can be started in seconds.
Fault-tolerance is particularly tedious to add into existing tools, it must be done from the first step. Spark Imaging Layer is fault-toleranct and allows checkpointing with single result.checkpoint()
commands enabling long running analyses to continue
Developing in Scala brings additional flexibility through types[1], with microscopy the standard formats are 2-, 3- and even 4- or more dimensional arrays or matrices which can be iterated through quickly using CPU and GPU code. While still possible in Scala, there is a great deal more flexibility for data types allowing anything to be stored as an image and then processed as long as basic functions make sense.
[1] Fighting Bit Rot with Types (Experience Report: Scala Collections), M Odersky, FSTTCS 2009, December 2009
A collection of positions and values, maybe more (not an array of double). Arrays are efficient for storing in computer memory, but often a poor way of expressing scientific ideas and analyses.
combine information from nearby pixels
determine groups of pixels which are very similar to desired result
Much simplier code and combining different pieces of information is trivial. For example calculating the distance to the nearest vessel and then determining the average distance of each cell
cellDist = vesselImage.invert.
calculateDistance
cellLabel = cellThreshold.makeLabel
avgCellDist = cellLabel.join(cellDist).
groupBy(LABEL).reduce(MEAN)
This analysis can be done easily since the types are flexible
Hyperspectral imaging is a rapidly growing area with the potentially for massive datasets and a severe deficit of usuable tools.
The scale of the data is large and standard image processing tools are ill-suited for handling them, although the ideas used in image processing are equally applicable to hyperspectral data (filtering, thresholding, segmentation,…) and distributed, parallel approaches make even more sense on such massive datasets
Using SIL, Hyperspectral images are treated the same way as normal images and all of the operations you apply an a normal image are applicable, as long as the underyling operations are defined. A Gaussian filter might seem like a strange operation at first, but using information from each of a points neighbors to reduce noise makes sense.
Identify cells from background by K-Means (2 groups) and then count the cells
clusters = KMeans.train(specImage,2)
labeledCells = specImage.map(clusters.identify(_)).
makeLabels
print labeledCells.distinct.count+" cells"
Scalable high-throughput imaging enables new realms of investigation and analysis
For many datasets processing, segmentation, and morphological analysis is all the information needed to be extracted. For many systems like bone tissue, cellular tissues, cellular materials and many others, the structure is just the beginning and the most interesting results come from the application to physical, chemical, or biological rules inside of these structures.
\[ \sum_j \vec{F}_{ij} = m\ddot{x}_i \]
Such systems can be easily represented by a graph, and analyzed using GraphX in a distributed, fault tolerant manner.
For cellular systems, a model called the Cellular Potts Model (CPM) is used to simulate growth, differentiation, coarsening, phase changes, and a number of similar properties for complex systems. Implementing such algorithms has traditionally been very tedious requiring thousands of lines of optimized MPI code (which is difficult to program well and in a stable manner). For real imaging data, the number of elements in these simulations can exceed 14 billion elements with 378 billion edges run over thousands of iterations
\[ E = \frac{1}{2} \sum_i\sum_{\langle j\rangle_i} J[1 - \delta(S_i - S_j)] \]
Extensions of Spark out of AMPLab like BlinkDB are moving towards approximate results.
mean(volume)
mean(volume).within_time(5)
mean(volume).within_ci(0.95)
For real-time image processing, with tasks like component labeling and filtering it could work well
It provides a much more robust solution than taking a small region of interest or scaling down
Scala code can be slow, but it is very high level and can be automatically translated to CPU or GPU code with projects like ScalaCL
val myData = new Array(1,2,3,4,5)
val doubleSum = myData.map(_*2).reduce(_+_)
Using ScalaCL to run on GPUs with 2-4X performance improvement
val clData = myData.cl
val doubleSum = clData.map(_*2).reduce(_+_)
Spark Imaging Layer is fully compatible with the notion of 'pipes' replacing pieces of code with other languages and tools.
We can also automatically choose among (and validate) different tools for the same analysis for increasing performance. Running on your laptop means you can profile code and see exactly where the bottlenecks are \( \rightarrow \) Premature Optimization is the source of all evil - Donald Knuth
A spinoff - 4Quant: From images to insight
We are interested in partnerships and collaborations
We have prepared an introduction to running Spark on the Amazon Cloud: http://4quant.github.io/aws-spark-introduction/
./spark-ec2 -k spark-key -i ~/Downloads/spark-key.pem -s 50 launch big-data-test-cluster --region=ap-southeast-2
Taken from Serban's K-Means CUDA Project (https://github.com/serban/kmeans/blob/master/cuda_kmeans.cu)
checkCuda(cudaMalloc(&deviceObjects, numObjs*numCoords*sizeof(float)));
checkCuda(cudaMalloc(&deviceClusters, numClusters*numCoords*sizeof(float)));
checkCuda(cudaMalloc(&deviceMembership, numObjs*sizeof(int)));
checkCuda(cudaMalloc(&deviceIntermediates, numReductionThreads*sizeof(unsigned int)));
checkCuda(cudaMemcpy(deviceObjects, dimObjects[0],
numObjs*numCoords*sizeof(float), cudaMemcpyHostToDevice));
checkCuda(cudaMemcpy(deviceMembership, membership,
numObjs*sizeof(int), cudaMemcpyHostToDevice));
do {
checkCuda(cudaMemcpy(deviceClusters, dimClusters[0],
numClusters*numCoords*sizeof(float), cudaMemcpyHostToDevice));
find_nearest_cluster
<<< numClusterBlocks, numThreadsPerClusterBlock, clusterBlockSharedDataSize >>>
(numCoords, numObjs, numClusters,
deviceObjects, deviceClusters, deviceMembership, deviceIntermediates);
cudaDeviceSynchronize(); checkLastCudaError();
compute_delta <<< 1, numReductionThreads, reductionBlockSharedDataSize >>>
(deviceIntermediates, numClusterBlocks, numReductionThreads);
cudaDeviceSynchronize(); checkLastCudaError();
int d;
checkCuda(cudaMemcpy(&d, deviceIntermediates,
sizeof(int), cudaMemcpyDeviceToHost));
delta = (float)d;
checkCuda(cudaMemcpy(membership, deviceMembership,
numObjs*sizeof(int), cudaMemcpyDeviceToHost));
for (i=0; i<numObjs; i++) {
/* find the array index of nestest cluster center */
index = membership[i];
/* update new cluster centers : sum of objects located within */
newClusterSize[index]++;
for (j=0; j<numCoords; j++)
newClusters[j][index] += objects[i][j];
}
(random()<nPoints/4)
nearestCenter(x) = {
for cCenter in Centers
pointDist(cCenter) = dist(x,cCenter)
end
return (argmin(pointDist),x)
}
nearestCenter
onto Points as NearestCenterListlibrary(jpeg)
in.img<-readJPEG("ext-figures/input_image.jpg")
kv.img<-im.to.df(in.img)
write.table(kv.img,"cell_colony.csv",row.names=F,col.names=F,sep=",")
kable(head(kv.img))
x | y | val |
---|---|---|
1 | 1 | 0.6274510 |
2 | 1 | 0.7803922 |
3 | 1 | 0.8862745 |
4 | 1 | 0.8980392 |
5 | 1 | 0.9098039 |
6 | 1 | 0.9215686 |
The key is position \( \langle x, y \rangle \) and value is the intensity \( val \)
val rawImage=sc.textFile("cell_colony.csv")
val imgAsColumns=rawImage.map(_.split(","))
val imgAsKV=imgAsColumns.map(point => ((point(0).toInt,point(1).toInt),point(2).toDouble))
imgAsKV.count
imgAsKV.take(1)
imgAsKV.sample(true,0.1,0).collect
val threshVal=0.5
val labelImg=imgAsKV.filter(_._2<threshVal)
100.0*labelImg.count/(imgAsKV.count)
Take a region of interest between 0 and 100 in X and Y
def roiFun(pvec: ((Int,Int),Double)) =
{pvec._1._1>=0 & pvec._1._1<100 & // X
pvec._1._2>=0 & pvec._1._2<100 } //Y
val roiImg=imgAsKV.filter(roiFun)
def spread_voxels(pvec: ((Int,Int),Double), windSize: Int = 1) = {
val wind=(-windSize to windSize)
val pos=pvec._1
val scalevalue=pvec._2/(wind.length*wind.length)
for(x<-wind; y<-wind)
yield ((pos._1+x,pos._2+y),scalevalue)
}
val filtImg=roiImg.
flatMap(cvec => spread_voxels(cvec)).
filter(roiFun).reduceByKey(_ + _)
val xWidth=100
var newLabels=labelImg.map(pvec => (pvec._1,(pvec._1._1.toLong*xWidth+pvec._1._2+1,true)))
def spread_voxels(pvec: ((Int,Int),(Long,Boolean)), windSize: Int = 1) = {
val wind=(-windSize to windSize)
val pos=pvec._1
val label=pvec._2._1
for(x<-wind; y<-wind)
yield ((pos._1+x,pos._2+y),(label,(x==0 & y==0)))
}
var groupList=Array((0L,0))
var running=true
var iterations=0
while (running) {
newLabels=newLabels.
flatMap(spread_voxels(_,1)).
reduceByKey((a,b) => ((math.min(a._1,b._1),a._2 | b._2))).
filter(_._2._2)
// make a list of each label and how many voxels are in it
val curGroupList=newLabels.map(pvec => (pvec._2._1,1)).
reduceByKey(_ + _).sortByKey(true).collect
// if the list isn't the same as before, continue running since we need to wait for swaps to stop
running = (curGroupList.deep!=groupList.deep)
groupList=curGroupList
iterations+=1
print("Iter #"+iterations+":"+groupList.mkString(","))
}
groupList
val labelSize = newLabels.
map(pvec => (pvec._2._1,1)).
reduceByKey((a,b) => (a+b)).
map(_._2)
labelSize.reduce((a,b) => (a+b))*1.0/labelSize.count
val labelPositions = newLabels.
map(pvec => (pvec._2._1,pvec._1)).
groupBy(_._1)
def posAvg(pvec: Seq[(Long,(Int,Int))]): (Double,Double) = {
val sumPt=pvec.map(_._2).reduce((a,b) => (a._1+b._1,a._2+b._2))
(sumPt._1*1.0/pvec.length,sumPt._2*1.0/pvec.length)
}
print(labelPositions.map(pvec=>posAvg(pvec._2)).mkString(","))
The needs in advertising, marketing, and recommendation engines have powered the creation of a many new machine learning tools capable of processing huge volumes of data. One of the most actively developed projects, MLLib, is a module of Spark and can be directly combined with the Imaging Layer