Was this helpful?
Data Matching Operators
The DataFlow operator library includes several pre-built fuzzy matching operators, which can be used to identify duplicates or establish links between records. For more information, refer to the following topics:
DiscoverDuplicates Operator
The DiscoverDuplicates operator can be used to discover duplicate records within a single source using fuzzy matching operators.
The first step in a matching operation is to index the input data records into groups for processing by the configured phases of field comparisons, classifiers, and filter. This indexing is useful in potentially reducing the number of records that must be compared. The output of this step in the matching operation is a stream of record pairs that must be compared, classified, and filtered.
Record pair comparisons happen in configured phases. A matching operation may consist of a single phase. Each phase consists of a set of field comparisons, classifiers, and a filter. Field comparisons compare a field from each source using a fuzzy matching comparison operator. Each comparison outputs a field comparison score.
A classifier may be used to classify or aggregate multiple field scores into a single score. A classifier outputs a single value representing the composite score.
A phase may use zero to many classifiers and a classifier can be used to aggregate scores from many classifiers. A filter is the last step of a phase. The filter ensures that record pairs are pushed to the output stream only if they meet the filter criteria.The output of this matching operation is a stream of record pairs that are deemed to be likely matches. Each record pair will contain a record score that determines the strength of the match on the spectrum from zero to one. A score approaching zero is an unlikely match. A score approaching one is a very likely match.
Code Example
This example uses the accounts data set to try and find duplicates using a set of fuzzy matching rules.
Using the DiscoverDuplicates operator in Java
import static com.pervasive.datarush.matching.ClassifierType.WAVG;
import static com.pervasive.datarush.matching.FilterType.GTE;
import static com.pervasive.datarush.matching.Index.PROP_KEYS;
import static java.util.Arrays.asList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.io.WriteMode;
import com.pervasive.datarush.matching.Classifier;
import com.pervasive.datarush.matching.Comparison;
import com.pervasive.datarush.matching.ComparisonType;
import com.pervasive.datarush.matching.DiscoverDuplicates;
import com.pervasive.datarush.matching.Filter;
import com.pervasive.datarush.matching.Index;
import com.pervasive.datarush.matching.IndexType;
import com.pervasive.datarush.matching.Phase;
import com.pervasive.datarush.matching.Phase.CleanupMode;
import com.pervasive.datarush.matching.functions.StringEncodings;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedText;
import com.pervasive.datarush.operators.io.textfile.WriteDelimitedText;
import com.pervasive.datarush.operators.record.DeriveFields;
import com.pervasive.datarush.operators.record.FieldDerivation;
import com.pervasive.datarush.operators.record.RemoveFields;

/**
 * Use the DiscoverDuplicates operator to find possible duplicate accounts
 *
 */
