In [1]:
:load KarpsDisplays KarpsDagDisplay
In [2]:
:extension DeriveGeneric
:extension FlexibleContexts
:extension OverloadedStrings
:extension GeneralizedNewtypeDeriving
:extension FlexibleInstances
:extension MultiParamTypeClasses
In [3]:
import Spark.Core.Dataset
import Spark.Core.Context
import Spark.Core.Column
import Spark.Core.ColumnFunctions
import Spark.Core.Functions
import Spark.Core.Row
import Spark.Core.Types
import Spark.Core.Try

import qualified Data.Vector as V
import qualified Data.Text as T
import GHC.Generics

Typed and untyped data with Karps

It is common to start data exploration with untyped data, to see how things are working together, and to refine this approach into more and more types as a pipeline moves into production.

Karps offers a typed and an untyped API, reflecting different tradeoffs:

  • the untyped API is easier to use, but most mistakes will only be exposed to the user right before running the computation. Do not worry though, you can still check the intermediate computations.
  • the typed API requires a bit more machinery and performs more checks using the Haskell compiler. As such, the error messages may not be as convenient. However, it offers stronger guarantees that the operations are chained in a legitimate manner. Also, it can be enforced across a large codebase without having to run some code first.

We are going to explore these tradeoffs on a small task of filtering data.

You are given a dataset about the nearby forest that details various trees being observed.

  • tree id
  • tree height
  • tree width

And we are going to compute various statistics about these trees.

In [4]:
rawData = [(1, 3, 2)] :: [(Int, Int, Int)]

For the sake of this example, we manually build up the content in a form that is digestible by Karps. In practice, though, this data would come from one of Spark's many input sources.

In [5]:
let fun (id', height, width) = RowArray (V.fromList [IntElement id', IntElement height, IntElement width])
dataCells = fun <$> rawData
[RowArray (fromList [IntElement 1,IntElement 3,IntElement 2])]

Dynamic operations with Dataframes

The first way to build a dataframe is to combine a datatype with a list of cells. We can build the datatype by hand, and then call the dataframe function:

In [6]:
-- The API is not stable yet, importing some internals
import Spark.Core.Internal.TypesFunctions
dt = structType [
        structField "treeId" intType,
        structField "treeWidth" intType,
        structField "treeHeight" intType]

-- The 'dataframe' function builds as new dataframe from a type and some content.
treesDF = dataframe dt dataCells
Right /distributedliteral_2e6742@org.spark.DistributedLiteral:{treeId:int treeWidth:int treeHeight:int}

As one can see from above, this operation succeeded, and lets us manipulate the data. We can extract columns of data, combine these columns, etc.

In [7]:
-- The '/-' operator extracts a column of data, using the field name as a string.
idCol = treesDF /- "treeId"
Right treeId{int}->/distributedliteral_2e6742
In [8]:
widthCol = treesDF/-"treeWidth"
doubleWidthCol = (widthCol + widthCol) @@ "doubleWidth"
Right doubleWidth{int}->/distributedliteral_2e6742

At the end of the day, all the resulting columns can be packed again in a new dataframe:

In [9]:
outputDF = pack' [idCol, doubleWidthCol]
Right /select_f28d60@org.spark.Select:{treeId:int doubleWidth:int}

Of course, this API does not prevent us from doing nonsensical operations:

In [10]:
-- What does that mean?
weirdCol = idCol + widthCol
weirdCol2 = idCol + 1

And Haskell will not help us if the schema of the dataframe changes:

In [11]:
treesDF /- "missingColumn"
Left (Error {ePath = NPath(), eMessage = "unsafeStaticProjection: Cannot find the field missingColumn in type {treeId:int treeWidth:int treeHeight:int}"})

Simple typing operations with Datasets

We can enforce some level of typing by making the structure of the data available to Karps. Here is how to assign a data structure to some data represented by Karps. We start by some simple representation that uses raw types:

In [12]:
data Tree = Tree {
  treeId :: Int,
  treeWidth :: Int,
  treeHeight :: Int } deriving (Generic, Show) -- You need these two classes at least

