Kafka 连接器

概述

此连接器允许将 Apache Kafka 主题用作 Presto 中的表。每条消息在 Presto 中显示为一行。

主题可以是实时的:行会随着数据的到达而出现,并随着消息的删除而消失。如果在单个查询中多次访问同一个表(例如,执行自连接),这会导致奇怪的行为。

注意

支持 Apache Kafka 2.3.1+。

配置

要配置 Kafka 连接器,请创建一个目录属性文件 etc/catalog/kafka.properties,其中包含以下内容,并根据需要替换属性

connector.name=kafka
kafka.table-names=table1,table2
kafka.nodes=host1:port,host2:port

多个 Kafka 集群

您可以根据需要拥有任意数量的目录,因此,如果您有额外的 Kafka 集群,只需将另一个属性文件添加到 etc/catalog 中,并使用不同的名称(确保其以 .properties 结尾)。例如,如果您将属性文件命名为 sales.properties,Presto 将使用配置的连接器创建一个名为 sales 的目录。

配置属性

以下是可用的配置属性

属性名称

描述

kafka.table-names

目录提供的所有表的列表

kafka.default-schema

表的默认模式名称

kafka.nodes

Kafka 集群中节点的列表

kafka.connect-timeout

连接到 Kafka 集群的超时时间

kafka.max-poll-records

每次轮询的最大记录数

kafka.max-partition-fetch-bytes

每次轮询从一个分区获取的最大字节数

kafka.table-description-dir

包含主题描述文件的目录

kafka.hide-internal-columns

控制内部列是否作为表模式的一部分

kafka.table-names

此目录提供的所有表的逗号分隔列表。表名可以是无限定的(简单名称),并将放入默认模式(见下文),或者限定为模式名(<schema-name>.<table-name>)。

对于此处定义的每个表,可能存在一个表描述文件(见下文)。如果没有表描述文件,表名将用作 Kafka 上的主题名称,并且没有数据列映射到表中。该表将仍然包含所有内部列(见下文)。

此属性是必需的;没有默认值,并且必须至少定义一个表。

kafka.default-schema

定义包含所有未限定模式名称的表的模式。

此属性是可选的;默认值为 default

kafka.nodes

Kafka 数据节点的 hostname:port 对的逗号分隔列表。

此属性是必需的;没有默认值,并且必须至少定义一个节点。

注意

即使仅指定了集群的一部分节点,Presto 也必须仍然能够连接到集群的所有节点,因为消息可能仅位于特定节点上。

kafka.connect-timeout

连接到数据节点的超时时间。繁忙的 Kafka 集群可能需要相当长的时间才能接受连接;如果看到由于超时而导致的查询失败,则增加此值是一个好策略。

此属性是可选的;默认值为 10 秒(10s)。

kafka.max-poll-records

从 Kafka 轮询() 的最大记录数。

此属性是可选的;默认值为 500

kafka.max-partition-fetch-bytes``

每次轮询从一个分区获取的最大字节数

此属性是可选的;默认值为 1MB

kafka.table-description-dir

引用 Presto 部署中的一个文件夹,其中包含一个或多个 JSON 文件(必须以 .json 结尾),这些文件包含表描述文件。

此属性是可选的;默认值为 etc/kafka

kafka.hide-internal-columns

除了在表描述文件中定义的数据列之外,连接器还为每个表维护一些额外的列。如果隐藏了这些列,它们仍然可以在查询中使用,但不会显示在 DESCRIBE <table-name>SELECT * 中。

此属性是可选的;默认值为 true

内部列

对于每个定义的表,连接器维护以下列

列名

类型

描述

_partition_id

BIGINT

包含此行的 Kafka 分区的 ID。

_partition_offset

BIGINT

此行在 Kafka 分区中的偏移量。

_message_corrupt

BOOLEAN

如果解码器无法为此行解码消息,则为 True。如果为 True,则应将从消息映射的数据列视为无效。

_message

VARCHAR

