函数

插件实现

函数框架用于实现 SQL 函数。Presto 包含许多内置函数。为了实现新函数,您可以编写一个插件,该插件从 getFunctions() 返回一个或多个函数。

public class ExampleFunctionsPlugin
        implements Plugin
{
    @Override
    public Set<Class<?>> getFunctions()
    {
        return ImmutableSet.<Class<?>>builder()
                .add(ExampleNullFunction.class)
                .add(IsNullFunction.class)
                .add(IsEqualOrNullFunction.class)
                .add(ExampleStringFunction.class)
                .add(ExampleAverageFunction.class)
                .build();
    }
}

请注意,ImmutableSet 类是来自 Guava 的一个实用程序类。 getFunctions() 方法包含我们将在此教程中实现的函数的所有类。

有关代码库中的完整示例,请参阅 Presto 源代码根目录中的 presto-ml 模块(用于机器学习函数)或 presto-teradata-functions 模块(用于 Teradata 兼容函数)。

标量函数实现

函数框架使用注释来指示有关函数的相关信息,包括名称、描述、返回类型和参数类型。下面是一个实现 is_null 的示例函数。

public class ExampleNullFunction
{
    @ScalarFunction("is_null", calledOnNullInput = true)
    @Description("Returns TRUE if the argument is NULL")
    @SqlType(StandardTypes.BOOLEAN)
    public static boolean isNull(@SqlNullable @SqlType(StandardTypes.VARCHAR) Slice string)
    {
        return (string == null);
    }
}

函数 is_null 接受一个 VARCHAR 参数,并返回一个 BOOLEAN,指示参数是否为 NULL。请注意,函数的参数类型为 SliceVARCHAR 使用 Slice,它本质上是 byte[] 的包装器,而不是 String 作为其原生容器类型。

  • @SqlType:

    @SqlType 注释用于声明返回类型和参数类型。请注意,Java 代码的返回类型和参数必须与相应注释的原生容器类型匹配。

  • @SqlNullable:

    @SqlNullable 注释指示参数可能是 NULL。如果没有此注释,框架将假设所有函数在任何参数为 NULL 时都返回 NULL。当使用具有原始原生容器类型的 Type(例如 BigintType)时,在使用 @SqlNullable 时使用原生容器类型的对象包装器。如果方法在参数为非空时可能返回 NULL,则必须使用 @SqlNullable 注释该方法。

参数化标量函数

具有类型参数的标量函数会有一些额外的复杂性。为了使我们之前的示例与任何类型一起使用,我们需要以下内容

@ScalarFunction(name = "is_null", calledOnNullInput = true)
@Description("Returns TRUE if the argument is NULL")
public final class IsNullFunction
{
    @TypeParameter("T")
    @SqlType(StandardTypes.BOOLEAN)
    public static boolean isNullSlice(@SqlNullable @SqlType("T") Slice value)
    {
        return (value == null);
    }

    @TypeParameter("T")
    @SqlType(StandardTypes.BOOLEAN)
    public static boolean isNullLong(@SqlNullable @SqlType("T") Long value)
    {
        return (value == null);
    }

    @TypeParameter("T")
    @SqlType(StandardTypes.BOOLEAN)
    public static boolean isNullDouble(@SqlNullable @SqlType("T") Double value)
    {
        return (value == null);
    }

    // ...and so on for each native container type
}
  • @TypeParameter:

    @TypeParameter 注释用于声明一个类型参数,该参数可以在参数类型 @SqlType 注释或函数的返回类型中使用。它也可以用于注释类型为 Type 的参数。在运行时,引擎将绑定具体类型到此参数。可选地,可以通过向 @TypeParameter 提供 boundedBy 类型类来将类型参数约束为特定类型的后代。 @OperatorDependency 可用于声明需要针对给定类型参数进行操作的附加函数。例如,以下函数只会绑定到定义了等于函数的类型

@ScalarFunction(name = "is_equal_or_null", calledOnNullInput = true)
@Description("Returns TRUE if arguments are equal or both NULL")
public final class IsEqualOrNullFunction
{
    @TypeParameter("T")
    @SqlType(StandardTypes.BOOLEAN)
    public static boolean isEqualOrNullSlice(
            @OperatorDependency(operator = OperatorType.EQUAL, returnType = StandardTypes.BOOLEAN, argumentTypes = {"T", "T"}) MethodHandle equals,
            @SqlNullable @SqlType("T") Slice value1,
            @SqlNullable @SqlType("T") Slice value2)
    {
        if (value1 == null && value2 == null) {
            return true;
        }
        if (value1 == null || value2 == null) {
            return false;
        }
        return (boolean) equals.invokeExact(value1, value2);
    }

    // ...and so on for each native container type
}

另一个标量函数示例

lowercaser 函数接受一个 VARCHAR 参数,并返回一个 VARCHAR,它是转换为小写的参数。

public class ExampleStringFunction
{
    @ScalarFunction("lowercaser")
    @Description("converts the string to alternating case")
    @SqlType(StandardTypes.VARCHAR)
    public static Slice lowercaser(@SqlType(StandardTypes.VARCHAR) Slice slice)
    {
        String argument = slice.toStringUtf8();
        return Slices.utf8Slice(argument.toLowerCase());
    }
}

请注意,对于大多数常见的字符串函数(包括将字符串转换为小写),Slice 库还提供直接在底层 byte[] 上工作的实现,这些实现具有更好的性能。此函数没有 @SqlNullable 注释,这意味着如果参数为 NULL,则结果将自动为 NULL(不会调用该函数)。

代码生成标量函数实现

标量函数也可以在字节码中实现,这使我们能够根据 @TypeParameter 特化和优化函数。

  • @CodegenScalarFunction:

    @CodegenScalarFunction 注释用于声明一个在字节码中实现的标量函数。 @SqlType 注释用于声明返回类型。它接受 Type 作为参数,这些参数也具有 @SqlType 注释。返回类型是 MethodHandle,它是代码生成函数方法。

public class CodegenArrayLengthFunction
{
    @CodegenScalarFunction("array_length", calledOnNullInput = true)
    @SqlType(StandardTypes.INTEGER)
    @TypeParameter("K")
    public static MethodHandle arrayLength(@SqlType("array(K)") Type arr)
    {
        CallSiteBinder binder = new CallSiteBinder();
        ClassDefinition classDefinition = new ClassDefinition(a(Access.PUBLIC, FINAL), makeClassName("ArrayLength"), type(Object.class));
        classDefinition.declareDefaultConstructor(a(PRIVATE));

        Parameter inputBlock = arg("inputBlock", Block.class);
        MethodDefinition method = classDefinition.declareMethod(a(Access.PUBLIC, STATIC), "array_length", type(Block.class), ImmutableList.of(inputBlock));
        BytecodeBlock body = method.getBody();
        body.append(inputBlock.invoke("getPositionCount", int.class).ret());

        Class<?> clazz = defineClass(classDefinition, Object.class, binder.getBindings(), CodegenArrayLengthFunction.class.getClassLoader());
        return new methodHandle(clazz, "array_length", Block.class), Optional.of();
    }
}

聚合函数实现

聚合函数使用与标量函数类似的框架,但稍微复杂一些。

  • AccumulatorState:

    所有聚合函数都会将输入行累积到一个状态对象中;此对象必须实现 AccumulatorState。对于简单的聚合,只需将 AccumulatorState 扩展到一个新的接口,其中包含所需的 getter 和 setter,框架将为您生成所有实现和序列化程序。如果您需要更复杂的状态对象,则需要实现 AccumulatorStateFactoryAccumulatorStateSerializer,并通过 AccumulatorStateMetadata 注释提供这些对象。

以下代码实现了聚合函数 avg_double,它计算 DOUBLE 列的平均值。

@AggregationFunction("avg_double")
public class AverageAggregation
{
    @InputFunction
    public static void input(LongAndDoubleState state, @SqlType(StandardTypes.DOUBLE) double value)
    {
        state.setLong(state.getLong() + 1);
        state.setDouble(state.getDouble() + value);
    }

