Kafka 连接器¶
概述¶
此连接器允许将 Apache Kafka 主题用作 Presto 中的表。每条消息在 Presto 中显示为一行。
主题可以是实时的:行会随着数据的到达而出现,并随着消息的删除而消失。如果在单个查询中多次访问同一个表(例如,执行自连接),这会导致奇怪的行为。
注意
支持 Apache Kafka 2.3.1+。
配置¶
要配置 Kafka 连接器,请创建一个目录属性文件 etc/catalog/kafka.properties
,其中包含以下内容,并根据需要替换属性
connector.name=kafka
kafka.table-names=table1,table2
kafka.nodes=host1:port,host2:port
多个 Kafka 集群¶
您可以根据需要拥有任意数量的目录,因此,如果您有额外的 Kafka 集群,只需将另一个属性文件添加到 etc/catalog
中,并使用不同的名称(确保其以 .properties
结尾)。例如,如果您将属性文件命名为 sales.properties
,Presto 将使用配置的连接器创建一个名为 sales
的目录。
配置属性¶
以下是可用的配置属性
属性名称 |
描述 |
---|---|
|
目录提供的所有表的列表 |
|
表的默认模式名称 |
|
Kafka 集群中节点的列表 |
|
连接到 Kafka 集群的超时时间 |
|
每次轮询的最大记录数 |
|
每次轮询从一个分区获取的最大字节数 |
|
包含主题描述文件的目录 |
|
控制内部列是否作为表模式的一部分 |
kafka.table-names
¶
此目录提供的所有表的逗号分隔列表。表名可以是无限定的(简单名称),并将放入默认模式(见下文),或者限定为模式名(<schema-name>.<table-name>
)。
对于此处定义的每个表,可能存在一个表描述文件(见下文)。如果没有表描述文件,表名将用作 Kafka 上的主题名称,并且没有数据列映射到表中。该表将仍然包含所有内部列(见下文)。
此属性是必需的;没有默认值,并且必须至少定义一个表。
kafka.default-schema
¶
定义包含所有未限定模式名称的表的模式。
此属性是可选的;默认值为 default
。
kafka.nodes
¶
Kafka 数据节点的 hostname:port
对的逗号分隔列表。
此属性是必需的;没有默认值,并且必须至少定义一个节点。
注意
即使仅指定了集群的一部分节点,Presto 也必须仍然能够连接到集群的所有节点,因为消息可能仅位于特定节点上。
kafka.connect-timeout
¶
连接到数据节点的超时时间。繁忙的 Kafka 集群可能需要相当长的时间才能接受连接;如果看到由于超时而导致的查询失败,则增加此值是一个好策略。
此属性是可选的;默认值为 10 秒(10s
)。
kafka.max-poll-records
¶
从 Kafka 轮询() 的最大记录数。
此属性是可选的;默认值为 500
。
kafka.max-partition-fetch-bytes``¶
每次轮询从一个分区获取的最大字节数
此属性是可选的;默认值为 1MB
。
kafka.table-description-dir
¶
引用 Presto 部署中的一个文件夹,其中包含一个或多个 JSON 文件(必须以 .json
结尾),这些文件包含表描述文件。
此属性是可选的;默认值为 etc/kafka
。
kafka.hide-internal-columns
¶
除了在表描述文件中定义的数据列之外,连接器还为每个表维护一些额外的列。如果隐藏了这些列,它们仍然可以在查询中使用,但不会显示在 DESCRIBE <table-name>
或 SELECT *
中。
此属性是可选的;默认值为 true
。
内部列¶
对于每个定义的表,连接器维护以下列
列名 |
类型 |
描述 |
---|---|---|
|
BIGINT |
包含此行的 Kafka 分区的 ID。 |
|
BIGINT |
此行在 Kafka 分区中的偏移量。 |
|
BOOLEAN |
如果解码器无法为此行解码消息,则为 True。如果为 True,则应将从消息映射的数据列视为无效。 |
|
VARCHAR |
消息字节作为 UTF-8 编码的字符串。这仅适用于文本主题。 |
|
BIGINT |
消息中的字节数。 |
|
BOOLEAN |
如果键解码器无法为此行解码键,则为 True。如果为 True,则应将从键映射的数据列视为无效。 |
|
VARCHAR |
键字节作为 UTF-8 编码的字符串。这仅适用于文本键。 |
|
BIGINT |
键中的字节数。 |
对于没有表定义文件的表,_key_corrupt
和 _message_corrupt
列将始终为 false
。
表定义文件¶
Kafka 仅将主题维护为字节消息,并将如何解释消息的定义留给生产者和消费者。对于 Presto,必须将此数据映射到列中,以允许对数据进行查询。
注意
对于包含 JSON 数据的文本主题,完全可以不使用任何表定义文件,而是使用 Presto 的 JSON 函数和运算符 来解析 _message
列,该列包含映射到 UTF-8 字符串的字节。然而,这很麻烦,并且使编写 SQL 查询变得困难。
表定义文件包含表的 JSON 定义。文件名可以任意,但必须以 .json
结尾。
{
"tableName": ...,
"schemaName": ...,
"topicName": ...,
"key": {
"dataFormat": ...,
"fields": [
...
]
},
"message": {
"dataFormat": ...,
"fields": [
...
]
}
}
字段 |
必需 |
类型 |
描述 |
---|---|---|---|
|
必需 |
字符串 |
此文件定义的 Presto 表名。 |
|
可选 |
字符串 |
包含表的模式。如果省略,则使用默认模式名称。 |
|
必需 |
字符串 |
映射的 Kafka 主题。 |
|
可选 |
JSON 对象 |
映射到消息键的数据列的字段定义。 |
|
可选 |
JSON 对象 |
映射到消息本身的数据列的字段定义。 |
Kafka 中的键和消息¶
从 Kafka 0.8 开始,主题中的每条消息都可以有一个可选的键。表定义文件包含键和消息的两个部分,用于将数据映射到表列。
表定义中的每个 key
和 message
字段都是一个 JSON 对象,必须包含两个字段
字段 |
必需 |
类型 |
描述 |
---|---|---|---|
|
必需 |
字符串 |
选择此字段组的解码器。 |
|
必需 |
JSON 数组 |
字段定义的列表。每个字段定义在 Presto 表中创建一个新列。 |
每个字段定义都是一个 JSON 对象
{
"name": ...,
"type": ...,
"dataFormat": ...,
"mapping": ...,
"formatHint": ...,
"hidden": ...,
"comment": ...
}
字段 |
必需 |
类型 |
描述 |
---|---|---|---|
|
必需 |
字符串 |
Presto 表中列的名称。 |
|
必需 |
字符串 |
列的 Presto 类型。 |
|
可选 |
字符串 |
选择此字段的列解码器。默认情况下,使用此行数据格式和列类型的默认解码器。 |
|
可选 |
字符串 |
Avro 模式所在的路径或 URL。仅用于 Avro 解码器。 |
|
可选 |
字符串 |
列的映射信息。这是解码器特定的,请参见下文。 |
|
可选 |
字符串 |
为列解码器设置列特定的格式提示。 |
|
可选 |
布尔值 |
隐藏 |
|
可选 |
字符串 |
添加列注释,该注释将与 |
键或消息的字段描述没有限制。
行解码¶
对于键和消息,解码器用于将消息和键数据映射到表列。
Kafka 连接器包含以下解码器
raw
- 不解释 Kafka 消息,原始消息字节的范围映射到表列csv
- Kafka 消息被解释为逗号分隔的消息,字段被映射到表列json
- Kafka 消息被解析为 JSON,JSON 字段被映射到表列avro
- Kafka 消息根据 Avro 模式解析,Avro 字段被映射到表列
注意
如果表不存在表定义文件,则使用 dummy
解码器,该解码器不公开任何列。
raw
解码器¶
原始解码器支持从 Kafka 消息或键中读取原始(基于字节)值并将其转换为 Presto 列。
对于字段,支持以下属性
dataFormat
- 选择要转换的数据类型的宽度type
- Presto 数据类型(有关支持的数据类型列表,请参见下表)mapping
-<start>[:<end>]
;要转换的字节的开始和结束位置(可选)
dataFormat
属性选择要转换的字节数。如果缺失,则假定为 BYTE
。所有值都有符号。
支持的值为
BYTE
- 一个字节SHORT
- 两个字节(大端)INT
- 四个字节(大端)LONG
- 八个字节(大端)FLOAT
- 四个字节(IEEE 754 格式)DOUBLE
- 八个字节(IEEE 754 格式)
type
属性定义映射值的 Presto 数据类型。
根据分配给列的 Presto 类型,可以使用不同的 dataFormat
值
Presto 数据类型 |
允许的 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
mapping
属性指定用于解码的键或消息中字节的范围。它可以是一个或两个由冒号 (<start>[:<end>]
) 分隔的数字。
如果只给出了开始位置
对于固定宽度类型,列将使用为指定
dataFormat
(见上文)指定的适当字节数。当解码
VARCHAR
值时,将使用从开始位置到消息末尾的所有字节。
如果给出了开始位置和结束位置,则
对于固定宽度类型,大小必须等于指定
dataFormat
使用的字节数。对于
VARCHAR
,使用开始(包含)和结束(不包含)之间的所有字节。
如果未指定 mapping
属性,则等效于将开始位置设置为 0 且结束位置未定义。
数值数据类型 (BIGINT
, INTEGER
, SMALLINT
, TINYINT
, DOUBLE
) 的解码方案很简单。从输入消息中读取字节序列并根据以下两种方式之一进行解码
大端编码(对于整数类型)
IEEE 754 格式(对于
DOUBLE
)。
解码字节序列的长度由 dataFormat
隐含。
对于 VARCHAR
数据类型,字节序列根据 UTF-8 编码进行解释。
csv
解码器¶
CSV 解码器使用 UTF-8 编码将表示消息或键的字节转换为字符串,然后将结果解释为 CSV(逗号分隔值)行。
对于字段,必须定义 type
和 mapping
属性
type
- Presto 数据类型(有关支持的数据类型列表,请参见下表)mapping
- CSV 记录中字段的索引
不支持 dataFormat
和 formatHint
,必须省略。
下表列出了支持的 Presto 类型,这些类型可以在 type
中使用,并列出了解码方案
Presto 数据类型 |
解码规则 |
---|---|
BIGINT INTEGER SMALLINT TINYINT |
使用 Java |
|
使用 Java |
|
“true” 字符序列映射到 |
|
按原样使用 |
json
解码器¶
JSON 解码器根据 RFC 4627 将表示消息或键的字节转换为 JSON。请注意,消息或键 *必须* 转换为 JSON 对象,而不是数组或简单类型。
对于字段,支持以下属性
type
- 列的 Presto 类型。dataFormat
- 要用于列的字段解码器。mapping
- 用于从 JSON 对象中选择字段的用斜杠分隔的字段名列表formatHint
- 仅适用于custom-date-time
,请参见下文
JSON 解码器支持多个字段解码器,其中 _default
用于标准表列,而一些解码器用于基于日期和时间的类型。
下表列出了可以用作 type
的 Presto 数据类型以及可以通过 dataFormat
属性指定的匹配字段解码器
Presto 数据类型 |
允许的 |
---|---|
BIGINT INTEGER SMALLINT TINYINT DOUBLE BOOLEAN VARCHAR VARCHAR(x) |
默认字段解码器(省略 |
TIMESTAMP TIMESTAMP WITH TIME ZONE TIME TIME WITH TIME ZONE |
|
|
|
默认字段解码器¶
这是支持所有 Presto 物理数据类型的标准字段解码器。字段值将通过 JSON 转换规则强制转换为布尔值、长整型、双精度浮点型或字符串值。对于非日期/时间类型的列,应使用此解码器。
日期和时间解码器¶
要将 JSON 对象中的值转换为 Presto DATE
、TIME
、TIME WITH TIME ZONE`, ``TIMESTAMP
或 TIMESTAMP WITH TIME ZONE
列,必须使用字段定义的 dataFormat
属性选择特殊的解码器。
iso8601
- 基于文本,将文本字段解析为 ISO 8601 时间戳。rfc2822
- 基于文本,将文本字段解析为 RFC 2822 时间戳。custom-date-time
- 基于文本,根据 Joda 格式模式解析文本字段。通过
formatHint
属性指定。格式模式应符合 https://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html。
milliseconds-since-epoch
- 基于数字,将文本或数字解释为自纪元以来的毫秒数。seconds-since-epoch
- 基于数字,将文本或数字解释为自纪元以来的秒数。
对于 TIMESTAMP WITH TIME ZONE
和 TIME WITH TIME ZONE
数据类型,如果解码后的值中存在时区信息,则它将用于 Presto 值。否则,结果时区将设置为 UTC
。
avro
解码器¶
Avro 解码器根据模式转换表示 Avro 格式消息或键的字节。消息必须包含嵌入的 Avro 模式。Presto 不支持无模式 Avro 解码。
对于键/消息,使用 avro
解码器,必须定义 dataSchema
。这应该指向要解码的消息的有效 Avro 模式文件的路径。此路径可以是远程 Web 服务器(例如:dataSchema: 'http://example.org/schema/avro_data.avsc'
)或本地文件系统(例如:dataSchema: '/usr/local/schema/avro_data.avsc'
)。如果 Presto 协调器节点无法访问此位置,则解码器将失败。
对于字段,支持以下属性
name
- Presto 表中列的名称。type
- 列的 Presto 类型。mapping
- 从 Avro 模式中选择字段的斜杠分隔的字段名称列表。如果mapping
中指定的字段不存在于原始 Avro 模式中,则读取操作将返回 NULL。
下表列出了支持的 Presto 类型,这些类型可以在 type
中用于等效的 Avro 字段类型。
Presto 数据类型 |
允许的 Avro 数据类型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Avro 模式演变¶
Avro 解码器支持与向后兼容的模式演变功能。使用向后兼容性,可以使用较新的模式来读取使用较旧模式创建的 Avro 数据。Avro 模式中的任何更改也必须反映在 Presto 的主题定义文件中。新添加/重命名的字段必须在 Avro 模式文件中具有默认值。
模式演变行为如下
在新模式中添加的列:使用旧模式创建的数据在表使用新模式时将生成默认值。
在新模式中删除的列:使用旧模式创建的数据将不再输出已删除列中的数据。
在新模式中重命名的列:这等效于删除列并添加新列,使用旧模式创建的数据在表使用新模式时将生成默认值。
在新模式中更改列的类型:如果 Avro 支持类型强制转换,则会执行转换。对于不兼容的类型,将抛出错误。