public class DiscoverDuplicateAccounts {
    public static void main(String[] args) {

        // Create an empty logical graph
        LogicalGraph app = LogicalGraphFactory.newLogicalGraph("DiscoverDuplicates");
        
        // Create a delimited text reader for the "accounts0.csv" file
        ReadDelimitedText reader = app.add(new ReadDelimitedText("data/accounts0.csv"));
        reader.setHeader(true);
        
        // Create a new city field to match with using the phonetic form of the city's name
        DeriveFields city = app.add(new DeriveFields(FieldDerivation.derive("city_soundex", StringEncodings.soundex("city"))));
        app.connect(reader.getOutput(), city.getInput());
        
        // Configure the indexing properties for the matcher
        Index index = new Index();
        index.setType(IndexType.BLOCK);
        index.addProperty(PROP_KEYS, new String[] {"zip", "city_soundex"});
        
        // Configure the matching criteria
        List<Comparison> comparisons = new ArrayList<Comparison>();
        comparisons.add(new Comparison("first_name", ComparisonType.LEVENSHTEIN, "fn_score"));
        comparisons.add(new Comparison("last_name", ComparisonType.JARO_WINKLER, "ln_score"));
        comparisons.add(new Comparison("company", ComparisonType.JARO, "company_score"));
        Map<String, Object> cprops = new HashMap<String, Object>();
        cprops.put("q", Integer.valueOf(3));
        cprops.put("maxDistance", Integer.valueOf(5));
        comparisons.add(new Comparison("company", ComparisonType.POSITIONAL_QGRAM, "company_pqg_score", cprops));
        
        // Configure the classifier the phase should use
        Map<String, Object> weights = new HashMap<String, Object>();
        weights.put("weights", new int[] {5, 20, 5, 2});
        Classifier classifier = new Classifier(
                WAVG, 
                new String[] {"fn_score", "ln_score", "company_score", "company_pqg_score"},
                Classifier.DEFAULT_FIELD_NAME,
                weights);
        
        // Configure the filter to determine the minimum match requirement
        Filter filter = new Filter(GTE, Filter.DEFAULT_FIELD_NAME, "minValue", 0.85D);
        
        // Create a new phase for the operator
        Phase phase1 = new Phase(comparisons, classifier, filter, CleanupMode.INTERMEDIATE);
        
        // Initialize the DiscoverDuplicates operator
        DiscoverDuplicates dedup = app.add(new DiscoverDuplicates(index, Arrays.asList(phase1)));
        app.connect(city.getOutput(), dedup.getInput());
        
        // Remove the derived city field from the output
        RemoveFields rmField = app.add(new RemoveFields(asList("zip", "city_soundex")));
        app.connect(dedup.getOutput(), rmField.getInput());
        
        // Create a delimited text writer
        WriteDelimitedText writerLeft = app.add(new WriteDelimitedText("results/dedup-accounts.txt", WriteMode.OVERWRITE));
        writerLeft.setHeader(true);
        writerLeft.setWriteSingleSink(true);
        app.connect(rmField.getOutput(), writerLeft.getInput());
        
        // Compile and run the graph
        app.run();
    }
}
Using the DiscoverDuplicates operator in RushScript
// Create block index on zip code and soundex of city
var index = new Index();
index.setType(IndexType.BLOCK);
index.addProperty('keys', ['zip', 'city_soundex']);

// Create a phase with comparisons, weighted average classifier and a filter
var phase = new Phase();
phase.addComparison('first_name', 'first_name', ComparisonType.LEVENSHTEIN, 'fn_score');
phase.addComparison('last_name', 'last_name', ComparisonType.LEVENSHTEIN, 'ln_score');
phase.addComparison('company', 'company', ComparisonType.JARO, 'company_score', {q:3, maxDistance:5 });
phase.addComparison('company', 'company', ComparisonType.POSITIONAL_QGRAM, 'company_pqg_score');
phase.addClassifier(ClassifierType.WAVG, ['fn_score', 'ln_score', 'company_score', 'company_pqg_score'], Classifier.PROP_WEIGHTS, [5, 20, 5, 2]);
phase.setFilter(FilterType.GTE, Filter.PROPS_MINVALUE, 0.85);

// Discover duplicates in the account data
var results = dr.discoverDuplicates(data, {index:index, phases:[phase]});
Properties
The DiscoverDuplicates operator provides the following properties.
Name
Type
Description
index
Index
The pair generation method for determining initial candidate matches.
phases
List<Phase>
The phases of comparison, classifying and filtering used to determine matches.
Ports
The DiscoverDuplicates operator provides a single input port.
Name
Type
Get Method
Description
input
getInput()
The input data to check for duplicates.
The DiscoverDuplicates operator provides a single output port.
Name
Type
Get Method
Description
output
getOutput()
The record pairs that are considered likely matches.
DiscoverLinks Operator
The DiscoverLinks operator uses fuzzy matching operators to discover linked records from two data sources.
The first step in a matching operation is to index the input data records into groups for processing by the configured phases of field comparisons, classifiers, and filter. This indexing is useful in potentially reducing the number of records that must be compared. The output of this step in the matching operation is a stream of record pairs that must be compared, classified, and filtered.
Record pair comparisons happen in configured phases. A matching operation may consist of a single phase. Each phase consists of a set of field comparisons, classifiers, and a filter. Field comparisons compare a field from each source using a fuzzy matching comparison operator. Each comparison outputs a field comparison score. A classifier may be used to classify or aggregate multiple field scores into a single score. A classifier outputs a single value representing the composite score.
A phase may use zero to many classifiers and a classifier can be used to aggregate scores from many classifiers. A filter is the last step of a phase. The filter ensures that record pairs are pushed to the output stream only if they meet the filter criteria.
The output of this matching operation is a stream of record pairs that are deemed to be likely matches. Each record pair will contain a record score that determined the strength of the match on the spectrum from zero to one. A score approaching zero is an unlikely match while a score approaching one is a very likely match.
Code Example
This example tries to use fuzzy matching rules to find links between two account data sets.
Using the DiscoverLinks operator in Java
import static com.pervasive.dataflow.matching.Classifier.PROP_WEIGHTS;
import static com.pervasive.dataflow.matching.ClassifierType.WAVG;
import static com.pervasive.dataflow.matching.ComparisonType.JARO_WINKLER;
import static com.pervasive.dataflow.matching.ComparisonType.SHORTHAND;
import static com.pervasive.dataflow.matching.Filter.PROPS_MINVALUE;
import static com.pervasive.dataflow.matching.FilterType.GTE;
import static com.pervasive.dataflow.matching.Index.PROP_LEFT_KEYS;
import static com.pervasive.dataflow.matching.Index.PROP_RIGHT_KEYS;
import static java.util.Arrays.asList;
import java.util.HashMap;
import java.util.Map;
import com.pervasive.dataflow.functions.Strings;
import com.pervasive.dataflow.graphs.LogicalGraph;
import com.pervasive.dataflow.graphs.LogicalGraphFactory;
import com.pervasive.dataflow.io.WriteMode;
import com.pervasive.dataflow.matching.ComparisonType;
import com.pervasive.dataflow.matching.DiscoverLinks;
import com.pervasive.dataflow.matching.Index;
import com.pervasive.dataflow.matching.IndexType;
import com.pervasive.dataflow.matching.Phase;
import com.pervasive.dataflow.matching.Phase.CleanupMode;
import com.pervasive.dataflow.operators.io.textfile.ReadDelimitedText;
import com.pervasive.dataflow.operators.io.textfile.WriteDelimitedText;
import com.pervasive.dataflow.operators.record.DeriveFields;
import com.pervasive.dataflow.operators.record.FieldDerivation;
import com.pervasive.dataflow.operators.record.RemoveFields;

/**
 * Use the DiscoverLinks operator to find possible account links
 *
 */
