函数¶
插件实现¶
函数框架用于实现 SQL 函数。Presto 包含许多内置函数。为了实现新函数,您可以编写一个插件,该插件从 getFunctions()
返回一个或多个函数。
public class ExampleFunctionsPlugin
implements Plugin
{
@Override
public Set<Class<?>> getFunctions()
{
return ImmutableSet.<Class<?>>builder()
.add(ExampleNullFunction.class)
.add(IsNullFunction.class)
.add(IsEqualOrNullFunction.class)
.add(ExampleStringFunction.class)
.add(ExampleAverageFunction.class)
.build();
}
}
请注意,ImmutableSet
类是来自 Guava 的一个实用程序类。 getFunctions()
方法包含我们将在此教程中实现的函数的所有类。
有关代码库中的完整示例,请参阅 Presto 源代码根目录中的 presto-ml
模块(用于机器学习函数)或 presto-teradata-functions
模块(用于 Teradata 兼容函数)。
标量函数实现¶
函数框架使用注释来指示有关函数的相关信息,包括名称、描述、返回类型和参数类型。下面是一个实现 is_null
的示例函数。
public class ExampleNullFunction
{
@ScalarFunction("is_null", calledOnNullInput = true)
@Description("Returns TRUE if the argument is NULL")
@SqlType(StandardTypes.BOOLEAN)
public static boolean isNull(@SqlNullable @SqlType(StandardTypes.VARCHAR) Slice string)
{
return (string == null);
}
}
函数 is_null
接受一个 VARCHAR
参数,并返回一个 BOOLEAN
,指示参数是否为 NULL
。请注意,函数的参数类型为 Slice
。 VARCHAR
使用 Slice
,它本质上是 byte[]
的包装器,而不是 String
作为其原生容器类型。
@SqlType
:@SqlType
注释用于声明返回类型和参数类型。请注意,Java 代码的返回类型和参数必须与相应注释的原生容器类型匹配。@SqlNullable
:@SqlNullable
注释指示参数可能是NULL
。如果没有此注释,框架将假设所有函数在任何参数为NULL
时都返回NULL
。当使用具有原始原生容器类型的Type
(例如BigintType
)时,在使用@SqlNullable
时使用原生容器类型的对象包装器。如果方法在参数为非空时可能返回NULL
,则必须使用@SqlNullable
注释该方法。
参数化标量函数¶
具有类型参数的标量函数会有一些额外的复杂性。为了使我们之前的示例与任何类型一起使用,我们需要以下内容
@ScalarFunction(name = "is_null", calledOnNullInput = true)
@Description("Returns TRUE if the argument is NULL")
public final class IsNullFunction
{
@TypeParameter("T")
@SqlType(StandardTypes.BOOLEAN)
public static boolean isNullSlice(@SqlNullable @SqlType("T") Slice value)
{
return (value == null);
}
@TypeParameter("T")
@SqlType(StandardTypes.BOOLEAN)
public static boolean isNullLong(@SqlNullable @SqlType("T") Long value)
{
return (value == null);
}
@TypeParameter("T")
@SqlType(StandardTypes.BOOLEAN)
public static boolean isNullDouble(@SqlNullable @SqlType("T") Double value)
{
return (value == null);
}
// ...and so on for each native container type
}
@TypeParameter
:@TypeParameter
注释用于声明一个类型参数,该参数可以在参数类型@SqlType
注释或函数的返回类型中使用。它也可以用于注释类型为Type
的参数。在运行时,引擎将绑定具体类型到此参数。可选地,可以通过向@TypeParameter
提供boundedBy
类型类来将类型参数约束为特定类型的后代。@OperatorDependency
可用于声明需要针对给定类型参数进行操作的附加函数。例如,以下函数只会绑定到定义了等于函数的类型
@ScalarFunction(name = "is_equal_or_null", calledOnNullInput = true)
@Description("Returns TRUE if arguments are equal or both NULL")
public final class IsEqualOrNullFunction
{
@TypeParameter("T")
@SqlType(StandardTypes.BOOLEAN)
public static boolean isEqualOrNullSlice(
@OperatorDependency(operator = OperatorType.EQUAL, returnType = StandardTypes.BOOLEAN, argumentTypes = {"T", "T"}) MethodHandle equals,
@SqlNullable @SqlType("T") Slice value1,
@SqlNullable @SqlType("T") Slice value2)
{
if (value1 == null && value2 == null) {
return true;
}
if (value1 == null || value2 == null) {
return false;
}
return (boolean) equals.invokeExact(value1, value2);
}
// ...and so on for each native container type
}
另一个标量函数示例¶
lowercaser
函数接受一个 VARCHAR
参数,并返回一个 VARCHAR
,它是转换为小写的参数。
public class ExampleStringFunction
{
@ScalarFunction("lowercaser")
@Description("converts the string to alternating case")
@SqlType(StandardTypes.VARCHAR)
public static Slice lowercaser(@SqlType(StandardTypes.VARCHAR) Slice slice)
{
String argument = slice.toStringUtf8();
return Slices.utf8Slice(argument.toLowerCase());
}
}
请注意,对于大多数常见的字符串函数(包括将字符串转换为小写),Slice 库还提供直接在底层 byte[]
上工作的实现,这些实现具有更好的性能。此函数没有 @SqlNullable
注释,这意味着如果参数为 NULL
,则结果将自动为 NULL
(不会调用该函数)。
代码生成标量函数实现¶
标量函数也可以在字节码中实现,这使我们能够根据 @TypeParameter
特化和优化函数。
@CodegenScalarFunction
:@CodegenScalarFunction
注释用于声明一个在字节码中实现的标量函数。@SqlType
注释用于声明返回类型。它接受Type
作为参数,这些参数也具有@SqlType
注释。返回类型是MethodHandle
,它是代码生成函数方法。
public class CodegenArrayLengthFunction
{
@CodegenScalarFunction("array_length", calledOnNullInput = true)
@SqlType(StandardTypes.INTEGER)
@TypeParameter("K")
public static MethodHandle arrayLength(@SqlType("array(K)") Type arr)
{
CallSiteBinder binder = new CallSiteBinder();
ClassDefinition classDefinition = new ClassDefinition(a(Access.PUBLIC, FINAL), makeClassName("ArrayLength"), type(Object.class));
classDefinition.declareDefaultConstructor(a(PRIVATE));
Parameter inputBlock = arg("inputBlock", Block.class);
MethodDefinition method = classDefinition.declareMethod(a(Access.PUBLIC, STATIC), "array_length", type(Block.class), ImmutableList.of(inputBlock));
BytecodeBlock body = method.getBody();
body.append(inputBlock.invoke("getPositionCount", int.class).ret());
Class<?> clazz = defineClass(classDefinition, Object.class, binder.getBindings(), CodegenArrayLengthFunction.class.getClassLoader());
return new methodHandle(clazz, "array_length", Block.class), Optional.of();
}
}
聚合函数实现¶
聚合函数使用与标量函数类似的框架,但稍微复杂一些。
AccumulatorState
:所有聚合函数都会将输入行累积到一个状态对象中;此对象必须实现
AccumulatorState
。对于简单的聚合,只需将AccumulatorState
扩展到一个新的接口,其中包含所需的 getter 和 setter,框架将为您生成所有实现和序列化程序。如果您需要更复杂的状态对象,则需要实现AccumulatorStateFactory
和AccumulatorStateSerializer
,并通过AccumulatorStateMetadata
注释提供这些对象。
以下代码实现了聚合函数 avg_double
,它计算 DOUBLE
列的平均值。
@AggregationFunction("avg_double")
public class AverageAggregation
{
@InputFunction
public static void input(LongAndDoubleState state, @SqlType(StandardTypes.DOUBLE) double value)
{
state.setLong(state.getLong() + 1);
state.setDouble(state.getDouble() + value);
}
@CombineFunction
public static void combine(LongAndDoubleState state, LongAndDoubleState otherState)
{
state.setLong(state.getLong() + otherState.getLong());
state.setDouble(state.getDouble() + otherState.getDouble());
}
@OutputFunction(StandardTypes.DOUBLE)
public static void output(LongAndDoubleState state, BlockBuilder out)
{
long count = state.getLong();
if (count == 0) {
out.appendNull();
}
else {
double value = state.getDouble();
DOUBLE.writeDouble(out, value / count);
}
}
}
平均值有两个部分:列中每行 DOUBLE
的总和以及所见行的 LONG
计数。 LongAndDoubleState
是一个扩展 AccumulatorState
的接口。
public interface LongAndDoubleState
extends AccumulatorState
{
long getLong();
void setLong(long value);
double getDouble();
void setDouble(double value);
}
如上所述,对于简单的 AccumulatorState
对象,只需定义具有 getter 和 setter 的接口就足够了,框架将为您生成实现。
深入了解与编写聚合函数相关的各种注释如下
@InputFunction
:使用
@InputFunction
注解声明接受输入行并将它们存储在AccumulatorState
中的函数。与标量函数类似,您必须使用@SqlType
注解参数。请注意,与上面的标量示例中使用Slice
来保存VARCHAR
不同,输入函数使用基本类型double
来表示参数。在本示例中,输入函数只是跟踪行的运行计数(通过setLong()
)和运行总和(通过setDouble()
)。@CombineFunction
:使用
@CombineFunction
注解声明用于合并两个状态对象的函数。此函数用于合并所有部分聚合状态。它接受两个状态对象,并将结果合并到第一个状态对象中(在上面的示例中,只需将它们加在一起)。@OutputFunction
:在计算聚合时,
@OutputFunction
是最后一个被调用的函数。它接受最终状态对象(合并所有部分状态的结果),并将结果写入BlockBuilder
中。序列化在哪里发生,什么是
GroupedAccumulatorState
?通常
@InputFunction
在与@CombineFunction
不同的工作节点上运行,因此状态对象被序列化并通过聚合框架在这些工作节点之间传输。GroupedAccumulatorState
用于执行GROUP BY
聚合,并且如果您没有指定AccumulatorStateFactory
,则会自动为您生成实现。
高级用例¶
原始块输入¶
标量和聚合函数注解都允许您定义在原生类型上操作的方法。在 Java 中,这些原生类型是 boolean
、Slice
和 long
。对于参数化实现或参数类型,无法使用标准 Java 类型,因为它们无法表示输入数据。
要定义一个可以接受任何类型的方法句柄,请将 @BlockPosition
与 @BlockIndex
参数结合使用。与 @SqlNullable
注解类似,使用 @NullablePosition
注解表示当块位置为 NULL
时应该调用函数。
这适用于标量和聚合函数实现。
@ScalarFunction("example")
public static Block exampleFunction(
@BlockPosition @NullablePosition @SqlType("array(int)") Block block,
@BlockIndex int index) { /* ...implementation */ }
使用 @BlockPosition
应用泛型类型¶
当函数使用 @TypeParameter
注解定义时,使用 @BlockPosition
语法的函数签名能够在泛型类型上操作。在 @BlockPosition
参数中添加一个额外的 @SqlType("T")
注解,表示它接受与泛型类型相对应的参数。这适用于标量和聚合函数实现。
@ScalarFunction("example")
@TypeParameter("T")
public static Block exampleFunction(
@BlockPosition @SqlType("T") Block block,
@BlockIndex int index) { /* ...implementation */ }
使用 @TypeParameter
检索泛型类型¶
在函数参数列表的开头添加 @TypeParameter
注解,以允许实现执行特定于类型的逻辑。将使用 @TypeParameter
注解的 Type
类型参数添加为函数签名的第一个参数,以访问 Type
。这适用于标量和聚合函数。
@ScalarFunction("example")
@TypeParameter("T")
public static Block exampleFunction(
@TypeParameter("T") Type type,
@BlockPosition @SqlType("T") Block block,
@BlockIndex int index) { /* ...implementation */ }