《Hadoop权威指南》ch5 Compression

Compression (压缩)

压缩的好处

  • 减少文件所需空间
  • 加速数据的在网络和磁盘上的传输

    Codec (Code-Decode)

    Codec是Hadoop中的压缩-解压算法的实现。在hadoop中,实现CompressionCodec接口的类代表一个codec,下表列举了Hadoop实现的codec。

Hadoop_Compression_format

CompressionCodec

CompressionCode包含两个函数,用于压缩和解压缩数据。

  • 压缩:通过createOutputStream(OutputStream out)方法获得CompressionOutputStream对象
  • 解压:通过createInputStream(InputStream in)方法获得CompressionInputStream对象
    压缩的示例代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    package com.sweetop.styhadoop;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionOutputStream;
    import org.apache.hadoop.util.ReflectionUtils;

    public class StreamCompressor {
    public static void main(String[] args) throws Exception {
    String codecClassName = args[0];
    Class<?> codecClass = Class.forName(codecClassName);
    Configuration conf = new Configuration();
    CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);

    CompressionOutputStream out = codec.createOutputStream(System.out);
    IOUtils.copyBytes(System.in, out, 4096, false);
    out.finish();
    }
    }

从命令行接受一个CompressionCodec实现类的参数,然后通过ReflectionUtils把实例化这个类,调用CompressionCodec的接口方法对标准输出流进行封装,封装成一个压缩流,通过IOUtils类的copyBytes方法把标准输入流拷贝到压缩流中,最后调用CompressionCodec的finish方法,完成压缩。

CompressionCodecFactory

如果你想读取一个被压缩的文件的话,首先你得先通过扩展名判断该用哪种codec。如前面的表可得,若文件名后缀为.gz,则需要用GzipCodec来读取。

也可以使用CompressionCodecFactorygetCodec()方法,通过传入一个Path,即可获得相应得codec。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.sweetop.styhadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;

public class FileDecompressor {
public static void main(String[] args) throws Exception {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);

Path inputPath = new Path(uri);
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodec(inputPath);
if (codec == null) {
System.out.println("No codec found for " + uri);
System.exit(1);
}
String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());

InputStream in = null;
OutputStream out = null;

try {
in = codec.createInputStream(fs.open(inputPath));
out = fs.create(new Path(outputUri));
IOUtils.copyBytes(in,out,conf);
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
}
}

注意看下removeSuffix方法,这是一个静态方法,它可以将文件的后缀去掉,然后我们将这个路径做为解压的输出路径。

CompressionCodecFactory能找到的code默认只有三种

  • org.apache.hadoop.io.compress.GzipCodec
  • org.apache.hadoop.io.compress.BZip2Codec
  • org.apache.hadoop.io.compress.DefaultCodec

如果想添加其他的codec你需要更改io.compression.codecs属性,并注册codec。

Native libraries(原生类库)

通常情况下,使用Native libraries来实现解压缩会获得更高的性能。所有的解压缩格式都有原生类库,但并非都有java实现。如下表所示。
hadoop压缩原生类库

默认情况下,Hadoop会根据自身运行的平台搜索原生代码库并自动加载,无需要做配置。当需要禁用原生代码库时,需要将io.native.lib.available设置为false

CodePool

如果使用原生代码库执行大量解压缩,可以使用CodePool,支持反复使用,从而减少了创建对象的开销。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class PooledStreamCompressor {
public static void main(String[] args) throws Exception {
String codecClassname = args[0];
Class<?> codecClass = Class.forName(codecClassname);
Configuration conf = new Configuration();
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
Compressor compressor = null;
try {
compressor = CodecPool.getCompressor(codec);
CompressionOutputStream out = codec.createOutputStream(System.out, compressor);
IOUtils.copyBytes(System.in, out, 4096, false);
out.finish();
}
finally {
CodecPool.returnCompressor(compressor);
}
}
}

Have a nice day!