An approach for measuring, stitching, and analyzing 50 terabytes of data
K. Mader, A. Patera, A. Astolfo, M. Schneider, B. Weber, M. Stampanoni (4quant.com/SRI2015)
fps1000 - (640 x 480 x 840 fps) for $400 \( \rightarrow \) 780Mb/s \( \rightarrow \) 66TB/day
The breakdown of science into component tasks has changed from a time perspective over the last years and how we expect it to change by 2020.
The primary trends to focus on are the rapid decrease in measurement time and increasing importantance of post-processing.
The breakdown of science into component tasks has changed from a time perspective over the last years and how we expect it to change by 2020.
The primary trends to focus on are the rapid decrease in measurement time and increasing importantance of post-processing.
Thank you Alessandra and Alberto!
The first of challenges with these data is to start stitching them together.
Algorithmically it is very simple:
\( \textrm{Overlap}_{A \rightarrow B} = \)
\( \textrm{argmax}_{\vec{x}}\left[\mathcal{FT}^{-1} \left\{\mathcal{FT}\{A\}\mathcal{FT}\{B\}^{*}\right\} \right] \)
We need to stitch each scan with all of its neighbors
We have a few more parameters (\( \theta \)) and we know the initial positions so we optimize in a limited range rather than the entire image.
Now that our task is clear we just execute \( \rightarrow \)
The machine being used by the user which is responsible for creating jobs which need to run
Distributes task across multiple machines, coordinates communication between machines
The nodes where the compuation is actually done
tell machine 1 to load images 0 to 2
tell machine 2 to load images 3 to 5
tell share the images to calculate the overlap
The most important job for any piece of analysis is to be correct.
Almost all image processing tasks require a number of people to evaluate and implement them and are almost always moving targets
The last of the major priorities is speed which covers both scalability, raw performance, and development time.
When a ton of heterogeneous is coming in fast.
Performant, scalable, and flexible
10X, 100X, 1000X is the same amount of effort
Director of AMPLab said their rate limiting factor is always enough interesting data
You conduct the orchestra, you don't spend time telling each person what to do
These frameworks are really cool and Spark has a big vocabulary, but flatMap, filter, aggregate, join, groupBy, and fold still do not sound like anything I want to do to an image.
I want to
We have developed a number of commands for SIL handling standard image processing tasks
Fully exensible with
First we start our cluster:
spark-ec2.py -s 50 launch 4quant-image-cluster
Load all of the samples (56TB of data)
loadImage(
"s3:/../brain_*_*_*/rec.tif"
)
\[ \downarrow \] A Resilient Distributed Dataset (RDD) \[ \textrm{Images}: \textrm{RDD}[((x,y,z),Img[Double])] =\\ \left[(\vec{x},\textrm{Img}),\cdots\right] \]
filteredImages = Images.gaussianFilter(1,0.5)
volFraction = Images.threshold(0.5).
map{keyImg =>
(sum(keyImg.img),keyImg.size)
}.reduce(_+_)
We have all of the filtered images and we want to stitch them together in a smart way.
pairImages = Images.
cartesian(Images).
filter((im1,im2) => dist(im1.pos,im2.pos)<1)
The cross correlation can then be executed on each pair of images from the new RDD (pairImages) by using the map command
displacementField = pairImages.
map{
((posA,ImgA), (posB,ImgB)) =>
xcorr(ImgA,ImgB,
in=posB-posA)
}
From the updated information provided by the cross correlations and by applying appropriate smoothing criteria across windows.
smoothField = displacementField.
window(3,3,3).
map(gaussianSmoothFunction)
This also ensures the original data is left unaltered and all analysis is reversible.
The final stiching can then be done by
alignImages.
filter(x=>abs(x-tPos)<img.size).
map { (x,img) =>
new Image(tSize).
copy(img,x,tPos)
}.combineImages()
New detectors certainly provide a substantial amount of new data, but what about old data?
Look at this new technique from famous journal, let's try it on all of our old data
It seems to give the same results
Well what about this technique?
allData = loadPSIImages("s3://psi-data/femur-bones/*.tif") ++
loadAPSImages("s3://aps-data/bone-images/*.hdf5") ++
loadESRFImages("s3://esrf-data/femur-bones/*.tif")
allData.registerImageMetaTable("BoneImages")
How many of the bone samples measured had a cortical thickness larger than 30mm? \[ \downarrow \]
SELECT COUNT(*) FROM (
SELECT THICKNESS(
SEGMENT_CORTICAL(
GAUSSFILT(imgData)
)
) AS thk WHERE thk>30
)
Is there a correlation between cortical thickness and mineral content?
\[ \downarrow \]
SELECT THICKNESS(cortical),
IMG_AVG(cortical) FROM (
SELECT SEGMENT_CORTICAL(
GAUSSFILT(imgData)
) AS cortical
)
CREATE TABLE FilteredBones AS
SELECT sampleId,GaussianFilter(image) FROM BoneImages
CREATE TABLE ThresholdImages AS
SELECT boneId,ApplyThreshold(image,OTSU) FROM FilteredBones
CREATE TABLE CellImages AS
SELECT boneId,ComponentLabel(image) FROM PorosityImages
CREATE TABLE CellAnalysis AS
SELECT boneId,AnalyzeShape(CellImages) FROM CellImages GROUP BY boneId
WRITE TABLE CellAnalysis "s3n://bones/cells.csv"
The query is processed by the SQL parsing engine
In the biological imaging community, the open source tools of ImageJ2 and Fiji are widely accepted and have a large number of readily available plugins and tools.
We can integrate the functionality directly into Spark and perform operations on much larger datasets than a single machine could have in memory. Additionally these analyses can be performed on streaming data.
val wr = new WebcamReceiver()
val ssc = sc.toStreaming(strTime)
val imgList = ssc.receiverStream(wr)
val filtImgs = allImgs.mapValues(_.run("Median...","radius=3"))
val totImgs = inImages.count()
val bgImage = inImages.reduce(_ add _).multiply(1.0/totImgs)
val eventImages = filtImgs.
transform{
inImages =>
val corImage = inImages.map {
case (inTime,inImage) =>
val corImage = inImage.subtract(bgImage)
(corImage.getImageStatistics().mean,
(inTime,corImage))
}
corImage
}
eventImages.filter(iv => Math.abs(iv._1)>20).
foreachRDD(showResultsStr("outlier",_))
A spinoff - 4Quant: From images to insight
While our framework is commercial, we build on top and integrate into open-source tools so that the research is reproducible by anyone, anywhere, using any machine (it just might take a bit longer)
We also try to publish as much code, educational material, and samples as possible on our github account.
We are interested in partnerships and collaborations
Big Data is a very popular term today. There is a ton of hype and seemingly money is being thrown at any project which includes these words. First to get over the biggest misconception big data isn't about how much data you have, it is how you work with it
Before we can think about Big Data we need to define what it's replacing, Small Data.
The 0815 approach, using standard tools and lots of clicking, one image at a time
If you looked at one 1000 x 1000 sized image every second
It would take you
139
hours to browse through a terabyte of data.
Year | Time to 1 TB | Man power to keep up | Salary Costs / Month |
---|---|---|---|
2000 | 4096 min | 2 people | 25 kCHF |
2008 | 1092 min | 8 people | 95 kCHF |
2014 | 32 min | 260 people | 3255 kCHF |
2016 | 2 min | 3906 people | 48828 kCHF |
The standard approach to producing data is top-down.
In a study with 80 samples with a variability of 10% and a 5% difference between them, we can afford only 1 test and maintain the commonly accepted \( p<0.05 \)
We thus need smart people making and choosing the best models so that we can preserve the signifance.
The Big Data era suggests a new approach. First given an abundance of data, we can afford to run many tests.
In a study with 800 samples with a variability of 10% and a 5% difference between them, we can afford over 10M tests and maintain the commonly accepted \( p<0.05 \)
For knowledge creation this means a switch
\[ \textrm{Transistors} \propto 2^{T/(\textrm{18 months})} \]
Based on data from https://gist.github.com/humberto-ortiz/de4b3a621602b78bf90d
There are now many more transistors inside a single computer but the processing speed hasn't increased. How can this be?
The range of cloud costs (determined by peak usage) compared to a local workstation with utilization shown as the average number of hours the computer is used each week.
1.5 man-years
For many traditional scientific studies, the sample count is fairly low 10-100, for such low counts, if we look at the costs (estimated from our experience with cortical bone microstructure and rheology)
As studies get bigger (>100 samples), we see that these costs begin to radically shift.
\[ \downarrow \textrm{Translate to SQL} \]
SELECT COUNT(*) FROM
(SELECT SHAPE_ANALYSIS(LABEL_NUCLEI(pathology_slide)) FROM patients
WHERE disease LIKE "myleoma")
WHERE anisotropy > 0.75
\[ \downarrow \textrm{Load Myleoma Data Subset} \]
\[ \downarrow \textrm{Perform analysis on a every image} \]
\[ \downarrow \textrm{Filter out the most anisotropic cells} \]
Find all strongly reflective large objects within 1km of Paul Scherrer Intitute, Villigen, CH
\[ \downarrow \textrm{Translate to SQL} \]
SELECT contour FROM (
SELECT COMPONENT_LABEL(THRESHOLD(tile,200)) FROM esriTiles
WHERE DIST(LAT,-47.53000992328762,LONG,8.215198516845703)<1
) WHERE area>200
We can then visualize these contours easily
or apply them back to the original map
Bottleneck is filesystem connection, many nodes (10+) reading in parallel brings even GPFS-based infiniband system to a crawl
One of the central tenants of MapReduce™ is data-centric computation \( \rightarrow \) instead of data to computation, move the computation to the data.
Apache Spark is brilliant platform and utilizing GraphX, MLLib, and other packages there unlimited possibilities
Combining many different components together inside of the Spark Shell, IPython or Zeppelin, make it easier to assemble workflows
val cells = sqlContext.csvFile("work/f2_bones/*/cells.csv")
val avgVol = sqlContext.sql("select SAMPLE,AVG(VOLUME) FROM cells GROUP BY SAMPLE")
avgVol.filter(_._2>1000).map(sampleToPath).joinByKey(bones)
Task | Single Core Time | Spark Time (40 cores) |
---|---|---|
Load and Preprocess | 360 minutes | 10 minutes |
Single Column Average | 4.6s | 400ms |
1 K-means Iteration | 2 minutes | 1s |
def spread_voxels(pvec: ((Int,Int),Double), windSize: Int = 1) = {
val wind=(-windSize to windSize)
val pos=pvec._1
val scalevalue=pvec._2/(wind.length*wind.length)
for(x<-wind; y<-wind)
yield ((pos._1+x,pos._2+y),scalevalue)
}
val filtImg=roiImg.
flatMap(cvec => spread_voxels(cvec)).
filter(roiFun).reduceByKey(_ + _)
Here we use a KNIME -based workflow and our Spark Imaging Layer extensions to create a workflow without any Scala or programming knowledge and with an easily visible flow from one block to the next without any performance overhead of using other tools.