Overview
Task D counts the number of followers for each page by joining the Pages and Follows datasets.
TaskDSimple
Package: circlenet.taskD
Class: TaskDSimple
Source: src/main/java/circlenet/taskD/TaskDSimple.java
Main Method
public static void main(String[] args) throws Exception
Command-Line Arguments
Input path to the Pages CSV file
Input path to the Follows CSV file
Mapper: PageMapper
Emits page information tagged with “P”.
public static class PageMapper extends Mapper<LongWritable, Text, Text, Text> {
private final Text outKey = new Text();
private final Text outVal = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] f = CsvUtils.split(value.toString());
if (f.length >= 2) {
outKey.set(f[0].trim());
outVal.set("P," + f[1].trim());
context.write(outKey, outVal);
}
}
}
Mapper: FollowsMapper
Emits follower counts tagged with “F”.
public static class FollowsMapper extends Mapper<LongWritable, Text, Text, Text> {
private final Text outKey = new Text();
private static final Text OUT = new Text("F,1");
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] f = CsvUtils.split(value.toString());
if (f.length >= 3) {
outKey.set(f[2].trim());
context.write(outKey, OUT);
}
}
}
Reducer: JoinReducer
Joins page nicknames with follower counts.
public static class JoinReducer extends Reducer<Text, Text, Text, IntWritable> {
private final IntWritable out = new IntWritable();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String nick = null;
int count = 0;
for (Text t : values) {
String[] p = CsvUtils.split(t.toString());
if (p.length >= 2 && "P".equals(p[0])) {
nick = p[1];
} else if (p.length >= 2 && "F".equals(p[0])) {
count += CsvUtils.toInt(p[1], 0);
}
}
if (nick != null) {
out.set(count);
context.write(new Text(key.toString() + "," + nick), out);
}
}
}
Example Usage
hadoop jar $JAR circlenet.taskD.TaskDSimple $PAGES $FOLLOWS $OUT/taskD/simple
TaskDOptimized
Package: circlenet.taskD
Class: TaskDOptimized
Source: src/main/java/circlenet/taskD/TaskDOptimized.java
Main Method
public static void main(String[] args) throws Exception
Command-Line Arguments
Input path to the Pages CSV file
Input path to the Follows CSV file
Temporary output for follower counts
Job 1: Count Followers
Mapper: CountMapper
Counts followers per page.
Reducer: SumReducer
Combiner: Yes, uses SumReducer as combiner.
Job 2: Map-Side Join
Mapper: JoinMapper
Loads follower counts from distributed cache and performs map-side join with pages.
@Override
protected void setup(Context context) throws IOException {
URI[] files = context.getCacheFiles();
if (files == null) {
return;
}
FileSystem fs = FileSystem.get(context.getConfiguration());
for (URI u : files) {
BufferedReader br = new BufferedReader(
new InputStreamReader(fs.open(new Path(u.getPath()))));
String line;
while ((line = br.readLine()) != null) {
String[] p = CsvUtils.splitTab(line);
if (p.length >= 2) {
followers.put(p[0].trim(), CsvUtils.toInt(p[1], 0));
}
}
br.close();
}
}
Map-Only Job: Sets setNumReduceTasks(0) for map-only execution.
Optimizations
- Uses combiner to reduce shuffle
- Map-side join using distributed cache
- Map-only second job (no reduce phase)
Example Usage
hadoop jar $JAR circlenet.taskD.TaskDOptimized $PAGES $FOLLOWS \
$OUT/taskD/tmp_count_opt $OUT/taskD/optimized