Column-based functions

When building a pipeline, it is often the case that the top-level dataframe is complex and changing, but that the work focus on transforming columns of this dataframe. Karps provides a way to express and compose complex functions on columns without having to run computations. Behind the scenes, Karps is able to take these functions and translate them into sequences of queries without having to deal with the details of collecting, joining and broadcasting data.

As an example, we are going to build functions that take a numerical dataset and that produce a centered version (the mean is substracted) and a scaled version (the variance is scaled to 1). Of course, such basic operations are already built into Spark, but it is instructive to see how one would implement similar functions in practice.

We will see that thanks to laziness and determinism, Karps is able to reuse some computations, and provide a high-level, lazy API.

In [1]:
:load KarpsDisplays KarpsDagDisplay
:extension DeriveGeneric
:extension FlexibleContexts
:extension OverloadedStrings
:extension GeneralizedNewtypeDeriving
:extension FlexibleInstances
:extension MultiParamTypeClasses
In [2]:
import Spark.Core.Dataset
import Spark.Core.Context
import Spark.Core.Column
import Spark.Core.ColumnFunctions
import Spark.Core.Functions
import Spark.Core.Row
import Spark.Core.Types
import Spark.Core.Try

import qualified Data.Vector as V
import qualified Data.Text as T
import Data.Text(Text)
import GHC.Generics

We will start with an extremely simple dataset:

In [3]:
let ds = dataset [-1, 1] :: Dataset Int
-- A column of data containing integers
let myData = asCol ds

Here is a first function that computes the mean of the data in a column. Giving some names to the elements is not necessary but helps when looking at the DAG of computations.

Note that we can use all the usual operators (+, /, etc.) even if the computation is lazy.

Also, note that all the operations are strongly typed: unlike SQL, the casting is almost always explicit since it can lead to loss of precision (or worse) otherwise.

In [4]:
myMean :: Column ref Int -> LocalData Double
myMean col =
  let
    cnt = asDouble (countCol col) @@ "mean_count"
    s = asDouble (sumCol col) @@ "mean_sum"
  in (s / cnt)

Now, if we apply it to our data, the result is rather anti-climactic: we just get a LocalData out:

In [5]:
myMean myData
/localdiv_815e11@org.spark.LocalDiv!double

Let's build on this to make the centering function, which simply substracts the mean, and the scaling function, which builds on the other two:

Note that again, we need to cast the column, it is not going to be done for us.

Note: due a Haskell limitation, the - operation is replaced by a .-. This is because Haskell does not allow to mix different types together (here a column and an observable). This restriction is going to be lifted in the future.

In [6]:
myCenter :: Column ref Int -> Column ref Double
myCenter col =
  let m = (myMean col) @@ "center_mean"
  in (asDoubleCol col) .- m

myScaler :: Column ref Int -> Column ref Double
myScaler col =
  let cnt = asDouble (countCol col) @@ "count"
      centered = myCenter col
      stdDev = sumCol (centered * centered) / cnt @@ "std_dev"
  in centered ./ stdDev

What does the transform look like if we apply it? Let's run showGraph on our simple dataset:

In [7]:
-- make a new scaled column:
let scaled = myScaler myData
-- pack it into a dataset to visualize it:
let out = pack1 scaled
showGraph out

This graph is pretty complicated, and you should click around to see what each node corresponds to. A couple of points are noteworthy:

  • Karps handles automatically and seemlessly the broadcasting and the reduction of the variables. In fact, Karps can broadcast pretty much anything that is understood by Spark dataframes.

  • Karps tries to reuse computations as much as possible: even if we did not make any attempt for it, the count of the dataset is reused between the calculation of the mean and of the variance. This is only possible because of laziness.

  • thanks to naming, even if the functions happen to be nested, we can still quickly relate one operator to the function that generated it.

Now, let's execute all of that:

In [8]:
conf = defaultConf {
        confEndPoint = "http://10.0.2.2",
        confRequestedSessionName = "col_ops6" }

createSparkSessionDef conf
[Debug] Creating spark session at url: http://10.0.2.2:8081/sessions/col_ops6 @(<unknown>:<unknown> <unknown>:0:0)
In [9]:
exec1Def (collect scaled)
[Debug] executeCommand1': computing observable /collect_f09694@org.spark.Collect![double] @(<unknown>:<unknown> <unknown>:0:0)
[Info] Sending computations at url: http://10.0.2.2:8081/computations/col_ops6/0/createwith nodes: [/distributedliteral_e9167d@org.spark.DistributedLiteral:int,/sum_148aaa@SUM!int,/mean_sum@org.spark.Select!double,/count_624031@COUNT!int,/mean_count@org.spark.Select!double,/center_mean@org.spark.LocalDiv!double,/localpack_12d6b8@org.spark.LocalPack!{_1:double _2:double},/broadcastpair_c502ed@org.spark.BroadcastPair:{_1:int _2:{_1:double _2:double}},/select_4800d7@org.spark.Select:double,/sum_9c5517@SUM!double,/std_dev@org.spark.LocalDiv!double,/localpack_bb6144@org.spark.LocalPack!{_1:double _2:double},/broadcastpair_59ae8f@org.spark.BroadcastPair:{_1:int _2:{_1:double _2:double}},/select_c905de@org.spark.Select:double,/collect_f09694@org.spark.Collect![double]] @(<unknown>:<unknown> <unknown>:0:0)
[Info] _computationMultiStatus: /sum_148aaa running @(<unknown>:<unknown> <unknown>:0:0)
[Info] _computationMultiStatus: /count_624031 running @(<unknown>:<unknown> <unknown>:0:0)
[Info] _computationMultiStatus: /sum_148aaa finished @(<unknown>:<unknown> <unknown>:0:0)
[Info] _computationMultiStatus: /mean_sum finished @(<unknown>:<unknown> <unknown>:0:0)
[Info] _computationMultiStatus: /count_624031 finished @(<unknown>:<unknown> <unknown>:0:0)
[Info] _computationMultiStatus: /mean_count finished @(<unknown>:<unknown> <unknown>:0:0)
[Info] _computationMultiStatus: /center_mean finished @(<unknown>:<unknown> <unknown>:0:0)
[Info] _computationMultiStatus: /localpack_12d6b8 finished @(<unknown>:<unknown> <unknown>:0:0)
[Info] _computationMultiStatus: /sum_9c5517 running @(<unknown>:<unknown> <unknown>:0:0)
[Info] _computationMultiStatus: /std_dev running @(<unknown>:<unknown> <unknown>:0:0)
[Info] _computationMultiStatus: /localpack_bb6144 finished @(<unknown>:<unknown> <unknown>:0:0)
[Info] _computationMultiStatus: /collect_f09694 running @(<unknown>:<unknown> <unknown>:0:0)
[Info] _computationMultiStatus: /sum_9c5517 finished @(<unknown>:<unknown> <unknown>:0:0)
[Info] _computationMultiStatus: /std_dev finished @(<unknown>:<unknown> <unknown>:0:0)
[Info] _computationMultiStatus: /collect_f09694 finished @(<unknown>:<unknown> <unknown>:0:0)
[-1.0,1.0]

As a preview of the next chapter, here is the function to display the RDDs generated by Spark when running this command.

Each element comes from the graph before. You can see which ones are missing (they have been optimized away by Spark). When you click on a box, you can see the sequence of RDDs that was generated in the process.

In [10]:
displayRDD "0"
In [ ]: