// Similar to Task D - reduce-side join
public static class PageMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] f = CsvUtils.split(value.toString());
if (f.length >= 2) {
context.write(new Text(f[0].trim()),
new Text("P," + f[1].trim()));
}
}
}
public static class FollowMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] f = CsvUtils.split(value.toString());
if (f.length >= 3) {
context.write(new Text(f[2].trim()), new Text("F,1"));
}
}
}
public static class JoinCountReducer extends Reducer<Text, Text, NullWritable, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String nick = null;
int count = 0;
for (Text t : values) {
String[] p = CsvUtils.split(t.toString());
if ("P".equals(p[0])) {
nick = p[1];
} else if ("F".equals(p[0])) {
count += CsvUtils.toInt(p[1], 0);
}
}
if (nick != null) {
context.write(NullWritable.get(),
new Text(key + "," + nick + "," + count));
}
}
}