MapReduce实践 Youtube数据分析

介绍

这篇博客是关于如何在Hadoop MapReduce中进行YouTube数据分析的。
使用该数据集执行一些分析,并将提取一些有用的信息,例如YouTube上排名前10位的视频,他们上传了最多的视频。

数据

数据下载

数据展示
数据展示

数据说明
Column 1: Video id of 11 characters.
Column 2: uploader of the video
Column 3: Interval between the day of establishment of Youtube and the date of uploading of the video.
Column 4: Category of the video.
Column 5: Length of the video.
Column 6: Number of views for the video.
Column 7: Rating on the video.
Column 8: Number of ratings given for the video
Column 9: Number of comments done on the videos.
Column 10: Related video ids with the uploaded video.

问题描述

寻找Top N个最多视频的类别

MapReduce实现

Mapper

对每一行数据进行划分,统计各个视频类别的数量(Column 4)。数据集中部分数据缺失,因此忽略了划分后少于5个属性的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static class CategoryMapper extends Mapper<Object, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1); // 值为1
private Text category = new Text();

public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
String[] attributeArray = value.toString().split("\t");// 对字符串进行切分
if (attributeArray.length > 5) // 忽略属性值少于5的错误数据
{
category.set(attributeArray[3]);
context.write(category, one);
}
}
}

Combiner

Combiner的作用就是对map端的输出先做一次合并,以减少在map和reduce节点之间的数据传输量,以提高网络IO性能,是MapReduce的一种优化手段之一。
Combiner实质就是在本地端先运行的一次Reducer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static class SumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();

}
context.write(key, new IntWritable(sum));
}
}

Reducer

因为需要对,[视频类别, 视频数]数组进行排序比较,因此首先定义一个二元组类,包含视频类别和视频数,分别对应first和second。并定义了Comparable接口,用于后面排序的需要。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static class TwoTuple implements Comparable<TwoTuple>
{
public String first;
public int second;

public TwoTuple(String a, int b)
{
first = a;
second = b;
}

public String toString()
{
return "(" + first + ", " + second + ")";
}

@Override
public int compareTo(TwoTuple tt)
{
return second - tt.second;
}
}

使用Reducer实现提取Top N值的算法。

首先需要介绍setup()函数和cleanup()函数,与reduce()函数不同,不会根据key的数目多次执行,只会执行1次。

setup() 此方法被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作。若是将资源初始化工作放在方法map()中,导致Mapper任务在解析每一行输入时都会进行资源初始化工作,导致重复,程序运行效率不高。

cleanup() 此方法被MapReduce框架仅且执行一次,在执行完毕Map任务后,进行相关变量或资源的释放工作。若是将释放资源工作放入方法map()中,也会导致Mapper任务在解析、处理每一行文本后释放资源,而且在下一行文本解析前还要重复初始化,导致反复重复,程序运行效率不高。

算法介绍
setup()函数中,主要用来从配置中获取需要提取Top N的N值,并初始化top[]数组;
reduce()函数中,计算出每个Category的视频总数后覆盖放入top[0]数组并进行排序;
cleanup()函数中,将覆盖排序多次后的top数组写入output。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public static class TopNReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{

int len;
TwoTuple[] top;

@Override
protected void setup(Context context) throws IOException, InterruptedException
{
len =context.getConfiguration().getInt("N", 10); // 从配置中获取top N的N值,若无则默认为10
top = new TwoTuple[len + 1];
for (int i=0; i<=len; i++)
{
top[i] = new TwoTuple("null", 0);
}
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException
{
for (int i = len; i > 0; i--)
{
context.write(new Text(top[i].first), new IntWritable(top[i].second));
}
}

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();

}
add(key.toString(), sum);
}

private void add(String key, int val)
{
top[0].first = key;
top[0].second = val; // 替换掉最小值
Arrays.sort(top); // 排序,从小到大顺序
}
}

main&conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
conf.addResource("classpath:/hadoop/core-site.xml");
conf.addResource("classpath:/hadoop/hdfs-site.xml");
conf.addResource("classpath:/hadoop/mapred-site.xml");
conf.setInt("N", 5);
// String[] otherArgs = new GenericOptionsParser(conf,
// args).getRemainingArgs();
String[] otherArgs = { "/youtube", "/youtube_category_Top5" };
if (otherArgs.length != 2)
{
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "youtube");
job.setJarByClass(FindMaxCategory.class);
job.setMapperClass(CategoryMapper.class);
job.setCombinerClass(SumReducer.class);
// job.setReducerClass(SumReducer.class); // 统计每个类别的总量
job.setReducerClass(TopNReducer.class); // 统计TopN的类别的总量
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputDirRecursive(job, true);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

输出

result

完整代码

Have a nice day!