Skip to main content
Task F identifies “popular” CircleNetPage owners by finding those who have more followers than the average number of followers across all page owners.

Problem Statement

Report all owners of a CircleNetPage who are more popular than an average user—those who have more followers than the average number of followers across all owners. SQL Equivalent:
WITH FollowerCounts AS (
    SELECT p.ID, p.NickName, COUNT(f.ID1) as follower_count
    FROM CircleNetPage p
    LEFT JOIN Follows f ON p.ID = f.ID2
    GROUP BY p.ID, p.NickName
),
AvgFollowers AS (
    SELECT AVG(follower_count) as avg_count
    FROM FollowerCounts
)
SELECT fc.ID, fc.NickName, fc.follower_count
FROM FollowerCounts fc, AvgFollowers af
WHERE fc.follower_count > af.avg_count;

Approaches

Simple Implementation (3 Jobs)

The simple approach uses three sequential MapReduce jobs.Job 1: Join and Count Followers (TaskFSimple.java:25-74)
// Similar to Task D - reduce-side join
public static class PageMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) 
    throws IOException, InterruptedException {
    String[] f = CsvUtils.split(value.toString());
    if (f.length >= 2) {
        context.write(new Text(f[0].trim()), 
                     new Text("P," + f[1].trim()));
    }
}
}

public static class FollowMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) 
    throws IOException, InterruptedException {
    String[] f = CsvUtils.split(value.toString());
    if (f.length >= 3) {
        context.write(new Text(f[2].trim()), new Text("F,1"));
    }
}
}

public static class JoinCountReducer extends Reducer<Text, Text, NullWritable, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) 
    throws IOException, InterruptedException {
    String nick = null;
    int count = 0;
    for (Text t : values) {
        String[] p = CsvUtils.split(t.toString());
        if ("P".equals(p[0])) {
            nick = p[1];
        } else if ("F".equals(p[0])) {
            count += CsvUtils.toInt(p[1], 0);
        }
    }
    if (nick != null) {
        context.write(NullWritable.get(), 
                     new Text(key + "," + nick + "," + count));
    }
}
}
Job 2: Calculate Average (TaskFSimple.java:76-102)
public static class AvgMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final Text K = new Text("avg");
private final IntWritable out = new IntWritable();

@Override
protected void map(LongWritable key, Text value, Context context) 
    throws IOException, InterruptedException {
    String[] f = CsvUtils.split(value.toString());
    if (f.length >= 3) {
        out.set(CsvUtils.toInt(f[2], 0));  // Extract follower count
        context.write(K, out);
    }
}
}

public static class AvgReducer extends Reducer<Text, IntWritable, NullWritable, DoubleWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) 
    throws IOException, InterruptedException {
    long sum = 0;
    long cnt = 0;
    for (IntWritable v : values) {
        sum += v.get();
        cnt++;
    }
    double avg = cnt == 0 ? 0.0 : (sum * 1.0) / cnt;
    context.write(NullWritable.get(), new DoubleWritable(avg));
}
}
Read Average from HDFS (TaskFSimple.java:173-189):
FileSystem fs = FileSystem.get(conf);
List<String> lines = new ArrayList<String>();
BufferedReader br = new BufferedReader(
new InputStreamReader(fs.open(new Path(args[3] + "/part-r-00000"))));
String line;
while ((line = br.readLine()) != null) {
lines.add(line);
}
br.close();
double avg = 0.0;
if (!lines.isEmpty()) {
String[] p = CsvUtils.splitTab(lines.get(0));
try {
    avg = Double.parseDouble(p[p.length - 1].trim());
} catch (Exception ignored) { }
}
Job 3: Filter Above Average (TaskFSimple.java:104-124)
public static class FilterMapper extends Mapper<LongWritable, Text, NullWritable, Text> {
private double avg;
private final Text out = new Text();

@Override
protected void setup(Context context) {
    avg = context.getConfiguration().getDouble("task.f.avg", 0.0);
}

@Override
protected void map(LongWritable key, Text value, Context context) 
    throws IOException, InterruptedException {
    String[] f = CsvUtils.split(value.toString());
    if (f.length >= 3) {
        int count = CsvUtils.toInt(f[2], 0);
        if (count > avg) {
            out.set(f[0] + "," + f[1] + "," + count);
            context.write(NullWritable.get(), out);
        }
    }
}
}
Characteristics:
  • 3 sequential jobs with high I/O
  • Reduce-side join in Job 1
  • Driver reads intermediate result from HDFS
  • High shuffle overhead

Performance Comparison

MetricSimpleOptimizedImprovement
Number of Jobs3233% reduction
Job 1 TypeReduce-side joinCombiner aggregation99% shuffle reduction
Average CalculationSeparate jobIn driver1 job saved
Job 2/3 TypeReduce jobMap-only50% I/O saved
Join StrategyReduce-sideMap-sideEliminates shuffle
Execution TimeBaseline40-60% fasterSignificant

Running the Task

export JAR=/home/ds503/ds503_bdm-1.0-SNAPSHOT.jar
export PAGES=/circlenet/pages/CircleNetPage.csv
export FOLLOWS=/circlenet/follows/Follows.csv
export OUT=/circlenet/output

# Run simple version (3 jobs)
hadoop jar $JAR circlenet.taskF.TaskFSimple \
  $PAGES $FOLLOWS \
  $OUT/taskF/tmp_join_simple \
  $OUT/taskF/tmp_avg_simple \
  $OUT/taskF/simple

# View results
hdfs dfs -cat $OUT/taskF/simple/part-r-00000 | head -20

Sample Output

42157,CoolNick789,234
89234,TechGuru23,567
12456,DataNinja,345
...
Format: PageID,NickName,FollowerCount (only those above average)

Key Takeaways

  • Simple approach: 3 jobs with reduce-side join and separate average calculation
  • Optimization techniques:
    • Combiner: Pre-aggregate follower counts (99% reduction)
    • Driver computation: Calculate average in driver instead of separate job
    • Map-side join: Load counts into memory for filtering
    • Map-only job: Eliminate reduce phase in final filter
  • Average calculation: Include ALL pages (200K) in denominator, not just those with followers
  • Performance: 40-60% faster with optimized approach
  • Design pattern: Multi-stage aggregation with filtering

Build docs developers (and LLMs) love