Calculate follower count (popularity factor) for each CircleNet page owner
Task D computes the “popularity factor” for each CircleNetPage owner by counting how many people follow them, demonstrating join optimization techniques between CircleNetPage and Follows datasets.
For each CircleNetPage, report the owner’s nickname and the number of people that follow them. For page owners not listed in Follows, return a score of zero.SQL Equivalent:
SELECT p.ID, p.NickName, COUNT(f.ID1) as follower_countFROM CircleNetPage pLEFT JOIN Follows f ON p.ID = f.ID2GROUP BY p.ID, p.NickNameORDER BY follower_count DESC;
The simple approach uses a single-job reduce-side join with MultipleInputs.Page Mapper (TaskDSimple.java:19-32):
public static class PageMapper extends Mapper<LongWritable, Text, Text, Text> {private final Text outKey = new Text();private final Text outVal = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] f = CsvUtils.split(value.toString()); if (f.length >= 2) { outKey.set(f[0].trim()); // Page ID outVal.set("P," + f[1].trim()); // Tag with "P" for Page context.write(outKey, outVal); }}}
Follows Mapper (TaskDSimple.java:34-46):
public static class FollowsMapper extends Mapper<LongWritable, Text, Text, Text> {private final Text outKey = new Text();private static final Text OUT = new Text("F,1");@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] f = CsvUtils.split(value.toString()); if (f.length >= 3) { outKey.set(f[2].trim()); // ID2 - who is being followed context.write(outKey, OUT); // Tag with "F" for Follow }}}
Join Reducer (TaskDSimple.java:48-68):
public static class JoinReducer extends Reducer<Text, Text, Text, IntWritable> {private final IntWritable out = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String nick = null; int count = 0; // Process all values for this page ID for (Text t : values) { String[] p = CsvUtils.split(t.toString()); if (p.length >= 2 && "P".equals(p[0])) { nick = p[1]; // Extract nickname } else if (p.length >= 2 && "F".equals(p[0])) { count += CsvUtils.toInt(p[1], 0); // Count followers } } if (nick != null) { out.set(count); context.write(new Text(key.toString() + "," + nick), out); }}}
Job Configuration (TaskDSimple.java:76-80):
// Use MultipleInputs to read from two sourcesMultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, PageMapper.class);MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, FollowsMapper.class);job.setReducerClass(JoinReducer.class);
The optimized version uses two jobs: first aggregates followers with a combiner, then performs a map-side join.Job 1: Count Followers with Combiner (TaskDOptimized.java:25-51)
public static class CountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {private static final IntWritable ONE = new IntWritable(1);private final Text outKey = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] f = CsvUtils.split(value.toString()); if (f.length >= 3) { outKey.set(f[2].trim()); // ID2 - who is being followed context.write(outKey, ONE); }}}public static class SumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {private final IntWritable out = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int s = 0; for (IntWritable v : values) { s += v.get(); } out.set(s); context.write(key, out);}}// Enable combiner for pre-aggregationjob1.setCombinerClass(SumReducer.class);job1.setReducerClass(SumReducer.class);
Combiner Benefit: Reduces shuffle from 20M follow records to ~200K aggregated counts (one per followed page). This is a 99% reduction in shuffle I/O!
Job 2: Map-Side Join (TaskDOptimized.java:53-88)
public static class JoinMapper extends Mapper<LongWritable, Text, NullWritable, Text> {private final Map<String, Integer> followers = new HashMap<String, Integer>();private final Text out = new Text();@Overrideprotected void setup(Context context) throws IOException { // Load follower counts into memory from distributed cache URI[] files = context.getCacheFiles(); if (files != null) { FileSystem fs = FileSystem.get(context.getConfiguration()); for (URI u : files) { BufferedReader br = new BufferedReader( new InputStreamReader(fs.open(new Path(u.getPath())))); String line; while ((line = br.readLine()) != null) { String[] p = CsvUtils.splitTab(line); if (p.length >= 2) { followers.put(p[0].trim(), CsvUtils.toInt(p[1], 0)); } } br.close(); } }}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] f = CsvUtils.split(value.toString()); if (f.length >= 2) { String id = f[0].trim(); String nick = f[1].trim(); // Lookup follower count from in-memory map int cnt = followers.containsKey(id) ? followers.get(id) : 0; out.set(id + "," + nick + "," + cnt); context.write(NullWritable.get(), out); }}}// Configure as map-only jobjob2.setNumReduceTasks(0);job2.addCacheFile(new Path(args[2] + "/part-r-00000").toUri());
Map-Only Optimization: Job 2 has no reducers (setNumReduceTasks(0)), performing the join entirely in the mapper. This eliminates shuffle and sort phases, saving significant I/O.
Optimization Benefits:
Combiner reduces Job 1 shuffle by 99% (20M → ~200K)
Trade-off: The optimized version uses 2 jobs instead of 1, but the dramatic I/O reduction more than compensates. This demonstrates that more jobs doesn’t always mean slower if each job is optimized.