Parquet学习笔记

简介

Apache Parquet是Hadoop生态圈中一种新型列式存储格式,它可以兼容Hadoop生态圈中大多数计算框架(Hadoop、Spark等),被多种查询引擎支持(Hive、Impala、Drill等),并且它是语言和平台无关的。

Definition level & Repetition level

定义

definition Level

Definition level指明该列的路径上多少个可选field被定义了。

definition Level是该路径上有定义的repeated field 和 optional field的个数,不包括required field,因为required field是必须有定义的。

Repetition levels

Repetition level指明该值在路径中哪个repeated field重复。

DL和RL的计算

parquet

我们用深度0表示一个纪录的开头(虚拟的根节点),深度的计算忽略非重复字段(标签不是repeated的字段都不算在深度里)。所以在Name.Language.Code这个路径中,包含两个重复字段,Name和Language,如果在Name处重复,重复深度为1(虚拟的根节点是0,下一级就是1),在Language处重复就是2,不可能在Code处重复,它是required类型,表示有且仅有一个;同样的,在路径Links.Forward中,Links是optional的,不参与深度计算(不可能重复),Forward是repeated的,因此只有在Forward处重复时重复深度为1。

Parquet Java example

来源
pom文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<dependencies>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>0.23.1</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.1</version>
</dependency>
</dependencies>

WriteParquet.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

public class WriteParquet {
public static void main(String[] args) throws IllegalArgumentException, IOException {

List<Field> fields = new ArrayList<Field>();
Object defaultValue = null;
fields.add(new Field("x", Schema.create(Type.INT), "x", defaultValue));
fields.add(new Field("y", Schema.create(Type.INT), "y", defaultValue));

Schema schema = Schema.createRecord("name", "doc", "namespace", false, fields);

try (ParquetWriter<GenericData.Record> writer = AvroParquetWriter.<GenericData.Record>builder(
new Path("my-file.parquet")).withSchema(schema).withCompressionCodec(CompressionCodecName.SNAPPY)
.build()) {

// 模拟10000行数据
for (int r = 0; r < 10000; ++r) {
Record record = new Record(schema);
record.put(0, r);
record.put(1, r * 3);
writer.write(record);
}
}
}
}

ReadParquet.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import java.io.IOException;

import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;

public class ReadParquet {
public static void main(String[] args) throws IllegalArgumentException, IOException {

ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(new Path("my-file.parquet"))
.build();
GenericRecord record;

while ((record = reader.read()) != null) {
System.out.println(record);
}
}
}

Have a nice day!