Caching and uncaching with Spark

This is not meant to be a full tutorial, but rather a brief presentation on how to cache datasets with Karps. A full Spark tutorial can be found there: http://spark.apache.org/docs/latest/quick-start.html#caching

Normal caching

In [1]:
:load KarpsDisplays
:extension OverloadedStrings
import KarpsDisplays(showGraph)
import KarpsDagDisplay(computeGraphToDisplayGraph)
In [2]:
import Spark.Core.Dataset
import Spark.Core.Context
import Spark.Core.Functions
import Spark.Core.Column
import Spark.Core.ColumnFunctions


conf = defaultConf {
        confEndPoint = "http://10.0.2.2",
        confRequestedSessionName = "session03_introduction" }

createSparkSessionDef conf
[Debug] Creating spark session at url: http://10.0.2.2:8081/sessions/session03_introduction @(<unknown>:<unknown> <unknown>:0:0)
In [3]:
-- Some internal utilities to show what is happening under the scene.
import Spark.Core.Internal.ContextStructures
import Spark.Core.Internal.ContextInternal
import Spark.Core.Internal.Utilities(forceRight)

Caching is a fundamental step when working repeatedly with large datasets. It is hard to get right, however, and it is a frequent source of performance issues for budding Spark users. Karps embeds some tools to help you diagnose caching issues.

Consider the following trivial program that creates a dataset, and runs a few computations on it (it computes the number of elements as well as their sums, and computes the mean out of it). Of course, Spark includes a mean operator so this would not be recommended in practice, but it is instructive to study caching issues.

In [4]:
ds = dataset ([1,2,3] :: [Int]) @@ "data"
-- Turns the dataset into a column and computes the sum of all the elements in this column
s1 = sumCol (asCol ds) @@ "sum"
-- Counts the element in the dataset
s2 = count ds @@ "count"
result = (s1 + s2) @@ "result"

Let us see what the computation graph looks like:

In [5]:
showGraph result

For large datasets, this would be inefficient, because it would cause the original dataset data to be recomputed twice, to perform the sum and count operations.

We can instruct Spark to cache the dataset (using the cache operator). But this will consume some precious memory, so we need to tell Spark to uncache it once we are sure we are done. The operator to do that, unsurprisingly, is called uncache. Here is a first attempt that is not going to work:

In [6]:
-- cache
cachedData = cache ds @@ "cache"
s1 = sumCol (asCol cachedData) @@ "sum"
s2 = count cachedData @@ "count"
uncachedData = uncache cachedData @@ "uncache"
x = (s1 + s2) @@ "result"

Let us look at the graph of computations: the uncaching operation is missing!

In [7]:
showGraph x

Because of laziness, the result must depend on the uncaching operation. Otherwise it is skipped when calculating the operations that need to be done.

Here is a second attempt, in which we explicitly add a logical dependency between the uncaching and the result: the uncaching will happen before we compute the result. Adding a logical dependency is expressed with the depends function.

In [8]:
:t depends
depends :: forall loc a. ComputeNode loc a -> [UntypedNode] -> ComputeNode loc a
In [9]:
x = ((s1 + s2) `depends` [untyped uncachedData]) @@ "result"

Now we see the uncaching operation in the graph of operations:

In [10]:
showGraph x

However, when we attempt to run it, it fails!

In [11]:
exec1Def x
Error {ePath = NPath(), eMessage = "Found some caching errors: [CachingFailure {cachingNode = cccd6.., uncachingNode = cb333.., escapingNode = 335d5..},CachingFailure {cachingNode = cccd6.., uncachingNode = cb333.., escapingNode = 301f9..}]"}

The message gives us a hint, telling us that it found some operations that are illegal.

When you look at the graph above, nothing indicates to Spark if it should first cache the data, or compute the sum or count operation. This ambiguity is in fact visible in the graph because the three nodes sum, count and uncache are represented on the same line. Which one should be done first?

Because of the lazy nature in Spark, it is a common mistake to uncache data while it is still going to be used later. Karps analyzes the graph of computations, and will refuse to run operations if they could be misintepreted by Spark. In that case, Karps can see the sum or count operation could happen after we uncache the data, so it will ask us to resolve this ambiguity.

How do we fix it? Simply by ensuring that sum or count is run before uncaching the data, using logical dependencies again:

In [12]:
uncachedData = ((uncache cachedData) `depends` [untyped s1, untyped s2]) @@ "uncache"
x = ((s1 + s2) `depends` [untyped uncachedData]) @@ "result"

showGraph x

When trying to run this program now, Spark can properly launch the computations.

In [13]:
exec1Def x
[Debug] executeCommand1': computing observable /result@org.spark.LocalPlus!int @(<unknown>:<unknown> <unknown>:0:0)
[Info] Sending computations at url: http://10.0.2.2:8081/computations/session03_introduction/0/createwith nodes: [/data@org.spark.DistributedLiteral:int,/cache@org.spark.Cache:int,/sum@SUM!int,/count@COUNT!int,/uncache@org.spark.Unpersist:int,/result@org.spark.LocalPlus!int] @(<unknown>:<unknown> <unknown>:0:0)
[Info] _computationMultiStatus: /sum finished @(<unknown>:<unknown> <unknown>:0:0)
[Info] _computationMultiStatus: /count finished @(<unknown>:<unknown> <unknown>:0:0)
[Info] _computationMultiStatus: /result finished @(<unknown>:<unknown> <unknown>:0:0)
9

Autocache

This business of caching and uncaching is clearly tedious and at the same time fairly mechanical. Can we automate all that? Yes! Karps includes an autocache operator that acts as a hint: it instructs Spark to consider a specific dataset for potential caching. If Karps decides it is useful to cache this dataset, it will automatically insert caching and uncaching instructions.

Here is our original program, but this time we automate the caching process: no uncaching, and caching is done using the autocache operator.

In [14]:
cachedData = autocache ds @@ "autocache"
s1 = sumCol (asCol cachedData) @@ "sum"
s2 = count cachedData @@ "count"
x = (s1 + s2) @@ "result"

This is all that is required. For the sake of demonstration, let us see under the hood what transforms have been done by looking at the graph of computations. This is not required by casual users, only to understand the computations that are going to take place:

In [15]:
import Data.Maybe(fromJust)

let cg = forceRight (buildComputationGraph x)
sess <- fromJust <$> currentSessionDef 
let cg' = forceRight (performGraphTransforms sess cg)
computeGraphToDisplayGraph cg'

This is very close to the transforms that we have done manually! Karps has added extra logical dependencies to ensure that operations are done in the correct order.

To conclude:

  • use autocache when possible
  • if you need more fine-grained control over the placement and the order of operations, you can manually cache datasets using cache and uncache. Karps will prevent you from doing mistakes however.
In [ ]: