Presto Worker REST API

Presto 的协调器与 Presto worker 通信以执行查询片段并获取查询结果。Presto worker 之间互相通信以交换中间结果。本章档介绍了这些通信中使用的 REST API。

任务资源用于启动查询片段的执行、跟踪状态并获取结果。

控制平面

以下 HTTP 方法由协调器用于启动查询片段的执行并跟踪执行状态。

  • /v1/task/{taskId}POST 请求启动在 POST 请求体中指定的查询片段的执行。请求可选地包含一组要处理的初始拆分。请求还指定如何对结果进行分区,例如,使用指定的输出列对指定数量的输出缓冲区进行哈希分区,或将所有结果组合到一个输出缓冲区中,或将组合结果广播到多个输出缓冲区中。

  • 后续对 /v1/task/{taskId}POST 请求可以提供更多要处理的拆分,并最终指定不再有新的拆分。

  • /v1/task/{taskId}/statusGET 请求返回一个 TaskStatus JSON 文档,描述当前执行状态。

  • /v1/task/{taskId}GET 请求返回一个 TaskInfo JSON 文档,包含有关执行状态的扩展信息。

  • /v1/task/{taskId}DELETE 请求删除已完成的任务或取消正在进行的任务。

  • /v1/taskGET 请求返回一个 JSON 文档,包含所有任务的 TaskInfo 列表。

协调器发出的状态请求包含两个 HTTP 标头:X-Presto-Current-StateX-Presto-Max-WaitX-Presto-Current-State 指定协调器已知的任务状态。如果 worker 上的任务状态不同,worker 将立即回复。如果 worker 上的任务状态与协调器上相同,worker 将等待任务状态更改后再回复。 X-Presto-Max-Wait HTTP 标头指定最大等待时间。即使任务状态保持不变,worker 也将在该时间后回复。

该设计确保协调器能够及时收到任务状态变化,而无需以紧密循环的方式轮询 worker。

相同的设计也适用于通过对 /v1/task/{taskId}GET 请求获取扩展任务信息。

数据平面

以下 HTTP 方法由协调器用于获取最终查询结果,或由下游 worker 用于从上游 worker 获取中间结果。

  • {taskId}/results/{bufferId}/{token}GET 请求从指定的输出缓冲区返回下一批结果。确认已收到前一批结果。

  • {taskId}/results/{bufferId}/{token}/acknowledgeGET 请求确认已收到结果,并允许 worker 删除它们。

  • {taskId}/results/{bufferId}DELETE 请求删除指定输出缓冲区中的所有结果(在发生错误的情况下)。

  • 可选地,可以对 {taskId}/results/{bufferId} 发出 HEAD 请求以检索任何与非数据页面序列相关的标头。使用此方法检查缓冲区是否已完成,或查看缓冲了多少数据(无需获取数据)。确认已收到前一批结果。

协调器和 worker 以块的形式获取结果。它们使用 X-Presto-Max-Size HTTP 标头指定块的最大字节数。每个块都由一个单调递增的序列号标识,有时称为令牌。对结果的第一个请求指定序列号零。响应包含

  • 请求的序列号,作为 X-Presto-Page-Sequence-Id HTTP 标头。

  • 用于确认已收到块并请求下一个块的序列号,作为 X-Presto-Page-End-Sequence-Id HTTP 标头。

  • 表示没有更多结果,作为 X-Presto-Buffer-Complete HTTP 标头,其值为 true

  • 输出缓冲区中剩余的缓冲字节数,作为 X-Presto-Buffer-Remaining-Bytes HTTP 标头。这应该返回一个逗号分隔的列表,表示可以在下一次请求中返回的页面的字节数。这可以作为上游任务优化数据交换的提示。

响应的主体包含 序列化页面线格式 中的一系列页面。

收到第一块结果后,客户端使用 X-Presto-Page-End-Sequence-Id 序列号来请求下一块结果。请求下一块会自动确认已收到前一块。客户端会一直获取结果,直到收到 X-Presto-Buffer-Complete HTTP 标头,其值为 true

当客户端决定不立即获取下一块数据时,它会使用对 {taskId}/results/{bufferId}/{token}/acknowledge 的 GET 请求发送显式确认。客户端将令牌设置为先前收到的 X-Presto-Page-End-Sequence-Id 标头的值。

如果 worker 超时填充响应,或者任务已失败或已中止,worker 将返回空结果。客户端可以尝试重试请求。在任务处于终端状态的情况下,假设控制平面最终将处理状态更改。

