Performing SQL-Like Operations
Executing Relational Operations
DataFlow provides a set of operators to perform SQL or relational types of operations. Using DataFlow, data can be obtained from disparate sources and joined together, filtered, aggregated, or re-ordered as needed. This flexibility allows working with data using a common set of relational concepts and capabilities but without the constraints of having all data reside in one place.
The operators that implement relational functionality are composed and configured as other operators are within DataFlow. Some allow using a more SQL-like syntax for configuration. This allows for a more natural transition into using DataFlow.
Covered Relational Operations
Using the Group Operator to Compute Aggregations
Aggregating data involves computing aggregations on input fields across groups of the data. Groups are defined by key fields. Distinct combinations of key fields define a group. If no key fields are specified, an aggregation is over the whole data set.
The
Group operator is used within DataFlow to aggregate data. The operator uses groups of consecutive equal keys ("key groups") to determine which data values to aggregate. The input data need not be sorted; if it is already sorted, performance will be optimal.
The
Group operator supports setting the aggregations to apply in two ways:
• Using static methods of the
Aggregation class to create the wanted aggregators
• Using a SQL-like syntax provided in textual form
Usage examples of both of these methods for setting aggregations are given below. For more information about the available aggregation functions and how to define them textually, see
Expression Language for Aggregate Functions.
Code Examples
The following Java code example creates the same aggregations but uses the SQL-like support of the
Group operator. This syntax can be more convenient and easier to read. The
Group operator will parse the provided text into a list of aggregations to apply.
Using the Group operator in Java
// Create a group operator and set aggregations to perform.
// Note the use of "as" to specify the output name of an aggregation.
Group groupRatings = graph.add(new Group());
groupRatings.setKeys(new String[] {"userID"});
groupRatings.setAggregations(
"count(rating) as ratingCount, " +
"min(rating) as minRating, " +
"max(rating) as maxRating, " +
"avg(rating) as avgRating, " +
"stddev(rating) as stddevRating");
Notes from this example:
• The aggregations to apply are specified using the SQL-like syntax. Note that the individual statements are separated with commas. The string was broken into multiple parts for the sake of presentation.
• The
Group operator will parse the given text into the list of aggregations to apply. The results are the same. It is up to the user as to which method to use for specifying the aggregations.
Using the Group operator in RushScript
var aggsToApply =
'count(rating) as ratingCount, ' +
'min(rating) as minRating, ' +
'max(rating) as maxRating, ' +
'avg(rating) as avgRating, ' +
'stddev(rating) as stddevRating';
var groupedData = dr.group(data, {keys:'userID', aggregations:aggsToApply});
The following Java code example performs the same aggregation of the input data. The difference is: static methods on the
Aggregation class are used to build the list of aggregations to apply. This is an alternative to the more SQL-like syntax shown in the example above.
Using jthe Group operator with Aggregation
// Create a group operator and set aggregations to perform.
// Note the use of "as" to specify the output name of an aggregation.
Group groupRatings = graph.add(new Group());
groupRatings.setKeys(new String[] {"userID"});
groupRatings.setAggregations( new Aggregation[] {
Aggregation.count().as("countRating"),
Aggregation.min("rating").as("minRating"),
Aggregation.max("rating").as("maxRating"),
Aggregation.avg("rating").as("avgRating"),
Aggregation.stddev("rating").as("stddevRating"),
Aggregation.var("rating").as("varRating"),
});
Notes from the above code example:
• The key field "userID" is set. The Group operator will output a row of aggregation results for each distinct value of the "userID" field.
• The aggregations to apply are created. The static methods of the
Aggregation class are used to create the aggregators. The input field to apply the aggregation to is provided. The function as() is used to specify directly the name to use for the output field of each aggregation. This is optional. The
Group operator will generate a name if as() is not used.
The resultant data from executing the example is shown below:
userID,countRating,minRating,maxRating,avgRating,stddevRating,varRating
1,53,3,5,4.188679245283019,0.6745118087441317,0.4549661801352802
2,129,1,5,3.7131782945736433,0.9976235207438109,0.9952526891412768
3,51,1,5,3.9019607843137254,0.9752807933683049,0.9511726259131101
4,21,1,5,4.190476190476191,1.051939144494025,1.106575963718821
5,198,1,5,3.1464646464646466,1.1298353443381843,1.2765279053157836
6,71,1,5,3.9014084507042255,0.8248760824983851,0.6804205514778825
7,31,3,5,4.32258064516129,0.7355970484510563,0.5411030176899057
8,139,2,5,3.884892086330935,0.9219867248115556,0.8500595207287392
9,106,2,5,3.7358490566037736,0.8161331605238553,0.6660733357066569
10,201,2,5,4.074626865671642,0.8918665011063313,0.7954258557956496
To apply the aggregations over all of the input data, simply do not set the keys property. This is known as a "no-key" join. The aggregations are applied to all of the input data as a single group. Using the group operator without specifying keys results in a single row of data being written to the output. The row contains the aggregation values for all aggregators applied to the whole data set. The results using the same aggregations from the example above are shown below. Note that the "countRating" field is equal to 1000: this equals the number of rows of input data as expected.
countRating,minRating,maxRating,avgRating,stddevRating,varRating
1000,1,5,3.777,1.0066136299494461,1.0132710000000007
Properties
The
Group operator provides the following properties.
Ports
The
Group operator provides a single input port.
The
Group operator provides a single output port.
Filtering and Sampling Data Sets
Filtering Data
DataFlow provides multiple ways of reducing the size of a data set. Data sets can be filtered based on criteria placed on the records themselves, similar to using a WHERE clause in a SQL SELECT statement. Alternatively, records can by selected from a set based on external criteria, such as taking the first N or a random sample.
Covered Filtering and Sampling Operations
Using the FilterRows Operator to Filter by Predicate
The
FilterRows operator filters input data by a given set of predicates. Row selection is controlled by evaluation of each predicate against each input row. Rows for which the predicates evaluate to true are emitted on the output flow. A secondary flow consisting of any rows for which the predicates are false or null is also produced.
The predicates to apply within the
FilterRows operator may be built in one of two ways:
• Using static methods on the
Predicates class. Each predicate function provides at least one static method within the
Predicates class that can be used to create the predicate.
• Using a SQL-like syntax for building a predicate expression. The expression is parsed resulting in a predicate function to apply.
Either method of building the predicate to apply in
FilterRows can be used. The list of valid expressions for building a predicate function follows:
• field_name function <field_name | constant_value>
• field_name is [ not ] null
• field_name [ not ] like string_literal
• field_name [ not ] in (constant_value, constant_value, ...)
For more information about the expression language syntax and functions, see
Expression Language. When used for filtering rows, the top-level expression must return a Boolean (true or false) value.
Code Example
The following code example uses the
FilterRows operator applied to the movie ratings data. Only movies with a certain value in the ratings field pass the filter and are pushed to the output port. All other rows are written to the reject port.
Using the FilterRows operator in Java
// Retain rows with rating values between 2 and 4 inclusive
FilterRows filter = graph.add(new FilterRows());
filter.setPredicate("rating >= 2 and rating <= 4");
Notes from the
FilterRows example:
• The SQL-like syntax is used to specify the predicate to execute. The value of the rating field must be between 2 and 4 (inclusive) to pass the filter.
• Note that the filter predicate could also have been written as "rating in (2, 3, 4)". This implies that the ratings are whole numbers. If the ratings can be floating point (that is, a rating of 3.5 is valid), then using the "in" clause would not work.
• The
FilterRows operator supports two output ports named output and rejects. The output port contains the data that passed the filter. The rejects port contains data that failed the filter. The respective output ports of the operator are obtained to link to operators downstream.
• Output ports are always optional in the sense that they do not have to be consumed. In the above example, if the rejected data is not important, the rejects port can be ignored and not connected.
Using the FilterRows operator in RushScript
var filteredData = dr.filterRows(data, {predicate:'rating >= 2 and rating <= 4'});
Within RushScript, operators that support multiple outputs return a variable that can be used to access the outputs by name. For example, the FilterRows operator has two outputs, named "output" and "rejects." The returned variable contains member variables of the same name that can be accessed directly.
Properties
The
FilterRows operator has one property.
Ports
The
FilterRows operator provides a single input port.
The
FilterRows operator provides the following output ports.
Using the FilterExistingRows Operator to Filter by Data Set Membership
The
FilterExistingRows operator filters a dataset based on the intersection with another dataset using one or more key values, similar to how EXISTS and NOT EXISTS predicates behave in SQL. Rows from the left that also exist on the right side are emitted on the output flow. A secondary output flow containing rows from the left which do not exist on the right is also produced. Existence is defined as having key field values that are equal. In terms of relational algebra, this operator performs both a left semi-join and left anti-join.
Depending on the value of the useHashJoinHint property, one of two procedures is used:
1. If useHashJoinHint is false, input data will be sorted and hash partitioned by the specified keys (if not already sorted according to upstream metadata). Once sorted and partitioned, data is then combined in a streaming fashion. Note that in the case that a join condition is specified, this will require buffering on the right side, increasing memory requirements if the right has a large number of records with duplicate keys.
2. If useHashJoinHint is true, a full copy of the data from the right will be distributed to the cluster and loaded into memory within each node in the cluster. The left side will not be sorted or partitioned. When enabling this behavior the right side data should always be relatively small.
Note: This operator replaces the now deprecated
SemiJoin operator, calculating the results from both its modes of operation simultaneously.
Code Example
The following code example uses the user ratings and movie information datasets. The ratings are joined with the movie listings. The results from the output will include those rows that have corresponding user ratings for the movie, while the results from the rejected output will be those rows that do not have a corresponding listing in the movie data.
Using the FilterExistingRows operator in Java
// Using FilterExistingRows
FilterExistingRows joiner = graph.add(new FilterExistingRows());
joiner.setJoinKeys(JoinKey.keys("movieID")); // Join on movieID field
joiner.setUseHashJoinHint(true); // Hash join if possible
Using the FilterExistingRows operator in RushScript
// Use a FilterExistingRows with hashing
var matchData = dr.filterExistingRows(leftData, rightData, {joinKeys:'movieID', useHashJoinHint:true});
Properties
The
FilterExistingRows operator has the following properties.
Ports
The
FilterExistingRows operator provides the following input ports.
The
FilterExistingRows operator provides the following output ports.
Using the LimitRows Operator to Limit Output Rows
The
LimitRows operator truncates its input flow to a fixed number of output records. The output is a limited "window" on the original input. This operator is most often used on the output of a sort, reducing the original unsorted data to the top
N rows (as measured by the sort criteria). By default, all records are passed through unless explicitly specified.
Code Example
The following code example uses the
LimitRows operator to skip the first 5 rows of its input and then output the next 10 rows. A
LogRows operator is connected to the output of
LimitRows to dump out the resultant data.
Using the LimitRows operator in Java
// Skip the first 5 rows. Output the next 10.
LimitRows limitRows = graph.add(new LimitRows());
limitRows.setSkip(5);
limitRows.setCount(10);
Notes from the example:
• The number of rows to output is specified. Note that the
LimitRows operator can be run in parallel mode. The specified number of rows will be output per stream. For example, with the row count set to 10 and parallelism set to 8, a total of 80 rows will be output, 10 rows per replicate of the
LimitRows operators.
• The number of initial rows to skip is set. This is an optional setting.
•
LimitRows can be used with any operator that outputs record ports or consumes record ports.
Using the LimitRows operator in RushScript
var limitedData = dr.limitRows(data, {skip:5, count:10});
Properties
The
LimitRows operator has the following properties.
Ports
The
LimitRows operator provides a single input port.
The
LimitRows operator provides a single output port.
Using the SampleRandomRows Operator to Sample Data
The
SampleRandomRows operator is used to randomly select a subset of rows from an input data set.
The schema of the output data matches that of the input data. The output data usually contains fewer rows than the input. The number of rows in the output varies depending on the value of the percent or sampleSize property.
The sampling can be executed in one of two mutually exclusive modes:
• BY_PERCENT: the specified percentage of rows will be output.
• BY_SIZE: the rows output depend on the given sample size and the total number of rows in the input data.
For example, using BY_PERCENT mode with 10000 input rows and percent set to 0.25, you can expect approximately 2500 rows of output. This value is not exact. It will vary with different settings of the seed property.
In contrast, using BY_SIZE mode with any input data size and sampleSize set to 2500, you can expect approximately 2500 rows of output. This value is not exact. It will vary with different settings of the seed property. Use BY_SIZE when you want to have a specific number of rows in the output. The sampleSize property sets an upper limit on the number of rows that will be output.
The seed property is set to the current time (System.currentTimeMillis()) by default. Override this value to specify the random seed to use.
Code Example
The following code example uses the
SampleRandomRows operator to randomly sample approximately 25% of the ratings input data.
Using the SampleRandomRows operator in Java
// Sample 25% of the data
SampleRandomRows sample = graph.add(new SampleRandomRows());
sample.setMode(SampleMode.BY_PERCENT);
sample.setPercent(0.25);
sample.setSeed(123456789L);
Using the SampleRandomRows operator in RushScript
// Sample 25% of the given data
var sampledData = dr.sample(data, {mode:'BY_PERCENT', percent:0.25, seed:123456789});
Properties
The
SampleRandomRows operator has the following properties.
Ports
The
SampleRandomRows operator provides a single input port.
The
SampleRandomRows operator provides a single output port.
Merging Data Sets
Variations of SQL Join
DataFlow supports multiple ways of merging data sets together. The most commonly used is the standard relational join. This is similar to a SQL join statement in that it joins two data sets based on key fields. Four join modes are supported: inner, full outer, left outer, and right outer. See the section on the standard relational join for more information.
A non-key-based join, also known as a cross join, produces a full Cartesian product of the input data sets. Unlike a standard relational join, no key fields need to be specified. The resultant data will join each row in the left side of the data with every row in the right side of the data. This results in an exponential increase of the input data and so should be used with caution.
In certain conditions data sets need to be merged together into a single set without using a relational or cross-product join. This can be accomplished by the union of the two data sets. This is similar to the SQL union statement. See the section on data unions for more information.
Covered Joining Operations
Using the Join Operator to Do Standard Relational Joins
The
Join operator performs a relational equi-join on two input data sets by a specified set of keys. The four standard join modes are supported. See the documentation on the joinMode property below for more information on join modes.
The join can also be optionally constrained by a predicate expression. When a predicate expression is used with join, rows will only match if their key field values are equal and the given predicate expression returns true. This allows the join user to specify flexible constraints on how to join the input data.
The predicate expression can involve fields from both input data sets. Be aware that the predicate expression is run against the data within the namespace of the joined data. At this point the fields from the left side and right side data set are merged together and any name conflicts have been resolved. Predicate expressions for joins are optional. If a predicate expression is not specified, the join condition is the default: match on key field values.
Depending on the value of the hashJoinHint property, one of two procedures is used:
1. If hashJoinHint is false, input data will be sorted and hash partitioned by the specified keys (if not already sorted and partitioned). Once sorted and partitioned, data is is then combined in a streaming fashion. Note that if a key group contains multiple rows on both sides, then all combinations of left and right rows in the key group appear in the output for that key group. In this case, memory consumption is lower if the side with more key repetitions is linked to the left input.
2. If hashJoinHint is true, a full copy of the data from the right will be distributed to the cluster and loaded into memory within each node in the cluster. The left side will not be sorted or partitioned. When enabling the hashJoinHint, the right side data should be relatively small since hashjoin’s memory consumption increases with the size of the right data set. This can lead to out-of-memory errors if there is not enough memory available on the partition to keep the entire right side's hash table in memory during the join.
Tips:
• When using a very small right data set with a hash join it may improve performance if parallelism is disabled for the reader on the right side if no other data manipulations are being performed.
• Depending on the memory used during the hash joins on the cluster, it may be useful to adjust the Garbage Collector settings on the worker JVMs to avoid the out-of-memory errors when it is caused occasionally. Use the option -XX:+UseG1GC to improve the performance in a few cases. If out of memory errors occur frequently with the hash joins, you must increase the memory available on the workers or reduce the size of the data read on the right side.
Note: hashJoinHint is currently ignored if performing a full outer or right outer join.
Code Example
The following example performs a join between two data sets. One contains user ratings of movies and the other contains movie information.
The application reads the input data, joins the data using the "movieID" field, and then writes the results. A hash join is used by enabling the hashJoinHint property. This implies the movie information data set will be loaded into memory to speed the join processing. Because a left outer join is being used, all of the rows of user ratings will be included in the output. User ratings without a corresponding movie record will have null values for all movie data set fields.
Using the Join operator in Java
// Join two datasets by the "movieID" field
Join joiner = graph.add(new Join());
joiner.setJoinKeys(JoinKey.keys("movieID")); // Join on movieID field
joiner.setJoinMode(JoinMode.LEFT_OUTER); // Use a left outer join
joiner.setUseHashJoinHint(true); // Enable hash join
joiner.setMergeLeftAndRightKeys(true); // Only output one set of key values
Using the Join operator in RushScript
// Two data sources have to be passed into the join operator
var joinedData = dr.join(leftData, rightData, {joinKeys:'movieID', joinMode:'LEFT_OUTER', useHashJoinHint:true, mergeLeftAndRightKeys:true});
The following Java code snippet can be used to set a predicate expression that will be used during the join processing. Rows will only be joined if their key field values match and the predicate evaluates to true.
Using a Predicate Expression in Join
// Use the SQL-like predicate syntax for specifying the join condition
joiner.setJoinCondition("rating = 5 and genre like \"Drama\"");
Properties
The
Join operator has the following properties.
Ports
The
Join operator provides the following input ports.
The
Join operator provides a single output port.
Using the CrossJoin Operator to Cross Products
The
CrossJoin operator produces the Cartesian product of two sets of records. The output is typed using the merge of the two input types; if the right type contains a field already named in the left type, it will be renamed to avoid collision.
To generate the output record pairs, the right side data is temporarily stored to disk for multiple passes. The left side data is read into memory in chunks, which are then used to generate a full set of pairs with the iterable right side data. This process is repeated until the left side data is exhausted.
No guarantee is made in respect to the ordering of the output data.
Caution! The CrossJoin operator produces a product of its two input data sets. Even with fairly small data sets the amount of output can be large. Use with caution.
Code Example
The following code example reads user ratings and movie listings. The two data sets are joined using the
CrossJoin operator. The results are written to a local file.
Using the CrossJoin operator in Java
// Use CrossJoin to create the Cartesian product
CrossJoin joiner = graph.add(new CrossJoin());
joiner.setBufferRows(100000); // Specify the buffer size in rows
Using the CrossJoin operator in RushScript
var joinedData = dr.crossJoin(leftData, rightData, {bufferRows:100000});
Properties
The
CrossJoin operator has the following property.
Ports
The
CrossJoin operator provides the following input ports.
The
CrossJoin operator provides a single output port.
Using the UnionAll Operator to Create Union of Data Sets
The
UnionAll operator provides an unordered union of two input data sources. The output will contain all of the rows of both input data sets. This is similar to the UNION ALL construct available in standard SQL. The input data is usually consumed as it becomes available and as such, order of the output data is non-deterministic. However, if the data order specified in the metadata of both inputs match, then the operator will perform a sorted merge preserving the sort order of the input. The metadata specifying the data order can be set through
Using the Sort Operator to Sort Data Sets or
Using the AssertSorted Operator to Assert Data Ordering.
The type of the output is determined by setting the union mode. If the outputMapping setting is set to MAPBYSCHEMA, then a schema must be provided that will define the output type. Otherwise the output type can be automatically determined by setting MAPBYPOSITION or MAPBYNAME, which will determine an appropriate output type by mapping the two inputs by position or name, respectively.
In the case where a target schema is provided, the input fields are mapped to output fields based on field name. If a field in the output is not contained in the target schema, the field is dropped. If a field is contained in the output schema, but not in the input, the output field will contain NULL values. Input values are converted into the specified output field type where possible.
For example, if the left input contains fields {a:int, b:int, c:string} and the right input contains fields {a:long, b:double, d:string} and the target schema specifies fields {a:double, c:string, d:date, e:string} then the following is true:
• The output port schema will contain the fields {a:double, c:string, d:date, e:string}.
• Field b is ignored from both inputs.
• When obtaining data from the left input port, the output fields {d, e} will contain null values.
• When obtaining data from the right input port, the output fields {c, e} will contain null values.
• Field a from the left input will be converted from an integer type to a double type.
• Field a from the right input will be converted from a long type to a double type.
• Field d from the right input will be converted from a string type to a date type. The format pattern specified in the target schema will be used for the conversion, if specified.
In the case where a target schema is not provided, the two input ports must have compatible types. In this case the operator will try to determine valid output fields based on the left and right input and two settings.
The outputMapping setting can be set to MAPBYNAME if the left and right side should be matched by field name. Otherwise the operator can use MAPBYPOSITION and they will be matched by position.
Additionally the includeExtraFields setting can be set to true if fields that are only present on one side of the input should be retained. If the field is not present in one of the inputs it will contain NULL values. If extra fields are present in one of the inputs and includeExtraFields is set to false, the extra fields will be dropped.
Code Examples
The following example demonstrates creating a UnionAll operator within a logical graph. The outputs from two separate operators are connected to the left and right inputs of the union operator. The creation and the graph and the source operators are not shown. The output of the union operator can be accessed using the getOutput() method.
Using the UnionAll operator in Java
// Use UnionAll to create the union of the two datasets
UnionAll unionOp = graph.add(new UnionAll());
graph.connect(source1.getOutput(), unionOp.getLeft());
graph.connect(source2.getOutput(), unionOp.getRight());
The following example demonstrates using the UnionAll operator within RushScript. This example is setting the schema property of UnionAll to coerce the two inputs into the provided target schema.
Using the UnionAll operator in RushScript
// Define a schema that is usable by the union operator
var irisschema = dr.schema()
.nullable(true)
.trimmed(true)
.nullIndicator("?")
.DOUBLE("sepal length")
.DOUBLE("sepal width")
.DOUBLE("petal length")
.DOUBLE("petal width")
.STRING("class", "Iris-setosa", "Iris-versicolor", "Iris-virginica");
// Apply union to two data sets using the defined schema.
var results = dr.unionAll(data1, data2, {schema:irisschema});
Properties
The
UnionAll operator supports the following properties.
Ports
The
UnionAll operator provides the following input ports.
The
UnionAll operator provides a single output port. The output port will contain all rows from the two inputs. The schema of the output port will match the provided target schema. If one is not provided, the output port type will match the type of the left hand side input port.
Using the Sort Operator to Sort Data Sets
The
Sort operator is used to sort input data. It will order the data according to one or more specified sort keys.
The Sort operator is parallelized so as to be able to scale to large amounts of data possibly residing on multiple nodes. In order to scale, the Sort operator itself performs a partial sort. A partial sort is a sort in which each partition of the data is sorted but there exists no ordered relationship among the rows of different partitions. If the source data is already partitioned, the existing partitions will be sorted. If the source data is not partitioned, the data will first be partitioned in a round-robin fashion and then sorted.
Any operations that require a gather of input data will preserve sort order. A gather refers to bringing all the data to a single node. Therefore the data will represent a total ordering following a gather. A total ordering is one where all elements are sorted with respect to each other. Because any data sinks (writers) that write to a single file perform a gather, those data sinks will produce a total ordering. In the example that follows, we write to a single file and thus produce a total ordering. Note that the use of a single sink should only be used in cases where the output data is known to be relatively small. This is because gathering the data is inherently nonscalable.
The Sort operator is special because the framework often uses it implicitly, since most operators explicitly specify data distribution and data ordering. For example, the Join operator declares that its inputs must be sorted, and the framework automatically does that sorting, so you do not need to insert a sort before a join. However, you may need a sort before operators such as
Run JavaScript, where execution of JavaScript code may have a data order dependency that the DataFlow execution environment is not aware of.
Here are other cases where you may want to use the Sort operator:
• When results are written to a file. Sort can be used just before the final writer operator to achieve the needed output order. If the data is already ordered according to the declared metadata, the framework automatically skips the Sort.
• To control performance settings for a sort that would otherwise be implicit. By using the Sort operator and choosing its settings (for example, sortBufferSize property), you override the implicit sort. If you want your settings used everywhere, the
EngineConfig class provides a way to globally configure implicit sorts. See
Engine Configuration Settings.
Code Example
The following example demonstrates the usage of the Sort operator. The data input to the operator is sorted by the field named LAST_NAME in ascending order and the field GRADE in descending order. By default, data is sorted in ascending order.
Using the Sort operator in Java
// Sort the input data by the LAST_NAME field in ascending order
// and the GRADE field in descending order.
Sort sorter = graph.add(new Sort());
sorter.setSortKeys(new SortKey[] {SortKey.asc("LAST_NAME"), SortKey.desc("GRADE")});
Using the Sort operator in RushScript
// Sort the input data by the LAST_NAME field in ascending order
// and the GRADE field in descending order.
var sortedData = dr.sort(data, {sortKeys:['LAST_NAME asc', 'GRADE desc']});
By default, data is sorted in ascending order. When you supply only one key field and the order is ascending, shortcuts can be taken. The following example demonstrates using a single sort key in default order.
Using a single, default order key in Java
Sort sorter = graph.add(new Sort());
sorter.setSortKeys("LAST_NAME");
Properties
The
Sort operator provides the following properties.
Note: The sort tuning properties (maxMerge, sortBufferSize, and ioBufferSize) can be controlled globally through the EngineConfig class. See
Engine Configuration Settings for more information.
Ports
The
Sort operator provides a single input port.
The
Sort operator provides a single output port.
Using the ProcessByGroup Operator to Process Data by Distinct Key Groups
The
ProcessByGroup operator segregates data into groups by the distinct values of the provided key fields. Each data group is streamed into another Dataflow application spawned by the operator. The spawned application of the operator is composed using the provided RushScript script. The RushScript is executed for each distinct key group and is passed a context object containing information about the current data group. The graph composed by RushScript is run and streamed the data for the data group.
The application composed by the provided RushScript is provided a result set containing the data of the current data group. The application can read other data sources as needed. There are no constraints placed on the composed application. The composed application can use the full breadth of Dataflow operators.
By default the
ProcessByGroup operator will evaluate the given RushScript text or source file. The code contained within the source will compose the Dataflow application to execute. Optionally, a function name can be provided. This function will be invoked after all sources are evaluated. The function can provide further composition of the application. If a file name is provided to the RushScript source file, the name must resolve to a valid file name or be found in the include directories configured. The include directories can only be configured when composing a DataFlow application using RushScript or can be created and passed in as script options.
The
ProcessByGroup operator may itself be composed by RushScript. In this case, the environment of the composing RushScript will be inherited by the spawned RushScript engines for each group. The inherited environment includes the following items:
• Include directories
• Whether script extensions are enabled or disabled
• Strictness mode
Context information is passed to the RushScript script in the form of a variable named "context". The name of the variable is controlled by the contextVariableName property. The context variable contains the following data members:
Warning! It may be tempting to embed the provided key values in items such as file names. However, care should be taken. The key values may contain characters that are not valid for file names or other purposes. To generate a unique file name for each key group, the context.partitionID and context.groupID variables can be used. See the following examples.
Code Examples
Let’s say we want to train a classification model for each unique value of a field within our input data. We can use the
ProcessByGroup operator to segregate the data by distinct values of this field and compose and execute an application to build the classification model for each data group. We can then later use the multiple decision tree models in an ensemble model to apply to data.
Here’s a snippet of data that we will want to process. The file is in ARFF format. We will segregate the data by the "outlook" field and build a model using the "windy" field as the target variable. Since we are segregating the data by the "outlook" field, we will expect three models to be built, one for each distinct value of "outlook" ("sunny", "overcast", and "rainy").
@relation weather
@attribute outlook {sunny, overcast, rainy}
@attribute temperature real
@attribute humidity real
@attribute windy {TRUE, FALSE}
@attribute play {yes, no}
@data
sunny,85,85,FALSE,no
sunny,80,90,TRUE,no
overcast,83,86,FALSE,yes
rainy,70,96,FALSE,yes
rainy,68,80,FALSE,yes
rainy,65,70,TRUE,no
overcast,64,65,TRUE,yes
sunny,72,95,FALSE,no
sunny,69,70,FALSE,yes
rainy,75,80,FALSE,yes
sunny,75,70,TRUE,yes
overcast,72,90,TRUE,yes
overcast,81,75,FALSE,yes
rainy,71,91,TRUE,no
The RushScript application that we want to execute follows. Note that the input data is provided in the "context.source" variable.
RushScript script to execute
// Remove the "outlook field". Do not want to include it in the model.
// Note the use of context.source as the input data set to the removeFields function.
var data = dr.removeFields(context.source, {fieldNames:'outlook'});
// Train a decision tree model using the "windy" field as the target variable.
var results = dr.decisionTreeLearner(data, {targetColumn:'windy'});
// Store the PMML file containing the model for this data group.
// Note the usage of a passed in variable (outputDir) and the creation of a unique file name
dr.writePMML(results, {targetPathName:outputDir + '/windy-model_' + context.partitionID + '_' + context.groupID + '.pmml'});
The following code demonstrates how to invoke the ProcessByGroup operator on the source data using the above script.
Using the ProcessByGroup operator in Java
// Create the operator in a graph
ProcessByGroup pbg = graph.add(new ProcessByGroup());
pbg.setKeys("outlook");
// Use the script that trains a decision tree model
pbg.setScriptFile(new File("decisiontree.js"));
// Set a variable that will be available in the RushScript environment
String targetDir = ... // where the output models should be written
pbg.addVariable("outputDir", targetDir);
Using the ProcessByGroup operator in RushScript
// Want to specify the output directory programatically
var targetDir = ...;
// Invoke the process by group operator function. It has no return value.
// The "data" variable is assumed to result from a previous operator function call.
dr.processByGroup(data, {keys:'outlook', variables:{outputDir:targetDir}, scriptFile:'decisiontree.js'});
Properties
The
ProcessByGroup operator has the following properties.
Ports
The
ProcessByGroup operator provides a single input port.
The
ProcessByGroup operator has no output ports.