Applying Big Data Solutions to Big Tomographic Problems

Workshop on Large Scale Tomography , 26 January 2016
Kevin Mader, CTO 4Quant / Lecturer ETH Zuerich

Outline

  • Imaging in 2016
  • The Problem(s)

Solutions

  • Big Data Approaches
  • Spark Imaging Layer
    • 3D Imaging
    • Hyperspectral Imaging
    • Interactive Analysis / Streaming

The Challenges / Science

Mammography Satellite
Mammography Satellite Images
Brain Tumors Throat Cancer
Brain Tumor Satellite Images

Image Science in 2016: More and faster

X-Ray

  • Swiss Light Source (SRXTM) images at (>1000fps) \( \rightarrow \) 8GB/s, diffraction patterns (cSAXS) at 30GB/s
  • Nanoscopium (Soleil), 10TB/day, 10-500GB file sizes, very heterogenous data

Optical

  • Light-sheet microscopy (see talk of Jeremy Freeman) produces images \( \rightarrow \) 500MB/s
  • High-speed confocal images at (>200fps) \( \rightarrow \) 78Mb/s

Geospatial

  • New satellite projects (Skybox, etc) will measure hundreds of terabytes to petabytes of images a year

Personal

  • GoPro 4 Black - 60MB/s (3840 x 2160 x 30fps) for $600
  • fps1000 - 400MB/s (640 x 480 x 840 fps) for $400
    plot of chunk time-figure

Synchrotron X-Ray Microscopy

Recent improvements allow you to:

  • peer deep into large samples
  • achieve \( \mathbf{<1\mu m} \) isotropic spatial resolution
    • with 1.8mm field of view
  • achieve >10 Hz temporal resolution
  • 8GB/s of images

SLS and TOMCAT

[1] Mokso et al., J. Phys. D, 46(49),2013

Courtesy of M. Pistone at U. Bristol

How much is a TB, really?

If you looked at one 1000 x 1000 sized image every second

Mammography Image

It would take you
139 hours to browse through a terabyte of data.

Year Time to 1 TB Man power to keep up Salary Costs / Month
2000 4096 min 2 people 25 kCHF
2008 1092 min 8 people 95 kCHF
2014 32 min 260 people 3255 kCHF
2016 2 min 3906 people 48828 kCHF

Challenge mouse brain: Full Vascular Imaging

  • Measure a full mouse brain (1cm\( ^3 \)) with cellular resolution (1 \( \mu \) m)
  • 10 x 10 x 10 scans at 2560 x 2560 x 2160 \( \rightarrow \) 14 TVoxels
  • 0.000004% of the entire dataset
    Single Slice
  • 14TVoxels = 56TB
  • Each scan needs to be registered and aligned together
  • There are no computers with 56TB of memory
  • Analysis of the stitched data is also of interest (segmentation, vessel analysis, distribution and network connectivity)

The raw data

plot of chunk unnamed-chunk-7

Let's Stitch

The first of challenges with these data is to start stitching them together.

  • Algorithmically it is very simple:

  • \( \textrm{Overlap}_{A \rightarrow B} = \)

  • \( \textrm{argmax}_{\vec{x}}\left[\mathcal{FT}^{-1} \left\{\mathcal{FT}\{A\}\mathcal{FT}\{B\}^{*}\right\} \right] \)

Computationally

We need to stitch each scan with all of its neighbors

  • \( x=\pm 1 \)
  • \( y=\pm 1 \)
  • \( z=\pm 1 \)

pre-main prep time: 2 ms

How does that look?

plot of chunk unnamed-chunk-8

plot of chunk unnamed-chunk-9

But it gets better

We have a few more parameters (\( \theta \)) and we know the initial positions so we optimize in a limited range rather than the entire image.

  • \( \rightarrow \) we need to add another loop
  • Iterate as long as the offset vectors are changing

pre-main prep time: 1 ms

Execute

Now that our task is clear we just execute \( \rightarrow \)

Super workstation

  • 160GB RAM
  • awesome-sauce graphics card
  • 40 cores with hyperthreading
  • SSD drives

Matlab + FIJI

  • Premade algorithms
  • Just click and go
  • 10x10x10 scans x 8 neighbors \( \rightarrow \) 8,000 registrations per iteration
  • 20 - 40 minutes per registration
  • 1000-4000 hours \( \rightarrow \) 40-160 days per iteration

Problems

  • Every task on every machine must be explicitly specified
  • Substantial effort in coordination
  • Huge amounts of data traffic between shared storage and machines
  • Not flexible or elastic

SIL

No Go's

  • Image Definitions: What is an image?
  • Too Strict

    • OpenCV: Mat: CV_8UC1 1D array of unsigned bytes
    • ImageJ: Img<RealType< T > & NativeType< T > >> recursive generic ND arrays with cursor access
    • ITK: Image< TPixel, VImageDimension > templated C++
  • Too Loose

    • Matlab: imread \( \rightarrow \) could be 2D, maybe 3D, sometimes even 4D
    • Python: numpy \( \rightarrow \) runtime problem determination
    • Does adding a spectrum to an image make sense?
    • Does multiplying two images make sense, if so how?
  • Untested Algorithms
    • 5 different implementations (GPU, cluster-version, local version, Matlab prototype version)

Other Data

  • Many independent components
  • Complicated workflows
  • Fragile glue

Parallel Challenges

Coordination

Parallel computing requires a significant of coordinating between computers for non-easily parallelizable tasks.

Mutability

The second major issue is mutability, if you have two cores / computers trying to write the same information at the same it is no longer deterministic (not good)

Blocking

The simple act of taking turns and waiting for every independent process to take its turn can completely negate the benefits of parallel computing

Distributed Challenges

Inherits all of the problems of parallel programming with a whole variety of new issues.

Sending Instructions / Data Afar

Fault Tolerance

If you have 1000 computers working on solving a problem and one fails, you do not want your whole job to crash

Data Storage

How can you access and process data from many different computers quickly without very expensive infrastructure

A Cluster?

Basic Topology

  • Driver Node

The machine being used by the user which is responsible for creating jobs which need to run

  • Master Node

Distributes task across multiple machines, coordinates communication between machines

  • Worker Nodes

The nodes where the compuation is actually done

pre-main prep time: 1 ms

Using a cluster

  • \( \rightarrow \) Tell the driver to load all images
  • The driver determines which data need to be loaded
  • Driver \( \rightarrow \) Master exact data to load

pre-main prep time: 1 ms

Steps

  • tell machine 1 to load images 0 to 2
  • tell machine 2 to load images 3 to 5

pre-main prep time: 0 ms

pre-main prep time: 2 ms

Exchanging Images

tell share the images to calculate the overlap

  • \( \rightarrow \) tell machine 1 to send image 0 to machine 2
  • \( \rightarrow \) tell machine 2 to calculate: \( \textrm{Overlap}(A\rightarrow B) \)

pre-main prep time: 1 ms

Computing has changed: Parallel

Moores Law

\[ \textrm{Transistors} \propto 2^{T/(\textrm{18 months})} \]

Based on trends from Wikipedia and Intel Based on data from https://gist.github.com/humberto-ortiz/de4b3a621602b78bf90d

There are now many more transistors inside a single computer but the processing speed hasn't increased. How can this be?

  • Multiple Core
    • Many machines have multiple cores for each processor which can perform tasks independently
  • Multiple CPUs
    • More than one chip is commonly present
  • New modalities
    • GPUs provide many cores which operate at slow speed

Parallel Code is important

What is Cloud?

Storage

Buckets

  • 8TB for 2 years is \( \approx \) $5000
  • From SLS to Cloud \( \rightarrow \) 231MB/s
  • Standard HD would cost $1200
  • K. Mader, et al, Moving image analysis to the cloud: A case study with genome-scale tomographic data, AIP Conference Proceedings, X-Ray Microscopy

Processing Power

Elastic, On-Demand

What is

Imperative Programming

Directly coordinating tasks on a computer.

  • Languages like C, C++, Java, Matlab
  • Exact orders are given (implicit time ordering)
  • Data management is manually controlled
  • Job and task scheduling is manual
  • Potential to tweak and optimize performance

Making a soup

  1. Buy vegetables at market
  2. then Buy meat at butcher
  3. then Chop carrots into pieces
  4. then Chop potatos into pieces
  5. then Heat water
  6. then Wait until boiling then add chopped vegetables
  7. then Wait 5 minutes and add meat

Declarative

  • Languages like SQL, Erlang, Haskell, Scala, Python, R can be declarative
  • DeepLearning tools are declarative: TensorFlow, Theano, Torch
  • Goals are stated rather than specific details
  • Data is automatically managed and copied
  • Scheduling is automatic but not always efficient

Making a soup

  • Buy vegetables at market \( \rightarrow shop_{veggies} \)
  • Buy meat at butcher \( \rightarrow shop_{meat} \)
  • Wait for \( shop_{veggies} \): Chop carrots into pieces \( \rightarrow chopped_{carrots} \)
  • Wait for \( shop_{veggies} \): Chop potatos into pieces \( \rightarrow chopped_{potatos} \)
  • Heat water
  • Wait for \( boiling water \),\( chopped_{carrots} \),\( chopped_{potatos} \): Add chopped vegetables
    • Wait 5 minutes and add meat

Results

Imperative

  • optimize specific tasks (chopping vegetables, mixing) so that many people can do it faster
    • Matlab/Python do this with fast-fourier-transforms (automatically uses many cores to compute faster)
  • make many soups at the same time (independent)
    • This leads us to cluster-based computing

Declarative

  • run everything at once
  • each core (computer) takes a task and runs it
  • execution order does not matter
    • wait for portions to be available (dependency)

Lazy Evaluation

  • do not run anything at all
  • until something needs to be exported or saved
  • run only the tasks that are needed for the final result
    • never buy tomatoes since they are not in the final soup

The Problem

There is a flood of new data

What took an entire PhD 3-4 years ago, can now be measured in a weekend, or even several seconds. Analysis tools have not kept up, are difficult to customize, and usually highly specific.

Optimized Data-Structures do not fit

Data-structures that were fast and efficient for computers with 640kb of memory do not make sense anymore

Single-core computing is too slow

CPU's are not getting that much faster but there are a lot more of them. Iterating through a huge array takes almost as long on 2014 hardware as 2006 hardware

Exploratory Image Analysis Priorities

Correctness

The most important job for any piece of analysis is to be correct.

  • A powerful testing framework is essential
  • Avoid repetition of code which leads to inconsistencies
  • Use compilers to find mistakes rather than users

Easily understood, changed, and used

Almost all image processing tasks require a number of people to evaluate and implement them and are almost always moving targets

  • Flexible, modular structure that enables replacing specific pieces

Fast

The last of the major priorities is speed which covers both scalability, raw performance, and development time.

  • Long waits for processing discourages exploration
  • Manual access to data on separeate disks is a huge speed barrier
  • Real-time image processing requires millisecond latencies
  • Implementing new ideas can be done quickly

The Framework First

  • Rather than building an analysis as quickly as possible and then trying to hack it to scale up to large datasets
    • chose the framework first
    • then start making the necessary tools.
  • Google, Amazon, Yahoo, and many other companies have made huge in-roads into these problems
  • The real need is a fast, flexible framework for robustly, scalably performing complicated analyses, a sort of Excel for big imaging data.

Streaming Analytics

  • You can analyze streaming tweets during World Cup matches to see goals before you see them on television

  • You can process a petabyte of clicks to optimize ad-campaigns

  • Powerful tools

  • Distributed computation

  • Interactive computation

Boring problems

Hadoop

Hadoop is the opensource version of MapReduce developed by Yahoo and released as an Apache project. It provides underlying infrastructure and filesystem that handles storing and distributing data so each machine stores some of the data locally and processing jobs run where the data is stored.

  • Non-local data is copied over the network.
  • Storage is automatically expanded with processing power.
  • It's how Amazon, Microsoft, Yahoo, Facebook, … deal with exabytes of data

Apache Spark

  • “The Ultimate Scala Collections”- Martin Odersky (EPFL / Creator of Scala)

  • “MapReduce on Steroids”

  • Developed by the Algorithms, Machines, and People Lab at UC Berkeley in 2012

  • General tool for all Directed Acyclical Graph (DAG) workflows

  • Course-grained processing \( \rightarrow \) simple operations applied to entire sets

    • Map, reduce, join, group by, fold, foreach, filter,…
  • In-memory caching

Zaharia, M., et. al (2012). Resilient distributed datasets: a fault-tolerant abstraction for in-memory cluster computing

Ultimate Collections?

  • Standard Processing
    • In Java
int[] newImage = new int[inputImage.length];
for(int i=0;i<inputImage.length; i++) {
  if(inputImage[i]>0) {
    newImage[i] = inputImage[i];
  } 
}
  • In Spark
val newImage = inputImage.map(v => if(v>0) v else 0)

It is being performed on a cluster where the computation is not only divided between all available nodes, but also locality-aware meaning it moves the computation where the data is.

Image Query and Analysis Engine

These frameworks are really cool and Apache Spark has a lot of functionality , but I don't want to map-reduce a collection.

I want to

  • filter out noise, segment, choose regions of interest
  • contour, component label
  • measure, count, and analyze

Image Query and Analysis Engine (IQAE)

  • Developed at 4Quant, ETH Zurich, and Paul Scherrer Institut
  • The IQAE is a Domain Specific Language for Microscopy for Spark.
  • It converts common imaging tasks into coarse-grained Spark operations

SIL

IQAE

We have developed a number of commands for IQAE handling standard image processing tasks

SIL Commands

Fully exensible with Spark Languages

SIL Commands

Algorithmic Quality Control

SIL Commands

My PhD Thesis on 2000 mouse femurs in 9 lines

CREATE TABLE FilteredBones AS
SELECT sampleId,GaussianFilter(image) FROM BoneImages
CREATE TABLE ThresholdImages AS
SELECT boneId,ApplyThreshold(image,OTSU) FROM FilteredBones
CREATE TABLE CellImages AS
SELECT boneId,ComponentLabel(image) FROM PorosityImages
CREATE TABLE CellAnalysis AS
SELECT boneId,AnalyzeShape(CellImages) FROM CellImages GROUP BY boneId
WRITE TABLE CellAnalysis "s3n://bones/cells.csv"

Time Comparison

Cloud Case

Distribution and Analysis

The query is processed by the SQL parsing engine

  • sent to the database where it can be combined with locally available private data
  • sent to the image layer for all image processing components to be integrated
  • sent to the Spark Core so specific map, reduce, fold operations can be generated
  • sent to the master node where it can be distributed across the entire cluster / cloud

pre-main prep time: 2 ms

Distribution and Analysis

pre-main prep time: 1 ms

Many Nodes

pre-main prep time: 1 ms

How do we get started?

  • First we start our cluster:
    spark-ec2.py -s 50 launch 4quant-image-cluster

  • Load all of the samples (56TB of data)

loadImage(
  "s3:/../brain_*_*_*/rec.tif"
  )

\[ \downarrow \] A Resilient Distributed Dataset (RDD) \[ \textrm{Images}: \textrm{RDD}[((x,y,z),Img[Double])] =\\ \left[(\vec{x},\textrm{Img}),\cdots\right] \]

  • Now start processing, run a gaussian filter on all the images to reduce the noise
filteredImages = Images.gaussianFilter(1,0.5)
  • Calculate the volume fraction at a threshold of 50%
volFraction = Images.threshold(0.5).
  map{keyImg =>
    (sum(keyImg.img),keyImg.size) 
    }.reduce(_+_)

Stitching?

We have all of the filtered images and we want to stitch them together in a smart way.

  • Start simple and match the images up with its neighbors (all image permutations and filter out the proximal ones)
pairImages = Images.
  cartesian(Images).
  filter((im1,im2) => dist(im1.pos,im2.pos)<1)

plot of chunk cartiscan

Cross Correlating

The cross correlation can then be executed on each pair of images from the new RDD (pairImages) by using the map command

displacementField = pairImages.
  map{
  ((posA,ImgA), (posB,ImgB)) =>

    xcorr(ImgA,ImgB,
      in=posB-posA)

  }

plot of chunk unnamed-chunk-17

From Matching to Stitching

From the updated information provided by the cross correlations and by applying appropriate smoothing criteria across windows.

smoothField = displacementField.
    window(3,3,3).
    map(gaussianSmoothFunction)

This also ensures the original data is left unaltered and all analysis is reversible.

Viewing Regions

The final stiching can then be done by

alignImages.
  filter(x=>abs(x-tPos)<img.size).
  map { (x,img) =>
   new Image(tSize).
    copy(img,x,tPos)
  }.combineImages()

plot of chunk unnamed-chunk-20

Exploring the data

Flexibility through Types

Developing in Scala brings additional flexibility through types[1], with microscopy the standard formats are 2-, 3- and even 4- or more dimensional arrays or matrices which can be iterated through quickly using CPU and GPU code. While still possible in Scala, there is a great deal more flexibility for data types allowing anything to be stored as an image and then processed as long as basic functions make sense.

[1] Fighting Bit Rot with Types (Experience Report: Scala Collections), M Odersky, FSTTCS 2009, December 2009

What is an image?

A collection of positions and values, maybe more (not an array of double). Arrays are efficient for storing in computer memory, but often a poor way of expressing scientific ideas and analyses.

  • Filter Noise?

combine information from nearby pixels

  • Find objects

determine groups of pixels which are very similar to desired result

Making Coding Simpler with Types

trait BasicMathSupport[T] extends Serializable {
    def plus(a: T, b: T): T
    def times(a: T, b: T): T
    def scale(a: T, b: Double): T
    def negate(a: T): T = scale(a,-1)
    def invert(a: T): T
    def abs(a: T): T
    def minus(a: T, b: T): T = plus(a, negate(b))
    def divide(a: T, b: T): T = times(a, invert(b))
    def compare(a: T, b: T): Int
}

Continuing with Types

  • Simple filter implementation

    def SimpleFilter[T](inImage: Image[T])
    (implicit val wst: BasicMathSupport[T]) = {
    val width: Double = 1
    kernel = (pos: D3int,value: T) => value * exp(-(pos.mag/width)**2)
    kernelReduce = (ptA,ptB) => (ptA + ptB) * 0.5
    runFilter(inImage,kernel,kernelReduce)
    }
    
  • Spectra as well supported types (or 2D diffraction patterns)

implicit val SpectraBMS = new BasicMathSupport[Array[Double]] {
    def plus(a: Array[Double], b: Array[Double]) = 
      a.zip(b).map(_ + _)
...
    def scale(a: Array[Double], b: Double) = 
      a.map(_*b)

Removing Bottlenecks with Hadoop Filesystem

Bottleneck is filesystem connection, many nodes (10+) reading in parallel brings even GPFS-based infiniband system to a crawl

SIL

One of the central tenants of MapReduce™ is data-centric computation \( \rightarrow \) instead of data to computation, move the computation to the data.

  • Use fast local storage for storing everything redundantly \( \rightarrow \) less transfer and fault-tolerance
  • Largest file size: 512 yottabytes, Yahoo has 14 petabyte filesystem in use

SIL

Spark / Resilient Distributed Datasets

Practical Specification

  • Distributed, parallel computing without logistics, libraries, or compiling
  • Declarative rather than imperative
    • Apply operation \( f \) to each image / block
    • NOT tell computer 3 to wait for an image from computer 2 to and perform operation \( f \) and send it to computer 1
    • Even scheduling is handled automatically
  • Results can be stored in memory, on disk, redundant or not

We have a cool tool, but what does this mean for me?

ETH Spinoff - 4Quant: From images to insight

  • Quantitative Search Machine for Images
    • Make analyzing huge sets of image data like a database search
  • Custom Analysis Solutions
    • Custom-tailored software to solve your problems
    • Streaming / Acquisition Solutions
    • Integration with Image Database Systems
    • Cloud Solutions
  • Use our API for your projects

Education / Training

First Steps: Scalable Image Storage

Comparison

GUI

Easily Interacting with Large Datasets

GUI

Searching?

New detectors certainly provide a substantial amount of new data, but what about old data?

  • At PSI we have years of tape archives with thousands of interesting experiments.
  • At hospitals and satellite imaging companies it is even more

PI:

Look at this new technique from famous journal, let's try it on all of our old data

PhD Student:

  1. Find old data
  2. Copy old data one disk at a time to current machine
  3. Read in old data, Run analysis, Save results
  4. Repeat from 2.

Small Data

2 years later

It seems to give the same results

PI

Well what about this technique?

Repeat ad naseaum

It doesn't need to be like this

  • Amazon provides cheep, universally acessible (but secure) storage
  • Amazon provides cheap, powerful computational resources
  • Spark / Spark Image Layer provides the glue to match the data to the computation to the science
allData = loadPSIImages("s3://psi-data/femur-bones/*.tif") ++
loadAPSImages("s3://aps-data/bone-images/*.hdf5") ++
loadESRFImages("s3://esrf-data/femur-bones/*.tif")
allData.registerImageMetaTable("BoneImages")

IQAE: Pathological Slices

  • Count the number of highly anisotropic nuclei in myeloma patients

\[ \downarrow \textrm{Translate to SQL} \]

SELECT COUNT(*) FROM 
  (SELECT SHAPE_ANALYSIS(LABEL_NUCLEI(pathology_slide)) FROM patients 
  WHERE disease LIKE "myleoma")
  WHERE anisotropy > 0.75

IQAE: Pathological Slices

\[ \downarrow \textrm{Load Myleoma Data Subset} \] Slices

IQAE: Pathological Slices

\[ \downarrow \textrm{Perform analysis on a every image} \] Slices

\[ \downarrow \textrm{Filter out the most anisotropic cells} \] Slices

IQAE: Maps

PSI

Find all strongly reflective large objects within 1km of Paul Scherrer Intitute, Villigen, CH

\[ \downarrow \textrm{Translate to SQL} \]

SELECT contour FROM (
  SELECT COMPONENT_LABEL(THRESHOLD(tile,200)) FROM esriTiles 
  WHERE DIST(LAT,-47.53000992328762,LONG,8.215198516845703)<1
  ) WHERE area>200

IQAE: Maps

We can then visualize these contours easily

plot of chunk region-lat-long-image

or apply them back to the original map

PSI

We support Open, Reproducible Science

While our framework is commercial, we build on top and integrate into open-source tools so that the research is reproducible by anyone, anywhere, using any machine (it just might take a bit longer)

We also try to publish as much code, educational material, and samples as possible on our github account.

Github

Acknowledgements: ETH and PSI

We are interested in partnerships and collaborations

Learn more at

Hadoop Filesystem (HDFS not HDF5)

Bottleneck is filesystem connection, many nodes (10+) reading in parallel brings even GPFS-based infiniband system to a crawl

SIL

One of the central tenants of MapReduce™ is data-centric computation \( \rightarrow \) instead of data to computation, move the computation to the data.

  • Use fast local storage for storing everything redundantly \( \rightarrow \) less transfer and fault-tolerance
  • Largest file size: 512 yottabytes, Yahoo has 14 petabyte filesystem in use

SIL