-- Automotically builds some converters between Spark datatypes and the
-- Haskell representation.
instance SQLTypeable Tree
-- Automatically infers some converters between the Spark data formats
-- and the Haskell in-memory representation.
instance ToSQL Tree

-- Theses accessors must be written by hand for now, but they can be
-- inferred in the future using TemplateHaskell.
treeId' :: StaticColProjection Tree Int
treeId' = unsafeStaticProjection buildType "treeId"
treeWidth' :: StaticColProjection Tree Int
treeWidth' = unsafeStaticProjection buildType "treeWidth"
treeHeight' :: StaticColProjection Tree Int
treeHeight' = unsafeStaticProjection buildType "treeHeight"
instance TupleEquivalence Tree (Int, Int, Int) where
  tupleFieldNames = NameTuple ["treeId", "treeWidth", "treeHeight"]

We can now take a dataframe and attempt to cast it to a (typed) dataset. Since this operation can fail, it is wrapped with a Try.

In [13]:
tTreesDS = asDS treesDF :: Try (Dataset Tree)
Right /distributedliteral_2e6742@org.spark.DistributedLiteral:{treeId:int treeWidth:int treeHeight:int}
In [14]:
asDS outputDF :: Try (Dataset Tree)
Left (Error {ePath = NPath(), eMessage = "castType: Casting error: dataframe has type {treeId:int doubleWidth:int} incompatible with type {treeId:int treeWidth:int treeHeight:int}"})

Since we know that this is going to work and since we are doing exploratory analysis, we are going to unwrap the Try and look at the dataset directly. That code will throw an exception if the types are not compatible:

In [15]:
-- To import `forceRight`.
import Spark.Core.Internal.Utilities(forceRight)
treesDS = forceRight (asDS treesDF) :: Dataset Tree
/distributedliteral_2e6742@org.spark.DistributedLiteral:{treeId:int treeWidth:int treeHeight:int}

All the operations on the dataframes can now be checked by the compiler:

In [16]:
col1 = treesDS // treeId'
:t col1
col1 :: Column Tree Int

We can still do some dynamic matching if we prefer, but then we get a dynamic column instead.

In [17]:
col1' = treesDS /- "treeId"
:t col1'
col1' :: DynColumn
Right treeId{int}->/distributedliteral_2e6742

After manipulating columns, all the data can be packed as a tuple, or as some other types. The following operations are fully type-checked. Try to change the types to see what happens:

