Presto Worker REST API¶
Presto 的协调器与 Presto worker 通信以执行查询片段并获取查询结果。Presto worker 之间互相通信以交换中间结果。本章档介绍了这些通信中使用的 REST API。
任务资源用于启动查询片段的执行、跟踪状态并获取结果。
控制平面¶
以下 HTTP 方法由协调器用于启动查询片段的执行并跟踪执行状态。
对
/v1/task/{taskId}
的POST
请求启动在POST
请求体中指定的查询片段的执行。请求可选地包含一组要处理的初始拆分。请求还指定如何对结果进行分区,例如,使用指定的输出列对指定数量的输出缓冲区进行哈希分区,或将所有结果组合到一个输出缓冲区中,或将组合结果广播到多个输出缓冲区中。后续对
/v1/task/{taskId}
的POST
请求可以提供更多要处理的拆分,并最终指定不再有新的拆分。对
/v1/task/{taskId}/status
的GET
请求返回一个TaskStatus
JSON 文档,描述当前执行状态。对
/v1/task/{taskId}
的GET
请求返回一个TaskInfo
JSON 文档,包含有关执行状态的扩展信息。对
/v1/task/{taskId}
的DELETE
请求删除已完成的任务或取消正在进行的任务。对
/v1/task
的GET
请求返回一个 JSON 文档,包含所有任务的TaskInfo
列表。
协调器发出的状态请求包含两个 HTTP 标头:X-Presto-Current-State
和 X-Presto-Max-Wait
。 X-Presto-Current-State
指定协调器已知的任务状态。如果 worker 上的任务状态不同,worker 将立即回复。如果 worker 上的任务状态与协调器上相同,worker 将等待任务状态更改后再回复。 X-Presto-Max-Wait
HTTP 标头指定最大等待时间。即使任务状态保持不变,worker 也将在该时间后回复。
该设计确保协调器能够及时收到任务状态变化,而无需以紧密循环的方式轮询 worker。
相同的设计也适用于通过对 /v1/task/{taskId}
的 GET
请求获取扩展任务信息。
数据平面¶
以下 HTTP 方法由协调器用于获取最终查询结果,或由下游 worker 用于从上游 worker 获取中间结果。
对
{taskId}/results/{bufferId}/{token}
的GET
请求从指定的输出缓冲区返回下一批结果。确认已收到前一批结果。对
{taskId}/results/{bufferId}/{token}/acknowledge
的GET
请求确认已收到结果,并允许 worker 删除它们。对
{taskId}/results/{bufferId}
的DELETE
请求删除指定输出缓冲区中的所有结果(在发生错误的情况下)。可选地,可以对
{taskId}/results/{bufferId}
发出HEAD
请求以检索任何与非数据页面序列相关的标头。使用此方法检查缓冲区是否已完成,或查看缓冲了多少数据(无需获取数据)。确认已收到前一批结果。
协调器和 worker 以块的形式获取结果。它们使用 X-Presto-Max-Size
HTTP 标头指定块的最大字节数。每个块都由一个单调递增的序列号标识,有时称为令牌。对结果的第一个请求指定序列号零。响应包含
请求的序列号,作为
X-Presto-Page-Sequence-Id
HTTP 标头。用于确认已收到块并请求下一个块的序列号,作为
X-Presto-Page-End-Sequence-Id
HTTP 标头。表示没有更多结果,作为
X-Presto-Buffer-Complete
HTTP 标头,其值为true
。输出缓冲区中剩余的缓冲字节数,作为
X-Presto-Buffer-Remaining-Bytes
HTTP 标头。这应该返回一个逗号分隔的列表,表示可以在下一次请求中返回的页面的字节数。这可以作为上游任务优化数据交换的提示。
响应的主体包含 序列化页面线格式 中的一系列页面。
收到第一块结果后,客户端使用 X-Presto-Page-End-Sequence-Id
序列号来请求下一块结果。请求下一块会自动确认已收到前一块。客户端会一直获取结果,直到收到 X-Presto-Buffer-Complete
HTTP 标头,其值为 true
。
当客户端决定不立即获取下一块数据时,它会使用对 {taskId}/results/{bufferId}/{token}/acknowledge
的 GET 请求发送显式确认。客户端将令牌设置为先前收到的 X-Presto-Page-End-Sequence-Id
标头的值。
如果 worker 超时填充响应,或者任务已失败或已中止,worker 将返回空结果。客户端可以尝试重试请求。在任务处于终端状态的情况下,假设控制平面最终将处理状态更改。
如果客户端错过了响应,它可以重复请求,worker 将再次发送结果。在收到对序列号的确认后,worker 将删除所有序列号小于该序列号的结果,客户端将无法再次获取这些结果。
下面是一个示例消息传递图表,用于从输出缓冲区零获取两块结果。
输出缓冲区¶
数据混洗涉及下游阶段的 worker 从上游阶段的 worker 获取结果。每个生成上游 worker 都将建立与下游阶段中 worker 数量相同的输出缓冲区。输出缓冲区由从零开始的连续数字标识。每个下游 worker 都分配了一个输出缓冲区,它使用该缓冲区从所有上游 worker 获取结果。
下图显示了 3 个下游 worker。这些 worker 分别分配了输出缓冲区编号 0、1 和 2。每个上游 worker 都有 3 个输出缓冲区。下游 worker #0 使用缓冲区编号 0 从所有上游 worker 获取结果。下游 worker #1 使用缓冲区编号 1 从所有上游 worker 获取结果。下游 worker #2 使用缓冲区编号 2 从所有上游 worker 获取结果。
故障处理¶
任务失败通过 TaskStatus
和 TaskInfo
更新报告给协调器。
当发现任务失败时,协调器会中止所有剩余的任务,并向客户端报告查询失败。当发生任务失败或收到中止请求时,所有后续处理将停止,所有剩余的任务输出将被丢弃。
失败或中止的任务将继续像往常一样响应数据平面请求,以防止级联故障。由于输出在失败时被完全丢弃,所有后续响应都为空。 X-Presto-Buffer-Complete
标头被设置为 false
以防止下游任务成功完成并生成错误的结果。
对于客户端来说,这些响应与健康任务的响应没有区别。为了避免请求突发,会在使用空结果集响应之前应用标准延迟。
诊断问题¶
HTTP 请求日志记录有助于诊断与协议相关的問題。
可以通过 config.properties
文件启用请求日志记录。
在 Presto 中
http-server.log.enabled=true
http-server.log.path=<request_log_file_path>
在 Prestissimo 中(日志写入标准日志)
http-server.enable-access-log=true
使用 grep 跟踪特定的协议交互。
一个交换
cat stderr* | grep '/v1/task/20240402_223203_00000_kg5tr.11.0.455.0/results'
I0402 15:33:06.928076 625 AccessLogFilter.cpp:69] 2401:db00:126c:f2f:face:0:3e1:0 - - [2024-04-02 15:33:06] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/results/213/0 HTTP/1.1" 200 0 57
I0402 15:33:07.181629 625 AccessLogFilter.cpp:69] 2401:db00:126c:f2f:face:0:3e1:0 - - [2024-04-02 15:33:07] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/results/213/0 HTTP/1.1" 200 94024 0
I0402 15:33:25.392717 675 AccessLogFilter.cpp:69] 2401:db00:126c:f2f:face:0:3e1:0 - - [2024-04-02 15:33:25] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/results/213/1 HTTP/1.1" 200 0 0
I0402 15:33:25.393162 675 AccessLogFilter.cpp:69] 2401:db00:126c:f2f:face:0:3e1:0 - - [2024-04-02 15:33:25] "DELETE /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/results/213 HTTP/1.1" 200 0 0
一个 TaskStatus
更新
cat stderr* | grep '/v1/task/20240402_223203_00000_kg5tr.11.0.455.0/status'
I0402 15:33:34.629278 668 AccessLogFilter.cpp:69] 2401:db00:1210:4267:face:0:15:0 - - [2024-04-02 15:33:34] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/status HTTP/1.1" 200 739 1000
I0402 15:33:35.636466 668 AccessLogFilter.cpp:69] 2401:db00:1210:4267:face:0:15:0 - - [2024-04-02 15:33:35] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/status HTTP/1.1" 200 739 1000
I0402 15:33:36.644189 668 AccessLogFilter.cpp:69] 2401:db00:1210:4267:face:0:15:0 - - [2024-04-02 15:33:36] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/status HTTP/1.1" 200 739 1000
I0402 15:33:36.768704 668 AccessLogFilter.cpp:69] 2401:db00:1210:4267:face:0:15:0 - - [2024-04-02 15:33:36] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/status HTTP/1.1" 200 717 115
日志记录包含诸如响应状态、响应大小和响应时间之类的信息,这些信息可以帮助理解交互流程,包括延迟和超时,在检查它们时。