In [1]:
:extension OverloadedStrings
:load KarpsDisplays KarpsDagDisplay


This notebook guides you through the first steps of using Karps.

In order to run it, you must start a spark server with the karps scala server loaded in it.

If you are familiar with Spark already, the Karps code is roughly organized the same way as Spark.

In [3]:
import Spark.Core.Dataset
import Spark.Core.Context
import Spark.Core.Functions
import Spark.Core.Column
import Spark.Core.ColumnFunctions

Karps communicates with Spark through a session object, called a SparkSession. For the purpose of interactive exploration, an implicit session can be created for a notebook. This pattern is not recommended for production cases, but it lets you try things quickly in a notebook.

Create a configuration objet. You can specify the location of the Spark endpoint. Calling createSparkSessionDef will allocate a default session. Note that all the xxxDef functions also have a xxx equivalent that takes or returns a session.

In [4]:
let conf = defaultConf {
        confEndPoint = "",
        confRequestedSessionName = "session00_introduction" }
print conf
createSparkSessionDef conf
SparkSessionConf {confEndPoint = "", confPort = 8081, confPollingIntervalMillis = 500, confRequestedSessionName = "session00_introduction", confUseNodePrunning = False}
[Debug] Creating spark session at url: @(<unknown>:<unknown> <unknown>:0:0)

Let us run our first program on Spark. We are going to create a tiny dataset and compute the number of elements in this dataset.

Creating a sequence of Spark operations does not require a session: at this point, you declare the operations that you want to do.

The command to create a dataset from existing elements is (surprise) dataset:

In [5]:
-- Let's make a big dataset with four elements:
let ds = dataset ([1 ,2, 3, 4]::[Int])

In order to count the number of elements, we are just going to use the built-in count command.

Unlike Spark, this command is also declarative and lazy: no computation will happen when it is called. It will return an observable that we can combine with other nodes or evaluate.

In [6]:
let c = count ds
In [7]:
:type c
c :: LocalData Int

In order to query the value and execute the computation graph, you need to call one of the exec commands. This analyzes the computation graph for possible errors, sends it to Spark for execution, and returns the result.

In this notebook, we will use the default execution context, which is implicitly used when calling exec1Def. For production cases, you should pass your own context and use exec1.

You can only send observables. Dataframes cannot be evaluated directly.

In [8]:
mycount <- exec1Def c
[Debug] executeCommand1': computing observable /count_22182b@COUNT!int @(<unknown>:<unknown> <unknown>:0:0)
[Info] Sending computations at url: nodes: [/distributedliteral_5c1023@org.spark.DistributedLiteral:int,/count_22182b@COUNT!int] @(<unknown>:<unknown> <unknown>:0:0)
[Info] _computationMultiStatus: /count_22182b finished @(<unknown>:<unknown> <unknown>:0:0)

As expected, mycount is an integer with the value 4:

In [9]:
:t mycount
mycount :: Int


If you execute again the same code, you will find that Karps runs it much faster:

In [10]:
_ <- exec1Def c
[Debug] executeCommand1': computing observable /count_22182b@COUNT!int @(<unknown>:<unknown> <unknown>:0:0)
[Info] Sending computations at url: nodes: [/distributedliteral_5c1023@org.spark.DistributedLiteral:int,/count_22182b@COUNT!int] @(<unknown>:<unknown> <unknown>:0:0)

Computations in Karps are completely deterministic: the same computation graph will always return the same exact result. Thanks to this property, Karps can aggressively cache final and intermediate results, and reuse them when they can remove some chunks of computations. Furthermore, since the graph of computations fully describes the computation, it can be saved along the data as a proof of how the result got generated, guaranteeing reproducible results.

Because some operations in Spark are intrisincally non-deterministic, this may require some changes from existing code. For example:

  • some operations such as collect always sort their results to maintain a result that is independent from the data layout
  • random is not available yet. Some strategies based on hashing will be available.
  • current_time will most probably never be available within Karps. However, the current time can be retrieved from the environment and passed as a constant.

Note this is a preview, so the caching is not complete yet.

For example, when distributing and collecting a dataset, the order of the initial data does not matter:

In [11]:
set = dataset ([1,2,3] :: [Int])
x = collect (asCol set)
exec1Def x
[Debug] executeCommand1': computing observable /collect_2846ce@org.spark.Collect![int] @(<unknown>:<unknown> <unknown>:0:0)
[Info] Sending computations at url: nodes: [/distributedliteral_c4ea43@org.spark.DistributedLiteral:int,/collect_2846ce@org.spark.Collect![int]] @(<unknown>:<unknown> <unknown>:0:0)
[Info] _computationMultiStatus: /collect_2846ce finished @(<unknown>:<unknown> <unknown>:0:0)
In [12]:
set = dataset ([3,2,1] :: [Int]) -- Data is reversed, but the output is the same.
x = collect (asCol set)
exec1Def x
[Debug] executeCommand1': computing observable /collect_58d68a@org.spark.Collect![int] @(<unknown>:<unknown> <unknown>:0:0)
[Info] Sending computations at url: nodes: [/distributedliteral_296a27@org.spark.DistributedLiteral:int,/collect_58d68a@org.spark.Collect![int]] @(<unknown>:<unknown> <unknown>:0:0)
[Info] _computationMultiStatus: /collect_58d68a finished @(<unknown>:<unknown> <unknown>:0:0)


This is the end of this first demonstration. From this point, you can explore different topics:

  • working with cached data
  • organizing computations with paths and scopes
In [ ]: