Was this helpful?
Performing SQL-Like Operations
Executing Relational Operations
DataFlow provides a set of operators to perform SQL or relational types of operations. Using DataFlow, data can be obtained from disparate sources and joined together, filtered, aggregated, or re-ordered as needed. This flexibility allows working with data using a common set of relational concepts and capabilities but without the constraints of having all data reside in one place.
The operators that implement relational functionality are composed and configured as other operators are within DataFlow. Some allow using a more SQL-like syntax for configuration. This allows for a more natural transition into using DataFlow.
Covered Relational Operations
Using the Group Operator to Compute Aggregations
Aggregating data involves computing aggregations on input fields across groups of the data. Groups are defined by key fields. Distinct combinations of key fields define a group. If no key fields are specified, an aggregation is over the whole data set.
The Group operator is used within DataFlow to aggregate data. The operator uses groups of consecutive equal keys ("key groups") to determine which data values to aggregate. The input data need not be sorted; if it is already sorted, performance will be optimal.
The Group operator supports setting the aggregations to apply in two ways:
Using static methods of the Aggregation class to create the wanted aggregators
Using a SQL-like syntax provided in textual form
Usage examples of both of these methods for setting aggregations are given below. For more information about the available aggregation functions and how to define them textually, see Aggregate Functions.
Code Examples
The following Java code example creates the same aggregations but uses the SQL-like support of the Group operator. This syntax can be more convenient and easier to read. The Group operator will parse the provided text into a list of aggregations to apply.
Using the Group operator in Java
// Create a group operator and set aggregations to perform.
// Note the use of "as" to specify the output name of an aggregation.
Group groupRatings = graph.add(new Group());
groupRatings.setKeys(new String[] {"userID"});
groupRatings.setAggregations(
        "count(rating) as ratingCount, " +
        "min(rating) as minRating, " +
        "max(rating) as maxRating, " +
        "avg(rating) as avgRating, " +
        "stddev(rating) as stddevRating");
Notes from this example:
The aggregations to apply are specified using the SQL-like syntax. Note that the individual statements are separated with commas. The string was broken into multiple parts for the sake of presentation.
The Group operator will parse the given text into the list of aggregations to apply. The results are the same. It is up to the user as to which method to use for specifying the aggregations.
Using the Group operator in RushScript
var aggsToApply =
    'count(rating) as ratingCount, ' +
    'min(rating) as minRating, ' +
    'max(rating) as maxRating, ' +
    'avg(rating) as avgRating, ' +
    'stddev(rating) as stddevRating';

var groupedData = dr.group(data, {keys:'userID', aggregations:aggsToApply});
The following Java code example performs the same aggregation of the input data. The difference is: static methods on the Aggregation class are used to build the list of aggregations to apply. This is an alternative to the more SQL-like syntax shown in the example above.
Using jthe Group operator with Aggregation
// Create a group operator and set aggregations to perform.
// Note the use of "as" to specify the output name of an aggregation.
Group groupRatings = graph.add(new Group());
groupRatings.setKeys(new String[] {"userID"});
groupRatings.setAggregations( new Aggregation[] {
        Aggregation.count().as("countRating"),
        Aggregation.min("rating").as("minRating"),
        Aggregation.max("rating").as("maxRating"),
        Aggregation.avg("rating").as("avgRating"),
        Aggregation.stddev("rating").as("stddevRating"),
        Aggregation.var("rating").as("varRating"),
});
Notes from the above code example:
The key field "userID" is set. The Group operator will output a row of aggregation results for each distinct value of the "userID" field.
The aggregations to apply are created. The static methods of the Aggregation class are used to create the aggregators. The input field to apply the aggregation to is provided. The function as() is used to specify directly the name to use for the output field of each aggregation. This is optional. The Group operator will generate a name if as() is not used.
The resultant data from executing the example is shown below:
userID,countRating,minRating,maxRating,avgRating,stddevRating,varRating
1,53,3,5,4.188679245283019,0.6745118087441317,0.4549661801352802
2,129,1,5,3.7131782945736433,0.9976235207438109,0.9952526891412768
3,51,1,5,3.9019607843137254,0.9752807933683049,0.9511726259131101
4,21,1,5,4.190476190476191,1.051939144494025,1.106575963718821
5,198,1,5,3.1464646464646466,1.1298353443381843,1.2765279053157836
6,71,1,5,3.9014084507042255,0.8248760824983851,0.6804205514778825
7,31,3,5,4.32258064516129,0.7355970484510563,0.5411030176899057
8,139,2,5,3.884892086330935,0.9219867248115556,0.8500595207287392
9,106,2,5,3.7358490566037736,0.8161331605238553,0.6660733357066569
10,201,2,5,4.074626865671642,0.8918665011063313,0.7954258557956496
To apply the aggregations over all of the input data, simply do not set the keys property. This is known as a "no-key" join. The aggregations are applied to all of the input data as a single group. Using the group operator without specifying keys results in a single row of data being written to the output. The row contains the aggregation values for all aggregators applied to the whole data set. The results using the same aggregations from the example above are shown below. Note that the "countRating" field is equal to 1000: this equals the number of rows of input data as expected.
countRating,minRating,maxRating,avgRating,stddevRating,varRating
1000,1,5,3.777,1.0066136299494461,1.0132710000000007
Properties
The Group operator provides the following properties.
Name
Type
Description
aggregations
Aggregation[] or String
The aggregations to apply to the data.
initialGroupCapacity
int
A hint as to the number of groups that are expected to be processed. When input data is unsorted, we will optimistically buffer input rows in order to attempt to reduce the amount of data to be sorted. This setting is ignored if input data is already sorted.
keyFieldPrefix
String
The prefix to add to key fields. Default: empty string.
keys
String[]
The names of the key fields. If empty, then all of the rows in the input are treated as one group.
maxGroupCapacity
int
The max number of groups to fit into internal memory buffers. A value of 0 means that this will grow unbounded. This setting is ignored if input data is already sorted.
fewGroupsHint
boolean
Can be set as a hint to the operator that the number of groups is expected to be small. If so, then the reduction step is performed as a non-parallel operation. This option has been shown to reduce overhead when running in distributed mode.
Ports
The Group operator provides a single input port.
Name
Type
Get Method
Description
input
getInput()
Aggregations will be applied to the input data from this port.
The Group operator provides a single output port.
Name
Type
Get Method
Description
output
getOutput()
Contains the results of the aggregations being applied. The schema will consist of the key fields and one field per specified aggregation. A row is output for each key group or one row if no keys are specified.
Filtering and Sampling Data Sets
Filtering Data
DataFlow provides multiple ways of reducing the size of a data set. Data sets can be filtered based on criteria placed on the records themselves, similar to using a WHERE clause in a SQL SELECT statement. Alternatively, records can by selected from a set based on external criteria, such as taking the first N or a random sample.
Covered Filtering and Sampling Operations
Using the FilterRows Operator to Filter by Predicate
The FilterRows operator filters input data by a given set of predicates. Row selection is controlled by evaluation of each predicate against each input row. Rows for which the predicates evaluate to true are emitted on the output flow. A secondary flow consisting of any rows for which the predicates are false or null is also produced.
The predicates to apply within the FilterRows operator may be built in one of two ways:
Using static methods on the Predicates class. Each predicate function provides at least one static method within the Predicates class that can be used to create the predicate.
Using a SQL-like syntax for building a predicate expression. The expression is parsed resulting in a predicate function to apply.
Either method of building the predicate to apply in FilterRows can be used. The list of valid expressions for building a predicate function follows:
field_name function <field_name | constant_value>
field_name is [ not ] null
field_name [ not ] like string_literal
field_name [ not ] in (constant_value, constant_value, ...)
For more information about the expression language syntax and functions, see Expression Language. When used for filtering rows, the top-level expression must return a Boolean (true or false) value.
Code Example
The following code example uses the FilterRows operator applied to the movie ratings data. Only movies with a certain value in the ratings field pass the filter and are pushed to the output port. All other rows are written to the reject port.
Using the FilterRows operator in Java
// Retain rows with rating values between 2 and 4 inclusive
FilterRows filter = graph.add(new FilterRows());
filter.setPredicate("rating >= 2 and rating <= 4");
Notes from the FilterRows example:
The SQL-like syntax is used to specify the predicate to execute. The value of the rating field must be between 2 and 4 (inclusive) to pass the filter.
Note that the filter predicate could also have been written as "rating in (2, 3, 4)". This implies that the ratings are whole numbers. If the ratings can be floating point (that is, a rating of 3.5 is valid), then using the "in" clause would not work.
The FilterRows operator supports two output ports named output and rejects. The output port contains the data that passed the filter. The rejects port contains data that failed the filter. The respective output ports of the operator are obtained to link to operators downstream.
Output ports are always optional in the sense that they do not have to be consumed. In the above example, if the rejected data is not important, the rejects port can be ignored and not connected.
Using the FilterRows operator in RushScript
var filteredData = dr.filterRows(data, {predicate:'rating >= 2 and rating <= 4'});
Within RushScript, operators that support multiple outputs return a variable that can be used to access the outputs by name. For example, the FilterRows operator has two outputs, named "output" and "rejects." The returned variable contains member variables of the same name that can be accessed directly.
Properties
The FilterRows operator has one property.
Name
Type
Description
predicate
The predicate to apply to the input data. The predicate can be expressed as a scalar function that returns a boolean value or as a SQL-like text expression that will be compiled into a scalar valued function.
Ports
The FilterRows operator provides a single input port.
Name
Type
Get Method
Description
input
getInput()
The predicate will be applied to data from the input port
The FilterRows operator provides the following output ports.
Name
Type
Get Method
Description
output
getOutput()
Contains input data rows that passed the predicate filter (the predicate function returned true).
rejects
getRejects()
Contains input data rows that failed the predicate filter (the predicate function returned false).
Using the FilterExistingRows Operator to Filter by Data Set Membership
The FilterExistingRows operator filters a dataset based on the intersection with another dataset using one or more key values, similar to how EXISTS and NOT EXISTS predicates behave in SQL. Rows from the left that also exist on the right side are emitted on the output flow. A secondary output flow containing rows from the left which do not exist on the right is also produced. Existence is defined as having key field values that are equal. In terms of relational algebra, this operator performs both a left semi-join and left anti-join.
Depending on the value of the useHashJoinHint property, one of two procedures is used:
1. If useHashJoinHint is false, input data will be sorted and hash partitioned by the specified keys (if not already sorted according to upstream metadata). Once sorted and partitioned, data is then combined in a streaming fashion. Note that in the case that a join condition is specified, this will require buffering on the right side, increasing memory requirements if the right has a large number of records with duplicate keys.
2. If useHashJoinHint is true, a full copy of the data from the right will be distributed to the cluster and loaded into memory within each node in the cluster. The left side will not be sorted or partitioned. When enabling this behavior the right side data should always be relatively small.
Note:  This operator replaces the now deprecated SemiJoin operator, calculating the results from both its modes of operation simultaneously.
Code Example
The following code example uses the user ratings and movie information datasets. The ratings are joined with the movie listings. The results from the output will include those rows that have corresponding user ratings for the movie, while the results from the rejected output will be those rows that do not have a corresponding listing in the movie data.
Using the FilterExistingRows operator in Java
// Using FilterExistingRows
FilterExistingRows joiner = graph.add(new FilterExistingRows());
joiner.setJoinKeys(JoinKey.keys("movieID"));    // Join on movieID field
joiner.setUseHashJoinHint(true);                // Hash join if possible
Using the FilterExistingRows operator in RushScript
// Use a FilterExistingRows with hashing
var matchData = dr.filterExistingRows(leftData, rightData, {joinKeys:'movieID', useHashJoinHint:true});
Properties
The FilterExistingRows operator has the following properties.
Name
Type
Description
joinCondition
ScalarValuedFunction(predicate)
If non-null, the condition is evaluated for every possible matching row. Only those rows for which the keys match and the condition returns true will be considered a match. Note that, if specified, the left fields and right fields must be distinct from each other. This is because the function will be evaluated in a namespace containing all fields from the left and right.
useHashJoinHint
boolean
Controls whether to perform the join as a hash join. If so, all rows on the right side will be loaded into memory and joined with the left. This is an optimization for the case where the right side is small enough to fit into memory since it implies that the left side need not be sorted or repartitioned.
joinKeys
List of keys to use for the join. Each element of the array contains the pair (leftName, rightName), where leftName is the name of the key field on the left and rightName is the name of the corresponding key field on the right.
Ports
The FilterExistingRows operator provides the following input ports.
Name
Type
Get Method
Description
left
getLeft()
The data to be filtered.
right
getRight()
The data used as the filtering criteria.
The FilterExistingRows operator provides the following output ports.
Name
Type
Get Method
Description
output
getOutput()
Data from the left matching at least one record on the right by key values.
rejects
getRejects()
Data from the left matching no records on the right by key values.
Using the LimitRows Operator to Limit Output Rows
The LimitRows operator truncates its input flow to a fixed number of output records. The output is a limited "window" on the original input. This operator is most often used on the output of a sort, reducing the original unsorted data to the top N rows (as measured by the sort criteria). By default, all records are passed through unless explicitly specified.
Code Example
The following code example uses the LimitRows operator to skip the first 5 rows of its input and then output the next 10 rows. A LogRows operator is connected to the output of LimitRows to dump out the resultant data.
Using the LimitRows operator in Java
// Skip the first 5 rows. Output the next 10.
LimitRows limitRows = graph.add(new LimitRows());
limitRows.setSkip(5);
limitRows.setCount(10);
Notes from the example:
The number of rows to output is specified. Note that the LimitRows operator can be run in parallel mode. The specified number of rows will be output per stream. For example, with the row count set to 10 and parallelism set to 8, a total of 80 rows will be output, 10 rows per replicate of the LimitRows operators.
The number of initial rows to skip is set. This is an optional setting.
LimitRows can be used with any operator that outputs record ports or consumes record ports.
Using the LimitRows operator in RushScript
var limitedData = dr.limitRows(data, {skip:5, count:10});
Properties
The LimitRows operator has the following properties.
Name
Type
Description
count
long
The maximum number of records that will be output. If not set, all input rows will be output.
skip
long
The number of input records to skip before outputting records. Default: 0
Ports
The LimitRows operator provides a single input port.
Name
Type
Get Method
Description
input
getInput()
A limited window of this input data will be pushed to the output.
The LimitRows operator provides a single output port.
Name
Type
Get Method
Description
output
getOutput()
Contains a limited window of rows of data from the input. The output port has the same schema as the input.
Using the SampleRandomRows Operator to Sample Data
The SampleRandomRows operator is used to randomly select a subset of rows from an input data set.
The schema of the output data matches that of the input data. The output data usually contains fewer rows than the input. The number of rows in the output varies depending on the value of the percent or sampleSize property.
The sampling can be executed in one of two mutually exclusive modes:
BY_PERCENT: the specified percentage of rows will be output.
BY_SIZE: the rows output depend on the given sample size and the total number of rows in the input data.
For example, using BY_PERCENT mode with 10000 input rows and percent set to 0.25, you can expect approximately 2500 rows of output. This value is not exact. It will vary with different settings of the seed property.
In contrast, using BY_SIZE mode with any input data size and sampleSize set to 2500, you can expect approximately 2500 rows of output. This value is not exact. It will vary with different settings of the seed property. Use BY_SIZE when you want to have a specific number of rows in the output. The sampleSize property sets an upper limit on the number of rows that will be output.
The seed property is set to the current time (System.currentTimeMillis()) by default. Override this value to specify the random seed to use.
Code Example
The following code example uses the SampleRandomRows operator to randomly sample approximately 25% of the ratings input data.
Using the SampleRandomRows operator in Java
// Sample 25% of the data
SampleRandomRows sample = graph.add(new SampleRandomRows());
sample.setMode(SampleMode.BY_PERCENT);
sample.setPercent(0.25);
sample.setSeed(123456789L);
Using the SampleRandomRows operator in RushScript
// Sample 25% of the given data
var sampledData = dr.sample(data, {mode:'BY_PERCENT', percent:0.25, seed:123456789});
Properties
The SampleRandomRows operator has the following properties.
Name
Type
Description
mode
The mode to use when applying the random sampling. This can be set to one of two possible enums:
BY_PERCENT: Output a percentage of the input data.
BY_SIZE: Output a given sample size of the input data.
percent
double
Sets the percentage of input data wanted. This value must be in the range 0 < percent < 1.0. This value is only used if the sample mode is set to BY_PERCENT.
sampleSize
long
Sets the desired sample size in rows. The output will contain approximately this many rows. This value is only used if the sample mode is set to BY_SIZE.
seed
long
Sets the random number generator’s seed value. This value is used to randomly select rows that will be added to the output data.
Ports
The SampleRandomRows operator provides a single input port.
Name
Type
Get Method
Description
input
getInput()
The input data that will be sampled.
The SampleRandomRows operator provides a single output port.
Name
Type
Get Method
Description
output
getOutput()
The randomly sampled data.
Merging Data Sets
Variations of SQL Join
DataFlow supports multiple ways of merging data sets together. The most commonly used is the standard relational join. This is similar to a SQL join statement in that it joins two data sets based on key fields. Four join modes are supported: inner, full outer, left outer, and right outer. See the section on the standard relational join for more information.
A non-key-based join, also known as a cross join, produces a full Cartesian product of the input data sets. Unlike a standard relational join, no key fields need to be specified. The resultant data will join each row in the left side of the data with every row in the right side of the data. This results in an exponential increase of the input data and so should be used with caution.
In certain conditions data sets need to be merged together into a single set without using a relational or cross-product join. This can be accomplished by the union of the two data sets. This is similar to the SQL union statement. See the section on data unions for more information.
Covered Joining Operations
Using the Join Operator to Do Standard Relational Joins
The Join operator performs a relational equi-join on two input data sets by a specified set of keys. The four standard join modes are supported. See the documentation on the joinMode property below for more information on join modes.
The join can also be optionally constrained by a predicate expression. When a predicate expression is used with join, rows will only match if their key field values are equal and the given predicate expression returns true. This allows the join user to specify flexible constraints on how to join the input data.
The predicate expression can involve fields from both input data sets. Be aware that the predicate expression is run against the data within the namespace of the joined data. At this point the fields from the left side and right side data set are merged together and any name conflicts have been resolved. Predicate expressions for joins are optional. If a predicate expression is not specified, the join condition is the default: match on key field values.
Depending on the value of the hashJoinHint property, one of two procedures is used:
1. If hashJoinHint is false, input data will be sorted and hash partitioned by the specified keys (if not already sorted and partitioned). Once sorted and partitioned, data is is then combined in a streaming fashion. Note that if a key group contains multiple rows on both sides, then all combinations of left and right rows in the key group appear in the output for that key group. In this case, memory consumption is lower if the side with more key repetitions is linked to the left input.
2. If hashJoinHint is true, a full copy of the data from the right will be distributed to the cluster and loaded into memory within each node in the cluster. The left side will not be sorted or partitioned. When enabling the hashJoinHint, the right side data should be relatively small since hashjoin’s memory consumption increases with the size of the right data set. This can lead to out-of-memory errors if there is not enough memory available on the partition to keep the entire right side's hash table in memory during the join.
Tips:
When using a very small right data set with a hash join it may improve performance if parallelism is disabled for the reader on the right side if no other data manipulations are being performed.
Depending on the memory used during the hash joins on the cluster, it may be useful to adjust the Garbage Collector settings on the worker JVMs to avoid the out-of-memory errors when it is caused occasionally. Use the option -XX:+UseG1GC to improve the performance in a few cases. If out of memory errors occur frequently with the hash joins, you must increase the memory available on the workers or reduce the size of the data read on the right side.
Note:  hashJoinHint is currently ignored if performing a full outer or right outer join.
Code Example
The following example performs a join between two data sets. One contains user ratings of movies and the other contains movie information.
The application reads the input data, joins the data using the "movieID" field, and then writes the results. A hash join is used by enabling the hashJoinHint property. This implies the movie information data set will be loaded into memory to speed the join processing. Because a left outer join is being used, all of the rows of user ratings will be included in the output. User ratings without a corresponding movie record will have null values for all movie data set fields.
Using the Join operator in Java
// Join two datasets by the "movieID" field
Join joiner = graph.add(new Join());
joiner.setJoinKeys(JoinKey.keys("movieID"));    // Join on movieID field
joiner.setJoinMode(JoinMode.LEFT_OUTER);        // Use a left outer join
joiner.setUseHashJoinHint(true);                // Enable hash join
joiner.setMergeLeftAndRightKeys(true);          // Only output one set of key values
Using the Join operator in RushScript
// Two data sources have to be passed into the join operator
var joinedData = dr.join(leftData, rightData, {joinKeys:'movieID', joinMode:'LEFT_OUTER', useHashJoinHint:true, mergeLeftAndRightKeys:true});
The following Java code snippet can be used to set a predicate expression that will be used during the join processing. Rows will only be joined if their key field values match and the predicate evaluates to true.
Using a Predicate Expression in Join
// Use the SQL-like predicate syntax for specifying the join condition
joiner.setJoinCondition("rating = 5 and genre like \"Drama\"");
Properties
The Join operator has the following properties.
Name
Type
Description
joinCondition
If non-null, the condition is evaluated for every possible matching row. Only those rows for which the keys match and the condition returns true will be output as an inner join. When determining the additional set of rows to output for an outer join, we output the set for which the join condition never evaluated to true. Note that, if specified, the left fields and right fields must be distinct from each other. This is because the function will be evaluated in a namespace containing all fields from the left and right.
useHashJoinHint
boolean
Controls whether to perform the join as a hash join. If so, all rows on the right side will be loaded into memory and joined with the left. This is an optimization for the case where the right side is small enough to fit into memory since it implies that the left side need not be sorted or repartitioned.
joinKeys
List of keys to use for the join. Each element of the array contains the pair (leftName, rightName), where leftName is the name of the field on the left and rightName is the name of the field to join on the right.
mergeLeftAndRightKeys
boolean
Controls whether to merge the left and right key fields into a single field for output. If true, output keys will be named after the left. If false, left and right keys will be output independently. Default: false.
joinMode
Specifies how the operator should handle rows with unmatched keys. These are rows from one of the inputs of the join that do not join with any row from the other input. Such rows may be dropped from the join, or retained by padding with nulls to populate fields where data is not available. Possible values are:
INNER(default): Performs an inner join: discard input rows with unmatched keys.
FULL_OUTER: Performs a full outer join: retains rows with unmatched keys.
LEFT_OUTER: Performs a left outer join: retains rows from the left input with unmatched keys, but discards such rows from the right input.
RIGHT_OUTER: Performs a right outer join: retains rows from the right input with unmatched keys, but discards such rows from the left input.
Ports
The Join operator provides the following input ports.
Name
Type
Get Method
Description
left
getLeft()
The left side of the join.
right
getRight()
The right side of the join. This should generally be connected to the smaller of the two inputs.
The Join operator provides a single output port.
Name
Type
Get Method
Description
output
getOutput()
The joined data.
Using the CrossJoin Operator to Cross Products
The CrossJoin operator produces the Cartesian product of two sets of records. The output is typed using the merge of the two input types; if the right type contains a field already named in the left type, it will be renamed to avoid collision.
To generate the output record pairs, the right side data is temporarily stored to disk for multiple passes. The left side data is read into memory in chunks, which are then used to generate a full set of pairs with the iterable right side data. This process is repeated until the left side data is exhausted.
No guarantee is made in respect to the ordering of the output data.
Caution!  The CrossJoin operator produces a product of its two input data sets. Even with fairly small data sets the amount of output can be large. Use with caution.
Code Example
The following code example reads user ratings and movie listings. The two data sets are joined using the CrossJoin operator. The results are written to a local file.
Using the CrossJoin operator in Java
// Use CrossJoin to create the Cartesian product
CrossJoin joiner = graph.add(new CrossJoin());
joiner.setBufferRows(100000);                   // Specify the buffer size in rows
Using the CrossJoin operator in RushScript
var joinedData = dr.crossJoin(leftData, rightData, {bufferRows:100000});
Properties
The CrossJoin operator has the following property.
Name
Type
Description
bufferRows
int
The size (in rows) of the memory buffer used to cache input rows for join processing. Larger values can increase performance due to decreased intermediate file buffering. Default: 10,000 rows.
Ports
The CrossJoin operator provides the following input ports.
Name
Type
Get Method
Description
left
getLeftInput()
The left side of the join.
right
getRightInput()
The right side of the join. This should generally be connected to the smaller of the two inputs.
The CrossJoin operator provides a single output port.
Name
Type
Get Method
Description
output
getOutput()
The joined data.
Using the UnionAll Operator to Create Union of Data Sets
The UnionAll operator provides an unordered union of two input data sources. The output will contain all of the rows of both input data sets. This is similar to the UNION ALL construct available in standard SQL. The input data is usually consumed as it becomes available and as such, order of the output data is non-deterministic. However, if the data order specified in the metadata of both inputs match, then the operator will perform a sorted merge preserving the sort order of the input. The metadata specifying the data order can be set through Using the Sort Operator to Sort Data Sets or Using the AssertSorted Operator to Assert Data Ordering.
The type of the output is determined by setting the union mode. If the outputMapping setting is set to MAPBYSCHEMA, then a schema must be provided that will define the output type. Otherwise the output type can be automatically determined by setting MAPBYPOSITION or MAPBYNAME, which will determine an appropriate output type by mapping the two inputs by position or name, respectively.
In the case where a target schema is provided, the input fields are mapped to output fields based on field name. If a field in the output is not contained in the target schema, the field is dropped. If a field is contained in the output schema, but not in the input, the output field will contain NULL values. Input values are converted into the specified output field type where possible.
For example, if the left input contains fields {a:int, b:int, c:string} and the right input contains fields {a:long, b:double, d:string} and the target schema specifies fields {a:double, c:string, d:date, e:string} then the following is true:
The output port schema will contain the fields {a:double, c:string, d:date, e:string}.
Field b is ignored from both inputs.
When obtaining data from the left input port, the output fields {d, e} will contain null values.
When obtaining data from the right input port, the output fields {c, e} will contain null values.
Field a from the left input will be converted from an integer type to a double type.
Field a from the right input will be converted from a long type to a double type.
Field d from the right input will be converted from a string type to a date type. The format pattern specified in the target schema will be used for the conversion, if specified.
In the case where a target schema is not provided, the two input ports must have compatible types. In this case the operator will try to determine valid output fields based on the left and right input and two settings.
The outputMapping setting can be set to MAPBYNAME if the left and right side should be matched by field name. Otherwise the operator can use MAPBYPOSITION and they will be matched by position.
Additionally the includeExtraFields setting can be set to true if fields that are only present on one side of the input should be retained. If the field is not present in one of the inputs it will contain NULL values. If extra fields are present in one of the inputs and includeExtraFields is set to false, the extra fields will be dropped.
Code Examples
The following example demonstrates creating a UnionAll operator within a logical graph. The outputs from two separate operators are connected to the left and right inputs of the union operator. The creation and the graph and the source operators are not shown. The output of the union operator can be accessed using the getOutput() method.
Using the UnionAll operator in Java
// Use UnionAll to create the union of the two datasets
UnionAll unionOp = graph.add(new UnionAll());
graph.connect(source1.getOutput(), unionOp.getLeft());
graph.connect(source2.getOutput(), unionOp.getRight());
The following example demonstrates using the UnionAll operator within RushScript. This example is setting the schema property of UnionAll to coerce the two inputs into the provided target schema.
Using the UnionAll operator in RushScript
// Define a schema that is usable by the union operator
var irisschema = dr.schema()
    .nullable(true)
    .trimmed(true)
    .nullIndicator("?")
    .DOUBLE("sepal length")
    .DOUBLE("sepal width")
    .DOUBLE("petal length")
    .DOUBLE("petal width")
    .STRING("class", "Iris-setosa", "Iris-versicolor", "Iris-virginica");
 
// Apply union to two data sets using the defined schema.
var results = dr.unionAll(data1, data2, {schema:irisschema});
Properties
The UnionAll operator supports the following properties.
Name
Type
Description
includeExtraFields
boolean
Whether or not fields that are only present on one side of the input will be retained when automatically determining the output through MAPBYPOSITION or MAPBYNAME mode. Default: false.
mapByName
UnionMode
Determines the output type of the operator. Can be set to MAPBYPOSITION, MAPBYNAME, or MAPBYSCHEMA. If the first two options are used, the output type will be automatically determined by mapping the fields in the two inputs positionally or by name, respectively. Otherwise if MAPBYSCHEMA is used the schema must be set. Default: MAPBYPOSITION.
schema
RecordTextSchema
The target schema that defines the type of the output port of the union operator. When provided, the input data will be converted into the form specified by this schema. This is an optional property that uses MAPBYSCHEMA mode.
Ports
The UnionAll operator provides the following input ports.
Name
Type
Get Method
Description
left
getLeft()
The left side of the union.
right
getRight()
The right side of the union.
The UnionAll operator provides a single output port. The output port will contain all rows from the two inputs. The schema of the output port will match the provided target schema. If one is not provided, the output port type will match the type of the left hand side input port.
Name
Type
Get Method
Description
output
getOutput()
The joined data.
Using the Sort Operator to Sort Data Sets
The Sort operator is used to sort input data. It will order the data according to one or more specified sort keys.
The Sort operator is parallelized so as to be able to scale to large amounts of data possibly residing on multiple nodes. In order to scale, the Sort operator itself performs a partial sort. A partial sort is a sort in which each partition of the data is sorted but there exists no ordered relationship among the rows of different partitions. If the source data is already partitioned, the existing partitions will be sorted. If the source data is not partitioned, the data will first be partitioned in a round-robin fashion and then sorted.
Any operations that require a gather of input data will preserve sort order. A gather refers to bringing all the data to a single node. Therefore the data will represent a total ordering following a gather. A total ordering is one where all elements are sorted with respect to each other. Because any data sinks (writers) that write to a single file perform a gather, those data sinks will produce a total ordering. In the example that follows, we write to a single file and thus produce a total ordering. Note that the use of a single sink should only be used in cases where the output data is known to be relatively small. This is because gathering the data is inherently nonscalable.
The Sort operator is special because the framework often uses it implicitly, since most operators explicitly specify data distribution and data ordering. For example, the Join operator declares that its inputs must be sorted, and the framework automatically does that sorting, so you do not need to insert a sort before a join. However, you may need a sort before operators such as Run JavaScript, where execution of JavaScript code may have a data order dependency that the DataFlow execution environment is not aware of.
Here are other cases where you may want to use the Sort operator:
When results are written to a file. Sort can be used just before the final writer operator to achieve the needed output order. If the data is already ordered according to the declared metadata, the framework automatically skips the Sort.
To control performance settings for a sort that would otherwise be implicit. By using the Sort operator and choosing its settings (for example, sortBufferSize property), you override the implicit sort. If you want your settings used everywhere, the EngineConfig class provides a way to globally configure implicit sorts. See Engine Configuration Settings.
Code Example
The following example demonstrates the usage of the Sort operator. The data input to the operator is sorted by the field named LAST_NAME in ascending order and the field GRADE in descending order. By default, data is sorted in ascending order.
Using the Sort operator in Java
// Sort the input data by the LAST_NAME field in ascending order
// and the GRADE field in descending order.
Sort sorter = graph.add(new Sort());
sorter.setSortKeys(new SortKey[] {SortKey.asc("LAST_NAME"), SortKey.desc("GRADE")});
Using the Sort operator in RushScript
// Sort the input data by the LAST_NAME field in ascending order
// and the GRADE field in descending order.
var sortedData = dr.sort(data, {sortKeys:['LAST_NAME asc', 'GRADE desc']});
By default, data is sorted in ascending order. When you supply only one key field and the order is ascending, shortcuts can be taken. The following example demonstrates using a single sort key in default order.
Using a single, default order key in Java
Sort sorter = graph.add(new Sort());
sorter.setSortKeys("LAST_NAME");
Properties
The Sort operator provides the following properties.
Name
Type
Description
sortKeys
String...
The list of fields to sort by. In the case of multiple fields, data is ordered first by the first key, then by the second, and so on.
sortKeys
The list of sort keys to sort by. A SortKey contains the pair (fieldName, ordering) where fieldName is the name of a field in the input and ordering indicates whether ascending or descending order is required. In the case of multiple fields, data is ordered first by the first key, then by the second, and so on.
maxMerge
int
Controls the maximum number of run files to merge at one time. Small values will reduce memory usage during the merge phase but can force multiple merge passes over the runs. Large values decrease the number of passes, but at a cost of memory; each run being merged will require a buffer of ioBufferSize bytes. If not set or set to zero, this first defaults to EngineConfig.sort.maxMerge. If that is also unspecified or set to zero, maxMerge is calculated by the following formula:
maxMerge = sortBufferSize / ioBufferSize
sortBuffer
String
Specifies an approximate cap on the amount of memory used by the sort. If the data is unable to fit into memory, it will be written to intermediate storage as required.
Values are supplied as number strings, supporting an optional size suffix. The common suffixes K, M, and G are supported, having the expected meaning; suffixes are case-insensitive, so you may use lowercase. Omitting the suffix indicates the value is in bytes. Values are limited to the range from 1M to 2G. Values outside of this range will be adjusted to the nearest limit.
If not specified or the value is zero, this first defaults to EngineConfig.sort.getSortBufferSize(). If that is also unspecified or the value is zero, this defaults to 100M.
sortBufferSize
long
Similar to sortBuffer, the only difference is that the value is specified as a long byte count rather than a string.
ioBuffer
String
Specifies the size of the memory buffers used for I/O operations on run files during sorting. Note that one buffer is required for each run being merged in the merge phase; this value impacts the total memory used during this phase.
Values are supplied as number strings, supporting an optional size suffix. The common suffixes K, M, and G are supported, having the expected meaning; suffixes are case-insensitive. Omitting the suffix indicates the value is in bytes.
If not specified or the value is zero, this first defaults to EngineConfig.sort.getIOBufferSize(). If that is also unspecified or its value is zero, this defaults to 64K.
ioBufferSize
long
Similar to ioBuffer, the only difference is that the value is specified as a long byte count rather than a string.
Note:  The sort tuning properties (maxMerge, sortBufferSize, and ioBufferSize) can be controlled globally through the EngineConfig class. See Engine Configuration Settings for more information.
Ports
The Sort operator provides a single input port.
Name
Type
Get Method
Description
input
getInput()
The data to be sorted.
The Sort operator provides a single output port.
Name
Type
Get Method
Description
output
getOutput()
The sorted data.
Using the ProcessByGroup Operator to Process Data by Distinct Key Groups
The ProcessByGroup operator segregates data into groups by the distinct values of the provided key fields. Each data group is streamed into another Dataflow application spawned by the operator. The spawned application of the operator is composed using the provided RushScript script. The RushScript is executed for each distinct key group and is passed a context object containing information about the current data group. The graph composed by RushScript is run and streamed the data for the data group.
The application composed by the provided RushScript is provided a result set containing the data of the current data group. The application can read other data sources as needed. There are no constraints placed on the composed application. The composed application can use the full breadth of Dataflow operators.
By default the ProcessByGroup operator will evaluate the given RushScript text or source file. The code contained within the source will compose the Dataflow application to execute. Optionally, a function name can be provided. This function will be invoked after all sources are evaluated. The function can provide further composition of the application. If a file name is provided to the RushScript source file, the name must resolve to a valid file name or be found in the include directories configured. The include directories can only be configured when composing a DataFlow application using RushScript or can be created and passed in as script options.
The ProcessByGroup operator may itself be composed by RushScript. In this case, the environment of the composing RushScript will be inherited by the spawned RushScript engines for each group. The inherited environment includes the following items:
Include directories
Whether script extensions are enabled or disabled
Strictness mode
Context information is passed to the RushScript script in the form of a variable named "context". The name of the variable is controlled by the contextVariableName property. The context variable contains the following data members:
Variable name
JavaScript Type
Description
source
ResultSet
Contains the ResultSet that can be used to access the data of the current data group. This variable can be passed as a result set to any operators composed within RushScript. See the code examples for how the result set can be used.
keys
Array (String)
Contains the current set of distinct key values. These values will differ for each key group. These are provided as a convenience to the RushScript as the key values may be used to affect downstream composition.
partitionID
Number
The ProcessByGroup operator supports parallel execution. The partitionID variable uniquely identifies which partition the current data group is being processed within. It can be used in conjunction with the groupID variable to uniquely identify the current data grouping. For example, the partitionID and groupID can be used within a file name to create a unique output file (or file set) for each data group.
partitionCount
Number
Provides the total number of parallel data streams being created for the job containing this ProcessByGroup operator.
groupID
Number
Uniquely defines a data group within the parallel data stream processing the data group. When used in conjunction with the partitionID, it uniquely defines the data group for the application.
WARNING!  It may be tempting to embed the provided key values in items such as file names. However, care should be taken. The key values may contain characters that are not valid for file names or other purposes. To generate a unique file name for each key group, the context.partitionID and context.groupID variables can be used. See the following examples.
Code Examples
Let’s say we want to train a classification model for each unique value of a field within our input data. We can use the ProcessByGroup operator to segregate the data by distinct values of this field and compose and execute an application to build the classification model for each data group. We can then later use the multiple decision tree models in an ensemble model to apply to data.
Here’s a snippet of data that we will want to process. The file is in ARFF format. We will segregate the data by the "outlook" field and build a model using the "windy" field as the target variable. Since we are segregating the data by the "outlook" field, we will expect three models to be built, one for each distinct value of "outlook" ("sunny", "overcast", and "rainy").
@relation weather
@attribute outlook {sunny, overcast, rainy}
@attribute temperature real
@attribute humidity real
@attribute windy {TRUE, FALSE}
@attribute play {yes, no}
@data
sunny,85,85,FALSE,no
sunny,80,90,TRUE,no
overcast,83,86,FALSE,yes
rainy,70,96,FALSE,yes
rainy,68,80,FALSE,yes
rainy,65,70,TRUE,no
overcast,64,65,TRUE,yes
sunny,72,95,FALSE,no
sunny,69,70,FALSE,yes
rainy,75,80,FALSE,yes
sunny,75,70,TRUE,yes
overcast,72,90,TRUE,yes
overcast,81,75,FALSE,yes
rainy,71,91,TRUE,no
The RushScript application that we want to execute follows. Note that the input data is provided in the "context.source" variable.
RushScript script to execute
// Remove the "outlook field". Do not want to include it in the model.
// Note the use of context.source as the input data set to the removeFields function.
var data = dr.removeFields(context.source, {fieldNames:'outlook'});

