In Assignment 2, you have learned how supervised learning can be used to solving a real-world problem (i.e., sentiment analysis). In Assignment 3, you will be instructed to apply unsupervised learning to addressing a practical problem. For simplicity, we will use intrusion detection as an example. But the principle can be used in many other fields, such as fraud detection, medical care.
After completing this assignment, you should be able to answer the following questions:
In practice, when you are facing an ML problem, the process of solving it basically consists of two phases: model development and model serving.
In model development, your job is to figure out what's the best algorithms, features, and parameters should be chosen based on historical data. This is often an iterative and off-line process.
Once you develop a satisfactory model, you will need to use the model to serve new requests and make predictions. This is often an on-line process, so you have to think about how to make the predictions as fast as possible and how to efficiently update the model when new data arrive.
Suppose you want to develop a model that can detect anomalous connections to your company's server. The server log contains all the information of historical connections; your nice colleague has already helped you to transform the raw log into a collection of feature vectors, where each feature vector characterizes a connection in 40 dimensions, e.g., number of failed login attempts, length (number of seconds) of the connection. Here is one example feature vector:
[udp,SF,0,105,146,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,255,240,0.94,0.01,0.00,0.00,0.00,0.00,0.00,0.00]
Your task is to take these feature vectors as input and develop an unsupervised-learning model to detect anomalous connections. In the lecture, we have gone through this process. In the assignment, you are going to implement the process. Since you have learned the KMeans API in the previous assignment, I helped you implement the KMeans part (see the code below). Read the code first, and then implement the remaining two functions: cat2Num and addScore, by doing Task A and Task B, respectively.
# anomaly_detection.py
from pyspark.mllib.clustering import KMeans, KMeansModel
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
import operator
sqlCt = SQLContext(sc)
class AnomalyDetection():
def readData(self, filename):
self.rawDF = sqlCt.read.parquet(filename).cache()
def cat2Num(self, df, indices):
"""
Write your code!
"""
def addScore(self, df):
"""
Write your code!
"""
def detect(self, k, t):
#Encoding categorical features using one-hot.
df1 = self.cat2Num(self.rawDF, [0, 1]).cache()
df1.show()
#Clustering points using KMeans
features = df1.select("features").rdd.map(lambda row: row[0]).cache()
model = KMeans.train(features, k, maxIterations=40, runs=10, initializationMode="random", seed=20)
#Adding the prediction column to df1
modelBC = sc.broadcast(model)
predictUDF = udf(lambda x: modelBC.value.predict(x), StringType())
df2 = df1.withColumn("prediction", predictUDF(df1.features)).cache()
df2.show()
#Adding the score column to df2; The higher the score, the more likely it is an anomaly
df3 = self.addScore(df2).cache()
df3.show()
return df3.where(df3.score > t)
if __name__ == "__main__":
ad = AnomalyDetection()
ad.readData('logs-features-sample')
anomalies = ad.detect(8, 0.97)
print anomalies.count()
anomalies.show()
To test your program, you can read a toy dataset:
def readToyData(self):
data = [(0, ["http", "udt", 0.4]), \
(1, ["http", "udf", 0.5]), \
(2, ["http", "tcp", 0.5]), \
(3, ["ftp", "icmp", 0.1]), \
(4, ["http", "tcp", 0.4])]
schema = ["id", "rawFeatures"]
self.rawDF = sqlCt.createDataFrame(data, schema)
After calling anomalies = ad.detect(2, 0.9)
in the main func, your program should output the followings:
df1.show():
+---+----------------+-----------------------------------+
|id |rawFeatures |features |
+---+----------------+-----------------------------------+
|0 |[http, udt, 0.4]|[0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.4]|
|1 |[http, udf, 0.5]|[0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.5]|
|2 |[http, tcp, 0.5]|[0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.5]|
|3 |[ftp, icmp, 0.1]|[1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.1]|
|4 |[http, tcp, 0.4]|[0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.4]|
+---+----------------+-----------------------------------+
df2.show():
+---+----------------+-----------------------------------+----------+
|id |rawFeatures |features |prediction|
+---+----------------+-----------------------------------+----------+
|0 |[http, udt, 0.4]|[0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.4]|0 |
|1 |[http, udf, 0.5]|[0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.5]|0 |
|2 |[http, tcp, 0.5]|[0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.5]|0 |
|3 |[ftp, icmp, 0.1]|[1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.1]|1 |
|4 |[http, tcp, 0.4]|[0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.4]|0 |
+---+----------------+-----------------------------------+----------+
df3.show():
+---+----------------+-----------------------------------+----------+-----+
|id |rawFeatures |features |prediction|score|
+---+----------------+-----------------------------------+----------+-----+
|0 |[http, udt, 0.4]|[0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.4]|0 |0.0 |
|1 |[http, udf, 0.5]|[0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.5]|0 |0.0 |
|2 |[http, tcp, 0.5]|[0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.5]|0 |0.0 |
|3 |[ftp, icmp, 0.1]|[1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.1]|1 |1.0 |
|4 |[http, tcp, 0.4]|[0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.4]|0 |0.0 |
+---+----------------+-----------------------------------+----------+-----+
anomalies.show():
+---+----------------+-----------------------------------+----------+-----+
|id |rawFeatures |features |prediction|score|
+---+----------------+-----------------------------------+----------+-----+
|3 |[ftp, icmp, 0.1]|[1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.1]|1 |1.0 |
+---+----------------+-----------------------------------+----------+-----+
Datasets can be downloaded from logs-features-sample and logs-features.
Note that you do not need to do feature scaling in this assignment, but you should make sure that you know how to do it (See StandardScaler for reference).
As you can see from the above feature vector, the first two dimensions in each feature vector are categorical features. For example, the first dimension can be one of the following cases: “tcp”, “udp”, or, “icmp”. You can represent these categorical features using one-hot representation. In other words, the first dimension can be replaced with [1,0,0] for “tcp”, [0,1,0] for “udp”, and [0,0,1] for “icmp”.
In Task A, your job is to implement the cat2Num function. Note that in this function, you are not allowed to convert DataFrame to an RDD.
def cat2Num(self, df, indices):
"""
Input: $df represents a DataFrame with two columns: "id" and "rawFeatures"
$indices represents which dimensions in $rawFeatures are categorical features,
e.g., indices = [0, 1] denotes that the first two dimensions are categorical features.
Output: Return a new DataFrame that adds the "features" column into the input $df
Comments: The difference between "features" and "rawFeatures" is that
the latter transforms all categorical features in the former into numerical features
using one-hot key representation
"""
Hints. Please take a look at UDF and withColumn for transforming column values and adding a new column, respectively.
Additional Advice.
I heard that some of you might prefer learning more tools from the course. But, there are hundreds of big data tools available right now. You can never master all of them. My advice is to train your fast-learning ability in this fast-changing world. To achieve this, a better strategy is to always have a big picture in your mind and learn 2~3 representitive tools in depth. For example, if you know what UDF is and how to use it in SparkSQL, then you can learn much faster when you need to use UDF in HIVE.
In Hive, UDAF is also widely used. But, as of 01/22/2017, UDAF has not been added into PySpark (SPARK-10915). I suggest you taking a look at UDAF as well, and understand when to use it.
As you may remember, the intuition of our anomaly detection approach was that clusters with a small number of data points will correspond to attacks or anomalies. We use this intuition to generate a confidence score from the clustering model’s output. The confidence score reflects how much the clustering model believes a data point is an attack or not. Let us assume $x$ is a data point describing a network connection. We can use:
$$score(x) = \frac{N_{max}-N_{x}}{N_{max}-N_{min}}$$to score $x$ as being an anomaly. Note that in this equation, $N_{max}$ and $N_{min}$ reflect the size of the largest and smallest clusters, respectively. $N_{x}$ represents the size of the cluster assigned to $x$. If you check the equation carefully, you will notice that $score(x) = 1$ when $x$ is assigned to the smallest cluster and $score(x)$ = 0 when $x$ is assigned to a large cluster.
In Task B, your job is to implement the addScore function. Note that in this function, you are not allowed to convert DataFrame to an RDD.
def addScore(self, df):
"""
Input: $df represents a DataFrame with four columns: "id", "rawFeatures", "features", and "prediction"
Output: Return a new DataFrame that adds the "score" column into the input $df
To compute the score of a data point x, we use:
score(x) = (N_max - N_x)/(N_max - N_min),
where N_max and N_min represent the size of the largest and smallest clusters, respectively,
and N_x represents the size of the cluster assigned to x
"""
As you have seen from Assignments 1 and 2, parameter tuning plays an important role in improving the model quality of supervised learning. In this assignment, your task is to figure out how to tune parameters for unsupervised learning. There are two parameters that need to be tuned:
They are set to k = 8 and t = 0.97 in the above program. In fact, if you changed them to different values, the result could be quite different. Thus, it is important for you to know how to tune $k$ and $t$ in practice.
In Task C, imagine yourself as a program manager (rather than a developer). Please write an email to tell a developer (named Nick) how you want to tune the parameters. YOU DO NOT NEED TO WRITE THE CODE!
Think about how you can deploy the model developed above in production. In this situation, the server log comes as a data stream; predictions have to be made in real-time. To simulate a stream
$ cd your-data-path/stream-logs-features
$ ls part-r-* | xargs -I % sh -c '{ cat %; sleep 5;}' | nc -lk 9999
The above command will change the current directory to the data directory. Then, it will start writing data to a port on your machine, file by file with a 5s pause in between iterations. Streaming datasets can be downloaded from: stream-logs-features-sample and stream-logs-features.
Write a Spark Streaming script that reads data points from a socket and decides whether they are “normal” or “attack” connections. The following steps can guide your implementation:
Hints.
Model serving is a hot research topic. There are many interesting open problems. If you are interested in them, I highly recommend you to take a look at Velox, a new system that is being developed in the AMPLab for serving machine learning predictions.
Implement cat2Num and addScore functions in anomaly_detection.py
. Submit your code file (anomaly_detection.py
) and your email content (email.txt
) to the CourSys activity Assignment 3.