In [18]:
outputDS = pack (col1, treesDS//treeWidth') :: Dataset (Int, Int)
-- Or we can get our trees back
outputDS2 = pack (treesDS//treeId', treesDS//treeWidth', treesDS//treeHeight') :: Dataset Tree

This still lets do some bogus operations, because we use some primitive types to represent the data:

In [19]:
-- Some curious operation
curious = (treesDS//treeWidth') + (treesDS//treeId')
:t curious
curious :: Column Tree Int
treeWidth + treeId{int}->/distributedliteral_2e6742

Domain-specific static typing

Or course we do not want to mix the different data types together, as we would do with regular Haskell code. Using newtype instances, we can tell Karps to use different types in Haskell, while still using the same datatype representation in Spark.

In [20]:
-- We are not allowing arithmetic operations on the ids anymore, just to be printed (Show)
newtype MyId = MyId Int deriving (Generic, Show)
instance SQLTypeable MyId
instance ToSQL MyId

-- We allow the new Length type to do some operations (Num)
newtype Length = Length Int deriving (Generic, Num, Show)
instance SQLTypeable Length
instance ToSQL Length
typeForLength :: SQLType Length
typeForLength = buildType

Let us define our new 'safer' tree structure. Because of Haskell's limitation with records, and because we have all the structures in the same notebooks, we have to pick a differnt name for the variables. In practice, these structures would not get mixed up.

In [21]:
data STree = STree {
  sTreeId :: MyId,
  sTreeWidth :: Length,
  sTreeHeight :: Length } deriving (Generic, Show)

instance SQLTypeable STree
instance ToSQL STree

-- Theses accessors must be written by hand for now, but they can be
-- inferred in the future using TemplateHaskell.
sTreeId' :: StaticColProjection STree MyId
sTreeId' = unsafeStaticProjection buildType "sTreeId"
sTreeWidth' :: StaticColProjection STree Length
sTreeWidth' = unsafeStaticProjection buildType "sTreeWidth"
sTreeHeight' :: StaticColProjection STree Length
sTreeHeight' = unsafeStaticProjection buildType "sTreeHeight"
instance TupleEquivalence STree (MyId, Length, Length) where
  tupleFieldNames = NameTuple ["sTreeId", "sTreeWidth", "sTreeHeight"]

Because of the name change, we cannot cast directly our previous dataframe to that dataset: the names of the fields do not match.

NOTE: that behaviour may be changed in future by just focusing on the types and dropping the name checks.

In [22]:
forceRight (asDS treesDF) :: Dataset STree
/distributedliteral_2e6742@org.spark.DistributedLiteral:{sTreeId:int sTreeWidth:int sTreeHeight:int}

We are going to do some gymnastics with the columns. There are two choices: either we build a dataframe first and then type-check it, or we type-check first each of the columns of the dataframe, and then combine the checked columns in a safe manner.

Here is the first option:

In [23]:
-- We can build a structure first and convert it to a dataframe:
str = struct' [ (treesDF/-"treeId") @@ "sTreeId",
                (treesDF/-"treeWidth") @@ "sTreeWidth",
                (treesDF/-"treeHeight") @@ "sTreeHeight"]
treesDF2 = pack' str
treesDS2 = forceRight (asDS treesDF2) :: Dataset STree
:t treesDS2
treesDS2 :: Dataset STree
/select_611827@org.spark.Select:{sTreeId:int sTreeWidth:int sTreeHeight:int}

And here is using typed columns. The do...return block wraps all the possible failurs when extracting the types columns.

In [24]:
tTreesDS2 = do
  idCol <- castCol' (buildType::SQLType MyId) (treesDF/-"treeId")
  widthCol <- castCol' (buildType::SQLType Length) (treesDF/-"treeWidth")
  heightCol <- castCol' (buildType::SQLType Length) (treesDF/-"treeWidth")
  -- This operation is type-safe
  let s = pack (idCol, widthCol, heightCol) :: Dataset STree
  return s
treesDS2 = forceRight tTreesDS2
:t treesDS2
treesDS2 :: Dataset STree
/select_97c4b5@org.spark.Select:{sTreeId:int sTreeWidth:int sTreeHeight:int}
In [25]:
let widthCol' = treesDF/-"treeWidth"
let widthCol = forceRight (castCol' typeForLength widthCol')

Now all the data can be manipulated in a type-safe manner. Under the hood, all these types will be unwrapped to Spark's primitive types.

In [26]:
idCol = treesDS2 // sTreeId'
:t idCol
idCol :: Column STree MyId
In [27]:
-- This will not work anymore:
idCol + idCol
No instance for (Num MyId) arising from a use of ‘+’
In the expression: idCol + idCol
In an equation for ‘it’: it = idCol + idCol
In [28]:
-- But this will still work:
widthCol = treesDS2//sTreeWidth'
heightCol = treesDS2//sTreeHeight'
volumeCol = (widthCol + heightCol) @@ "volume"
:t volumeCol
volumeCol :: Column STree Length

Potentially illegal casting operations will not work:

In [29]:
pack (idCol, volumeCol) :: Dataset (Int, Int)
No instance for (karps- STree (karps- STree Length) Int)
arising from a use of ‘pack’
In the expression: pack (idCol, volumeCol) :: Dataset (Int, Int)
In an equation for ‘it’: it = pack (idCol, volumeCol) :: Dataset (Int, Int)
In [30]:
pack (idCol, volumeCol) :: Dataset (MyId, Length)
/select_8c6946@org.spark.Select:{_1:int _2:int}

And of course, the final result can always be converted back to a dataframe if it is more convenient:

In [31]:
pack' [untypedCol idCol, untypedCol volumeCol]
Right /select_1a963e@org.spark.Select:{sTreeId:int volume:int}

To conclude, Karps allows you to use Haskell's type checking as an opt-in compile-time check. You can still mix and match both styles if more convenient.