消息字节作为 UTF-8 编码的字符串。这仅适用于文本主题。

_message_length

BIGINT

消息中的字节数。

_key_corrupt

BOOLEAN

如果键解码器无法为此行解码键,则为 True。如果为 True,则应将从键映射的数据列视为无效。

_key

VARCHAR

键字节作为 UTF-8 编码的字符串。这仅适用于文本键。

_key_length

BIGINT

键中的字节数。

对于没有表定义文件的表,_key_corrupt_message_corrupt 列将始终为 false

表定义文件

Kafka 仅将主题维护为字节消息,并将如何解释消息的定义留给生产者和消费者。对于 Presto,必须将此数据映射到列中,以允许对数据进行查询。

注意

对于包含 JSON 数据的文本主题,完全可以不使用任何表定义文件,而是使用 Presto 的 JSON 函数和运算符 来解析 _message 列,该列包含映射到 UTF-8 字符串的字节。然而,这很麻烦,并且使编写 SQL 查询变得困难。

表定义文件包含表的 JSON 定义。文件名可以任意,但必须以 .json 结尾。

{
    "tableName": ...,
    "schemaName": ...,
    "topicName": ...,
    "key": {
        "dataFormat": ...,
        "fields": [
            ...
        ]
    },
    "message": {
        "dataFormat": ...,
        "fields": [
            ...
       ]
    }
}

字段

必需

类型

描述

tableName

必需

字符串

此文件定义的 Presto 表名。

schemaName

可选

字符串

包含表的模式。如果省略,则使用默认模式名称。

topicName

必需

字符串

映射的 Kafka 主题。

key

可选

JSON 对象

映射到消息键的数据列的字段定义。

message

可选

JSON 对象

映射到消息本身的数据列的字段定义。

Kafka 中的键和消息

从 Kafka 0.8 开始,主题中的每条消息都可以有一个可选的键。表定义文件包含键和消息的两个部分,用于将数据映射到表列。

表定义中的每个 keymessage 字段都是一个 JSON 对象,必须包含两个字段

字段

必需

类型

描述

dataFormat

必需

字符串

选择此字段组的解码器。

fields

必需

JSON 数组

字段定义的列表。每个字段定义在 Presto 表中创建一个新列。

每个字段定义都是一个 JSON 对象

{
    "name": ...,
    "type": ...,
    "dataFormat": ...,
    "mapping": ...,
    "formatHint": ...,
    "hidden": ...,
    "comment": ...
}

字段

必需

类型

描述

name

必需

字符串

Presto 表中列的名称。

type

必需

字符串

列的 Presto 类型。

dataFormat

可选

字符串

选择此字段的列解码器。默认情况下,使用此行数据格式和列类型的默认解码器。

dataSchema

可选

字符串

Avro 模式所在的路径或 URL。仅用于 Avro 解码器。

mapping

可选

字符串

列的映射信息。这是解码器特定的,请参见下文。

formatHint

可选

字符串

为列解码器设置列特定的格式提示。

hidden

可选

布尔值

隐藏 DESCRIBE <table name>SELECT * 中的列。默认为 false

comment

可选

字符串

添加列注释,该注释将与 DESCRIBE <table name> 一起显示。

键或消息的字段描述没有限制。

行解码

对于键和消息,解码器用于将消息和键数据映射到表列。

Kafka 连接器包含以下解码器

  • raw - 不解释 Kafka 消息,原始消息字节的范围映射到表列

  • csv - Kafka 消息被解释为逗号分隔的消息,字段被映射到表列

  • json - Kafka 消息被解析为 JSON,JSON 字段被映射到表列

  • avro - Kafka 消息根据 Avro 模式解析,Avro 字段被映射到表列

注意

如果表不存在表定义文件,则使用 dummy 解码器,该解码器不公开任何列。

raw 解码器

原始解码器支持从 Kafka 消息或键中读取原始(基于字节)值并将其转换为 Presto 列。