如果客户端错过了响应,它可以重复请求,worker 将再次发送结果。在收到对序列号的确认后,worker 将删除所有序列号小于该序列号的结果,客户端将无法再次获取这些结果。

下面是一个示例消息传递图表,用于从输出缓冲区零获取两块结果。

../_images/worker-protocol-results.png

输出缓冲区

数据混洗涉及下游阶段的 worker 从上游阶段的 worker 获取结果。每个生成上游 worker 都将建立与下游阶段中 worker 数量相同的输出缓冲区。输出缓冲区由从零开始的连续数字标识。每个下游 worker 都分配了一个输出缓冲区,它使用该缓冲区从所有上游 worker 获取结果。

下图显示了 3 个下游 worker。这些 worker 分别分配了输出缓冲区编号 0、1 和 2。每个上游 worker 都有 3 个输出缓冲区。下游 worker #0 使用缓冲区编号 0 从所有上游 worker 获取结果。下游 worker #1 使用缓冲区编号 1 从所有上游 worker 获取结果。下游 worker #2 使用缓冲区编号 2 从所有上游 worker 获取结果。

../_images/worker-protocol-output-buffers.png

故障处理

任务失败通过 TaskStatusTaskInfo 更新报告给协调器。

当发现任务失败时,协调器会中止所有剩余的任务,并向客户端报告查询失败。当发生任务失败或收到中止请求时,所有后续处理将停止,所有剩余的任务输出将被丢弃。

失败或中止的任务将继续像往常一样响应数据平面请求,以防止级联故障。由于输出在失败时被完全丢弃,所有后续响应都为空。 X-Presto-Buffer-Complete 标头被设置为 false 以防止下游任务成功完成并生成错误的结果。

对于客户端来说,这些响应与健康任务的响应没有区别。为了避免请求突发,会在使用空结果集响应之前应用标准延迟。

诊断问题

HTTP 请求日志记录有助于诊断与协议相关的問題。

可以通过 config.properties 文件启用请求日志记录。

在 Presto 中

http-server.log.enabled=true
http-server.log.path=<request_log_file_path>

在 Prestissimo 中(日志写入标准日志)

http-server.enable-access-log=true

使用 grep 跟踪特定的协议交互。

一个交换

cat stderr* | grep '/v1/task/20240402_223203_00000_kg5tr.11.0.455.0/results'
I0402 15:33:06.928076   625 AccessLogFilter.cpp:69] 2401:db00:126c:f2f:face:0:3e1:0 - - [2024-04-02 15:33:06] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/results/213/0 HTTP/1.1" 200 0   57
I0402 15:33:07.181629   625 AccessLogFilter.cpp:69] 2401:db00:126c:f2f:face:0:3e1:0 - - [2024-04-02 15:33:07] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/results/213/0 HTTP/1.1" 200 94024   0
I0402 15:33:25.392717   675 AccessLogFilter.cpp:69] 2401:db00:126c:f2f:face:0:3e1:0 - - [2024-04-02 15:33:25] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/results/213/1 HTTP/1.1" 200 0   0
I0402 15:33:25.393162   675 AccessLogFilter.cpp:69] 2401:db00:126c:f2f:face:0:3e1:0 - - [2024-04-02 15:33:25] "DELETE /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/results/213 HTTP/1.1" 200 0   0

一个 TaskStatus 更新

cat stderr* | grep '/v1/task/20240402_223203_00000_kg5tr.11.0.455.0/status'
I0402 15:33:34.629278   668 AccessLogFilter.cpp:69] 2401:db00:1210:4267:face:0:15:0 - - [2024-04-02 15:33:34] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/status HTTP/1.1" 200 739   1000
I0402 15:33:35.636466   668 AccessLogFilter.cpp:69] 2401:db00:1210:4267:face:0:15:0 - - [2024-04-02 15:33:35] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/status HTTP/1.1" 200 739   1000
I0402 15:33:36.644189   668 AccessLogFilter.cpp:69] 2401:db00:1210:4267:face:0:15:0 - - [2024-04-02 15:33:36] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/status HTTP/1.1" 200 739   1000
I0402 15:33:36.768704   668 AccessLogFilter.cpp:69] 2401:db00:1210:4267:face:0:15:0 - - [2024-04-02 15:33:36] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/status HTTP/1.1" 200 717   115

日志记录包含诸如响应状态、响应大小和响应时间之类的信息,这些信息可以帮助理解交互流程,包括延迟和超时,在检查它们时。