Building DataFlow Applications : Building DataFlow Applications in Java : DataFlow Operator Library : Performing Data Cleansing
 
Share this page                  
Performing Data Cleansing
DataFlow Cleansing Operators
The DataFlow operator library contains several prebuilt data cleansing operators. This topic covers each of those operators and provides details on how to use them.
Covered Data Cleansing Operations
Using the RemoveDuplicates Operator to Remove Duplicates
Using the ReplaceMissingValues Operator to Replace Missing Values
Using the RemoveDuplicates Operator to Remove Duplicates
The RemoveDuplicates operator removes duplicate rows based on a specified set of group keys. The "first" record of a key value group is pushed to the output. Other records with the same key values are ignored. The "first" record of a key group is determined by sorting all rows of each key group by the specified SortKeys. If the SortKeys are unspecified, then this will output an arbitrary row.
Code Example
This example uses the ratings data set to remove all duplicates of the "userID" field. It will keep the record with the oldest timestamp from each set of duplicates.
Using the RemoveDuplicates operator in Java
import static com.pervasive.datarush.types.TokenTypeConstant.INT;
import static com.pervasive.datarush.types.TokenTypeConstant.STRING;
import static com.pervasive.datarush.types.TokenTypeConstant.record;
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.io.WriteMode;
import com.pervasive.datarush.operators.group.RemoveDuplicates;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedText;
import com.pervasive.datarush.operators.io.textfile.WriteDelimitedText;
import com.pervasive.datarush.schema.TextRecord;
import com.pervasive.datarush.types.RecordTokenType;

public class RemoveDuplicateRatings {
    public static void main(String[] args) {

        // Create an empty logical graph
        LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("RemoveDuplicates");

        // Create a delimited text reader for the "ratings.txt" file
        ReadDelimitedText reader = graph.add(new ReadDelimitedText("data/ratings.txt"));
        reader.setFieldSeparator("::");
        reader.setHeader(true);
        RecordTokenType ratingsType = record(INT("userID"), INT("movieID"), INT("rating"), STRING("timestamp"));
        reader.setSchema(TextRecord.convert(ratingsType));
       
        // Remove duplicates of the userID field and 
        // keep the record with the lowest timestamp
        RemoveDuplicates remDup = graph.add(new RemoveDuplicates());
        remDup.setGroupKeys(new String []{"userID"});
        remDup.setSortKeys("timestamp");
        
        // Connect reader to remove duplicates
        graph.connect(reader.getOutput(), remDup.getInput());
        
        // Create a delimited text writer
        WriteDelimitedText writer = graph.add(new WriteDelimitedText("results/dedup-ratings.txt", WriteMode.OVERWRITE));
        writer.setFieldDelimiter("");
        writer.setHeader(true);
        writer.setWriteSingleSink(true);    // want a single file output
        
        // Connect remove duplicates to writer
        graph.connect(remDup.getOutput(), writer.getInput());
        
        // Compile and run the graph
        graph.run();
    }
}
Using the RemoveDuplicates operator in RushScript
var results = dr.removeDuplicates(data, {groupKeys:["userID"], sortKeys:["timestamp"]});
Properties
The RemoveDuplicates operator provides the following properties.
Name
Type
Description
groupKeys
String[]
The keys by which to deduplicate.
sortKeys
String...
Provides additional ascending sorting within the group for finer control over which record is the result of the operation. The behavior is to output the first record within the group defined by the groupKeys, use this to control which record within that group will be first (the output).
sortKeys
SortKey[]
Same behavior but with even finer control over the direction of the sort order within the group.
Ports
The RemoveDuplicates operator provides a single input port.
Name
Type
Get Method
Description
input
getInput()
The input data to remove duplicates from.
The RemoveDuplicates operator provides a single output port.
Name
Type
Get Method
Description
output
getOutput()
The de-duplicated data.
Using the ReplaceMissingValues Operator to Replace Missing Values
The ReplaceMissingValues operator replaces missing values in the input data according to the given replacement specifications provided. Each specification provides an action to take and specifies the affected fields.
Some actions require an initial first pass through the data to calculate the required column values such as the minimum value, maximum value, mean or most frequent value. If any of these actions are specified, the data will be read to calculate the required values.
The next pass of the data applies the replacements specified by using the calculated data. Alternatively an input port is provided that may be used to provide the statistics.
The order of the input data is preserved wherever possible. However, when using the action to skip records with missing data, records may be reordered. This is due to how the data is partitioned for parallelization.
A PMML model is created that contains statistics about the number of records skipped and the number of field values replaced. This model is similar to the one created by the SummaryStatistics operator.
Code Example
This example below demonstrates replacing missing values in an input data set. It specifies the replacement rules using a list of ReplaceSpecification objects.
Using the ReplaceMissingValues operator in Java
// Define the replacement specifications
List<ReplaceSpecification> specs = new ArrayList<ReplaceSpecification>();
specs.add(min("sepal_length"));
specs.add(max("sepal_width"));
specs.add(mean("petal_length"));
specs.add(constant("petal_width", "0.0"));
specs.add(freq("class"));

// Create the ReplaceMissingValues operator
ReplaceMissingValues replace = graph.add(new ReplaceMissingValues());
replace.setReplaceSpecifications(specs);
Using the ReplaceMissingValues operator in RushScript
// Define the replacement specifications
var replaceRules = [
    ReplaceSpecification.min('sepal_length'), 
    ReplaceSpecification.max('sepal_width'),
    ReplaceSpecification.mean('petal_length'),
    ReplaceSpecification.constant('petal_width'),
    ReplaceSpecification.freq('class'),
];

// Apply the replacement rules to the input data
var rmvResult = dr.replaceMissingValues( input, {replaceSpecifications: replaceRules} );
Properties
The ReplaceMissingValues operator provides one property.
Name
Type
Description
specifications
A list of the value replacement specifications to apply to the input data.
Ports
The ReplaceMissingValues operator provides the following input ports.
Name
Type
Get Method
Description
input
getInput()
The data with missing values to be replaced.
statisticsInput
getStatisticsInput()
The optional model port for providing statistics for replacement specifications based on column statistics. If not connected and some replacements depend on statistics, then they will be automatically calculated on a first pass through the data. This port should not be used if only some of the necessary statistics are included.
The ReplaceMissingValues operator provides a single output port.
Name
Type
Get Method
Description
output
getOutput()
This will be the data with null values replaced as specified.
model
getModel()
Returns a PMML model with the skip and replacement statistics.