Assignment 3: Anomaly Detection

Objective

In Assignment 2, you have learned how supervised learning can be used to solving a real-world problem (i.e., sentiment analysis). In Assignment 3, you will be instructed to apply unsupervised learning to addressing a practical problem. For simplicity, we will use intrusion detection as an example. But the principle can be used in many other fields, such as fraud detection, medical care.

After completing this assignment, you should be able to answer the following questions:

  1. What's the difference between model development and model serving?
  2. How to transform categorical features to numerical features?
  3. What is User Defined Function (UDF)? And how to use it in SparkSQL?
  4. What is the difference between UDF and UDAF?
  5. How to derive anomalies from clustering results?
  6. How to tune parameters for unsupervised learning?

Overview

In practice, when you are facing an ML problem, the process of solving it basically consists of two phases: model development and model serving.

  • In model development, your job is to figure out what's the best algorithms, features, and parameters should be chosen based on historical data. This is often an iterative and off-line process.

  • Once you develop a satisfactory model, you will need to use the model to serve new requests and make predictions. This is often an on-line process, so you have to think about how to make the predictions as fast as possible and how to efficiently update the model when new data arrive.

Part 1: Model Development (Required)

Suppose you want to develop a model that can detect anomalous connections to your company's server. The server log contains all the information of historical connections; your nice colleague has already helped you to transform the raw log into a collection of feature vectors, where each feature vector characterizes a connection in 40 dimensions, e.g., number of failed login attempts, length (number of seconds) of the connection. Here is one example feature vector:

[udp,SF,0,105,146,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,255,240,0.94,0.01,0.00,0.00,0.00,0.00,0.00,0.00]

Your task is to take these feature vectors as input and develop an unsupervised-learning model to detect anomalous connections. In the lecture, we have gone through this process. In the assignment, you are going to implement the process. Since you have learned the KMeans API in the previous assignment, I helped you implement the KMeans part (see the code below). Read the code first, and then implement the remaining two functions: cat2Num and addScore, by doing Task A and Task B, respectively.

# anomaly_detection.py
from pyspark.mllib.clustering import KMeans, KMeansModel
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
import operator

sqlCt = SQLContext(sc)

class AnomalyDetection():

    def readData(self, filename):
        self.rawDF = sqlCt.read.parquet(filename).cache()

    def cat2Num(self, df, indices):
        """
            Write your code!
        """

    def addScore(self, df):
        """
            Write your code!
        """

    def detect(self, k, t):
        #Encoding categorical features using one-hot.
        df1 = self.cat2Num(self.rawDF, [0, 1]).cache()
        df1.show()

        #Clustering points using KMeans
        features = df1.select("features").rdd.map(lambda row: row[0]).cache()
        model = KMeans.train(features, k, maxIterations=40, runs=10, initializationMode="random", seed=20)

        #Adding the prediction column to df1
        modelBC = sc.broadcast(model)
        predictUDF = udf(lambda x: modelBC.value.predict(x), StringType())
        df2 = df1.withColumn("prediction", predictUDF(df1.features)).cache()
        df2.show()

        #Adding the score column to df2; The higher the score, the more likely it is an anomaly 
        df3 = self.addScore(df2).cache()
        df3.show()    

        return df3.where(df3.score > t)


if __name__ == "__main__":
    ad = AnomalyDetection()
    ad.readData('logs-features-sample')
    anomalies = ad.detect(8, 0.97)
    print anomalies.count()
    anomalies.show()

A toy dataset for testing

To test your program, you can read a toy dataset:

def readToyData(self):
        data = [(0, ["http", "udt", 0.4]), \
                (1, ["http", "udf", 0.5]), \
                (2, ["http", "tcp", 0.5]), \
                (3, ["ftp", "icmp", 0.1]), \
                (4, ["http", "tcp", 0.4])]
        schema = ["id", "rawFeatures"]
        self.rawDF = sqlCt.createDataFrame(data, schema)

After calling anomalies = ad.detect(2, 0.9) in the main func, your program should output the followings:

df1.show():

+---+----------------+-----------------------------------+
|id |rawFeatures     |features                           |
+---+----------------+-----------------------------------+
|0  |[http, udt, 0.4]|[0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.4]|
|1  |[http, udf, 0.5]|[0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.5]|
|2  |[http, tcp, 0.5]|[0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.5]|
|3  |[ftp, icmp, 0.1]|[1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.1]|
|4  |[http, tcp, 0.4]|[0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.4]|
+---+----------------+-----------------------------------+

df2.show():

+---+----------------+-----------------------------------+----------+
|id |rawFeatures     |features                           |prediction|
+---+----------------+-----------------------------------+----------+
|0  |[http, udt, 0.4]|[0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.4]|0         |
|1  |[http, udf, 0.5]|[0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.5]|0         |
|2  |[http, tcp, 0.5]|[0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.5]|0         |
|3  |[ftp, icmp, 0.1]|[1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.1]|1         |
|4  |[http, tcp, 0.4]|[0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.4]|0         |
+---+----------------+-----------------------------------+----------+

df3.show():

+---+----------------+-----------------------------------+----------+-----+
|id |rawFeatures     |features                           |prediction|score|
+---+----------------+-----------------------------------+----------+-----+
|0  |[http, udt, 0.4]|[0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.4]|0         |0.0  |
|1  |[http, udf, 0.5]|[0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.5]|0         |0.0  |
|2  |[http, tcp, 0.5]|[0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.5]|0         |0.0  |
|3  |[ftp, icmp, 0.1]|[1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.1]|1         |1.0  |
|4  |[http, tcp, 0.4]|[0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.4]|0         |0.0  |
+---+----------------+-----------------------------------+----------+-----+

anomalies.show():

