Scala UDFs
Scala UDFs must be configured before using them. For information about Configuring User-defined Functions, see the System Administrator Guide. Spark provider container needs to be set up first via iisuspark and remote udf processing must be enabled in udf_engines.conf.
The function body must contain regular Scala 2 code, whereby the last statement equals the result of the UDF (no return keyword). With respect to libraries, you can use what is shipped with the official Spark distribution (https://spark.apache.org/releases/spark-release-3-5-1.html). Additionally, you can access Azure, AWS, GCS storage, Iceberg tables and Ingres or Vector via JDBC and the Spark Vector Connector. Currently, we do not support adding any additional third-party libraries not contained in the container.
Data Type Mapping
To write the Scala functions, the user must be aware of mapping SQL types to Scala types and vice versa:
Implementing UDF Functions
In addition to entering the regular Scala code, it is possible to keep state between each internal call to this UDF for each vector in the table. This is possible by implementing a pre-defined CacheAccess interface within the Scala body of the UDF. The interface appears as shown below:
trait UdfCacheAccess extends Serializable {
def createCacheables(spark: SparkSession): Seq[(String, Any)]
This can be used by the UDF for one-time tasks, such as loading a pre-trained ML model, as shown below:
create or replace function scalaudf(a int) return(int) AS language scala source='
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.classification.LogisticRegressionModel
import org.apache.spark.SparkContext
import com.actian.spark_vector.provider.udf.UdfCacheAccess
val myCache = new UdfCacheAccess {
override def createCacheables(spark: SparkSession): Seq[(String, Any)] = {
val sc = spark.sparkContext
val model = LogisticRegressionModel.load(sc, "file:///opt/user_mount/udf/mllib_model")
Seq(("lmodel", model))
}
}
val v = Vectors.dense(a.toDouble)
val model = myCache.retrieveCacheable("lmodel").asInstanceOf[LogisticRegressionModel]
model.predict(v).toInt',varfi=true;\g
Last modified date: 12/19/2024