// Train a decision tree model using the "windy" field as the target variable.
var results = dr.decisionTreeLearner(data, {targetColumn:'windy'});

// Store the PMML file containing the model for this data group.
// Note the usage of a passed in variable (outputDir) and the creation of a unique file name
dr.writePMML(results, {targetPathName:outputDir + '/windy-model_' + context.partitionID + '_' + context.groupID + '.pmml'});
The following code demonstrates how to invoke the ProcessByGroup operator on the source data using the above script.
Using the ProcessByGroup operator in Java
// Create the operator in a graph
ProcessByGroup pbg = graph.add(new ProcessByGroup());
pbg.setKeys("outlook");

// Use the script that trains a decision tree model
pbg.setScriptFile(new File("decisiontree.js"));

// Set a variable that will be available in the RushScript environment
String targetDir = ... // where the output models should be written
pbg.addVariable("outputDir", targetDir);
Using the ProcessByGroup operator in RushScript
// Want to specify the output directory programatically
var targetDir = ...;

// Invoke the process by group operator function. It has no return value.
// The "data" variable is assumed to result from a previous operator function call.
dr.processByGroup(data, {keys:'outlook', variables:{outputDir:targetDir}, scriptFile:'decisiontree.js'});
Properties
The ProcessByGroup operator has the following properties.
Name
Type
Description
contextVariableName
String
(Optional) The name of the variable injected into the RushScript environment that contains context of the current sub-graph composition and execution. The default value is "context".
functionName
String
The name of a JavaScript function to invoke after the JavaScript source has been evaluated. This function can provide (additional) composition of the Dataflow application. This is useful when a single source file has many functions that provide composition of different applications. In this way, the same script file can be used to compose and run different applications. This is an optional property.
keys
String[] or List<String>
The list of field names used to define key groups. At least one field name must be provided.
script
String
The text of the RushScript code executed for each distinct key group. This RushScript code is used to compose the application that processes each data group. This property is required. A convenience method named setScriptFile() is provided that accepts a java.io.File object. When used, the script text will be read from the provided file.
scriptFileName
String
The pathname to the source file containing the RushScript code to execute for each distinct key group. If the file name is not an absolute path and not found in the current directory, the file will be searched for within the configured include directories. The include directories are only valid if the application containing this operator was composed using RushScript. If the file is not found in the include directories, an exception is issued. The file must exist on the machine where the application is composed.
scriptOptions
The scripting command line options applicable to operators that may spawn a script itself, such as the ProcessByGroup operator. This property is used internally by the scripting framework and usually does not have to be used by end users. It can be used to artificially set up the scripting options wanted to be passed on to scripting environments set up by the ProcessByGroup operator.
variables
Map<String, Object>
(Optional) A map of variable names to their values. The variable names must be valid JavaScript variable names. These variables will be injected into the RushScript environment during the composition phase of each sub-graph executed for data groups. A convenience method named addVariable() can be used to set a single variable.
Ports
The ProcessByGroup operator provides a single input port.
Name
Type
Get Method
Description
input
getInput()
Input data
The ProcessByGroup operator has no output ports.
Last modified date: 01/06/2023