+---+----------------+-----------------------------------+----------+-----+
|id |rawFeatures     |features                           |prediction|score|
+---+----------------+-----------------------------------+----------+-----+
|3  |[ftp, icmp, 0.1]|[1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.1]|1         |1.0  |
+---+----------------+-----------------------------------+----------+-----+

Real-world Dataset

Datasets can be downloaded from logs-features-sample and logs-features.

Note that you do not need to do feature scaling in this assignment, but you should make sure that you know how to do it (See StandardScaler for reference).

Task A. Categorical Features --> Numerical Features

As you can see from the above feature vector, the first two dimensions in each feature vector are categorical features. For example, the first dimension can be one of the following cases: “tcp”, “udp”, or, “icmp”. You can represent these categorical features using one-hot representation. In other words, the first dimension can be replaced with [1,0,0] for “tcp”, [0,1,0] for “udp”, and [0,0,1] for “icmp”.

In Task A, your job is to implement the cat2Num function. Note that in this function, you are not allowed to convert DataFrame to an RDD.

def cat2Num(self, df, indices):
        """ 
            Input: $df represents a DataFrame with two columns: "id" and "rawFeatures"
                   $indices represents which dimensions in $rawFeatures are categorical features, 
                    e.g., indices = [0, 1] denotes that the first two dimensions are categorical features.

            Output: Return a new DataFrame that adds the "features" column into the input $df

            Comments: The difference between "features" and "rawFeatures" is that 
            the latter transforms all categorical features in the former into numerical features 
            using one-hot key representation
        """

Hints. Please take a look at UDF and withColumn for transforming column values and adding a new column, respectively.

Additional Advice.

  1. I heard that some of you might prefer learning more tools from the course. But, there are hundreds of big data tools available right now. You can never master all of them. My advice is to train your fast-learning ability in this fast-changing world. To achieve this, a better strategy is to always have a big picture in your mind and learn 2~3 representitive tools in depth. For example, if you know what UDF is and how to use it in SparkSQL, then you can learn much faster when you need to use UDF in HIVE.

  2. In Hive, UDAF is also widely used. But, as of 01/22/2017, UDAF has not been added into PySpark (SPARK-10915). I suggest you taking a look at UDAF as well, and understand when to use it.

Task B. Adding Anomaly Score for Each Data Point

As you may remember, the intuition of our anomaly detection approach was that clusters with a small number of data points will correspond to attacks or anomalies. We use this intuition to generate a confidence score from the clustering model’s output. The confidence score reflects how much the clustering model believes a data point is an attack or not. Let us assume $x$ is a data point describing a network connection. We can use:

$$score(x) = \frac{N_{max}-N_{x}}{N_{max}-N_{min}}$$

to score $x$ as being an anomaly. Note that in this equation, $N_{max}$ and $N_{min}$ reflect the size of the largest and smallest clusters, respectively. $N_{x}$ represents the size of the cluster assigned to $x$. If you check the equation carefully, you will notice that $score(x) = 1$ when $x$ is assigned to the smallest cluster and $score(x)$ = 0 when $x$ is assigned to a large cluster.

In Task B, your job is to implement the addScore function. Note that in this function, you are not allowed to convert DataFrame to an RDD.

def addScore(self, df):
    """ 
        Input: $df represents a DataFrame with four columns: "id", "rawFeatures", "features", and "prediction"
        Output: Return a new DataFrame that adds the "score" column into the input $df

        To compute the score of a data point x, we use:

             score(x) = (N_max - N_x)/(N_max - N_min), 

        where N_max and N_min represent the size of the largest and smallest clusters, respectively,
              and N_x represents the size of the cluster assigned to x 
    """

Task C. Parameter Tuning for Unsupervised Learning

As you have seen from Assignments 1 and 2, parameter tuning plays an important role in improving the model quality of supervised learning. In this assignment, your task is to figure out how to tune parameters for unsupervised learning. There are two parameters that need to be tuned:

  • $k$: the number of clusters
  • $t$: the score threshold

They are set to k = 8 and t = 0.97 in the above program. In fact, if you changed them to different values, the result could be quite different. Thus, it is important for you to know how to tune $k$ and $t$ in practice.

In Task C, imagine yourself as a program manager (rather than a developer). Please write an email to tell a developer (named Nick) how you want to tune the parameters. YOU DO NOT NEED TO WRITE THE CODE!

Part 2: Model Serving (Not Required)

Think about how you can deploy the model developed above in production. In this situation, the server log comes as a data stream; predictions have to be made in real-time. To simulate a stream

$ cd your-data-path/stream-logs-features
$ ls part-r-* | xargs -I % sh -c '{ cat %; sleep 5;}' | nc -lk 9999

The above command will change the current directory to the data directory. Then, it will start writing data to a port on your machine, file by file with a 5s pause in between iterations. Streaming datasets can be downloaded from: stream-logs-features-sample and stream-logs-features.

Write a Spark Streaming script that reads data points from a socket and decides whether they are “normal” or “attack” connections. The following steps can guide your implementation:

Hints.

  • Create a streaming K-means model. For doing so, you need to first create a StreamingKMeans object, then use the setInitialCenters method to set cluster centers to those trained in Part 1.
  • As of 01/22/2017, Structured Streaming is still ALPHA. Like what you did for Spark MLlib vs. Spark ML, you have to make your own decision about which API (Spark Streaming vs. Structured Streaming) to choose in this task.

Model serving is a hot research topic. There are many interesting open problems. If you are interested in them, I highly recommend you to take a look at Velox, a new system that is being developed in the AMPLab for serving machine learning predictions.

Submission

Implement cat2Num and addScore functions in anomaly_detection.py. Submit your code file (anomaly_detection.py) and your email content (email.txt) to the CourSys activity Assignment 3.