Skip to main content

VeRDD API

The VeRDD API is the new API available to offload operations onto the Vector Engine. As many of the complexities of SQL are skipped with this more direct API it is possible to be more efficient with Vector Engine usage. The VeRDD API works by using Scala Macros to translate Scala expressions into C++ which is then compiled and executed on the Vector Engine in much of the same way as the Spark SQL API.

Creating a VeRDD

Creating a VeRDD is as simple as adding .toVeRDD to the input RDD after importing the implicit classes provided. An example is shown below.

import com.nec.ve.VeRDD._
import org.apache.spark.{SparkConf, SparkContext}

val conf = new SparkConf().setAppName("VeRDD Test")
val sc = new SparkContext(conf)

val rdd: RDD[Int] = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8))
val ve: VeRDD[Int] = rdd.toVeRDD

Calling toVeRDD causes the RDD to get evaluated and loaded into the Vector Engine's RAM.

There is also a veParallelize extension method on SparkContext that is able to create a VeRDD from a Range.

Executing map/filter/reduce

Using the power of Scala's macros it is possible to use the same RDD APIs such as map, filter and reduce.

Below is an example of the same operations on bother an RDD and a VeRDD:

import com.nec.ve.VeRDD._
import org.apache.spark.{SparkConf, SparkContext}

val conf = new SparkConf().setAppName("VeRDD Test")
val sc = new SparkContext(conf)

val numbers = (1L to (500 * 1000000))
val rdd = sc.parallelize(numbers).cache()
rdd
.filter((a: Long) => a % 3 == 0 && a % 5 == 0 && a % 15 == 0)
.map((a: Long) => ((2 * a) + 12) - (a % 15))
.reduce((a: Long, b: Long) => a + b)

val verdd = rdd.toVeRDD // Or sc.veParallelize(numbers)
verdd
.filter((a: Long) => a % 3 == 0 && a % 5 == 0 && a % 15 == 0)
.map((a: Long) => ((2 * a) + 12) - (a % 15))
.reduce((a: Long, b: Long) => a + b)

As you can see, when using this API it is possible to run code on the VE with no difference other than the call to toVeRDD.

Not all APIs are supported to run on the VE. If a method that is not implemented is used, VeRDD will automatically perform the current VeRDD operation DAG on the VE and then convert the result into a normal RDD[T] where the rest of the calls will execute as they normally do.

If you'd like to convert back into a VeRDD[T] from a RDD[T] you can add a call to toVeRDD

Limitations

There are limitations to the macro-based API that should be understood to fully take advantage of the VeRDD API.

1) The compiler needs to know it is working with a VeRDD[T]

To generate C++ code form the defined lambda functions it is necessary to involve a macro that rewrites the invocation of filter, map, reduce, etc. into an Expr. Due to limitations of the compiler, it is not possible for VeRDD to override the definitions of many operations such asmap or groupBy Therefore to run the macro properly the compiler must know it is working with a VeRDD[T] instance. Scala's type inference is usually sufficient so it is not necessary in general to specify the type VeRDD[T] explicitly. Unless, for example, you have a function like:

def foo(rdd: RDD[(Long, Long)]): RDD[(Long, Double)] = {
val filtered = rdd.filter((t: (Long, Long)) => t._1 < 10)
rdd.map((t: (Long, Long)) => (t._1, t._2 / 100.0))
}

In this case even if you pass a VeRDD into this function (which will work as VeRDD[T] does implements RDD[T]) it will run the filter and map methods that are defined on RDD and not the ones defined in VeRDD. In this case since and RDD API is being called, the VeRDD passed in will copy the data out of the VE and continue as a normal RDD. To run this on the VE it is necessary to overload or redefine this function as taking and returning a VeRDD like so.

def foo(rdd: VeRDD[(Long, Long)]): VeRDD[(Long, Double)] = {
val filtered = rdd.filter((t: (Long, Long)) => t._1 < 10)
rdd.map((t: (Long, Long)) => (t._1, t._2 / 100.0))
}

2) Lambdas should have types for their parameters

While it is possible to infer the type of the lambda function parameters based on the type of the RDD, it is unfortunately not possible for the compiler to pass that information into the Expr. To transpile the expression into C++ it is necessary for the lambda to be as stand alone as possible, and therefore necessary to define the types for the inputs.

3) Lambdas currently only support a limited number of types

It is currently possible to use Int, Long, Double, Float and Instant inside of lambda function parameters. It is also necessary that there's no implicit conversions used inside the lambda function, as that is not C++ compatible.

4) The lambda function must contain the entire code

Calling functions defined elsewhere will get translated in to function calls in C++. With the exception of some simple numerical functions, the function will not in general be available on the VE. It is not currently possible to have scala inline the function when reifying it into an Expr, but it might become possible in the future.

5) Custom classes or case classes are not supported

Currently it is necessary to use Tuples when dealing with groups of values as it is not currently possible to inline the definition of the case class. This limitation might be lifted in the future.

Explicit API

The RDD compatible methods of the VeRDD API leverage macros to rewrite the method call. With map for example, it is rewritten into vemap. There are functions for vefilter, vegroupBy, vereduce, vesortBy and more.

The example above can be converted to use the ve methods as follows:


val verdd = sc.veParallelize(numbers)
verdd
.vefilter(reify { (a: Long) => a % 3 == 0 && a % 5 == 0 && a % 15 == 0 })
.vemap(reify { (a: Long) => ((2 * a) + 12) - (a % 15) })
.vereduce(reify { (a: Long, b: Long) => a + b })

In general there's no advantage to using the ve versions of the API other than to be more explicit that it the code will run on the VE.