Iceberg 连接器¶
概述¶
Iceberg 连接器允许查询存储在 Iceberg 表中的数据。
元数据存储¶
Iceberg 表将大多数元数据存储在元数据文件中,以及文件系统上的数据,但它仍然需要一个中央位置来查找表的当前元数据指针的当前位置。这个中央位置称为 Iceberg Catalog
。Presto Iceberg 连接器支持不同类型的 Iceberg 目录:HIVE
、NESSIE
、REST
和 HADOOP
。
要配置 Iceberg 连接器,请创建一个目录属性文件 etc/catalog/iceberg.properties
。要定义目录类型,iceberg.catalog.type
属性是必需的,以及以下内容,用以下属性值替换:
Hive 元数据存储目录¶
Iceberg 连接器支持与 Hive 连接器相同的配置 HMS。
connector.name=iceberg
hive.metastore.uri=hostname:port
iceberg.catalog.type=hive
Glue 目录¶
Iceberg 连接器支持与 Hive 连接器相同的配置 Glue。
connector.name=iceberg
hive.metastore=glue
iceberg.catalog.type=hive
使用 Hive 或 Glue 目录配置的 Iceberg 连接器时,可以使用其他配置。
属性名称 |
描述 |
默认 |
---|---|---|
|
要使用 Thrift 协议连接到的 Hive 元数据存储的 URI。如果提供了多个 URI,则默认使用第一个 URI,其余 URI 为备用元数据存储。 示例: 如果 |
|
|
从 Hive 元数据存储中使用的统计信息(以逗号分隔)的列表,用于覆盖 Iceberg 表统计信息。可用值为 注意:仅当 Iceberg 连接器使用 Hive 配置时才有效。 |
|
|
刷新表元数据时重试之间休眠的最小时间。 |
100ms |
|
刷新表元数据时重试之间休眠的最大时间。 |
5s |
|
在失败的表元数据刷新操作之前,所有重试所需的最大时间。 |
1min |
|
使用 Hive 元数据存储刷新表元数据时,遇到错误后重试的次数。 |
20 |
|
用于缩放后续重试之间等待时间的倍数。 |
4.0 |
Nessie 目录¶
要使用 Nessie 目录,请将目录类型配置为 iceberg.catalog.type=nessie
。
connector.name=iceberg
iceberg.catalog.type=nessie
iceberg.catalog.warehouse=/tmp
iceberg.nessie.uri=https://127.0.0.1:19120/api/v1
Nessie 目录支持的其他属性
属性名称 |
描述 |
---|---|
|
用于 Nessie 的分支/标签,默认为 |
|
Nessie API 终结点 URI(必需)。示例: |
|
要使用的身份验证类型。可用值为 注意:Nessie BASIC 身份验证类型已弃用,将在即将发布的版本中删除 |
|
用于 |
|
用于 |
|
用于 |
|
对 Nessie 服务器的请求的读取超时时间(以毫秒为单位)。示例: |
|
对 Nessie 服务器的连接请求的连接超时时间(以毫秒为单位)。示例: |
|
对 Nessie 服务器的请求是否启用压缩的配置,默认为 |
|
要使用的自定义 ClientBuilder 实现类的配置。 |
使用 Docker 设置 Nessie¶
要使用 Docker 映像在本地设置 Nessie 实例,请参阅 设置 Nessie。Docker 实例启动并运行后,您应该看到类似以下示例的日志
2023-09-05 13:11:37,905 INFO [io.quarkus] (main) nessie-quarkus 0.69.0 on JVM (powered by Quarkus 3.2.4.Final) started in 1.921s. Listening on: http://0.0.0.0:19120
2023-09-05 13:11:37,906 INFO [io.quarkus] (main) Profile prod activated.
2023-09-05 13:11:37,906 INFO [io.quarkus] (main) Installed features: [agroal, amazon-dynamodb, cassandra-client, cdi, google-cloud-bigtable, hibernate-validator, jdbc-postgresql, logging-sentry, micrometer, mongodb-client, narayana-jta, oidc, opentelemetry, reactive-routes, resteasy, resteasy-jackson, security, security-properties-file, smallrye-context-propagation, smallrye-health, smallrye-openapi, swagger-ui, vertx]
如果与 Nessie 的 OpenTelemetry 收集器相关的日志消息类似于以下示例,则可以使用配置选项 quarkus.otel.sdk.disabled=true
禁用 OpenTelemetry。
2023-08-27 11:10:02,492 INFO [io.qua.htt.access-log] (executor-thread-1) 172.17.0.1 - - [27/Aug/2023:11:10:02 +0000] "GET /api/v1/config HTTP/1.1" 200 62
2023-08-27 11:10:05,007 SEVERE [io.ope.exp.int.grp.OkHttpGrpcExporter] (OkHttp https://127.0.0.1:4317/...) Failed to export spans. The request could not be executed. Full error message: Failed to connect to localhost/127.0.0.1:4317
例如,使用以下命令启动 Docker 映像:docker run -p 19120:19120 -e QUARKUS_OTEL_SDK_DISABLED=true ghcr.io/projectnessie/nessie
有关此配置选项和其他相关选项的更多信息,请参阅 OpenTelemetry 配置参考。
有关排查 OpenTelemetry 跟踪问题的更多信息,请参阅 排查跟踪问题。
如果显示类似以下示例的错误,这可能是因为您正在与 http 服务器交互,而不是 https 服务器。您需要将 iceberg.nessie.uri
设置为 https://127.0.0.1:19120/api/v1
。
Caused by: javax.net.ssl.SSLException: Unsupported or unrecognized SSL message
at sun.security.ssl.SSLSocketInputRecord.handleUnknownRecord(SSLSocketInputRecord.java:448)
at sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:174)
at sun.security.ssl.SSLTransport.decode(SSLTransport.java:111)
at sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1320)
at sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1233)
at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:417)
at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:389)
at sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:558)
at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:201)
at sun.net.www.protocol.https.HttpsURLConnectionImpl.connect(HttpsURLConnectionImpl.java:167)
at org.projectnessie.client.http.impl.jdk8.UrlConnectionRequest.executeRequest(UrlConnectionRequest.java:71)
... 42 more
REST 目录¶
要使用 REST 目录,请将目录类型配置为 iceberg.catalog.type=rest
。最小配置包括
connector.name=iceberg
iceberg.catalog.type=rest
iceberg.rest.uri=https://127.0.0.1:8181
REST 目录支持的额外属性
属性名称 |
描述 |
---|---|
|
REST API 端点 URI(必需)。例如: |
|
要使用的身份验证类型。可用值是 |
|
用于 OAUTH2 身份验证的凭据。例如: |
|
用于 OAUTH2 身份验证的 Bearer 令牌。例如: |
|
与 REST 目录通信时要使用的会话类型。可用值是 |
|
Iceberg 表的目录仓库根路径(可选)。例如: |
Hadoop 目录¶
要使用 Hadoop 目录,请将目录类型配置为 iceberg.catalog.type=hadoop
。最小配置包括
connector.name=iceberg
iceberg.catalog.type=hadoop
iceberg.catalog.warehouse=hdfs://hostname:port
Hadoop 目录配置属性
属性名称 |
描述 |
默认 |
---|---|---|
|
Iceberg 表的目录仓库根路径。 例如: |
|
|
要缓存的 Iceberg 目录数量。如果 |
|
配置属性¶
注意
Iceberg 连接器支持 Amazon S3 作为 Hive 连接器的配置选项。
以下配置属性可用于所有目录类型
属性名称 |
描述 |
默认 |
---|---|---|
|
Iceberg 表的目录类型。可用值为 |
|
|
Hadoop 配置资源的路径。 例如: |
|
|
Iceberg 表的存储文件格式。可用值为 |
|
|
写入文件时要使用的压缩编解码器。可用值为 |
|
|
每个写入器处理的最大分区数。 |
|
|
范围为 (0, 1] 的小数,用作分配给每个拆分的权重的最小值。较低的值可能会提高对具有小文件的表的性能。较高的值可能会提高高度倾斜聚合或联接的查询的性能。 |
|
|
启用读取使用合并读取进行更新的基本表。 |
|
|
启用时,将平等删除行过滤应用为与平等删除文件的数据的联接。 |
|
|
启用 parquet 解引用下推。 |
|
|
在选择统计信息时计算最接近快照时,总记录数差异的重要性。值为 1 表示单个记录等效于 1 毫秒的时间差异。 |
|
|
实验性:启用 Iceberg 的过滤器下推。这仅在使用 Native Worker 时受支持。 |
|
|
Iceberg 表中允许使用元数据优化查询的最大分区数。如果 Iceberg 表的分区数超过此阈值,则跳过元数据优化。 设置为 |
|
|
用于生成 Iceberg 拆分的线程数。 |
|
|
在当前元数据日志中保留的旧元数据文件最大数量。 |
|
|
设置为 |
|
表属性¶
表属性为基础表设置元数据。这对于 CREATE TABLE/CREATE TABLE AS 语句至关重要。表属性使用 WITH 子句传递给连接器
CREATE TABLE tablename
WITH (
property_name = property_value,
...
)
以下表属性可用,它们是 Presto Iceberg 连接器特有的
属性名称 |
描述 |
默认 |
---|---|---|
|
可选地指定表数据文件的格式,可以是 |
|
|
可选地指定表分区。如果表按列 |
|
|
可选地指定表的 FileSystem 位置 URI。 |
|
|
可选地指定要用于新表的 Iceberg 规范的格式版本,可以是 |
|
|
确定在出现并发 upsert 请求的情况下提交元数据的尝试次数,在失败之前。 |
|
|
可选地指定要用于新表的 Iceberg 规范的写删除模式,可以是 |
|
|
可选地指定在当前元数据日志中保留的旧元数据文件最大数量。 |
|
|
设置为 |
|
以下表定义指定格式 ORC
,按列 c1
和 c2
分区,以及 FileSystem 位置 s3://test_bucket/test_schema/test_table
CREATE TABLE test_table (
c1 bigint,
c2 varchar,
c3 double
)
WITH (
format = 'ORC',
partitioning = ARRAY['c1', 'c2'],
location = 's3://test_bucket/test_schema/test_table')
)
会话属性¶
会话属性为在给定会话中执行的查询设置行为更改。
属性名称 |
描述 |
---|---|
|
在当前会话中覆盖连接器属性 |
|
在当前会话中覆盖连接器属性 |
|
在当前会话中覆盖连接器属性 |
缓存支持¶
清单文件缓存¶
从 Iceberg 1.1.0 版本开始,Apache Iceberg 提供了一种机制,可以在内存中缓存 Iceberg 清单文件的内容。此功能有助于减少对来自远程存储的较小的 Iceberg 清单文件的重复读取。
注意
目前,清单文件缓存支持 Presto Iceberg 连接器中的 Hadoop 和 Nessie 目录。
以下配置属性可用
属性名称 |
描述 |
默认 |
---|---|---|
|
启用或禁用清单缓存功能。此功能仅在 |
|
|
要在目录中使用的自定义 FileIO 实现。必须设置它才能启用清单缓存。 |
|
|
缓存大小的最大值(以字节为单位)。 |
|
|
条目保留在清单缓存中的最大时间长度(以毫秒为单位)。 |
|
|
要考虑缓存的清单文件最大长度(以字节为单位)。长度超过此大小的清单文件不会被缓存。 |
|
Alluxio 数据缓存¶
Presto 工作器将其原始形式(压缩的,可能已加密)的远程存储数据缓存到本地 SSD 上,以供读取。
以下配置属性需要在 Iceberg 目录文件(catalog/iceberg.properties)中设置
cache.enabled=true
cache.base-directory=file:///mnt/flash/data
cache.type=ALLUXIO
cache.alluxio.max-cache-size=1600GB
hive.node-selection-strategy=SOFT_AFFINITY
用于获取指标并验证缓存使用的 JMX 查询
SELECT * FROM jmx.current."com.facebook.alluxio:name=client.cachehitrate,type=gauges";
SELECT * FROM jmx.current."com.facebook.alluxio:name=client.cachebytesreadcache,type=meters";
SHOW TABLES FROM jmx.current like '%alluxio%';
文件和条带页脚缓存¶
在叶子工作器内存中缓存打开的文件描述符和条带或文件页脚信息。这些数据在读取文件时最常被访问。
以下配置属性需要在 Iceberg 目录文件(catalog/iceberg.properties)中设置
# scheduling
hive.node-selection-strategy=SOFT_AFFINITY
# orc
iceberg.orc.file-tail-cache-enabled=true
iceberg.orc.file-tail-cache-size=100MB
iceberg.orc.file-tail-cache-ttl-since-last-access=6h
iceberg.orc.stripe-metadata-cache-enabled=true
iceberg.orc.stripe-footer-cache-size=100MB
iceberg.orc.stripe-footer-cache-ttl-since-last-access=6h
iceberg.orc.stripe-stream-cache-size=300MB
iceberg.orc.stripe-stream-cache-ttl-since-last-access=6h
# parquet
iceberg.parquet.metadata-cache-enabled=true
iceberg.parquet.metadata-cache-size=100MB
iceberg.parquet.metadata-cache-ttl-since-last-access=6h
用于获取指标并验证缓存使用的 JMX 查询
SELECT * FROM jmx.current."com.facebook.presto.hive:name=iceberg_parquetmetadata,type=cachestatsmbean";
元数据存储缓存¶
元数据存储缓存仅缓存模式、表和表统计信息。缓存在 tableCache 中的表对象仅用于读取表元数据位置和表属性,其余表元数据从文件系统/对象存储元数据位置获取。
注意
仅当 Presto Iceberg 连接器使用 Hive Catalog 时,元数据存储缓存才适用。
hive.metastore-cache-ttl=2d
hive.metastore-refresh-interval=3d
hive.metastore-cache-maximum-size=10000000
额外的隐藏元数据列¶
Iceberg 连接器公开额外的隐藏元数据列。您可以在 SQL 查询中将它们包含在 SELECT 语句中,作为查询的一部分。
$path
列¶
$path
: 此行的文件完整文件系统路径名称
SELECT "$path", regionkey FROM "ctas_nation";
$path | regionkey
---------------------------------+-----------
/full/path/to/file/file.parquet | 2
$data_sequence_number
列¶
$data_sequence_number
: 添加此行的 Iceberg 数据序列号
SELECT "$data_sequence_number", regionkey FROM "ctas_nation";
$data_sequence_number | regionkey
----------------------------------+------------
2 | 3
额外的隐藏元数据表¶
Iceberg 连接器公开额外的隐藏元数据表。您可以在 SQL 查询中将它们包含在 SELECT 语句中,作为查询的一部分。
$properties
表¶
$properties
: 给定表的通用属性
SELECT * FROM "ctas_nation$properties";
key | value
----------------------+---------
write.format.default | PARQUET
$history
表¶
$history
: 表状态更改的历史记录
SELECT * FROM "ctas_nation$history";
made_current_at | snapshot_id | parent_id | is_current_ancestor
--------------------------------------+---------------------+-----------+---------------------
2022-11-25 20:56:31.784 Asia/Kolkata | 7606232158543069775 | NULL | true
$snapshots
表¶
$snapshots
: 有关表快照的详细信息。有关更多信息,请参阅 Iceberg 表规范中的 快照。
SELECT * FROM "ctas_nation$snapshots";
committed_at | snapshot_id | parent_id | operation | manifest_list | summary
--------------------------------------+---------------------+-----------+-----------+----------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
2022-11-25 20:56:31.784 Asia/Kolkata | 7606232158543069775 | NULL | append | s3://my-bucket/ctas_nation/metadata/snap-7606232158543069775-1-395a2cad-b244-409b-b030-cc44949e5a4e.avro | {changed-partition-count=1, added-data-files=1, total-equality-deletes=0, added-records=25, total-position-deletes=0, added-files-size=1648, total-delete-files=0, total-files-size=1648, total-records=25, total-data-files=1}
$manifests
表¶
$manifests
: 有关不同表快照清单的详细信息。有关更多信息,请参阅 Iceberg 表规范中的 清单。
SELECT * FROM "ctas_nation$manifests";
path | length | partition_spec_id | added_snapshot_id | added_data_files_count | existing_data_files_count | deleted_data_files_count | partitions
---------------------------------------------------------------------------------+--------+-------------------+---------------------+------------------------+---------------------------+--------------------------+-----------
s3://my-bucket/ctas_nation/metadata/395a2cad-b244-409b-b030-cc44949e5a4e-m0.avro | 5957 | 0 | 7606232158543069775 | 1 | 0 | 0 | []
$partitions
表¶
$partitions
: 表的详细分区信息
SELECT * FROM "ctas_nation$partitions";
row_count | file_count | total_size | nationkey | name | regionkey | comment
-----------+------------+------------+-------------------------------+------------------------------------------+------------------------------+------------------------------------------------------------
25 | 1 | 1648 | {min=0, max=24, null_count=0} | {min=ALGERIA, max=VIETNAM, null_count=0} | {min=0, max=4, null_count=0} | {min= haggle. careful, max=y final packaget, null_count=0}
$files
表¶
$files
: 表当前快照中数据文件的概述
SELECT * FROM "ctas_nation$files";
content | file_path | file_format | record_count | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids
---------+------------------------------------------------------------------------------+-------------+--------------+--------------------+-----------------------------+--------------------------+----------------------+------------------+-------------------------------------------+--------------------------------------------+--------------+---------------+-------------
0 | s3://my-bucket/ctas_nation/data/9f889274-6f74-4d28-8164-275eef99f660.parquet | PARQUET | 25 | 1648 | {1=52, 2=222, 3=105, 4=757} | {1=25, 2=25, 3=25, 4=25} | {1=0, 2=0, 3=0, 4=0} | NULL | {1=0, 2=ALGERIA, 3=0, 4= haggle. careful} | {1=24, 2=VIETNAM, 3=4, 4=y final packaget} | NULL | NULL | NULL
$changelog
表¶
此表使您可以查看随着时间的推移,以特定顺序对表进行了哪些行级更改。 $changelog
表表示对表的更改历史记录,同时还使数据可供通过查询进行处理。
更改日志查询的结果始终返回具有四个列的静态模式
operation
: (VARCHAR
) 指示行是插入、更新还是删除。ordinal
: (int
) 一个数字,表示相对于所有其他更改,需要将特定更改应用于表的相对顺序。snapshotid
: (bigint
) 表示进行行级更改的快照。rowdata
: (row(T)
) 包括特定行的數據。此类型的内部值与父表的模式匹配。
可以使用以下名称格式查询更改日志表
... FROM "<table>[@<begin snapshot ID>]$changelog[@<end snapshot ID>]"
<table>
是表的名称。<begin snapshot ID>
是您要开始查看更改的表的快照。此参数是可选的。如果不存在,则使用最旧的可用快照。<end snapshot ID>
是您要查看更改的最后一个快照。此参数是可选的。如果不存在,则使用表的最新快照。
$changelog
表的一个用途是查找何时将记录插入或从表中删除。为此,可以将 $changelog
表与 $snapshots
表结合使用。首先,从 $snapshots
表中选择快照 ID 以选择起点。
SELECT * FROM "orders$snapshots";
committed_at | snapshot_id | parent_id | operation | manifest_list | summary
---------------------------------------------+---------------------+---------------------+-----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
2023-09-26 08:45:20.930 America/Los_Angeles | 2423571386296047175 | NULL | append | file:/var/folders/g_/6_hxl7r16qdddw7956j_r88h0000gn/T/PrestoTest8140889264166671718/catalog/tpch/ctas_orders/metadata/snap-2423571386296047175-1-3f288b1c-95a9-406b-9e17-9cfe31a11b48.avro | {changed-partition-count=1, added-data-files=4, total-equality-deletes=0, added-records=100, total-position-deletes=0, added-files-size=9580, total-delete-files=0, total-files-size=9580, total-records=100, total-data-files=4}
2023-09-26 08:45:36.942 America/Los_Angeles | 8702997868627997320 | 2423571386296047175 | append | file:/var/folders/g_/6_hxl7r16qdddw7956j_r88h0000gn/T/PrestoTest8140889264166671718/catalog/tpch/ctas_orders/metadata/snap-8702997868627997320-1-a2e1c714-7eed-4e2c-b144-dae4147ebaa4.avro | {changed-partition-count=1, added-data-files=1, total-equality-deletes=0, added-records=1, total-position-deletes=0, added-files-size=1687, total-delete-files=0, total-files-size=11267, total-records=101, total-data-files=5}
2023-09-26 08:45:39.866 America/Los_Angeles | 7615903782581283889 | 8702997868627997320 | append | file:/var/folders/g_/6_hxl7r16qdddw7956j_r88h0000gn/T/PrestoTest8140889264166671718/catalog/tpch/ctas_orders/metadata/snap-7615903782581283889-1-d94c2114-fd22-4de2-9ab5-c0b5bf67282f.avro | {changed-partition-count=1, added-data-files=3, total-equality-deletes=0, added-records=3, total-position-deletes=0, added-files-size=4845, total-delete-files=0, total-files-size=16112, total-records=104, total-data-files=8}
2023-09-26 08:45:48.404 America/Los_Angeles | 677209275408372885 | 7615903782581283889 | append | file:/var/folders/g_/6_hxl7r16qdddw7956j_r88h0000gn/T/PrestoTest8140889264166671718/catalog/tpch/ctas_orders/metadata/snap-677209275408372885-1-ad69e208-1440-459b-93e8-48e61f961758.avro | {changed-partition-count=1, added-data-files=3, total-equality-deletes=0, added-records=5, total-position-deletes=0, added-files-size=4669, total-delete-files=0, total-files-size=20781, total-records=109, total-data-files=11}
现在我们知道了可以在更改日志中查询的快照,我们可以看到自表创建以来对表进行了哪些更改。具体来说,此示例使用最早的快照 ID: 2423571386296047175
SELECT * FROM "ctas_orders@2423571386296047175$changelog" ORDER BY ordinal;
operation | ordinal | snapshotid | rowdata
-----------+---------+---------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
INSERT | 0 | 8702997868627997320 | {orderkey=37504, custkey=1291, orderstatus=O, totalprice=165509.83, orderdate=1996-03-04, orderpriority=5-LOW, clerk=Clerk#000000871, shippriority=0, comment=c theodolites alongside of the fluffily bold requests haggle quickly against }
INSERT | 1 | 7615903782581283889 | {orderkey=12001, custkey=739, orderstatus=F, totalprice=138635.75, orderdate=1994-07-07, orderpriority=2-HIGH, clerk=Clerk#000000863, shippriority=0, comment=old, even theodolites. regular, special theodolites use furio}
INSERT | 1 | 7615903782581283889 | {orderkey=17989, custkey=364, orderstatus=F, totalprice=133669.05, orderdate=1994-01-17, orderpriority=4-NOT SPECIFIED, clerk=Clerk#000000547, shippriority=0, comment=ously express excuses. even theodolit}
INSERT | 1 | 7615903782581283889 | {orderkey=37504, custkey=1291, orderstatus=O, totalprice=165509.83, orderdate=1996-03-04, orderpriority=5-LOW, clerk=Clerk#000000871, shippriority=0, comment=c theodolites alongside of the fluffily bold requests haggle quickly against }
INSERT | 2 | 677209275408372885 | {orderkey=17991, custkey=92, orderstatus=O, totalprice=20732.51, orderdate=1998-07-09, orderpriority=4-NOT SPECIFIED, clerk=Clerk#000000636, shippriority=0, comment= the quickly express accounts. iron}
INSERT | 2 | 677209275408372885 | {orderkey=17989, custkey=364, orderstatus=F, totalprice=133669.05, orderdate=1994-01-17, orderpriority=4-NOT SPECIFIED, clerk=Clerk#000000547, shippriority=0, comment=ously express excuses. even theodolit}
INSERT | 2 | 677209275408372885 | {orderkey=17990, custkey=458, orderstatus=O, totalprice=218031.58, orderdate=1998-03-18, orderpriority=3-MEDIUM, clerk=Clerk#000000340, shippriority=0, comment=ounts wake final foxe}
INSERT | 2 | 677209275408372885 | {orderkey=18016, custkey=403, orderstatus=O, totalprice=174070.99, orderdate=1996-03-19, orderpriority=1-URGENT, clerk=Clerk#000000629, shippriority=0, comment=ly. quickly ironic excuses are furiously. carefully ironic pack}
INSERT | 2 | 677209275408372885 | {orderkey=18017, custkey=958, orderstatus=F, totalprice=203091.02, orderdate=1993-03-26, orderpriority=1-URGENT, clerk=Clerk#000000830, shippriority=0, comment=sleep quickly bold requests. slyly pending pinto beans haggle in pla}
过程¶
使用 CALL 语句执行数据操作或管理任务。过程在目录的 system
模式中可用。
注册表¶
可以在目录中注册表数据和元数据已存在于文件系统中的 Iceberg 表。在目录的 system
模式上使用 register_table
过程,并提供目标模式、所需的表名以及表元数据的存储位置
CALL iceberg.system.register_table('schema_name', 'table_name', 'hdfs://127.0.0.1:9000/path/to/iceberg/table/metadata/dir')
注意
如果在指定位置存在多个同一版本的元数据文件,则使用修改时间最新的那个。
可以选择将元数据文件作为参数包含在 register_table
中,其中特定元数据文件包含目标表状态
CALL iceberg.system.register_table('schema_name', 'table_name', 'hdfs://127.0.0.1:9000/path/to/iceberg/table/metadata/dir', '00000-35a08aed-f4b0-4010-95d2-9d73ef4be01c.metadata.json')
注意
Iceberg REST 目录可能不支持表注册,具体取决于后备目录的类型。
注意
在使用 Hive 元数据存储注册表时,调用该过程的用户将被设置为表的拥有者,并对该表具有 SELECT
、INSERT
、UPDATE
和 DELETE
权限。可以使用 GRANT
和 REVOKE
命令更改这些权限。
注意
在使用 Hive 目录时,尝试使用 Hive 连接器读取已注册的 Iceberg 表将失败。
注销表¶
可以使用目录的 system
模式上的 unregister_table
过程从目录注销 Iceberg 表
CALL iceberg.system.unregister_table('schema_name', 'table_name')
注意
仅当使用 Hive 目录时,在调用 unregister_table
后,表数据和元数据将保留在文件系统中。这类似于为 DROP TABLE 命令列出的行为。
回滚到快照¶
将表回滚到特定快照 ID。Iceberg 可以通过在 Iceberg 的 system
模式上使用 rollback_to_snapshot
过程来回滚到特定快照 ID
CALL iceberg.system.rollback_to_snapshot('table_name', 'snapshot_id');
以下参数可用
参数名称 |
必需 |
类型 |
描述 |
---|---|---|---|
|
✔️ |
string |
要更新的表的名称 |
|
✔️ |
long |
要回滚到的快照 ID |
到期快照¶
Iceberg 中的每个 DML(数据操作语言)操作都会生成一个新的快照,同时保留旧数据和元数据以进行快照隔离和时间旅行。使用 expire_snapshots 删除旧的快照及其文件。
此过程删除旧快照及其相应的文件,并且永远不会删除非过期快照所需的任何文件。
以下参数可用
参数名称 |
必需 |
类型 |
描述 |
---|---|---|---|
|
✔️ |
string |
要更新的表的模式 |
|
✔️ |
string |
要更新的表的名称 |
|
timestamp |
在此时间戳之前将删除快照(默认值:5 天前) |
|
|
int |
无论 older_than 如何,都要保留的祖先快照数量(默认为 1) |
|
|
array of long |
要过期的快照 ID 数组 |
示例
删除早于特定日期和时间的快照,但保留最近 10 个快照
CALL iceberg.system.expire_snapshots('schema_name', 'table_name', TIMESTAMP '2023-08-31 00:00:00.000', 10);
删除快照 ID 为 10001 和 10002 的快照(请注意,这些快照 ID 不应是当前快照)
CALL iceberg.system.expire_snapshots(schema => 'schema_name', table_name => 'table_name', snapshot_ids => ARRAY[10001, 10002]);
删除孤立文件¶
用于删除任何 Iceberg 表的元数据文件中未引用的文件。
以下参数可用
参数名称 |
必需 |
类型 |
描述 |
---|---|---|---|
|
✔️ |
string |
要清理的表的模式 |
|
✔️ |
string |
要清理的表的名称 |
|
timestamp |
删除在此时间戳之前创建的孤立文件(默认值:3 天前) |
示例
删除任何对表 db.sample 不知晓且早于指定时间戳的文件
CALL iceberg.system.remove_orphan_files('db', 'sample', TIMESTAMP '2023-08-31 00:00:00.000');
删除任何对表 db.sample 不知晓且在 3 天前(默认情况下)创建的文件
CALL iceberg.system.remove_orphan_files(schema => 'db', table_name => 'sample');
SQL 支持¶
Iceberg 连接器支持查询和操作 Iceberg 表和模式(数据库)。以下是 Presto 支持的 SQL 操作的一些示例
创建模式¶
创建一个名为 web
的新 Iceberg 模式,该模式将表存储在名为 my-bucket
的 S3 存储桶中
CREATE SCHEMA iceberg.web
WITH (location = 's3://my-bucket/')
创建表¶
在 web
模式中创建一个名为 page_views
的新 Iceberg 表,该表使用 ORC 文件格式存储,并按 ds
和 country
进行分区
CREATE TABLE iceberg.web.page_views (
view_time timestamp,
user_id bigint,
page_url varchar,
ds date,
country varchar
)
WITH (
format = 'ORC',
partitioning = ARRAY['ds', 'country']
)
创建一个使用 Iceberg 格式版本 2 的 Iceberg 表
CREATE TABLE iceberg.web.page_views_v2 (
view_time timestamp,
user_id bigint,
page_url varchar,
ds date,
country varchar
)
WITH (
format = 'ORC',
partitioning = ARRAY['ds', 'country'],
format_version = '2'
)
分区列转换¶
除了选择某些特定列进行分区外,您还可以使用 transform
函数并按列的转换值对表进行分区。
Presto Iceberg 连接器中的可用转换包括
Bucket
(使用哈希函数将数据分区到指定数量的桶中)Truncate
(根据字段的截断值对表进行分区,并且可以指定截断值的宽度)Identity
(使用未修改的源值对数据进行分区)Year
(使用整数值对数据进行分区,方法是提取日期或时间戳年份,作为 1970 年以来的年份)Month
(使用整数进行数据分区,通过提取日期或时间戳的月份,以 1970-01-01 后的月份数表示)Day
(使用整数进行数据分区,通过提取日期或时间戳的日期,以 1970-01-01 后的天数表示)Hour
(使用整数进行数据分区,通过提取时间戳的小时,以 1970-01-01 00:00:00 后的小时数表示)
创建一个 Iceberg 表,将其分区为 8 个大小相等的桶
CREATE TABLE players (
id int,
name varchar,
team varchar
)
WITH (
format = 'ORC',
partitioning = ARRAY['bucket(team, 8)']
);
创建一个 Iceberg 表,根据 team
字段的首字母进行分区
CREATE TABLE players (
id int,
name varchar,
team varchar
)
WITH (
format = 'ORC',
partitioning = ARRAY['truncate(team, 1)']
);
创建一个 Iceberg 表,根据 ds
进行分区
CREATE TABLE players (
id int,
name varchar,
team varchar,
ds date
)
WITH (
format = 'ORC',
partitioning = ARRAY['year(ds)']
);
创建一个 Iceberg 表,根据 ts
进行分区
CREATE TABLE players (
id int,
name varchar,
team varchar,
ts timestamp
)
WITH (
format = 'ORC',
partitioning = ARRAY['hour(ts)']
);
CREATE VIEW¶
Iceberg 连接器支持在 Hive 和 Glue 元数据存储中创建视图。要为在 CREATE TABLE 示例中创建的 iceberg.web.page_views
表创建名为 view_page_views
的视图
CREATE VIEW iceberg.web.view_page_views AS SELECT user_id, country FROM iceberg.web.page_views;
INSERT INTO¶
将数据插入 page_views
表
INSERT INTO iceberg.web.page_views VALUES(TIMESTAMP '2023-08-12 03:04:05.321', 1, 'https://example.com', current_date, 'country');
CREATE TABLE AS SELECT¶
从现有表 page_views
创建一个新的表 page_views_new
CREATE TABLE iceberg.web.page_views_new AS SELECT * FROM iceberg.web.page_views
SELECT¶
连接器支持对 Iceberg 格式版本 1 和版本 2 进行 SELECT 表操作
SELECT * FROM iceberg.web.page_views;
SELECT * FROM iceberg.web.page_views_v2;
包含删除文件的表¶
Iceberg V2 表支持行级删除。有关更多信息,请参阅 Iceberg 表规范中的 行级删除。Presto 支持读取删除文件,包括位置删除文件和相等性删除文件。在读取时,Presto 会合并这些删除文件以读取最新的结果。
ALTER TABLE¶
Iceberg 连接器支持 ALTER TABLE 操作
ALTER TABLE iceberg.web.page_views ADD COLUMN zipcode VARCHAR;
ALTER TABLE iceberg.web.page_views RENAME COLUMN zipcode TO location;
ALTER TABLE iceberg.web.page_views DROP COLUMN location;
要将新列添加为分区列,请标识该列的转换函数。表将按该列的转换值进行分区
ALTER TABLE iceberg.web.page_views ADD COLUMN zipcode VARCHAR WITH (partitioning = 'identity');
ALTER TABLE iceberg.web.page_views ADD COLUMN location VARCHAR WITH (partitioning = 'truncate(2)');
ALTER TABLE iceberg.web.page_views ADD COLUMN location VARCHAR WITH (partitioning = 'bucket(8)');
ALTER TABLE iceberg.web.page_views ADD COLUMN dt date WITH (partitioning = 'year');
ALTER TABLE iceberg.web.page_views ADD COLUMN ts timestamp WITH (partitioning = 'month');
ALTER TABLE iceberg.web.page_views ADD COLUMN dt date WITH (partitioning = 'day');
ALTER TABLE iceberg.web.page_views ADD COLUMN ts timestamp WITH (partitioning = 'hour');
TRUNCATE¶
Iceberg 连接器可以使用 TRUNCATE TABLE
从表中删除所有数据,而不会从元数据目录中删除表。
TRUNCATE TABLE nation;
TRUNCATE TABLE;
SELECT * FROM nation;
nationkey | name | regionkey | comment
-----------+------+-----------+---------
(0 rows)
DELETE¶
Iceberg 连接器可以使用 DELETE FROM
从表中删除数据。例如,要从表 lineitem
中删除数据
DELETE FROM lineitem;
DELETE FROM lineitem WHERE linenumber = 1;
DELETE FROM lineitem WHERE linenumber not in (1, 3, 5, 7) and linestatus in ('O', 'F');
注意
过滤后的列仅支持比较运算符,例如 EQUALS、LESS THAN 或 LESS THAN EQUALS。
删除必须仅发生在最新的快照中。
对于 V1 表,Iceberg 连接器只能删除一个或多个完整分区中的数据。过滤器中的列必须全部是目标表的身份转换分区列。
DROP TABLE¶
删除表 page_views
DROP TABLE iceberg.web.page_views
使用 Hive Metastore 和 Glue 目录删除 Iceberg 表只会从元数据存储中删除元数据。
使用 Hadoop 和 Nessie 目录删除 Iceberg 表会删除表中的所有数据和元数据。
DROP VIEW¶
删除视图 view_page_views
DROP VIEW iceberg.web.view_page_views;
DROP SCHEMA¶
删除架构 iceberg.web
DROP SCHEMA iceberg.web
SHOW CREATE TABLE¶
使用 SHOW CREATE TABLE
显示创建指定 Iceberg 表的 SQL 语句。
例如,从分区 Iceberg 表 customer
中 SHOW CREATE TABLE
SHOW CREATE TABLE customer;
CREATE TABLE iceberg.tpch_iceberg.customer (
"custkey" bigint,
"name" varchar,
"address" varchar,
"nationkey" bigint,
"phone" varchar,
"acctbal" double,
"mktsegment" varchar,
"comment" varchar
)
WITH (
delete_mode = 'copy-on-write',
format = 'PARQUET',
format_version = '2',
location = 's3a://tpch-iceberg/customer',
partitioning = ARRAY['mktsegment']
)
(1 row)
SHOW CREATE TABLE
从非分区 Iceberg 表 region
SHOW CREATE TABLE region;
CREATE TABLE iceberg.tpch_iceberg.region (
"regionkey" bigint,
"name" varchar,
"comment" varchar
)
WITH (
delete_mode = 'copy-on-write',
format = 'PARQUET',
format_version = '2',
location = 's3a://tpch-iceberg/region'
)
(1 row)
SHOW COLUMNS¶
使用 SHOW COLUMNS
列出表中的列及其数据类型和其他属性。
例如,从分区 Iceberg 表 customer
中 SHOW COLUMNS
SHOW COLUMNS FROM customer;
Column | Type | Extra | Comment
------------+---------+---------------+---------
custkey | bigint | |
name | varchar | |
address | varchar | |
nationkey | bigint | |
phone | varchar | |
acctbal | double | |
mktsegment | varchar | partition key |
comment | varchar | |
(8 rows)
SHOW COLUMNS
从非分区 Iceberg 表 region
SHOW COLUMNS FROM region;
Column | Type | Extra | Comment
-----------+---------+-------+---------
regionkey | bigint | |
name | varchar | |
comment | varchar | |
(3 rows)
DESCRIBE¶
使用 DESCRIBE
列出表中的列及其数据类型和其他属性。 DESCRIBE
是 SHOW COLUMNS
的别名。
例如,从分区 Iceberg 表 customer
中 DESCRIBE
DESCRIBE customer;
Column | Type | Extra | Comment
------------+---------+---------------+---------
custkey | bigint | |
name | varchar | |
address | varchar | |
nationkey | bigint | |
phone | varchar | |
acctbal | double | |
mktsegment | varchar | partition key |
comment | varchar | |
(8 rows)
DESCRIBE
从非分区 Iceberg 表 region
DESCRIBE region;
Column | Type | Extra | Comment
-----------+---------+-------+---------
regionkey | bigint | |
name | varchar | |
comment | varchar | |
(3 rows)
架构演变¶
Iceberg 和 Presto Iceberg 连接器支持就地表演变,也称为架构演变,例如添加、删除和重命名列。使用架构演变,用户可以在启用 Presto Iceberg 连接器后使用 SQL 演变表架构。
Parquet 编写器版本¶
Presto 现在支持 Iceberg 目录的 Parquet 编写器版本 V1 和 V2。可以使用会话属性 parquet_writer_version
和配置属性 hive.parquet.writer.version
切换它。这些属性的有效值为 PARQUET_1_0
和 PARQUET_2_0
。默认值为 PARQUET_1_0
。
示例查询¶
让我们创建一个名为 ctas_nation 的 Iceberg 表,该表由 TPCH nation 表创建。该表有四个列:nationkey、name、regionkey 和 comment。
USE iceberg.tpch;
CREATE TABLE IF NOT EXISTS ctas_nation AS (SELECT * FROM nation);
DESCRIBE ctas_nation;
Column | Type | Extra | Comment
-----------+---------+-------+---------
nationkey | bigint | |
name | varchar | |
regionkey | bigint | |
comment | varchar | |
(4 rows)
我们可以简单地使用 ALTER TABLE 语句将新列添加到 Iceberg 表中。以下查询将一个名为 zipcode 的新列添加到表中。
ALTER TABLE ctas_nation ADD COLUMN zipcode VARCHAR;
DESCRIBE ctas_nation;
Column | Type | Extra | Comment
-----------+---------+-------+---------
nationkey | bigint | |
name | varchar | |
regionkey | bigint | |
comment | varchar | |
zipcode | varchar | |
(5 rows)
我们还可以将新列重命名为另一个名称 address
ALTER TABLE ctas_nation RENAME COLUMN zipcode TO address;
DESCRIBE ctas_nation;
Column | Type | Extra | Comment
-----------+---------+-------+---------
nationkey | bigint | |
name | varchar | |
regionkey | bigint | |
comment | varchar | |
address | varchar | |
(5 rows)
最后,我们可以删除新列。表列将恢复到原始状态。
ALTER TABLE ctas_nation DROP COLUMN address;
DESCRIBE ctas_nation;
Column | Type | Extra | Comment
-----------+---------+-------+---------
nationkey | bigint | |
name | varchar | |
regionkey | bigint | |
comment | varchar | |
(4 rows)
时间旅行¶
Iceberg 和 Presto Iceberg 连接器通过表快照(由唯一的快照 ID 标识)支持时间旅行。快照 ID 存储在 $snapshots
元数据表中。您可以将表的狀態回滚到以前的快照 ID。它还支持使用 VERSION (SYSTEM_VERSION) 和 TIMESTAMP (SYSTEM_TIME) 选项进行时间旅行查询。
示例查询¶
与 架构演变 中的示例查询类似,从 TPCH nation 表创建一个名为 ctas_nation 的 Iceberg 表
USE iceberg.tpch;
CREATE TABLE IF NOT EXISTS ctas_nation AS (SELECT * FROM nation);
DESCRIBE ctas_nation;
Column | Type | Extra | Comment
-----------+---------+-------+---------
nationkey | bigint | |
name | varchar | |
regionkey | bigint | |
comment | varchar | |
(4 rows)
我们可以从 $snapshots 元数据表中找到 Iceberg 表的快照 ID。
SELECT snapshot_id FROM iceberg.tpch."ctas_nation$snapshots" ORDER BY committed_at;
snapshot_id
---------------------
5837462824399906536
(1 row)
现在,由于我们刚刚创建了表,所以只有一个快照 ID。让我们将一行插入表中,看看快照 ID 的变化。
INSERT INTO ctas_nation VALUES(25, 'new country', 1, 'comment');
SELECT snapshot_id FROM iceberg.tpch."ctas_nation$snapshots" ORDER BY committed_at;
snapshot_id
---------------------
5837462824399906536
5140039250977437531
(2 rows)
现在,由于将新行插入表中,所以创建了一个新的快照 (5140039250977437531)。可以通过运行以下命令验证新行
SELECT * FROM ctas_nation WHERE name = 'new country';
nationkey | name | regionkey | comment
-----------+-------------+-----------+---------
25 | new country | 1 | comment
(1 row)
使用时间旅行功能,我们可以通过调用 iceberg.system.rollback_to_snapshot 回滚到没有新行的先前状态
CALL iceberg.system.rollback_to_snapshot('tpch', 'ctas_nation', 5837462824399906536);
现在,如果我们再次检查该表,我们会发现新插入的行不再存在,因为我们已经回滚到了先前状态。
SELECT * FROM ctas_nation WHERE name = 'new country';
nationkey | name | regionkey | comment
-----------+------+-----------+---------
(0 rows)
使用 VERSION (SYSTEM_VERSION) 和 TIMESTAMP (SYSTEM_TIME) 进行时间旅行¶
使用 Iceberg 连接器访问表的歷史数据。您可以查看表在某个时间点的样子,即使数据自那时起已更改或删除。
// snapshot ID 5300424205832769799
INSERT INTO ctas_nation VALUES(10, 'united states', 1, 'comment');
// snapshot ID 6891257133877048303
INSERT INTO ctas_nation VALUES(20, 'canada', 2, 'comment');
// snapshot ID 705548372863208787
INSERT INTO ctas_nation VALUES(30, 'mexico', 3, 'comment');
// snapshot ID for first record
SELECT * FROM ctas_nation FOR VERSION AS OF 5300424205832769799;
// snapshot ID for first record using SYSTEM_VERSION
SELECT * FROM ctas_nation FOR SYSTEM_VERSION AS OF 5300424205832769799;
nationkey | name | regionkey | comment
-----------+---------------+-----------+---------
10 | united states | 1 | comment
(1 row)
// snapshot ID for second record using BEFORE clause to retrieve previous state
SELECT * FROM ctas_nation FOR SYSTEM_VERSION BEFORE 6891257133877048303;
nationkey | name | regionkey | comment
-----------+---------------+-----------+---------
10 | united states | 1 | comment
(1 row)
在上面的示例中,SYSTEM_VERSION 可用作 VERSION 的别名。
您可以使用 FOR TIMESTAMP AS OF TIMESTAMP 访问表的歷史数据。该查询使用最接近指定时间戳的表快照返回表的狀態。在此示例中,SYSTEM_TIME 可用作 TIMESTAMP 的别名。
// In following query, timestamp string is matching with second inserted record.
SELECT * FROM ctas_nation FOR TIMESTAMP AS OF TIMESTAMP '2023-10-17 13:29:46.822 America/Los_Angeles';
// Same example using SYSTEM_TIME as an alias for TIMESTAMP
SELECT * FROM ctas_nation FOR SYSTEM_TIME AS OF TIMESTAMP '2023-10-17 13:29:46.822 America/Los_Angeles';
nationkey | name | regionkey | comment
-----------+---------------+-----------+---------
10 | united states | 1 | comment
20 | canada | 2 | comment
(2 rows)
FOR TIMESTAMP AS OF 后面的选项可以接受任何返回带有时区值的 timestamp 的表达式。例如,TIMESTAMP ‘2023-10-17 13:29:46.822 America/Los_Angeles’ 是表达式的常量字符串。在以下查询中,表达式 CURRENT_TIMESTAMP 返回带有时区值的当前时间戳。
SELECT * FROM ctas_nation FOR TIMESTAMP AS OF CURRENT_TIMESTAMP;
nationkey | name | regionkey | comment
-----------+---------------+-----------+---------
10 | united states | 1 | comment
20 | canada | 2 | comment
30 | mexico | 3 | comment
(3 rows)
// In following query, timestamp string is matching with second inserted record.
// BEFORE clause returns first record which is less than timestamp of the second record.
SELECT * FROM ctas_nation FOR TIMESTAMP BEFORE TIMESTAMP '2023-10-17 13:29:46.822 America/Los_Angeles';
nationkey | name | regionkey | comment
-----------+---------------+-----------+---------
10 | united states | 1 | comment
(1 row)
类型映射¶
PrestoDB 和 Iceberg 具有对方不支持的数据类型。在使用 Iceberg 读取或写入数据时,Presto 会将每个 Iceberg 数据类型更改为相应的 Presto 数据类型,并将每个 Presto 数据类型更改为可比较的 Iceberg 数据类型。以下表格详细说明了 PrestoDB 和 Iceberg 之间的特定类型映射。
Iceberg 到 PrestoDB 类型映射¶
Iceberg 类型与相关 PrestoDB 类型映射
Iceberg 类型 |
PrestoDB 类型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
不支持其他类型。
PrestoDB 到 Iceberg 类型映射¶
PrestoDB 类型与相关 Iceberg 类型映射
PrestoDB 类型 |
Iceberg 类型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
不支持其他类型。