Building DataFlow Applications Using RushScript : Composing a Graph : Creating a LogicalGraph
 
Share this page                  
Creating a LogicalGraph
Within RushScript, LogicalGraph instances are created implicitly. When entering the RushScript environment, a new LogicalGraph is automatically created and available for composition. When the current graph is executed, it is discarded and a new, empty LogicalGraph is created and ready for use. This new LogicalGraph becomes the current graph. When creating operators, they are always added to the current graph.
There are several methods on dr Variable for setting engine properties. These properties will affect the execution of the current graph. When executed, the engine settings will be used to compile the current graph into an executable form.
After the current graph is executed, a new current graph is created. All the state settings of the previous graph are lost, including engine settings.
Creating Operators
During initialization, discovered operators are added to dr Variable as JavaScript functions. The function name for each operator is obtained from the JSONTypeName annotation on the Java class for the operator. If this annotation is not available, the operator class name will be used (with camel casing). For example, the Group operator will be exposed in RushScript using the dr.group() function. A listing of all of the supported operators and their function names are at http://doc-server2/display/DF654/Special+Variables#SpecialVariables-operatornames.
To create a new instance of an operator and add it to the current LogicalGraph, simply invoke the function for the operator on dr Variable. In general, operator functions take three types of parameters:
Optionally, a name to assign to the operator. This name can be useful to distinguish operator instances when viewing profiling and debugging information. When used, this parameter is always the first parameter. It is optional. If not provided, a name will be generated.
Result sets representing the output ports of upstream operators. Source operators such as file readers will not take result set parameters. Some operators have multiple input ports and so will require multiple result set parameters. See the documentation for each operator to determine the order and number of the input ports.
Properties are provided in JavaScript notation for maps. Maps are name-value pairs where the name represents the name of the property. The value represents the value you want to set. See the documentation for each operator to determine the list of properties that it supports.
Not only will the function create the operator and set its properties, but it also will add the operator to the current graph. There is no need to keep a reference to the current graph for adding operators as that is done implicitly for you.
The functions that create operators will return a result set that represents the output port(s) of the operator. If an operator supports multiple output ports, the individual ports can be referenced by name. The reference documentation for each operator lists the output ports and provides their names. These names are used on a result set to access a particular port.
The following example illustrates several of these usages.
Operator names are set for each operator. These names will be displayed in exceptions and in profiling and debugging information.
The dr.readDelimitedText() function creates a reader. It does not accept a result set because the ReadDelimitedText operator has no input ports.
The dr.filterRows() function takes a single result set as input. Its next parameters are the properties of the operator. The function returns a result set that the code names "results".
The FilterRows operator (see Using the FilterRows Operator to Filter by Predicate) provides two output ports. So the result set returned by the dr.filterRows() method contains references to the two ports. The documentation for FilterRows shows that it has an output port named "output" and another named "rejects".
The port named "output" is referenced as "results.output" when connecting with the first delimited text writer.
The port named "rejects" is referenced as "results.rejects" when connecting with the second delimited text writer.
Creating operators
// Define ratings schema
var ratingschema = dr.schema()
    .nullable(true)
    .trimmed(true)
    .INT("userID")
    .INT("movieID")
    .DOUBLE("rating")
    .INT("timestamp");

// Read the ratings
var ratings = dr.readDelimitedText('readRatings', {source:'ratings.txt', schema:ratingschema, fieldSeparator:"::", header:true});

// Filter by rating
var results = dr.filterRows('filter', ratings, {predicate:'rating = 1 or rating = 3 or rating = 5'});

// Write filtered results
dr.writeDelimitedText('writeResults', results.output, {target:'ratings-filter-output.txt', mode:WriteMode.OVERWRITE, header:true, fieldDelimiter:""});

// Write rejects
dr.writeDelimitedText('writeRejects', results.rejects, {target:'ratings-filter-rejects.txt', mode:WriteMode.OVERWRITE, header:true, fieldDelimiter:""});

dr.execute("filter-ratings");
The following example demonstrates using an operator that requires two input ports. The Join operator ( Using the Join Operator to Do Standard Relational Joins) takes a left side and right side input. As such, the dr.join() function requires two result sets. The parameter order is the same order as the input ports and are listed in the documentation for the operator.
This example provides names for some operators, but not all. This demonstrates that operator names are optional. When a name is not provided for an operator, one will be generated.
Note that two sources are read with the result sets named "ratings" and "movies". The two sources are then joined. Looking at the documentation for the Join operator, the left input port is listed first, then the right input port. This implies the ordering for parameters to the dr.join() method. The left input is provided first then the right input. In this case, the "movies" result set is the left input and the "ratings" result set is the right input.
Using an operator with two inputs
// Join two data sources and write the results
var movieschema = dr.schema()
    .nullable(true)
    .trimmed(true)
    .INT("movieID")
    .STRING("movieName")
    .STRING("genre");