对于字段,支持以下属性

  • dataFormat - 选择要转换的数据类型的宽度

  • type - Presto 数据类型(有关支持的数据类型列表,请参见下表)

  • mapping - <start>[:<end>];要转换的字节的开始和结束位置(可选)

dataFormat 属性选择要转换的字节数。如果缺失,则假定为 BYTE。所有值都有符号。

支持的值为

  • BYTE - 一个字节

  • SHORT - 两个字节(大端)

  • INT - 四个字节(大端)

  • LONG - 八个字节(大端)

  • FLOAT - 四个字节(IEEE 754 格式)

  • DOUBLE - 八个字节(IEEE 754 格式)

type 属性定义映射值的 Presto 数据类型。

根据分配给列的 Presto 类型,可以使用不同的 dataFormat

Presto 数据类型

允许的 dataFormat

BIGINT

BYTE, SHORT, INT, LONG

INTEGER

BYTE, SHORT, INT

SMALLINT

BYTE, SHORT

TINYINT

BYTE

DOUBLE

DOUBLE, FLOAT

BOOLEAN

BYTE, SHORT, INT, LONG

VARCHAR / VARCHAR(x)

BYTE

mapping 属性指定用于解码的键或消息中字节的范围。它可以是一个或两个由冒号 (<start>[:<end>]) 分隔的数字。

如果只给出了开始位置

  • 对于固定宽度类型,列将使用为指定 dataFormat(见上文)指定的适当字节数。

  • 当解码 VARCHAR 值时,将使用从开始位置到消息末尾的所有字节。

如果给出了开始位置和结束位置,则

  • 对于固定宽度类型,大小必须等于指定 dataFormat 使用的字节数。

  • 对于 VARCHAR,使用开始(包含)和结束(不包含)之间的所有字节。

如果未指定 mapping 属性,则等效于将开始位置设置为 0 且结束位置未定义。

数值数据类型 (BIGINT, INTEGER, SMALLINT, TINYINT, DOUBLE) 的解码方案很简单。从输入消息中读取字节序列并根据以下两种方式之一进行解码

  • 大端编码(对于整数类型)

  • IEEE 754 格式(对于 DOUBLE)。

解码字节序列的长度由 dataFormat 隐含。

对于 VARCHAR 数据类型,字节序列根据 UTF-8 编码进行解释。

csv 解码器

CSV 解码器使用 UTF-8 编码将表示消息或键的字节转换为字符串,然后将结果解释为 CSV(逗号分隔值)行。

对于字段,必须定义 typemapping 属性

  • type - Presto 数据类型(有关支持的数据类型列表,请参见下表)

  • mapping - CSV 记录中字段的索引

不支持 dataFormatformatHint,必须省略。

下表列出了支持的 Presto 类型,这些类型可以在 type 中使用,并列出了解码方案

Presto 数据类型

解码规则

BIGINT
INTEGER
SMALLINT
TINYINT

使用 Java Long.parseLong() 解码

DOUBLE

使用 Java Double.parseDouble() 解码

BOOLEAN

“true” 字符序列映射到 true;其他字符序列映射到 false

VARCHAR / VARCHAR(x)

按原样使用

json 解码器

JSON 解码器根据 RFC 4627 将表示消息或键的字节转换为 JSON。请注意,消息或键 *必须* 转换为 JSON 对象,而不是数组或简单类型。

对于字段,支持以下属性

  • type - 列的 Presto 类型。

  • dataFormat - 要用于列的字段解码器。

  • mapping - 用于从 JSON 对象中选择字段的用斜杠分隔的字段名列表

  • formatHint - 仅适用于 custom-date-time,请参见下文

JSON 解码器支持多个字段解码器,其中 _default 用于标准表列,而一些解码器用于基于日期和时间的类型。

下表列出了可以用作 type 的 Presto 数据类型以及可以通过 dataFormat 属性指定的匹配字段解码器

Presto 数据类型