public class DiscoverLinkAccounts {
    public static void main(String[] args) {

        // Create an empty logical graph
        LogicalGraph app = LogicalGraphFactory.newLogicalGraph("DiscoverLinks");
        
        // Create a delimited text reader for the left input data set
        ReadDelimitedText readLeft = app.add(new ReadDelimitedText("data/accounts1.csv"));
        readLeft.setHeader(true);
        DeriveFields zipLeft = app.add(new DeriveFields(FieldDerivation.derive("zip", Strings.substr("zip", 0, 5))));
        app.connect(readLeft.getOutput(), zipLeft.getInput());
        
        // Create a delimited text reader for the right input data set
        ReadDelimitedText readRight = app.add(new ReadDelimitedText("data/accounts2.csv"));
        readRight.setHeader(true);
        DeriveFields zipRight = app.add(new DeriveFields(FieldDerivation.derive("zip", Strings.substr("zip", 0, 5))));
        app.connect(readRight.getOutput(), zipRight.getInput());
        
        // Configure the indexing properties for the matcher
        Index index = new Index();
        index.setType(IndexType.BLOCK);
        index.setLeftFieldPattern("left_{0}");
        index.setRightFieldPattern("right_{0}");
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(PROP_LEFT_KEYS, new String[] {"file_id"});
        props.put(PROP_RIGHT_KEYS, new String[] {"file_id"});
        index.setProperties(props);
        
        // Configure phase 1 of the matcher
        Phase phase1 = new Phase(CleanupMode.ALL);
        phase1.addComparison("zip", "zip", ComparisonType.EXACT_MATCH, "zip_score");
        phase1.setFilter(GTE, "zip_score", PROPS_MINVALUE, Double.valueOf(0.5));
        
        // Configure phase 2 of the matcher
        Phase phase2 = new Phase(CleanupMode.INTERMEDIATE);
        phase2.addComparison("first_name", "first_name", JARO_WINKLER, "jw_fname");
        phase2.addComparison("last_name", "last_name", JARO_WINKLER, "jw_lname");
        phase2.addClassifier(WAVG, new String[] {"jw_fname", "jw_lname"}, "name_score");
        
        phase2.addComparison("company", "company", JARO_WINKLER, "jw_co");
        phase2.addComparison("company", "company", SHORTHAND, "sh_co");
        phase2.addClassifier(WAVG, new String[] {"jw_co", "sh_co"}, "company_score", PROP_WEIGHTS, new int[] {43, 57});
        
        phase2.addComparison("street", "street", JARO_WINKLER, "jw_st");
        phase2.addComparison("street", "street", SHORTHAND, "sh_st");
        phase2.addClassifier(WAVG, new String[] {"jw_st", "sh_st"}, "street_score", PROP_WEIGHTS, new int[] {43, 57});
        
        phase2.addComparison("city", "city", JARO_WINKLER, "city_score");
        
        phase2.addClassifier(
                WAVG, 
                new String[] {"name_score", "company_score", "street_score", "city_score"}, 
                PROP_WEIGHTS, 
                new int[] {20, 25, 25, 30});
        phase2.setFilter(GTE, PROPS_MINVALUE, 0.7);
        
        // Initialize the DiscoverLinks operator
        DiscoverLinks link = app.add(new DiscoverLinks(index, asList(phase1, phase2)));
        app.connect(zipLeft.getOutput(), link.getLeft());
        app.connect(zipRight.getOutput(), link.getRight());
        
        // Remove the file_id field from the output
        RemoveFields rmField = app.add(new RemoveFields(asList("file_id")));
        app.connect(link.getOutput(), rmField.getInput());
        
        // Create a delimited text writer
        WriteDelimitedText writer = app.add(new WriteDelimitedText("results/link-accounts.txt", WriteMode.OVERWRITE));
        writer.setHeader(true);
        writer.setWriteSingleSink(true);
        app.connect(rmField.getOutput(), writer.getInput());
        
        // Compile and run the graph
        app.run();
    }
}
Using the DiscoverLinks operator in RushScript
// Create block index on zip code and soundex of city
var index = new Index();
index.setType(IndexType.BLOCK);
index.addProperty('leftKeys', ['zip', 'city_soundex']);
index.addProperty('rightKeys', ['zip', 'city_soundex']);

// Create a phase with comparisons, weighted average classifier and a filter
var phase = new Phase();
phase.addComparison('first_name', 'first_name', ComparisonType.LEVENSHTEIN, 'fn_score');
phase.addComparison('last_name', 'last_name', ComparisonType.LEVENSHTEIN, 'ln_score');
phase.addComparison('company', 'company', ComparisonType.JARO, 'company_score', {q:3, maxDistance:5 });
phase.addComparison('company', 'company', ComparisonType.POSITIONAL_QGRAM, 'company_pqg_score');
phase.addClassifier(ClassifierType.WAVG, ['fn_score', 'ln_score', 'company_score', 'company_pqg_score'], Classifier.PROP_WEIGHTS, [5, 20, 5, 2]);
phase.setFilter(FilterType.GTE, Filter.PROPS_MINVALUE, 0.85);

// Discover links in the account data
var results = dr.discoverLinks(splitData.output, splitData.rejects, {index:index, phases:[phase]});
Properties
The DiscoverLinks operator provides the following properties.
Name
Type
Description
index
Index
The pair generation method for determining initial candidate matches.
phases
List<Phase>
The phases of comparison, classifying and filtering used to determine matches.
Ports
The DiscoverLinks operator provides the following input ports.
Name
Type
Get Method
Description
leftInput
getLeft()
The left side input data for the operation.
rightInput
getRight()
The right side input data for the operation.
The DiscoverLinks operator provides a single output port.
Name
Type
Get Method
Description
output
getOutput()
The record pairs that are considered likely matches.
Last modified date: 03/10/2025