Was this helpful?
Scala UDFs
Scala UDFs must be configured before using them. For information about Configuring User-defined Functions, see the System Administrator Guide. Spark provider container needs to be set up first via iisuspark and remote udf processing must be enabled in udf_engines.conf.
The function body must contain regular Scala 2 code, whereby the last statement equals the result of the UDF (no return keyword). With respect to libraries, you can use what is shipped with the official Spark distribution (https://spark.apache.org/releases/spark-release-3-5-1.html). Additionally, you can access Azure, AWS, GCS storage, Iceberg tables and Ingres or Vector via JDBC and the Spark Vector Connector. Currently, we do not support adding any additional third-party libraries not contained in the container.
Data Type Mapping
To write the Scala functions, the user must be aware of mapping SQL types to Scala types and vice versa:
SQL Type
Scala Type
BOOLEAN
Boolean
INTEGER1
Byte
SMALLINT
Short
INTEGER
Int
BIGINT
Long
FLOAT4
Float
FLOAT
Double
MONEY
Double
DECIMAL
java.math.BigDecimal
CHAR
String
VARCHAR
String
NVARCHAR
String
ANSIDATE
java.sql.date
TIME WITHOUT TIMEZONE
java.sql.Timestamp
TIME WITH LOCAL TIMEZONE
java.sql.Timestamp
TIME WITH TIMEZONE
java.sql.Timestamp
TIMESTAMP WITHOUT TIMEZONE
java.sql.Timestamp
TIMESTAMP WITH LOCAL TIMEZONE
java.sql.Timestamp
TIMESTAMP WITH TIMEZONE
java.sql.Timestamp
INTERVAL YEAR TO MONTH
String (“<num of years>-<num of month>“
INTERVAL DAY TO SECOND
String (“<num of days>
<hours>:<minutes>:<seconds>“)
IPv4
String
IPv6
String
UUID
String
In case of user defined NULL handling, all parameters as well as the return value are wrapped as Scala option.
Implementing UDF Functions
In addition to entering the regular Scala code, it is possible to keep state between each internal call to this UDF for each vector in the table. This is possible by implementing a pre-defined CacheAccess interface within the Scala body of the UDF. The interface appears as shown below:
trait UdfCacheAccess extends Serializable {
def createCacheables(spark: SparkSession): Seq[(String, Any)]
This can be used by the UDF for one-time tasks, such as loading a pre-trained ML model, as shown below:
create or replace function scalaudf(a int) return(int) AS language scala source='
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.classification.LogisticRegressionModel
import org.apache.spark.SparkContext
import com.actian.spark_vector.provider.udf.UdfCacheAccess
val myCache = new UdfCacheAccess {
override def createCacheables(spark: SparkSession): Seq[(String, Any)] = {
val sc = spark.sparkContext
val model = LogisticRegressionModel.load(sc, "file:///opt/user_mount/udf/mllib_model")
Seq(("lmodel", model))
}
}
val v = Vectors.dense(a.toDouble)
val model = myCache.retrieveCacheable("lmodel").asInstanceOf[LogisticRegressionModel]
model.predict(v).toInt',varfi=true;\g
Last modified date: 12/19/2024