允许的 dataFormat

BIGINT
INTEGER
SMALLINT
TINYINT
DOUBLE
BOOLEAN
VARCHAR
VARCHAR(x)

默认字段解码器(省略 dataFormat 属性)

TIMESTAMP
TIMESTAMP WITH TIME ZONE
TIME
TIME WITH TIME ZONE

custom-date-time, iso8601, rfc2822, milliseconds-since-epoch, seconds-since-epoch

DATE

custom-date-time, iso8601, rfc2822,

默认字段解码器

这是支持所有 Presto 物理数据类型的标准字段解码器。字段值将通过 JSON 转换规则强制转换为布尔值、长整型、双精度浮点型或字符串值。对于非日期/时间类型的列,应使用此解码器。

日期和时间解码器

要将 JSON 对象中的值转换为 Presto DATETIMETIME WITH TIME ZONE`, ``TIMESTAMPTIMESTAMP WITH TIME ZONE 列,必须使用字段定义的 dataFormat 属性选择特殊的解码器。

  • iso8601 - 基于文本,将文本字段解析为 ISO 8601 时间戳。

  • rfc2822 - 基于文本,将文本字段解析为 RFC 2822 时间戳。

  • custom-date-time - 基于文本,根据 Joda 格式模式解析文本字段。

    通过 formatHint 属性指定。格式模式应符合 https://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html

  • milliseconds-since-epoch - 基于数字,将文本或数字解释为自纪元以来的毫秒数。

  • seconds-since-epoch - 基于数字,将文本或数字解释为自纪元以来的秒数。

对于 TIMESTAMP WITH TIME ZONETIME WITH TIME ZONE 数据类型,如果解码后的值中存在时区信息,则它将用于 Presto 值。否则,结果时区将设置为 UTC

avro 解码器

Avro 解码器根据模式转换表示 Avro 格式消息或键的字节。消息必须包含嵌入的 Avro 模式。Presto 不支持无模式 Avro 解码。

对于键/消息,使用 avro 解码器,必须定义 dataSchema。这应该指向要解码的消息的有效 Avro 模式文件的路径。此路径可以是远程 Web 服务器(例如:dataSchema: 'http://example.org/schema/avro_data.avsc')或本地文件系统(例如:dataSchema: '/usr/local/schema/avro_data.avsc')。如果 Presto 协调器节点无法访问此位置,则解码器将失败。

对于字段,支持以下属性

  • name - Presto 表中列的名称。

  • type - 列的 Presto 类型。

  • mapping - 从 Avro 模式中选择字段的斜杠分隔的字段名称列表。如果 mapping 中指定的字段不存在于原始 Avro 模式中,则读取操作将返回 NULL。

下表列出了支持的 Presto 类型,这些类型可以在 type 中用于等效的 Avro 字段类型。

Presto 数据类型

允许的 Avro 数据类型

BIGINT

INTLONG

DOUBLE

DOUBLE, FLOAT

BOOLEAN

BOOLEAN

VARCHAR / VARCHAR(x)

STRING

VARBINARY

FIXEDBYTES

ARRAY

ARRAY

MAP

MAP

Avro 模式演变

Avro 解码器支持与向后兼容的模式演变功能。使用向后兼容性,可以使用较新的模式来读取使用较旧模式创建的 Avro 数据。Avro 模式中的任何更改也必须反映在 Presto 的主题定义文件中。新添加/重命名的字段必须在 Avro 模式文件中具有默认值。

模式演变行为如下

  • 在新模式中添加的列:使用旧模式创建的数据在表使用新模式时将生成默认值。

  • 在新模式中删除的列:使用旧模式创建的数据将不再输出已删除列中的数据。

  • 在新模式中重命名的列:这等效于删除列并添加新列,使用旧模式创建的数据在表使用新模式时将生成默认值。

  • 在新模式中更改列的类型:如果 Avro 支持类型强制转换,则会执行转换。对于不兼容的类型,将抛出错误。