In the previous assignments, we have learned how to build machine learning models to analyze data and make predictions. This is the cool part of being a data scientist. But, the reality is that data may not be at hand, thus you need to know what data to collect and how to collect them. When you get the data, you will find real-world datasets are often dirty and they may come from multiple sources. If your career goal is to become a data scientist, you have to master some skills for data cleaning and integration.
In Assignment 4, you will go through the solution to Entity Resolution (ER), a very common problem in data cleaning and integration. After completing this assignment, you should be able to answer the following questions:
ER is defined as finding different records that refer to the same real-world entity, e.g., iPhone 4-th generation vs. iPhone four. It is central to data integration and cleaning. In this assignment, you will learn how to apply ER in a data integration setting. But the program that you are going to write can be easily extended to a data-cleaning setting, being used to detect duplication values (see $r_1$ and $r_3$ in Table 1).
Imagine that you want to help your company's customers to buy products at a cheaper price. In order to do so, you decide to first collect product data from Amazon.com and Google Shopping, and then integrate the data together. Since the same product may have different representations in the two websites, you are facing an ER problem.
Existing ER techniques can be broadly divided into two categories: similarity-based and learning-based. In Part 1, you will be instructed to implement a similarity-based method. Later, in Part 2, you will find out how a learning-based technique should work.
Unlike a learning-based technique, a similarity-based technique (a.k.a similarity join) does not need any label data. It first chooses a similarity function and a threshold, and then returns the record pairs whose similarity values are above the threshold. These returned record pairs are thought of as matching pairs, i.e., referring to the same real-world entity.
Depending on particular applications, you may need to choose different similarity functions. In this assignment, we will use Jaccard similarity, i.e., $\textsf{Jaccard}(r, s) = \big|\frac{r~\cap~s}{r~\cup~s}\big|$. Here is the formal definition of this problem.
Jaccard-Similarity Join: Given two DataFrames, R and S, and a threshold $\theta \in (0, 1]$, the jaccard-similarity join problem aims to find all record pairs $(r,~s) \in R \times S$ such that $\textsf{Jaccard}(r, s) \geq \theta$
In order to implement similarity join, you need to address the following challenges:
Jaccard is used to quantify the similarity between two sets instead of two records. You need to convert each record to a set.
A naive implementation of similarity join is to compute Jaccard for all $|R \times S|$ possible pairs. Imagine R and S have one million records. This requires to do 10^12 pair comparisons, which is extremely expensive. Thus, you need to know how to avoid n^2 comparisons.
The output of ER is a set of matching pairs, where each pair is considered as referring to the same real-world entity. You need to know how to evaluate the quality of an ER result.
Next, you will be guided to complete four tasks. After finishing these tasks, I suggest you going over the above challenges again, and understand how they are addressed.
Read the code first, and then implement the remaining four functions: preprocessDF, filtering, verification, and evaluate by doing Tasks A-D, respectively.
# entity_resolution.py
import re
import operator
from pyspark.sql import SQLContext
sqlCt = SQLContext(sc)
class EntityResolution:
def __init__(self, dataFile1, dataFile2, stopWordsFile):
self.f = open(stopWordsFile, "r")
self.stopWords = set(self.f.read().split("\n"))
self.stopWordsBC = sc.broadcast(self.stopWords).value
self.df1 = sqlCt.read.parquet(dataFile1).cache()
self.df2 = sqlCt.read.parquet(dataFile2).cache()
def preprocessDF(self, df, cols):
"""
Write your code!
"""
def filtering(self, df1, df2):
"""
Write your code!
"""
def verification(self, candDF, threshold):
"""
Write your code!
"""
def evaluate(self, result, groundTruth):
"""
Write your code!
"""
def jaccardJoin(self, cols1, cols2, threshold):
newDF1 = self.preprocessDF(self.df1, cols1)
newDF2 = self.preprocessDF(self.df2, cols2)
print "Before filtering: %d pairs in total" %(self.df1.count()*self.df2.count())
candDF = self.filtering(newDF1, newDF2)
print "After Filtering: %d pairs left" %(candDF.count())
resultDF = self.verification(candDF, threshold)
print "After Verification: %d similar pairs" %(resultDF.count())
return resultDF
def __del__(self):
self.f.close()
if __name__ == "__main__":
er = EntityResolution("Amazon_sample", "Google_sample", "stopwords.txt")
amazonCols = ["title", "manufacturer"]
googleCols = ["name", "manufacturer"]
resultDF = er.jaccardJoin(amazonCols, googleCols, 0.5)
result = resultDF.map(lambda row: (row.id1, row.id2)).collect()
groundTruth = sqlCt.read.parquet("data/sample/Amazon_Google_perfectMapping_sample") \
.map(lambda row: (row.idAmazon, row.idGoogle)).collect()
print "(precision, recall, fmeasure) = ", er.evaluate(result, groundTruth)
Datasets can be downloaded from Amazon-Google-Sample and Amazon-Google.
The program will output the following when running on the sample data:
Before filtering: 256 pairs in total
After Filtering: 79 pairs left
After Verification: 6 similar pairs
(precision, recall, fmeasure) = (1.0, 0.375, 0.5454545454545454)
Since Jaccard needs to take two sets as input, your first job is to preprocess DataFrames by transforming each record into a set of tokens. Please implement the following function.
def preprocessDF(self, df, cols):
"""
Input: $df represents a DataFrame
$cols represents the list of columns (in $df) that will be concatenated and be tokenized
Output: Return a new DataFrame that adds the "joinKey" column into the input $df
Comments: The "joinKey" column is a list of tokens, which is generated as follows:
(1) concatenate the $cols in $df;
(2) apply the tokenizer to the concatenated string
Here is how the tokenizer should work:
(1) Use "re.split(r'\W+', string)" to split a string into a set of tokens
(2) Convert each token to its lower-case
(3) Remove stop words
"""
Hints.
If you have mastered the use of UDF and withColumn by doing Assignment 3, you should have no problem to finish this task.
For the purpose of testing, you can compare your outputs with newDF1 and newDF2 that can be found from the test folder of the Amazon-Google-Sample dataset.
To avoid $n^2$ pair comparisons, ER algorithms often follow a filtering-and-verification framework. The basic idea is to first filter obviously non-matching pairs and then only verify the remaining pairs.
In Task B, your job is implement the filtering function. This function will filter all the record pairs whose joinKeys do not share any token. This is because that based on the definition of Jaccard, we can deduce that if two sets do not share any element (i.e., $r\cap s = \phi$), their Jaccard similarity values must be zero. Thus, we can safely remove them.
def filtering(self, df1, df2):
"""
Input: $df1 and $df2 are two input DataFrames, where each of them
has a 'joinKey' column added by the preprocessDF function
Output: Return a new DataFrame $candDF with four columns: 'id1', 'joinKey1', 'id2', 'joinKey2',
where 'id1' and 'joinKey1' are from $df1, and 'id2' and 'joinKey2'are from $df2.
Intuitively, $candDF is the joined result between $df1 and $df2 on the condition that
their joinKeys share at least one token.
Comments: Since the goal of the "filtering" function is to avoid n^2 pair comparisons,
you are NOT allowed to compute a cartesian join between $df1 and $df2 in the function.
Please come up with a more efficient algorithm (see my hints below).
"""
Hints.
In the second phase of the filtering-and-verification framework, we will compute the Jaccard similarity for each survived pair and return those pairs whose jaccard similarity values are no smaller than the specified threshold.
In Task C, your job is to implement the verification function. This task looks simple, but there are a few small "traps" (see the hints below).
def verification(self, candDF, threshold):
"""
Input: $candDF is the output DataFrame from the 'filtering' function.
$threshold is a float value between (0, 1]
Output: Return a new DataFrame $resultDF that represents the ER result.
It has five columns: id1, joinKey1, id2, joinKey2, jaccard
Comments: There are two differences between $candDF and $resultDF
(1) $resultDF adds a new column, called jaccard, which stores the jaccard similarity
between $joinKey1 and $joinKey2
(2) $resultDF removes the rows whose jaccard similarity is smaller than $threshold
"""
Hints.
You need to implement a function for computing the Jaccard similarity between two joinKeys. Since the function will be called for many times, you have to think about what's the most efficient implementation for the function. Furthermore, you also need to consider some edge cases in the function.
For the purpose of testing, you can compare your output with resultDF that can be found from the test folder of the Amazon-Google-Sample dataset.
How should we evaluate an ER result? Before answering this question, let's first recall what the ER result looks like. The goal of ER is to identify all matching record pairs. Thus, the ER result should be a set of identified matching pairs, denoted by R. One thing that we want to know is that what percentage of the pairs in $R$ that are truly matching? This is what Precision can tell us. Let $T$ denote the truly matching pairs in $R$. Precision is defined as: $$Precision = \frac{|T|}{|R|}$$
In addition to Precision, another thing that we care about is that what percentage of truly matching pairs that are identified. This is what Recall can tell us. Let $A$ denote the truly matching pairs in the entire dataset. Recall is defined as:
$$Recall = \frac{|T|}{|A|}$$There is an interesting trade-off between Precision and Recall. As more and more pairs that are identified as matching, Recall increases while Precision potentially decreases. For the extreme case, if we return all the pairs as matching pairs, we will get a perfect Recall (i.e., Recall = 100%) but precision will be the worst. Thus, to balance Precision and Recall, people often use FMeasure to evaluate an ER result:
$$FMeasure = \frac{2*Precision*Recall}{Precision+Recall}$$In Task D, you will be given an ER result as well as the ground truth that tells you what pairs are truly matching. Your job is to calculate Precision, Recall and FMeasure for the result.
def evaluate(self, result, groundTruth):
"""
Input: $result is a list of matching pairs identified by the ER algorithm
$groundTrueth is a list of matching pairs labeld by humans
Output: Compute precision, recall, and fmeasure of $result based on $groundTruth, and
return the evaluation result as a triple: (precision, recall, fmeasure)
"""
return (precision, recall, fmeasure)
Hints. It's likely that |R|, |A|, or Precision+Recall are equal to zero, so please pay attention to some edge cases.
Imagine you have implemented the above similarity-join algorithm and are using it in production. But you find that it is inefficient for large datasets or the result quality is not satisfactory. Then you might want to know how to make it more efficient or return more accurate results. Here are a few things you can try out.
In Task B, we only filtered the record pairs that share zero token. This is a very conservative filtering strategy. For example, can we also filter the record pairs that only share one token, i.e., $|r \cap s| = 1$? If this holds, then the question is that what's the minimum number of tokens that two records need to share in order to guarantee $\textsf{Jaccard}(r, s) \geq \theta$. Once we get the number, how can we modify our filtering algorithm to support it? To get the answers for these questions, I recommend you to read my following paper. The algorithm presented in the paper can run orders of magnitude faster than the algorithm you implemented in Part 1.
Jiannan Wang, Guoliang Li, Jianhua Feng. Can We Beat The Prefix Filtering? An Adaptive Framework for Similarity Join and Search. SIGMOD 2012:85-96.
In this assignment, we only focus on the filtering strategy for similarity joins. If you want to know the filtering strategies for some other types of ER techniques, please take a look at the following survey paper.
P. Christen. A survey of indexing techniques for scalable record linkage and deduplication. TKDE 2012: 1537-1555.
TF-IDF. When we compute Jaccard similarity, it is assumed that every token has the same weight. As we have seen in Assignment 2, some tokens should be more important than others. Thus, a simple way to improve result quality is to compute the TF-IDF weight for each token and then use the weighted jaccard to quantify similarity, i.e., $\textsf{WJaccard(r, s)} = \frac{\textsf{wt}(r \cap s)}{\textsf{wt}(r \cup s)}$, where $\textsf{wt}(r\cap s)$ and $\textsf{wt}(r\cup s)$ denote the sum of the weights of the tokens in $r \cap s$ and $r \cup s$, respectively.
Learning-based. You might find that ER is kind of like a classification problem, where you have a bunch of record pairs and you want to classify them as matching or non-matching. Thus, an important problem is how to represent a record pair as a feature vector. The following paper describes a common way to do it. This method is being used in many open-source data-cleaning systems, e.g., SampleClean and Dedup.
M. Bilenko and R. J. Mooney. Adaptive duplicate detection using learnable string similarity measures. In KDD, pages 39–48, 2003
Crowdsourcing. What if you have used all the tricks to improve quality, but the quality is still far from perfect. Then you might need to turn to the crowd (e.g., Amazon Machenical Turk, CrowdFlower) for help. The basic idea of crowdsourced entity resolution is to build a hybrid human-machine system (like Iron Man) to address the ER problem. This is a very hot research topic. I recommend you to read the following paper to get some basic ideas.
Jiannan Wang, Tim Kraska, Michael Franklin, Jianhua Feng. CrowdER: Crowdsourcing Entity Resolution. PVLDB 5(11):1483-1494 (2012).
Implement preprocessDF, filtering, verification, and evaluate functions in entity_resolution.py
. Submit your code file (entity_resolution.py
) to the CourSys activity Assignment 4.