Performing Data Matching
DataFlow Matching Operators
The DataFlow operator library contains several prebuilt fuzzy matching operators. This topic covers each of those operators and provides details on how to use them.
Covered Matching Operations
Data Matching
DataFlow Matching Operators
The DataFlow matching library provides operators to discover duplicates or links between records. This topic covers each of those operators and provides details on how to use them.
Matching Operators
Using the DiscoverDuplicates Operator to Discover Duplicates
The
DiscoverDuplicates operator can be used to discover duplicate records within a single source using fuzzy matching operators.
The first step in a matching operation is to index the input data records into groups for processing by the configured phases of field comparisons, classifiers, and filter. This indexing is useful in potentially reducing the number of records that must be compared. The output of this step in the matching operation is a stream of record pairs that must be compared, classified, and filtered.
Record pair comparisons happen in configured phases. A matching operation may consist of a single phase. Each phase consists of a set of field comparisons, classifiers, and a filter. Field comparisons compare a field from each source using a fuzzy matching comparison operator. Each comparison outputs a field comparison score.
A classifier may be used to classify or aggregate multiple field scores into a single score. A classifier outputs a single value representing the composite score.
A phase may use zero to many classifiers and a classifier can be used to aggregate scores from many classifiers. A filter is the last step of a phase. The filter ensures that record pairs are pushed to the output stream only if they meet the filter criteria.The output of this matching operation is a stream of record pairs that are deemed to be likely matches. Each record pair will contain a record score that determines the strength of the match on the spectrum from zero to one. A score approaching zero is an unlikely match. A score approaching one is a very likely match.
Code Example
This example uses the accounts data set to try and find duplicates using a set of fuzzy matching rules.
Using the DiscoverDuplicates operator in Java
import static com.pervasive.datarush.matching.ClassifierType.WAVG;
import static com.pervasive.datarush.matching.FilterType.GTE;
import static com.pervasive.datarush.matching.Index.PROP_KEYS;
import static java.util.Arrays.asList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.io.WriteMode;
import com.pervasive.datarush.matching.Classifier;
import com.pervasive.datarush.matching.Comparison;
import com.pervasive.datarush.matching.ComparisonType;
import com.pervasive.datarush.matching.DiscoverDuplicates;
import com.pervasive.datarush.matching.Filter;
import com.pervasive.datarush.matching.Index;
import com.pervasive.datarush.matching.IndexType;
import com.pervasive.datarush.matching.Phase;
import com.pervasive.datarush.matching.Phase.CleanupMode;
import com.pervasive.datarush.matching.functions.StringEncodings;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedText;
import com.pervasive.datarush.operators.io.textfile.WriteDelimitedText;
import com.pervasive.datarush.operators.record.DeriveFields;
import com.pervasive.datarush.operators.record.FieldDerivation;
import com.pervasive.datarush.operators.record.RemoveFields;
/**
* Use the DiscoverDuplicates operator to find possible duplicate accounts
*
*/
public class DiscoverDuplicateAccounts {
public static void main(String[] args) {
// Create an empty logical graph
LogicalGraph app = LogicalGraphFactory.newLogicalGraph("DiscoverDuplicates");
// Create a delimited text reader for the "accounts0.csv" file
ReadDelimitedText reader = app.add(new ReadDelimitedText("data/accounts0.csv"));
reader.setHeader(true);
// Create a new city field to match with using the phonetic form of the city's name
DeriveFields city = app.add(new DeriveFields(FieldDerivation.derive("city_soundex", StringEncodings.soundex("city"))));
app.connect(reader.getOutput(), city.getInput());
// Configure the indexing properties for the matcher
Index index = new Index();
index.setType(IndexType.BLOCK);
index.addProperty(PROP_KEYS, new String[] {"zip", "city_soundex"});
// Configure the matching criteria
List<Comparison> comparisons = new ArrayList<Comparison>();
comparisons.add(new Comparison("first_name", ComparisonType.LEVENSHTEIN, "fn_score"));
comparisons.add(new Comparison("last_name", ComparisonType.JARO_WINKLER, "ln_score"));
comparisons.add(new Comparison("company", ComparisonType.JARO, "company_score"));
Map<String, Object> cprops = new HashMap<String, Object>();
cprops.put("q", Integer.valueOf(3));
cprops.put("maxDistance", Integer.valueOf(5));
comparisons.add(new Comparison("company", ComparisonType.POSITIONAL_QGRAM, "company_pqg_score", cprops));
// Configure the classifier the phase should use
Map<String, Object> weights = new HashMap<String, Object>();
weights.put("weights", new int[] {5, 20, 5, 2});
Classifier classifier = new Classifier(
WAVG,
new String[] {"fn_score", "ln_score", "company_score", "company_pqg_score"},
Classifier.DEFAULT_FIELD_NAME,
weights);
// Configure the filter to determine the minimum match requirement
Filter filter = new Filter(GTE, Filter.DEFAULT_FIELD_NAME, "minValue", 0.85D);
// Create a new phase for the operator
Phase phase1 = new Phase(comparisons, classifier, filter, CleanupMode.INTERMEDIATE);
// Initialize the DiscoverDuplicates operator
DiscoverDuplicates dedup = app.add(new DiscoverDuplicates(index, Arrays.asList(phase1)));
app.connect(city.getOutput(), dedup.getInput());
// Remove the derived city field from the output
RemoveFields rmField = app.add(new RemoveFields(asList("zip", "city_soundex")));
app.connect(dedup.getOutput(), rmField.getInput());
// Create a delimited text writer
WriteDelimitedText writerLeft = app.add(new WriteDelimitedText("results/dedup-accounts.txt", WriteMode.OVERWRITE));
writerLeft.setHeader(true);
writerLeft.setWriteSingleSink(true);
app.connect(rmField.getOutput(), writerLeft.getInput());
// Compile and run the graph
app.run();
}
}
Using the DiscoverDuplicates operator in RushScript
// Create block index on zip code and soundex of city
var index = new Index();
index.setType(IndexType.BLOCK);
index.addProperty('keys', ['zip', 'city_soundex']);
// Create a phase with comparisons, weighted average classifier and a filter
var phase = new Phase();
phase.addComparison('first_name', 'first_name', ComparisonType.LEVENSHTEIN, 'fn_score');
phase.addComparison('last_name', 'last_name', ComparisonType.LEVENSHTEIN, 'ln_score');
phase.addComparison('company', 'company', ComparisonType.JARO, 'company_score', {q:3, maxDistance:5 });
phase.addComparison('company', 'company', ComparisonType.POSITIONAL_QGRAM, 'company_pqg_score');
phase.addClassifier(ClassifierType.WAVG, ['fn_score', 'ln_score', 'company_score', 'company_pqg_score'], Classifier.PROP_WEIGHTS, [5, 20, 5, 2]);
phase.setFilter(FilterType.GTE, Filter.PROPS_MINVALUE, 0.85);
// Discover duplicates in the account data
var results = dr.discoverDuplicates(data, {index:index, phases:[phase]});
Properties
The
DiscoverDuplicates operator provides the following properties.
Ports
The
DiscoverDuplicates operator provides a single input port.
The
DiscoverDuplicates operator provides a single output port.
Using the DiscoverLinks Operator to Discover Links
The
DiscoverLinks operator uses fuzzy matching operators to discover linked records from two data sources.
The first step in a matching operation is to index the input data records into groups for processing by the configured phases of field comparisons, classifiers, and filter. This indexing is useful in potentially reducing the number of records that must be compared. The output of this step in the matching operation is a stream of record pairs that must be compared, classified, and filtered.
Record pair comparisons happen in configured phases. A matching operation may consist of a single phase. Each phase consists of a set of field comparisons, classifiers, and a filter. Field comparisons compare a field from each source using a fuzzy matching comparison operator. Each comparison outputs a field comparison score. A classifier may be used to classify or aggregate multiple field scores into a single score. A classifier outputs a single value representing the composite score.
A phase may use zero to many classifiers and a classifier can be used to aggregate scores from many classifiers. A filter is the last step of a phase. The filter ensures that record pairs are pushed to the output stream only if they meet the filter criteria.
The output of this matching operation is a stream of record pairs that are deemed to be likely matches. Each record pair will contain a record score that determined the strength of the match on the spectrum from zero to one. A score approaching zero is an unlikely match while a score approaching one is a very likely match.
Code Example
This example tries to use fuzzy matching rules to find links between two account data sets.
Using the DiscoverLinks operator in Java
import static com.pervasive.dataflow.matching.Classifier.PROP_WEIGHTS;
import static com.pervasive.dataflow.matching.ClassifierType.WAVG;
import static com.pervasive.dataflow.matching.ComparisonType.JARO_WINKLER;
import static com.pervasive.dataflow.matching.ComparisonType.SHORTHAND;
import static com.pervasive.dataflow.matching.Filter.PROPS_MINVALUE;
import static com.pervasive.dataflow.matching.FilterType.GTE;
import static com.pervasive.dataflow.matching.Index.PROP_LEFT_KEYS;
import static com.pervasive.dataflow.matching.Index.PROP_RIGHT_KEYS;
import static java.util.Arrays.asList;
import java.util.HashMap;
import java.util.Map;
import com.pervasive.dataflow.functions.Strings;
import com.pervasive.dataflow.graphs.LogicalGraph;
import com.pervasive.dataflow.graphs.LogicalGraphFactory;
import com.pervasive.dataflow.io.WriteMode;
import com.pervasive.dataflow.matching.ComparisonType;
import com.pervasive.dataflow.matching.DiscoverLinks;
import com.pervasive.dataflow.matching.Index;
import com.pervasive.dataflow.matching.IndexType;
import com.pervasive.dataflow.matching.Phase;
import com.pervasive.dataflow.matching.Phase.CleanupMode;
import com.pervasive.dataflow.operators.io.textfile.ReadDelimitedText;
import com.pervasive.dataflow.operators.io.textfile.WriteDelimitedText;
import com.pervasive.dataflow.operators.record.DeriveFields;
import com.pervasive.dataflow.operators.record.FieldDerivation;
import com.pervasive.dataflow.operators.record.RemoveFields;
/**
* Use the DiscoverLinks operator to find possible account links
*
*/
public class DiscoverLinkAccounts {
public static void main(String[] args) {
// Create an empty logical graph
LogicalGraph app = LogicalGraphFactory.newLogicalGraph("DiscoverLinks");
// Create a delimited text reader for the left input data set
ReadDelimitedText readLeft = app.add(new ReadDelimitedText("data/accounts1.csv"));
readLeft.setHeader(true);
DeriveFields zipLeft = app.add(new DeriveFields(FieldDerivation.derive("zip", Strings.substr("zip", 0, 5))));
app.connect(readLeft.getOutput(), zipLeft.getInput());
// Create a delimited text reader for the right input data set
ReadDelimitedText readRight = app.add(new ReadDelimitedText("data/accounts2.csv"));
readRight.setHeader(true);
DeriveFields zipRight = app.add(new DeriveFields(FieldDerivation.derive("zip", Strings.substr("zip", 0, 5))));
app.connect(readRight.getOutput(), zipRight.getInput());
// Configure the indexing properties for the matcher
Index index = new Index();
index.setType(IndexType.BLOCK);
index.setLeftFieldPattern("left_{0}");
index.setRightFieldPattern("right_{0}");
Map<String, Object> props = new HashMap<String, Object>();
props.put(PROP_LEFT_KEYS, new String[] {"file_id"});
props.put(PROP_RIGHT_KEYS, new String[] {"file_id"});
index.setProperties(props);
// Configure phase 1 of the matcher
Phase phase1 = new Phase(CleanupMode.ALL);
phase1.addComparison("zip", "zip", ComparisonType.EXACT_MATCH, "zip_score");
phase1.setFilter(GTE, "zip_score", PROPS_MINVALUE, Double.valueOf(0.5));
// Configure phase 2 of the matcher
Phase phase2 = new Phase(CleanupMode.INTERMEDIATE);
phase2.addComparison("first_name", "first_name", JARO_WINKLER, "jw_fname");
phase2.addComparison("last_name", "last_name", JARO_WINKLER, "jw_lname");
phase2.addClassifier(WAVG, new String[] {"jw_fname", "jw_lname"}, "name_score");
phase2.addComparison("company", "company", JARO_WINKLER, "jw_co");
phase2.addComparison("company", "company", SHORTHAND, "sh_co");
phase2.addClassifier(WAVG, new String[] {"jw_co", "sh_co"}, "company_score", PROP_WEIGHTS, new int[] {43, 57});
phase2.addComparison("street", "street", JARO_WINKLER, "jw_st");
phase2.addComparison("street", "street", SHORTHAND, "sh_st");
phase2.addClassifier(WAVG, new String[] {"jw_st", "sh_st"}, "street_score", PROP_WEIGHTS, new int[] {43, 57});
phase2.addComparison("city", "city", JARO_WINKLER, "city_score");
phase2.addClassifier(
WAVG,
new String[] {"name_score", "company_score", "street_score", "city_score"},
PROP_WEIGHTS,
new int[] {20, 25, 25, 30});
phase2.setFilter(GTE, PROPS_MINVALUE, 0.7);
// Initialize the DiscoverLinks operator
DiscoverLinks link = app.add(new DiscoverLinks(index, asList(phase1, phase2)));
app.connect(zipLeft.getOutput(), link.getLeft());
app.connect(zipRight.getOutput(), link.getRight());
// Remove the file_id field from the output
RemoveFields rmField = app.add(new RemoveFields(asList("file_id")));
app.connect(link.getOutput(), rmField.getInput());
// Create a delimited text writer
WriteDelimitedText writer = app.add(new WriteDelimitedText("results/link-accounts.txt", WriteMode.OVERWRITE));
writer.setHeader(true);
writer.setWriteSingleSink(true);
app.connect(rmField.getOutput(), writer.getInput());
// Compile and run the graph
app.run();
}
}
Using the DiscoverLinks operator in RushScript
// Create block index on zip code and soundex of city
var index = new Index();
index.setType(IndexType.BLOCK);
index.addProperty('leftKeys', ['zip', 'city_soundex']);
index.addProperty('rightKeys', ['zip', 'city_soundex']);
// Create a phase with comparisons, weighted average classifier and a filter
var phase = new Phase();
phase.addComparison('first_name', 'first_name', ComparisonType.LEVENSHTEIN, 'fn_score');
phase.addComparison('last_name', 'last_name', ComparisonType.LEVENSHTEIN, 'ln_score');
phase.addComparison('company', 'company', ComparisonType.JARO, 'company_score', {q:3, maxDistance:5 });
phase.addComparison('company', 'company', ComparisonType.POSITIONAL_QGRAM, 'company_pqg_score');
phase.addClassifier(ClassifierType.WAVG, ['fn_score', 'ln_score', 'company_score', 'company_pqg_score'], Classifier.PROP_WEIGHTS, [5, 20, 5, 2]);
phase.setFilter(FilterType.GTE, Filter.PROPS_MINVALUE, 0.85);
// Discover links in the account data
var results = dr.discoverLinks(splitData.output, splitData.rejects, {index:index, phases:[phase]});
Properties
The
DiscoverLinks operator provides the following properties.
Ports
The
DiscoverLinks operator provides the following input ports.
The
DiscoverLinks operator provides a single output port.
Data Clustering
DataFlow Cluster Operators
The DataFlow matching library provides operators for clustering the results of duplicate or linkage discovery. This section covers each of those operators and provides details on how to use them.
Clustering Operators
Using the ClusterDuplicates Operator to Cluster Duplicates
The
ClusterDuplicates operator transforms record pairs into clusters of like records, where the two sides of the pair are from the same source. The output of the
DiscoverDuplicates operator is a stream of record pairs.
Each pair of records has passed the given qualifications for being a potential match. This operator takes the record pair input and finds clusters of records that are alike. For example, a row contains records A and B, another contains records B and C. This operator will create a cluster for records A, B, and C; generate a unique cluster identifier for the grouping; and output a row for records A, B, and C with the generated cluster identifier.
A cluster may contain any number of records. Note that the original record pairings are lost, as are the scores.
Code Example
The following code fragment demonstrates how to set up the
ClusterDuplicates operator and use it within a simple dataflow.
Using the ClusterDuplicates operator in Java
// Create an empty logical graph
LogicalGraph graph = LogicalGraphFactory.newLogicalGraph();
// Create a delimited text reader for the "dedup-accounts.csv" file
ReadDelimitedText reader = graph.add(new ReadDelimitedText("data/dedup-accounts.csv"));
reader.setHeader(true);
// Create a ClusterDuplicates operator and set the id field
ClusterDuplicates cluster = graph.add(new ClusterDuplicates());
cluster.setDataIdField("id");
// Create a delimited text writer for the results
WriteDelimitedText writer = graph.add(new WriteDelimitedText("results/cluster-accounts.csv", WriteMode.OVERWRITE));
writer.setHeader(true);
writer.setWriteSingleSink(true);
// Connect the graph
graph.connect(reader.getOutput(), cluster.getInput());
graph.connect(cluster.getOutput(), writer.getInput());
// Compile and run the graph
graph.run();
Using the ClusterDuplicates operator in RushScript
var results = dr.clusterDuplicates(data, {dataIdField:"id"});
Properties
The
ClusterDuplicates operator provides the following properties.
Ports
The
ClusterDuplicates operator provides a single input port.
The
ClusterDuplicates operator provides a single output port.
Using the ClusterLinks Operator to Cluster Links
The
ClusterLinks operator transforms record pairs into clusters of like records. The output of the
DiscoverLinks operator is a stream of record pairs. Each pair of records has passed the given qualifications for being a potential match.
This operator takes the record pair input and finds clusters of records that are alike. For example, a row contains records A and B, another contains records B and C. This operator will create a cluster for records A, B, and C; generate a unique cluster identifier for the grouping; and output a row for records A, B, and C with the generated cluster identifier.
A cluster may contain any number of records. Note that the original record pairings are lost as are the scores.
Code Example
The following code fragment demonstrates how to set up the
ClusterLinks operator and use it within a simple dataflow.
Using the ClusterLinks operator in Java
// Create an empty logical graph
LogicalGraph graph = LogicalGraphFactory.newLogicalGraph();
// Create a delimited text reader for the "link-accounts.csv" file
ReadDelimitedText reader = graph.add(new ReadDelimitedText("data/link-accounts.csv"));
reader.setHeader(true);
// Create a ClusterLinks operator and set the id field and field patterns
ClusterLinks cluster = graph.add(new ClusterLinks());
cluster.setDataIdField("id");
cluster.setLeftFieldPattern("left_{0}");
cluster.setRightFieldPattern("right_{0}");
// Create a delimited text writer for the results
WriteDelimitedText writer = graph.add(new WriteDelimitedText("results/cluster-accounts.csv", WriteMode.OVERWRITE));
writer.setHeader(true);
writer.setWriteSingleSink(true);
// Connect the graph
graph.connect(reader.getOutput(), cluster.getInput());
graph.connect(cluster.getOutput(), writer.getInput());
// Compile and run the graph
graph.run();
Using the ClusterLinks operator in RushScript
var results = dr.clusterLinks(data, {dataIdField:"id", leftFieldPattern:"left_{0}", rightFieldPattern:"right_{0}"});
Properties
The
ClusterLinks operator provides the following properties.
Ports
The
ClusterLinks operator provides a single input port.
The
ClusterLinks operator provides a single output port.
Match Analysis
DataFlow Match Analysis
The DataFlow matching library provides operators to analyze data for approximate matching. This topic covers each of those operators and provides details on how to use them.
Match Analysis Operators
Using the AnalyzeDuplicateKeys Operator to Analyze Duplicates
The
AnalyzeDuplicateKeys operator can provide an analysis of the quality of a set of blocking keys over data to be deduplicated. As each record in a given block must be compared to every other record in the block during deduplication, the smaller the block sizes, the better the performance.
The
AnalyzeDuplicateKeys operator will output the results from its key analysis to the console.
Code Examples
The following code fragment demonstrates how to initialize the
AnalyzeDuplicateKeys operator.
Using the AnalyzeDuplicateKeys operator in Java
AnalyzeDuplicateKeys analyzer = graph.add(new AnalyzeDuplicateKeys());
analyzer.setBlockingKeys(Arrays.asList("keyField1", "keyField2"));
graph.connect(reader.getOutput(), analyzer.getInput());
Using the AnalyzeDuplicateKeys operator in RushScript
var results = dr.analyzeDuplicateKeys(data, {blockingKeys:["keyField1", "keyField2"]});
Properties
The
AnalyzeDuplicateKeys operator provides one property.
Ports
The
AnalyzeDuplicateKeys operator provides a single input port.
Using the AnalyzeLinkKeys Operator to Analyze Links
The
AnalyzeLinkKeys operator can provide an analysis of the quality of a set of blocking keys over two data sets to be linked. As each record in a given block on the left must be compared to every other record in the same block on the right during linking, the smaller the block sizes, the better the performance.
The
AnalyzeLinkKeys operator will output the results of its key analysis to the console.
Code Examples
The following code fragment demonstrates how to initialize the
AnalyzeLinkKeys operator.
Using the AnalyzeLinkKeys operator in Java
AnalyzeLinkKeys analyzer = graph.add(new AnalyzeLinkKeys());
analyzer.setLeftBlockingKeys(Arrays.asList("keyField1"));
analyzer.setRightBlockingKeys(Arrays.asList("keyField1"));
graph.connect(leftReader.getOutput(), analyzer.getLeft());
graph.connect(rightReader.getOutput(), analyzer.getRight());
Using the AnalyzeLinkKeys operator in RushScript
var results = dr.analyzeLinkKeys(data1, data2, {leftBlockingKeys:["keyField1"], rightBlockingKeys:["keyField1"]});
Properties
The
AnalyzeLinkKeys operator provides the following properties.
Ports
The
AnalyzeLinkKeys operator provides the following input ports.