// Use MultipleInputs to read Pages and UserMax
public static class PageMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] f = CsvUtils.split(value.toString());
if (f.length >= 2) {
context.write(new Text(f[0].trim()),
new Text("P," + f[1].trim()));
}
}
}
public static class MaxMapOutMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] f = CsvUtils.splitTab(value.toString());
if (f.length >= 2) {
context.write(new Text(f[0].trim()),
new Text("M," + f[1].trim()));
}
}
}
public static class OutdatedReducer extends Reducer<Text, Text, NullWritable, Text> {
private int threshold;
@Override
protected void setup(Context context) {
threshold = context.getConfiguration().getInt("task.g.threshold", 0);
}
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String nick = null;
Integer last = null;
for (Text t : values) {
String[] p = CsvUtils.split(t.toString());
if ("P".equals(p[0])) {
nick = p[1];
} else if ("M".equals(p[0])) {
last = CsvUtils.toInt(p[1], -1);
}
}
// Output if user has page but no recent activity
if (nick != null && (last == null || last < threshold)) {
context.write(NullWritable.get(), new Text(key + "," + nick));
}
}
}