var ratingschema = dr.schema()
    .nullable(true)
    .trimmed(true)
    .INT("userID")
    .INT("movieID")
    .DOUBLE("rating")
    .INT("timestamp");

// Read the movies
var movies = dr.readDelimitedText('readMovies', {source:'movies.txt', schema:movieschema, fieldSeparator:"::"});

// Read the ratings
var ratings = dr.readDelimitedText('readRatings', {source:'ratings.txt', schema:ratingschema, fieldSeparator:"::"});

// Join the two data sets
var results = dr.join(movies, ratings, {joinMode:'INNER', joinKeys:'movieID'});

// Write the results to a delimited text file.
dr.writeDelimitedText(results, {target:"movieratings-join.txt", mode:WriteMode.OVERWRITE, header:true, fieldDelimiter:""});

// Execute the graph
dr.execute("jointest");
Setting Properties
Operator properties are set when the operator is created. Properties use the JavaScript map notation. Map notation follows this order:
Starts with an opening squiggly brace '{'
For each property:
The property name is provided
Followed by a colon ':'
Followed by the property value. This can take many forms depending on the type of the property.
Each property except the last is followed by a comma ','
Ends with a closing squiggly brace '}'
The documentation for each operator provides a listing of the supported properties. The name of each property and its type is documented. Use this documentation to determine what properties are available for each operator.
Two special properties are supported on every operator. These special properties are set in the same manner as other operator properties. The two special properties are:
Property Name
Type
Description
disableParallelism
boolean
Setting to true disables parallelism for the operator instance being created. Parallelism is disabled by setting the parallelism of the operator to one. This overrides the default behavior of the operator. Setting to false has no effect.
maxParallelism
int
Sets the parallelism level of the operator being created. By default, each operator inherits its parallelism level from the current job configuration. The parallelism can be overridden on each operator. Use this setting to override the parallelism for this operator. This setting is ignored by operators that do not support parallelism.
For example, the documentation for the Group operator shows a property named aggregations and another named keys. An example usage of the operator is shown below. This example demonstrates the general method for setting properties on operators. Note that the aggregations property accepts multiple aggregation definitions. In this case we must pass in an array of aggregations to perform. The syntax for JavaScript arrays is used to build a new array and pass it into the dr.group() function as the aggregations property.
Note that the keys property also accepts multiple key names. However, the RushScript code passes in a single value, "class", as the only key field. In most cases where multiple values can be passed in for a property, a single value will work. This is a convenience supported by RushScript to prevent having to use the array notation for all properties that take multiple values.
Setting properties
// Define the aggregations wanted
var aggs = [
    Aggregation.count("sepal length").as("count-sl"), 
    Aggregation.min("sepal length").as("min-sl"),
    Aggregation.max("sepal length").as("max-sl"),
    Aggregation.avg("sepal length").as("avg-sl"),
    Aggregation.stddev("sepal length").as("stddev-sl"),
    Aggregation.geoAvg("petal length").as("geoavg-pl"),
    Aggregation.skewness("petal length").as("skewness-pl")
];

// Group by the given keys using the defined aggregations
var results = dr.group(data, {keys:'class', aggregations:aggs});
Connecting Ports
Ports are implicitly connected when an operator is created. Ports are represented by result sets in RushScript. Most operator functions return a result set that represents the output ports of the operator being created. Connections are made when an operator is created that takes a result set as a parameter.
When an operator is created that takes a result set, the following steps are taken:
1. The output port represented by the result set is found.
2. The input port of the created operator is found.
3. The two ports are connected using the current graph.
If an operator defines multiple input ports, the creation function for the operator will define multiple result set parameters. The parameters are positional according to the order of definition of the input ports. See the documentation for each operator to determine the number and position of the operator’s input ports.
See the previous example where two data sources are joined. The dr.join() method requires two result sets as its first and second parameters. The documentation for the Join operator shows it requires two input ports, named "left" and "right". You can deduce from the documentation that the dr.join() method will take three parameters: the left input, the right input, and properties.