Accumulo 连接器

概述

Accumulo 连接器支持从 Apache Accumulo 读取和写入数据。请仔细阅读此页面,以了解连接器的功能和特性。

安装迭代器依赖项

Accumulo 连接器使用自定义 Accumulo 迭代器,以便将 SQL 谓词子句中的各种信息推送到 Accumulo 进行服务器端过滤,这被称为谓词下推。为了使服务器端迭代器正常工作,您需要将 presto-accumulo jar 文件添加到每个 TabletServer 节点上的 Accumulo 的 lib/ext 目录中。

# For each TabletServer node:
scp $PRESTO_HOME/plugins/accumulo/presto-accumulo-*.jar [tabletserver_address]:$ACCUMULO_HOME/lib/ext

# TabletServer should pick up new JAR files in ext directory, but may require restart

请注意,这使用的是 Java 8。如果您的 Accumulo 集群使用的是 Java 7,那么当您尝试创建索引表时,您会在 TabletServer 日志中收到 Unsupported major.minor version 52.0 错误。您需要使用位于 https://github.com/bloomberg/presto-accumulopresto-accumulo-iterators jar 文件。

连接器配置

创建 etc/catalog/accumulo.properties 以将 accumulo 连接器作为 accumulo 目录挂载,根据需要替换 accumulo.xxx 属性

connector.name=accumulo
accumulo.instance=xxx
accumulo.zookeepers=xxx
accumulo.username=username
accumulo.password=password

配置变量

属性名称

默认值

必需

描述

accumulo.instance

(无)

Accumulo 实例的名称

accumulo.zookeepers

(无)

ZooKeeper 连接字符串

accumulo.username

(无)

Presto 的 Accumulo 用户

accumulo.password

(无)

用户的 Accumulo 密码

accumulo.zookeeper.metadata.root

/presto-accumulo

用于存储元数据的根 znode。仅在使用默认元数据管理器时相关

accumulo.cardinality.cache.size

100000

设置索引基数缓存的大小

accumulo.cardinality.cache.expire.duration

5m

设置基数缓存的过期持续时间。

不支持的功能

不支持以下功能

  • 通过 ALTER TABLE 添加列:虽然您不能通过 SQL 添加列,但可以使用工具。有关更多详细信息,请参阅下面的 添加列 部分。

  • DELETE:连接器尚未实现行删除。

使用

只需开始使用 SQL 在 Accumulo 中创建新表,即可开始处理数据。默认情况下,表定义的第一列设置为 Accumulo 行 ID。这应该是您表的 主键,请记住,包含相同行 ID 的任何 INSERT 语句就相当于对 Accumulo 的 UPDATE 操作,因为单元格中的任何先前数据都将被覆盖。行 ID 可以是任何有效的 Presto 数据类型。如果第一列不是您的主键,则可以在表定义的 WITH 子句中使用 row_id 表属性设置行 ID 列。

只需发出 CREATE TABLE 语句即可创建新的 Presto/Accumulo 表

CREATE TABLE myschema.scientists (
  recordkey VARCHAR,
  name VARCHAR,
  age BIGINT,
  birthday DATE
);
DESCRIBE myschema.scientists;
  Column   |  Type   | Extra |                      Comment
-----------+---------+-------+---------------------------------------------------
 recordkey | varchar |       | Accumulo row ID
 name      | varchar |       | Accumulo column name:name. Indexed: false
 age       | bigint  |       | Accumulo column age:age. Indexed: false
 birthday  | date    |       | Accumulo column birthday:birthday. Indexed: false

此命令将创建一个新的 Accumulo 表,其中 recordkey 列作为 Accumulo 行 ID。name、age 和 birthday 列映射到自动生成的列族和限定符值(实际上,两者都与 Presto 列名相同)。

使用 SQL 创建表时,您可以选择指定 column_mapping 表属性。此属性的值是三元组的逗号分隔列表,presto 列 : accumulo 列族 : accumulo 列限定符,每个非行 ID 列有一个三元组。这将设置 Presto 列名与其对应的 Accumulo 列族和列限定符的映射。

如果您没有指定 column_mapping 表属性,那么连接器将自动生成列名(尊重任何配置的地域组)。列名的自动生成仅适用于内部表,因此如果您的表是外部表,则必须指定 column_mapping 属性。

有关表属性的完整列表,请参阅 表属性

例如

CREATE TABLE myschema.scientists (
  recordkey VARCHAR,
  name VARCHAR,
  age BIGINT,
  birthday DATE
)
WITH (
  column_mapping = 'name:metadata:name,age:metadata:age,birthday:metadata:date'
);
DESCRIBE myschema.scientists;
  Column   |  Type   | Extra |                    Comment
-----------+---------+-------+-----------------------------------------------
 recordkey | varchar |       | Accumulo row ID
 name      | varchar |       | Accumulo column metadata:name. Indexed: false
 age       | bigint  |       | Accumulo column metadata:age. Indexed: false
 birthday  | date    |       | Accumulo column metadata:date. Indexed: false

然后,您可以发出 INSERT 语句将数据放入 Accumulo 中。

注意

虽然发出 INSERT 语句很方便,但这种将数据加载到 Accumulo 的方法吞吐量很低。您需要使用 Accumulo API 直接将 Mutations 写入表。有关更多详细信息,请参阅有关 加载数据 的部分。

INSERT INTO myschema.scientists VALUES
('row1', 'Grace Hopper', 109, DATE '1906-12-09' ),
('row2', 'Alan Turing', 103, DATE '1912-06-23' );
SELECT * FROM myschema.scientists;
 recordkey |     name     | age |  birthday
-----------+--------------+-----+------------
 row1      | Grace Hopper | 109 | 1906-12-09
 row2      | Alan Turing  | 103 | 1912-06-23
(2 rows)

正如您所期望的那样,通过 shell 或以编程方式插入到 Accumulo 中的行也会在查询时显示。(Accumulo shell 认为“ -5321”是选项,而不是数字……因此我们将 TBL 稍微年轻一点。)

$ accumulo shell -u root -p secret
root@default> table myschema.scientists
root@default myschema.scientists> insert row3 metadata name "Tim Berners-Lee"
root@default myschema.scientists> insert row3 metadata age 60
root@default myschema.scientists> insert row3 metadata date 5321
SELECT * FROM myschema.scientists;
 recordkey |      name       | age |  birthday
-----------+-----------------+-----+------------
 row1      | Grace Hopper    | 109 | 1906-12-09
 row2      | Alan Turing     | 103 | 1912-06-23
 row3      | Tim Berners-Lee |  60 | 1984-07-27
(3 rows)

您也可以使用 DROP TABLE 删除表。此命令将删除元数据和表。有关内部表和外部表的更多详细信息,请参阅下面的 外部表 部分。

DROP TABLE myschema.scientists;

索引列

在内部,连接器创建一个 Accumulo Range 并将其打包到一个拆分中。该拆分将传递给 Presto Worker,以通过 BatchScannerRange 中读取数据。当发出导致完全表扫描的查询时,每个 Presto Worker 会获得一个映射到表的单个数据片的单个 Range。当发出带有谓词的查询(即 WHERE x = 10 子句)时,Presto 会将谓词中的值(10)传递给连接器,以便它可以使用此信息来扫描更少的数据。当 Accumulo 行 ID 用于谓词子句时,这会缩小 Range 查找范围,以快速从 Accumulo 检索数据的子集。

但是其他列呢?如果经常对非行 ID 列进行查询,则应考虑使用 Accumulo 连接器内置的 **索引** 功能。此功能可以大幅减少从表中选择少量值的查询运行时间,并且在通过 Presto INSERT 语句加载数据时,繁重的工作将为您完成(不过,请记住,通过 INSERT 将数据写入 Accumulo 的吞吐量并不高)。

要启用索引,请添加 index_columns 表属性并指定要索引的 Presto 列名称的逗号分隔列表(我们在本示例中使用 string 序列化器来帮助进行此示例 - 您应该使用默认的 lexicoder 序列化器)。

CREATE TABLE myschema.scientists (
  recordkey VARCHAR,
  name VARCHAR,
  age BIGINT,
  birthday DATE
)
WITH (
  serializer = 'string',
  index_columns='name,age,birthday'
);

创建表后,我们看到还有另外两个 Accumulo 表来存储索引和指标。

root@default> tables
accumulo.metadata
accumulo.root
myschema.scientists
myschema.scientists_idx
myschema.scientists_idx_metrics
trace

插入数据后,我们可以查看索引表,并看到 name、age 和 birthday 列有索引值。连接器会查询此索引表

INSERT INTO myschema.scientists VALUES
('row1', 'Grace Hopper', 109, DATE '1906-12-09'),
('row2', 'Alan Turing', 103, DATE '1912-06-23');
root@default> scan -t myschema.scientists_idx
-21011 metadata_date:row2 []
-23034 metadata_date:row1 []
103 metadata_age:row2 []
109 metadata_age:row1 []
Alan Turing metadata_name:row2 []
Grace Hopper metadata_name:row1 []

在对索引列发出带有 WHERE 子句的查询时,连接器会在索引表中搜索包含谓词中值的 所有 行 ID。这些行 ID 会以单值 Range 对象的形式捆绑到 Presto 分区中(每个分区的行 ID 数量由 accumulo.index_rows_per_split 的值控制),并传递给 Presto 工作器,以便在扫描数据表的 BatchScanner 中配置。

SELECT * FROM myschema.scientists WHERE age = 109;
 recordkey |     name     | age |  birthday
-----------+--------------+-----+------------
 row1      | Grace Hopper | 109 | 1906-12-09
(1 row)

加载数据

Accumulo 连接器支持通过 INSERT 语句加载数据,但是这种方法往往吞吐量较低,不应在吞吐量成为问题时依赖它。相反,连接器的用户应该使用 PrestoBatchWriter 工具,该工具作为 presto-accumulo 存储库 中的 presto-accumulo-tools 子项目的一部分提供。

PrestoBatchWriter 是典型 BatchWriter 的包装类,它利用 Presto/Accumulo 元数据将 Mutations 写入主数据表。特别是,它会处理对任何索引列的给定 Mutations 进行索引。该工具的使用方法在 存储库 中的 README 中提供。

外部表

默认情况下,使用 Presto 通过 SQL 语句创建的表是 **内部** 表,这意味着 Presto 管理 Presto 表元数据和 Accumulo 表。创建内部表时,也会创建 Accumulo 表。如果 Accumulo 表已经存在,您将收到错误消息。通过 Presto 删除内部表时,也会删除 Accumulo 表(以及任何索引表)。

要更改此行为,请在发出 CREATE 语句时将 external 属性设置为 true。这将使表成为 **外部** 表,并且 DROP TABLE 命令将 **仅** 删除与表关联的元数据。如果 Accumulo 表尚不存在,连接器将创建它们。

创建外部表 **将** 设置任何配置的局部性组以及索引表和指标表的迭代器(如果表被索引)。简而言之,外部表和内部表之间的唯一区别是连接器将在发出 DROP TABLE 命令时删除 Accumulo 表。

外部表可能更难处理,因为数据以预期格式存储。如果数据存储不正确,那么您将面临糟糕的状况。用户必须在创建表时提供 column_mapping 属性。这会创建 Presto 列名称到表单元格的列族/限定符的映射。单元格的值存储在 Accumulo 键/值对的 Value 中。默认情况下,预期此值使用 Accumulo 的 *lexicoder* API 进行序列化。如果将值存储为字符串,则可以使用表的 serializer 属性指定不同的序列化器。有关更多信息,请参阅关于 表属性 的部分。

接下来,我们创建 Presto 外部表。

CREATE TABLE external_table (
  a VARCHAR,
  b BIGINT,
  c DATE
)
WITH (
  column_mapping = 'a:md:a,b:md:b,c:md:c',
  external = true,
  index_columns = 'b,c',
  locality_groups = 'foo:b,c'
);

创建表后,表的用法照常继续

INSERT INTO external_table VALUES
('1', 1, DATE '2015-03-06'),
('2', 2, DATE '2015-03-07');
SELECT * FROM external_table;
 a | b |     c
---+---+------------
 1 | 1 | 2015-03-06
 2 | 2 | 2015-03-06
(2 rows)
DROP TABLE external_table;

删除表后,表仍将存在于 Accumulo 中,因为它是 *外部* 表。

root@default> tables
accumulo.metadata
accumulo.root
external_table
external_table_idx
external_table_idx_metrics
trace

如果要向表添加新列,可以再次创建表并指定新列。表中任何现有的行都将具有 NULL 值。此命令将重新配置 Accumulo 表,设置局部性组和迭代器配置。

CREATE TABLE external_table (
  a VARCHAR,
  b BIGINT,
  c DATE,
  d INTEGER
)
WITH (
  column_mapping = 'a:md:a,b:md:b,c:md:c,d:md:d',
  external = true,
  index_columns = 'b,c,d',
  locality_groups = 'foo:b,c,d'
);

SELECT * FROM external_table;
 a | b |     c      |  d
---+---+------------+------
 1 | 1 | 2015-03-06 | NULL
 2 | 2 | 2015-03-07 | NULL
(2 rows)

表属性

表属性用法示例

CREATE TABLE myschema.scientists (
  recordkey VARCHAR,
  name VARCHAR,
  age BIGINT,
  birthday DATE
)
WITH (
  column_mapping = 'name:metadata:name,age:metadata:age,birthday:metadata:date',
  index_columns = 'name,age'
);

属性名称

默认值

描述

column_mapping

(生成的)

列元数据的逗号分隔列表:col_name:col_family:col_qualifier,[...]。外部表必须提供。未设置此属性会导致自动生成列名。

index_columns

(无)

在该表的相应索引表中索引的 Presto 列的逗号分隔列表

external

false

如果为 true,Presto 将仅对表执行元数据操作。否则,Presto 将在适当的地方创建和删除 Accumulo 表。

locality_groups

(无)

要在 Accumulo 表上设置的局部性组列表。仅对内部表有效。字符串格式为局部性组名称,冒号,组中列族的逗号分隔列表。组由管道分隔。例如:group1:famA,famB,famC|group2:famD,famE,famF|etc...

row_id

(第一列)

映射到 Accumulo 行 ID 的 Presto 列名称。

serializer

default

Accumulo 数据编码的序列化器。可以是 defaultstringlexicoder 或 Java 类名。默认值为 default,即来自 AccumuloRowSerializer.getDefault() 的值,即 lexicoder

scan_auths

(用户授权)

在批处理扫描器上设置的扫描时授权。

会话属性

可以使用 SET SESSION 更改会话属性的默认值。请注意,会话属性以目录名称为前缀

SET SESSION accumulo.column_filter_optimizations_enabled = false;

属性名称

默认值

描述

optimize_locality_enabled

true

设置为 true 以启用对非索引扫描的数据局部性

optimize_split_ranges_enabled

true

设置为 true 以根据数据块分区拆分非索引查询。通常应为 true。

optimize_index_enabled

true

设置为 true 以启用在查询中使用辅助索引

index_rows_per_split

10000

打包到单个 Presto 分区中的 Accumulo 行 ID 数量

index_threshold

0.2

基于索引扫描的行数与总行数的比率 如果比率低于此阈值,则将使用索引。

index_lowest_cardinality_threshold

0.01

将使用基数最低的列的阈值,而不是计算索引中范围的交集。必须启用辅助索引

index_metrics_enabled

true

设置为 true 以启用使用指标表来优化索引的使用

scan_username

(配置)

扫描表时要模拟的用户。此属性优先于 scan_auths 表属性

index_short_circuit_cardinality_fetch

true

在任何列的基数小于最低基数阈值时,短路检索索引指标

index_cardinality_cache_polling_duration

10ms

设置基数缓存轮询持续时间,以短路检索索引指标

添加列

由于需要用于列工作的其他元数据,因此目前无法通过 ALTER TABLE [table] ADD COLUMN [name] [type] 向现有表添加新列;列族、限定符以及列是否被索引。

相反,可以使用 presto-accumulo-tools 子项目(位于 presto-accumulo 存储库中)中的某个实用程序。文档和用法可以在 README 中找到。

序列化器

用于 Accumulo 的 Presto 连接器具有一个可插入的序列化器框架,用于处理 Presto 和 Accumulo 之间的 I/O。这使最终用户能够以编程方式在 Accumulo 中序列化和反序列化其特殊数据格式,同时抽象化连接器本身的复杂性。

目前有两种类型的序列化器可用;一种是 string 序列化器,它将值视为 Java String,另一种是 lexicoder 序列化器,它利用 Accumulo 的 Lexicoder API 来存储值。默认序列化器是 lexicoder 序列化器,因为此序列化器不需要在 String 对象和 Presto 类型之间进行昂贵的转换操作 - 单元格的值被编码为字节数组。

此外,lexicoder 序列化器对 BIGINTTIMESTAMP 等数值类型进行正确的词典排序。这对于连接器在查询数据时正确利用辅助索引至关重要。

可以通过指定 serializer 表属性来更改默认序列化器,可以使用 default(即 lexicoder)、stringlexicoder 用于内置类型,或者可以提供自己的实现方法,方法是扩展 AccumuloRowSerializer,将其添加到 Presto CLASSPATH,并在连接器配置中指定完全限定的 Java 类名。

CREATE TABLE myschema.scientists (
  recordkey VARCHAR,
  name VARCHAR,
  age BIGINT,
  birthday DATE
)
WITH (
  column_mapping = 'name:metadata:name,age:metadata:age,birthday:metadata:date',
  serializer = 'default'
);
INSERT INTO myschema.scientists VALUES
('row1', 'Grace Hopper', 109, DATE '1906-12-09' ),
('row2', 'Alan Turing', 103, DATE '1912-06-23' );
root@default> scan -t myschema.scientists
row1 metadata:age []    \x08\x80\x00\x00\x00\x00\x00\x00m
row1 metadata:date []    \x08\x7F\xFF\xFF\xFF\xFF\xFF\xA6\x06
row1 metadata:name []    Grace Hopper
row2 metadata:age []    \x08\x80\x00\x00\x00\x00\x00\x00g
row2 metadata:date []    \x08\x7F\xFF\xFF\xFF\xFF\xFF\xAD\xED
row2 metadata:name []    Alan Turing
CREATE TABLE myschema.stringy_scientists (
  recordkey VARCHAR,
  name VARCHAR,
  age BIGINT,
  birthday DATE
)
WITH (
  column_mapping = 'name:metadata:name,age:metadata:age,birthday:metadata:date',
  serializer = 'string'
);
INSERT INTO myschema.stringy_scientists VALUES
('row1', 'Grace Hopper', 109, DATE '1906-12-09' ),
('row2', 'Alan Turing', 103, DATE '1912-06-23' );
root@default> scan -t myschema.stringy_scientists
row1 metadata:age []    109
row1 metadata:date []    -23034
row1 metadata:name []    Grace Hopper
row2 metadata:age []    103
row2 metadata:date []    -21011
row2 metadata:name []    Alan Turing
CREATE TABLE myschema.custom_scientists (
  recordkey VARCHAR,
  name VARCHAR,
  age BIGINT,
  birthday DATE
)
WITH (
  column_mapping = 'name:metadata:name,age:metadata:age,birthday:metadata:date',
  serializer = 'my.serializer.package.MySerializer'
);

元数据管理

Presto/Accumulo 表的元数据存储在 ZooKeeper 中。您可以在 Presto 中(并且应该)发出 SQL 语句来创建和删除表。这是创建连接器工作所需的元数据最简单的方法。最好不要修改元数据,但这里有关于如何存储它的详细信息。信息就是力量。

ZooKeeper 中的根节点包含所有映射,格式如下

/metadata-root/schema/table

其中 metadata-root 是配置文件中 zookeeper.metadata.root 的值(默认为 /presto-accumulo),schema 是 Presto 模式(与 Accumulo 命名空间名称相同),table 是 Presto 表名(同样,与 Accumulo 名称相同)。table ZooKeeper 节点的數據是序列化后的 AccumuloTable Java 对象(位于连接器代码中)。此表包含模式(命名空间)名称、表名、列定义、要用于表的序列化器以及任何其他表属性。

如果您需要以编程方式操作 Accumulo 的 ZooKeeper 元数据,请查看 com.facebook.presto.accumulo.metadata.ZooKeeperMetadataManager,它提供了一些 Java 代码来简化此过程。

将表从内部转换为外部

如果您的表是内部表,您可以将其转换为外部表,方法是删除 ZooKeeper 中相应的 znode,从 Presto 的角度来看,该表不再存在。然后,使用相同的 DDL 重新创建表,但添加 external = true 表属性。

例如

1. 我们从一个内部表 foo.bar 开始,该表是用下面的 DDL 创建的。如果您之前没有为 column_mapping 定义表属性(如本示例所示),请确保在删除元数据之前描述该表。在创建外部表时,我们需要列映射。

CREATE TABLE foo.bar (a VARCHAR, b BIGINT, c DATE)
WITH (
    index_columns = 'b,c'
);
DESCRIBE foo.bar;
 Column |  Type   | Extra |               Comment
--------+---------+-------+-------------------------------------
 a      | varchar |       | Accumulo row ID
 b      | bigint  |       | Accumulo column b:b. Indexed: true
 c      | date    |       | Accumulo column c:c. Indexed: true

2. 使用 ZooKeeper CLI 删除相应的 znode。注意,此示例使用默认的 ZooKeeper 元数据根目录 /presto-accumulo

$ zkCli.sh
[zk: localhost:2181(CONNECTED) 1] delete /presto-accumulo/foo/bar

3. 使用与之前相同的 DDL 重新创建表,但添加 external=true 属性。请注意,如果您之前没有定义 column_mapping,则需要将其添加到新的 DDL 中(外部表要求设置此属性)。列映射位于 DESCRIBE 语句的输出中。

CREATE TABLE foo.bar (
  a VARCHAR,
  b BIGINT,
  c DATE
)
WITH (
  column_mapping = 'a:a:a,b:b:b,c:c:c',
  index_columns = 'b,c',
  external = true
);