Accumulo 连接器¶
概述¶
Accumulo 连接器支持从 Apache Accumulo 读取和写入数据。请仔细阅读此页面,以了解连接器的功能和特性。
安装迭代器依赖项¶
Accumulo 连接器使用自定义 Accumulo 迭代器,以便将 SQL 谓词子句中的各种信息推送到 Accumulo 进行服务器端过滤,这被称为谓词下推。为了使服务器端迭代器正常工作,您需要将 presto-accumulo
jar 文件添加到每个 TabletServer 节点上的 Accumulo 的 lib/ext
目录中。
# For each TabletServer node:
scp $PRESTO_HOME/plugins/accumulo/presto-accumulo-*.jar [tabletserver_address]:$ACCUMULO_HOME/lib/ext
# TabletServer should pick up new JAR files in ext directory, but may require restart
请注意,这使用的是 Java 8。如果您的 Accumulo 集群使用的是 Java 7,那么当您尝试创建索引表时,您会在 TabletServer 日志中收到 Unsupported major.minor version 52.0
错误。您需要使用位于 https://github.com/bloomberg/presto-accumulo 的presto-accumulo-iterators jar 文件。
连接器配置¶
创建 etc/catalog/accumulo.properties
以将 accumulo
连接器作为 accumulo
目录挂载,根据需要替换 accumulo.xxx
属性
connector.name=accumulo
accumulo.instance=xxx
accumulo.zookeepers=xxx
accumulo.username=username
accumulo.password=password
配置变量¶
属性名称 |
默认值 |
必需 |
描述 |
---|---|---|---|
|
(无) |
是 |
Accumulo 实例的名称 |
|
(无) |
是 |
ZooKeeper 连接字符串 |
|
(无) |
是 |
Presto 的 Accumulo 用户 |
|
(无) |
是 |
用户的 Accumulo 密码 |
|
|
否 |
用于存储元数据的根 znode。仅在使用默认元数据管理器时相关 |
|
|
否 |
设置索引基数缓存的大小 |
|
|
否 |
设置基数缓存的过期持续时间。 |
不支持的功能¶
不支持以下功能
通过
ALTER TABLE
添加列:虽然您不能通过 SQL 添加列,但可以使用工具。有关更多详细信息,请参阅下面的 添加列 部分。DELETE
:连接器尚未实现行删除。
使用¶
只需开始使用 SQL 在 Accumulo 中创建新表,即可开始处理数据。默认情况下,表定义的第一列设置为 Accumulo 行 ID。这应该是您表的 主键,请记住,包含相同行 ID 的任何 INSERT
语句就相当于对 Accumulo 的 UPDATE 操作,因为单元格中的任何先前数据都将被覆盖。行 ID 可以是任何有效的 Presto 数据类型。如果第一列不是您的主键,则可以在表定义的 WITH
子句中使用 row_id
表属性设置行 ID 列。
只需发出 CREATE TABLE
语句即可创建新的 Presto/Accumulo 表
CREATE TABLE myschema.scientists (
recordkey VARCHAR,
name VARCHAR,
age BIGINT,
birthday DATE
);
DESCRIBE myschema.scientists;
Column | Type | Extra | Comment
-----------+---------+-------+---------------------------------------------------
recordkey | varchar | | Accumulo row ID
name | varchar | | Accumulo column name:name. Indexed: false
age | bigint | | Accumulo column age:age. Indexed: false
birthday | date | | Accumulo column birthday:birthday. Indexed: false
此命令将创建一个新的 Accumulo 表,其中 recordkey
列作为 Accumulo 行 ID。name、age 和 birthday 列映射到自动生成的列族和限定符值(实际上,两者都与 Presto 列名相同)。
使用 SQL 创建表时,您可以选择指定 column_mapping
表属性。此属性的值是三元组的逗号分隔列表,presto 列 : accumulo 列族 : accumulo 列限定符,每个非行 ID 列有一个三元组。这将设置 Presto 列名与其对应的 Accumulo 列族和列限定符的映射。
如果您没有指定 column_mapping
表属性,那么连接器将自动生成列名(尊重任何配置的地域组)。列名的自动生成仅适用于内部表,因此如果您的表是外部表,则必须指定 column_mapping 属性。
有关表属性的完整列表,请参阅 表属性。
例如
CREATE TABLE myschema.scientists (
recordkey VARCHAR,
name VARCHAR,
age BIGINT,
birthday DATE
)
WITH (
column_mapping = 'name:metadata:name,age:metadata:age,birthday:metadata:date'
);
DESCRIBE myschema.scientists;
Column | Type | Extra | Comment
-----------+---------+-------+-----------------------------------------------
recordkey | varchar | | Accumulo row ID
name | varchar | | Accumulo column metadata:name. Indexed: false
age | bigint | | Accumulo column metadata:age. Indexed: false
birthday | date | | Accumulo column metadata:date. Indexed: false
然后,您可以发出 INSERT
语句将数据放入 Accumulo 中。
注意
虽然发出 INSERT
语句很方便,但这种将数据加载到 Accumulo 的方法吞吐量很低。您需要使用 Accumulo API 直接将 Mutations
写入表。有关更多详细信息,请参阅有关 加载数据 的部分。
INSERT INTO myschema.scientists VALUES
('row1', 'Grace Hopper', 109, DATE '1906-12-09' ),
('row2', 'Alan Turing', 103, DATE '1912-06-23' );
SELECT * FROM myschema.scientists;
recordkey | name | age | birthday
-----------+--------------+-----+------------
row1 | Grace Hopper | 109 | 1906-12-09
row2 | Alan Turing | 103 | 1912-06-23
(2 rows)
正如您所期望的那样,通过 shell 或以编程方式插入到 Accumulo 中的行也会在查询时显示。(Accumulo shell 认为“ -5321”是选项,而不是数字……因此我们将 TBL 稍微年轻一点。)
$ accumulo shell -u root -p secret
root@default> table myschema.scientists
root@default myschema.scientists> insert row3 metadata name "Tim Berners-Lee"
root@default myschema.scientists> insert row3 metadata age 60
root@default myschema.scientists> insert row3 metadata date 5321
SELECT * FROM myschema.scientists;
recordkey | name | age | birthday
-----------+-----------------+-----+------------
row1 | Grace Hopper | 109 | 1906-12-09
row2 | Alan Turing | 103 | 1912-06-23
row3 | Tim Berners-Lee | 60 | 1984-07-27
(3 rows)
您也可以使用 DROP TABLE
删除表。此命令将删除元数据和表。有关内部表和外部表的更多详细信息,请参阅下面的 外部表 部分。
DROP TABLE myschema.scientists;
索引列¶
在内部,连接器创建一个 Accumulo Range
并将其打包到一个拆分中。该拆分将传递给 Presto Worker,以通过 BatchScanner
从 Range
中读取数据。当发出导致完全表扫描的查询时,每个 Presto Worker 会获得一个映射到表的单个数据片的单个 Range
。当发出带有谓词的查询(即 WHERE x = 10
子句)时,Presto 会将谓词中的值(10
)传递给连接器,以便它可以使用此信息来扫描更少的数据。当 Accumulo 行 ID 用于谓词子句时,这会缩小 Range
查找范围,以快速从 Accumulo 检索数据的子集。
但是其他列呢?如果经常对非行 ID 列进行查询,则应考虑使用 Accumulo 连接器内置的 **索引** 功能。此功能可以大幅减少从表中选择少量值的查询运行时间,并且在通过 Presto INSERT
语句加载数据时,繁重的工作将为您完成(不过,请记住,通过 INSERT
将数据写入 Accumulo 的吞吐量并不高)。
要启用索引,请添加 index_columns
表属性并指定要索引的 Presto 列名称的逗号分隔列表(我们在本示例中使用 string
序列化器来帮助进行此示例 - 您应该使用默认的 lexicoder
序列化器)。
CREATE TABLE myschema.scientists (
recordkey VARCHAR,
name VARCHAR,
age BIGINT,
birthday DATE
)
WITH (
serializer = 'string',
index_columns='name,age,birthday'
);
创建表后,我们看到还有另外两个 Accumulo 表来存储索引和指标。
root@default> tables
accumulo.metadata
accumulo.root
myschema.scientists
myschema.scientists_idx
myschema.scientists_idx_metrics
trace
插入数据后,我们可以查看索引表,并看到 name、age 和 birthday 列有索引值。连接器会查询此索引表
INSERT INTO myschema.scientists VALUES
('row1', 'Grace Hopper', 109, DATE '1906-12-09'),
('row2', 'Alan Turing', 103, DATE '1912-06-23');
root@default> scan -t myschema.scientists_idx
-21011 metadata_date:row2 []
-23034 metadata_date:row1 []
103 metadata_age:row2 []
109 metadata_age:row1 []
Alan Turing metadata_name:row2 []
Grace Hopper metadata_name:row1 []
在对索引列发出带有 WHERE
子句的查询时,连接器会在索引表中搜索包含谓词中值的 所有 行 ID。这些行 ID 会以单值 Range
对象的形式捆绑到 Presto 分区中(每个分区的行 ID 数量由 accumulo.index_rows_per_split
的值控制),并传递给 Presto 工作器,以便在扫描数据表的 BatchScanner
中配置。
SELECT * FROM myschema.scientists WHERE age = 109;
recordkey | name | age | birthday
-----------+--------------+-----+------------
row1 | Grace Hopper | 109 | 1906-12-09
(1 row)
加载数据¶
Accumulo 连接器支持通过 INSERT 语句加载数据,但是这种方法往往吞吐量较低,不应在吞吐量成为问题时依赖它。相反,连接器的用户应该使用 PrestoBatchWriter
工具,该工具作为 presto-accumulo 存储库 中的 presto-accumulo-tools 子项目的一部分提供。
PrestoBatchWriter
是典型 BatchWriter
的包装类,它利用 Presto/Accumulo 元数据将 Mutations 写入主数据表。特别是,它会处理对任何索引列的给定 Mutations 进行索引。该工具的使用方法在 存储库 中的 README 中提供。
外部表¶
默认情况下,使用 Presto 通过 SQL 语句创建的表是 **内部** 表,这意味着 Presto 管理 Presto 表元数据和 Accumulo 表。创建内部表时,也会创建 Accumulo 表。如果 Accumulo 表已经存在,您将收到错误消息。通过 Presto 删除内部表时,也会删除 Accumulo 表(以及任何索引表)。
要更改此行为,请在发出 CREATE
语句时将 external
属性设置为 true
。这将使表成为 **外部** 表,并且 DROP TABLE
命令将 **仅** 删除与表关联的元数据。如果 Accumulo 表尚不存在,连接器将创建它们。
创建外部表 **将** 设置任何配置的局部性组以及索引表和指标表的迭代器(如果表被索引)。简而言之,外部表和内部表之间的唯一区别是连接器将在发出 DROP TABLE
命令时删除 Accumulo 表。
外部表可能更难处理,因为数据以预期格式存储。如果数据存储不正确,那么您将面临糟糕的状况。用户必须在创建表时提供 column_mapping
属性。这会创建 Presto 列名称到表单元格的列族/限定符的映射。单元格的值存储在 Accumulo 键/值对的 Value
中。默认情况下,预期此值使用 Accumulo 的 *lexicoder* API 进行序列化。如果将值存储为字符串,则可以使用表的 serializer
属性指定不同的序列化器。有关更多信息,请参阅关于 表属性 的部分。
接下来,我们创建 Presto 外部表。
CREATE TABLE external_table (
a VARCHAR,
b BIGINT,
c DATE
)
WITH (
column_mapping = 'a:md:a,b:md:b,c:md:c',
external = true,
index_columns = 'b,c',
locality_groups = 'foo:b,c'
);
创建表后,表的用法照常继续
INSERT INTO external_table VALUES
('1', 1, DATE '2015-03-06'),
('2', 2, DATE '2015-03-07');
SELECT * FROM external_table;
a | b | c
---+---+------------
1 | 1 | 2015-03-06
2 | 2 | 2015-03-06
(2 rows)
DROP TABLE external_table;
删除表后,表仍将存在于 Accumulo 中,因为它是 *外部* 表。
root@default> tables
accumulo.metadata
accumulo.root
external_table
external_table_idx
external_table_idx_metrics
trace
如果要向表添加新列,可以再次创建表并指定新列。表中任何现有的行都将具有 NULL 值。此命令将重新配置 Accumulo 表,设置局部性组和迭代器配置。
CREATE TABLE external_table (
a VARCHAR,
b BIGINT,
c DATE,
d INTEGER
)
WITH (
column_mapping = 'a:md:a,b:md:b,c:md:c,d:md:d',
external = true,
index_columns = 'b,c,d',
locality_groups = 'foo:b,c,d'
);
SELECT * FROM external_table;
a | b | c | d
---+---+------------+------
1 | 1 | 2015-03-06 | NULL
2 | 2 | 2015-03-07 | NULL
(2 rows)
表属性¶
表属性用法示例
CREATE TABLE myschema.scientists (
recordkey VARCHAR,
name VARCHAR,
age BIGINT,
birthday DATE
)
WITH (
column_mapping = 'name:metadata:name,age:metadata:age,birthday:metadata:date',
index_columns = 'name,age'
);
属性名称 |
默认值 |
描述 |
---|---|---|
|
(生成的) |
列元数据的逗号分隔列表: |
|
(无) |
在该表的相应索引表中索引的 Presto 列的逗号分隔列表 |
|
|
如果为 true,Presto 将仅对表执行元数据操作。否则,Presto 将在适当的地方创建和删除 Accumulo 表。 |
|
(无) |
要在 Accumulo 表上设置的局部性组列表。仅对内部表有效。字符串格式为局部性组名称,冒号,组中列族的逗号分隔列表。组由管道分隔。例如: |
|
(第一列) |
映射到 Accumulo 行 ID 的 Presto 列名称。 |
|
|
Accumulo 数据编码的序列化器。可以是 |
|
(用户授权) |
在批处理扫描器上设置的扫描时授权。 |
会话属性¶
可以使用 SET SESSION 更改会话属性的默认值。请注意,会话属性以目录名称为前缀
SET SESSION accumulo.column_filter_optimizations_enabled = false;
属性名称 |
默认值 |
描述 |
---|---|---|
|
|
设置为 true 以启用对非索引扫描的数据局部性 |
|
|
设置为 true 以根据数据块分区拆分非索引查询。通常应为 true。 |
|
|
设置为 true 以启用在查询中使用辅助索引 |
|
|
打包到单个 Presto 分区中的 Accumulo 行 ID 数量 |
|
|
基于索引扫描的行数与总行数的比率 如果比率低于此阈值,则将使用索引。 |
|
|
将使用基数最低的列的阈值,而不是计算索引中范围的交集。必须启用辅助索引 |
|
|
设置为 true 以启用使用指标表来优化索引的使用 |
|
(配置) |
扫描表时要模拟的用户。此属性优先于 |
|
|
在任何列的基数小于最低基数阈值时,短路检索索引指标 |
|
|
设置基数缓存轮询持续时间,以短路检索索引指标 |
添加列¶
由于需要用于列工作的其他元数据,因此目前无法通过 ALTER TABLE [table] ADD COLUMN [name] [type]
向现有表添加新列;列族、限定符以及列是否被索引。
相反,可以使用 presto-accumulo-tools 子项目(位于 presto-accumulo
存储库中)中的某个实用程序。文档和用法可以在 README 中找到。
序列化器¶
用于 Accumulo 的 Presto 连接器具有一个可插入的序列化器框架,用于处理 Presto 和 Accumulo 之间的 I/O。这使最终用户能够以编程方式在 Accumulo 中序列化和反序列化其特殊数据格式,同时抽象化连接器本身的复杂性。
目前有两种类型的序列化器可用;一种是 string
序列化器,它将值视为 Java String
,另一种是 lexicoder
序列化器,它利用 Accumulo 的 Lexicoder API 来存储值。默认序列化器是 lexicoder
序列化器,因为此序列化器不需要在 String
对象和 Presto 类型之间进行昂贵的转换操作 - 单元格的值被编码为字节数组。
此外,lexicoder
序列化器对 BIGINT
或 TIMESTAMP
等数值类型进行正确的词典排序。这对于连接器在查询数据时正确利用辅助索引至关重要。
可以通过指定 serializer
表属性来更改默认序列化器,可以使用 default
(即 lexicoder
)、string
或 lexicoder
用于内置类型,或者可以提供自己的实现方法,方法是扩展 AccumuloRowSerializer
,将其添加到 Presto CLASSPATH
,并在连接器配置中指定完全限定的 Java 类名。
CREATE TABLE myschema.scientists (
recordkey VARCHAR,
name VARCHAR,
age BIGINT,
birthday DATE
)
WITH (
column_mapping = 'name:metadata:name,age:metadata:age,birthday:metadata:date',
serializer = 'default'
);
INSERT INTO myschema.scientists VALUES
('row1', 'Grace Hopper', 109, DATE '1906-12-09' ),
('row2', 'Alan Turing', 103, DATE '1912-06-23' );
root@default> scan -t myschema.scientists
row1 metadata:age [] \x08\x80\x00\x00\x00\x00\x00\x00m
row1 metadata:date [] \x08\x7F\xFF\xFF\xFF\xFF\xFF\xA6\x06
row1 metadata:name [] Grace Hopper
row2 metadata:age [] \x08\x80\x00\x00\x00\x00\x00\x00g
row2 metadata:date [] \x08\x7F\xFF\xFF\xFF\xFF\xFF\xAD\xED
row2 metadata:name [] Alan Turing
CREATE TABLE myschema.stringy_scientists (
recordkey VARCHAR,
name VARCHAR,
age BIGINT,
birthday DATE
)
WITH (
column_mapping = 'name:metadata:name,age:metadata:age,birthday:metadata:date',
serializer = 'string'
);
INSERT INTO myschema.stringy_scientists VALUES
('row1', 'Grace Hopper', 109, DATE '1906-12-09' ),
('row2', 'Alan Turing', 103, DATE '1912-06-23' );
root@default> scan -t myschema.stringy_scientists
row1 metadata:age [] 109
row1 metadata:date [] -23034
row1 metadata:name [] Grace Hopper
row2 metadata:age [] 103
row2 metadata:date [] -21011
row2 metadata:name [] Alan Turing
CREATE TABLE myschema.custom_scientists (
recordkey VARCHAR,
name VARCHAR,
age BIGINT,
birthday DATE
)
WITH (
column_mapping = 'name:metadata:name,age:metadata:age,birthday:metadata:date',
serializer = 'my.serializer.package.MySerializer'
);
元数据管理¶
Presto/Accumulo 表的元数据存储在 ZooKeeper 中。您可以在 Presto 中(并且应该)发出 SQL 语句来创建和删除表。这是创建连接器工作所需的元数据最简单的方法。最好不要修改元数据,但这里有关于如何存储它的详细信息。信息就是力量。
ZooKeeper 中的根节点包含所有映射,格式如下
/metadata-root/schema/table
其中 metadata-root
是配置文件中 zookeeper.metadata.root
的值(默认为 /presto-accumulo
),schema
是 Presto 模式(与 Accumulo 命名空间名称相同),table
是 Presto 表名(同样,与 Accumulo 名称相同)。table
ZooKeeper 节点的數據是序列化后的 AccumuloTable
Java 对象(位于连接器代码中)。此表包含模式(命名空间)名称、表名、列定义、要用于表的序列化器以及任何其他表属性。
如果您需要以编程方式操作 Accumulo 的 ZooKeeper 元数据,请查看 com.facebook.presto.accumulo.metadata.ZooKeeperMetadataManager
,它提供了一些 Java 代码来简化此过程。
将表从内部转换为外部¶
如果您的表是内部表,您可以将其转换为外部表,方法是删除 ZooKeeper 中相应的 znode,从 Presto 的角度来看,该表不再存在。然后,使用相同的 DDL 重新创建表,但添加 external = true
表属性。
例如
1. 我们从一个内部表 foo.bar
开始,该表是用下面的 DDL 创建的。如果您之前没有为 column_mapping
定义表属性(如本示例所示),请确保在删除元数据之前描述该表。在创建外部表时,我们需要列映射。
CREATE TABLE foo.bar (a VARCHAR, b BIGINT, c DATE)
WITH (
index_columns = 'b,c'
);
DESCRIBE foo.bar;
Column | Type | Extra | Comment
--------+---------+-------+-------------------------------------
a | varchar | | Accumulo row ID
b | bigint | | Accumulo column b:b. Indexed: true
c | date | | Accumulo column c:c. Indexed: true
2. 使用 ZooKeeper CLI 删除相应的 znode。注意,此示例使用默认的 ZooKeeper 元数据根目录 /presto-accumulo
$ zkCli.sh
[zk: localhost:2181(CONNECTED) 1] delete /presto-accumulo/foo/bar
3. 使用与之前相同的 DDL 重新创建表,但添加 external=true
属性。请注意,如果您之前没有定义 column_mapping,则需要将其添加到新的 DDL 中(外部表要求设置此属性)。列映射位于 DESCRIBE
语句的输出中。
CREATE TABLE foo.bar (
a VARCHAR,
b BIGINT,
c DATE
)
WITH (
column_mapping = 'a:a:a,b:b:b,c:c:c',
index_columns = 'b,c',
external = true
);