Was this helpful?
Data Aggregation Operator
Group Operator
Aggregating data involves computing aggregations on input fields across groups of the data. Groups are defined by key fields. Distinct combinations of key fields define a group. If no key fields are specified, an aggregation is over the whole data set.
The Group operator is used within DataFlow to aggregate data. The operator uses groups of consecutive equal keys ("key groups") to determine which data values to aggregate. The input data need not be sorted; if it is already sorted, performance will be optimal.
The Group operator supports setting the aggregations to apply in two ways:
Using static methods of the Aggregation class to create the wanted aggregators
Using a SQL-like syntax provided in textual form
Usage examples of both of these methods for setting aggregations are given below. For more information about the available aggregation functions and how to define them textually, see Aggregate Functions.
Code Examples
The following Java code example creates the same aggregations but uses the SQL-like support of the Group operator. This syntax can be more convenient and easier to read. The Group operator will parse the provided text into a list of aggregations to apply.
Using the Group operator in Java
// Create a group operator and set aggregations to perform.
// Note the use of "as" to specify the output name of an aggregation.
Group groupRatings = graph.add(new Group());
groupRatings.setKeys(new String[] {"userID"});
groupRatings.setAggregations(
        "count(rating) as ratingCount, " +
        "min(rating) as minRating, " +
        "max(rating) as maxRating, " +
        "avg(rating) as avgRating, " +
        "stddev(rating) as stddevRating");
Notes from this example:
The aggregations to apply are specified using the SQL-like syntax. Note that the individual statements are separated with commas. The string was broken into multiple parts for the sake of presentation.
The Group operator will parse the given text into the list of aggregations to apply. The results are the same. It is up to the user as to which method to use for specifying the aggregations.
Using the Group operator in RushScript
var aggsToApply =
    'count(rating) as ratingCount, ' +
    'min(rating) as minRating, ' +
    'max(rating) as maxRating, ' +
    'avg(rating) as avgRating, ' +
    'stddev(rating) as stddevRating';

var groupedData = dr.group(data, {keys:'userID', aggregations:aggsToApply});
The following Java code example performs the same aggregation of the input data. The difference is: static methods on the Aggregation class are used to build the list of aggregations to apply. This is an alternative to the more SQL-like syntax shown in the example above.
Using jthe Group operator with Aggregation
// Create a group operator and set aggregations to perform.
// Note the use of "as" to specify the output name of an aggregation.
Group groupRatings = graph.add(new Group());
groupRatings.setKeys(new String[] {"userID"});
groupRatings.setAggregations( new Aggregation[] {
        Aggregation.count().as("countRating"),
        Aggregation.min("rating").as("minRating"),
        Aggregation.max("rating").as("maxRating"),
        Aggregation.avg("rating").as("avgRating"),
        Aggregation.stddev("rating").as("stddevRating"),
        Aggregation.var("rating").as("varRating"),
});
Notes from the above code example:
The key field "userID" is set. The Group operator will output a row of aggregation results for each distinct value of the "userID" field.
The aggregations to apply are created. The static methods of the Aggregation class are used to create the aggregators. The input field to apply the aggregation to is provided. The function as() is used to specify directly the name to use for the output field of each aggregation. This is optional. The Group operator will generate a name if as() is not used.
The resultant data from executing the example is shown below:
userID,countRating,minRating,maxRating,avgRating,stddevRating,varRating
1,53,3,5,4.188679245283019,0.6745118087441317,0.4549661801352802
2,129,1,5,3.7131782945736433,0.9976235207438109,0.9952526891412768
3,51,1,5,3.9019607843137254,0.9752807933683049,0.9511726259131101
4,21,1,5,4.190476190476191,1.051939144494025,1.106575963718821
5,198,1,5,3.1464646464646466,1.1298353443381843,1.2765279053157836
6,71,1,5,3.9014084507042255,0.8248760824983851,0.6804205514778825
7,31,3,5,4.32258064516129,0.7355970484510563,0.5411030176899057
8,139,2,5,3.884892086330935,0.9219867248115556,0.8500595207287392
9,106,2,5,3.7358490566037736,0.8161331605238553,0.6660733357066569
10,201,2,5,4.074626865671642,0.8918665011063313,0.7954258557956496
To apply the aggregations over all of the input data, simply do not set the keys property. This is known as a "no-key" join. The aggregations are applied to all of the input data as a single group. Using the group operator without specifying keys results in a single row of data being written to the output. The row contains the aggregation values for all aggregators applied to the whole data set. The results using the same aggregations from the example above are shown below. Note that the "countRating" field is equal to 1000: this equals the number of rows of input data as expected.
countRating,minRating,maxRating,avgRating,stddevRating,varRating
1000,1,5,3.777,1.0066136299494461,1.0132710000000007
Properties
The Group operator provides the following properties.
Name
Type
Description
aggregations
Aggregation[] or String
The aggregations to apply to the data.
initialGroupCapacity
int
A hint as to the number of groups that are expected to be processed. When input data is unsorted, we will optimistically buffer input rows in order to attempt to reduce the amount of data to be sorted. This setting is ignored if input data is already sorted.
keyFieldPrefix
String
The prefix to add to key fields. Default: empty string.
keys
String[]
The names of the key fields. If empty, then all of the rows in the input are treated as one group.
maxGroupCapacity
int
The max number of groups to fit into internal memory buffers. A value of 0 means that this will grow unbounded. This setting is ignored if input data is already sorted.
fewGroupsHint
boolean
Can be set as a hint to the operator that the number of groups is expected to be small. If so, then the reduction step is performed as a non-parallel operation. This option has been shown to reduce overhead when running in distributed mode.
Ports
The Group operator provides a single input port.
Name
Type
Get Method
Description
input
getInput()
Aggregations will be applied to the input data from this port.
The Group operator provides a single output port.
Name
Type
Get Method
Description
output
getOutput()
Contains the results of the aggregations being applied. The schema will consist of the key fields and one field per specified aggregation. A row is output for each key group or one row if no keys are specified.
Last modified date: 03/10/2025