    @CombineFunction
    public static void combine(LongAndDoubleState state, LongAndDoubleState otherState)
    {
        state.setLong(state.getLong() + otherState.getLong());
        state.setDouble(state.getDouble() + otherState.getDouble());
    }

    @OutputFunction(StandardTypes.DOUBLE)
    public static void output(LongAndDoubleState state, BlockBuilder out)
    {
        long count = state.getLong();
        if (count == 0) {
            out.appendNull();
        }
        else {
            double value = state.getDouble();
            DOUBLE.writeDouble(out, value / count);
        }
    }
}

平均值有两个部分:列中每行 DOUBLE 的总和以及所见行的 LONG 计数。 LongAndDoubleState 是一个扩展 AccumulatorState 的接口。

public interface LongAndDoubleState
        extends AccumulatorState
{
    long getLong();

    void setLong(long value);

    double getDouble();

    void setDouble(double value);
}

如上所述,对于简单的 AccumulatorState 对象,只需定义具有 getter 和 setter 的接口就足够了,框架将为您生成实现。

深入了解与编写聚合函数相关的各种注释如下

  • @InputFunction:

    使用 @InputFunction 注解声明接受输入行并将它们存储在 AccumulatorState 中的函数。与标量函数类似,您必须使用 @SqlType 注解参数。请注意,与上面的标量示例中使用 Slice 来保存 VARCHAR 不同,输入函数使用基本类型 double 来表示参数。在本示例中,输入函数只是跟踪行的运行计数(通过 setLong())和运行总和(通过 setDouble())。

  • @CombineFunction:

    使用 @CombineFunction 注解声明用于合并两个状态对象的函数。此函数用于合并所有部分聚合状态。它接受两个状态对象,并将结果合并到第一个状态对象中(在上面的示例中,只需将它们加在一起)。

  • @OutputFunction:

    在计算聚合时,@OutputFunction 是最后一个被调用的函数。它接受最终状态对象(合并所有部分状态的结果),并将结果写入 BlockBuilder 中。

  • 序列化在哪里发生,什么是 GroupedAccumulatorState

    通常 @InputFunction 在与 @CombineFunction 不同的工作节点上运行,因此状态对象被序列化并通过聚合框架在这些工作节点之间传输。 GroupedAccumulatorState 用于执行 GROUP BY 聚合,并且如果您没有指定 AccumulatorStateFactory,则会自动为您生成实现。

高级用例

原始块输入

标量和聚合函数注解都允许您定义在原生类型上操作的方法。在 Java 中,这些原生类型是 booleanSlicelong。对于参数化实现或参数类型,无法使用标准 Java 类型,因为它们无法表示输入数据。

要定义一个可以接受任何类型的方法句柄,请将 @BlockPosition@BlockIndex 参数结合使用。与 @SqlNullable 注解类似,使用 @NullablePosition 注解表示当块位置为 NULL 时应该调用函数。

这适用于标量和聚合函数实现。

@ScalarFunction("example")
public static Block exampleFunction(
        @BlockPosition @NullablePosition @SqlType("array(int)") Block block,
        @BlockIndex int index) { /* ...implementation */ }

使用 @BlockPosition 应用泛型类型

当函数使用 @TypeParameter 注解定义时,使用 @BlockPosition 语法的函数签名能够在泛型类型上操作。在 @BlockPosition 参数中添加一个额外的 @SqlType("T") 注解,表示它接受与泛型类型相对应的参数。这适用于标量和聚合函数实现。

@ScalarFunction("example")
@TypeParameter("T")
public static Block exampleFunction(
        @BlockPosition @SqlType("T") Block block,
        @BlockIndex int index) { /* ...implementation */ }

使用 @TypeParameter 检索泛型类型

在函数参数列表的开头添加 @TypeParameter 注解,以允许实现执行特定于类型的逻辑。将使用 @TypeParameter 注解的 Type 类型参数添加为函数签名的第一个参数,以访问 Type。这适用于标量和聚合函数。

@ScalarFunction("example")
@TypeParameter("T")
public static Block exampleFunction(
        @TypeParameter("T") Type type,
        @BlockPosition @SqlType("T") Block block,
        @BlockIndex int index) { /* ...implementation */ }