Skip to main content
Task E determines which CircleNetPage owners have “favorites” by calculating two metrics: total actions/accesses made and the number of distinct pages they’ve accessed.

Problem Statement

For each CircleNetPage owner, determine:
  1. How many total accesses/actions to CircleNetPages they have made (from ActivityLog)
  2. How many distinct CircleNetPages they have accessed/interacted with in total
Return page owner IDs with both metrics. SQL Equivalent:
SELECT p.ID, 
       COUNT(*) as total_actions,
       COUNT(DISTINCT a.WhatPage) as distinct_pages
FROM CircleNetPage p
LEFT JOIN ActivityLog a ON p.ID = a.ByWho
GROUP BY p.ID;

Implementation

Simple Approach (Reduce-Side Join)

The implementation uses a single-job reduce-side join with in-reducer aggregation and distinct counting. Activity Mapper (TaskESimple.java:21-38):
public static class ActivityMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
    private final IntWritable byWho = new IntWritable();
    private final Text page = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
        String[] f = CsvUtils.split(value.toString());
        if (f.length >= 3) {
            int by = CsvUtils.toInt(f[1], -1);  // ByWho
            int p = CsvUtils.toInt(f[2], -1);   // WhatPage
            if (by > 0 && p > 0) {
                byWho.set(by);
                page.set("A," + p);  // Tag with "A" for Activity
                context.write(byWho, page);
            }
        }
    }
}
Page Owner Mapper (TaskESimple.java:40-55):
public static class PageOwnerMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
    private final IntWritable owner = new IntWritable();
    private static final Text MARKER = new Text("P,1");

    @Override
    protected void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
        String[] f = CsvUtils.split(value.toString());
        if (f.length >= 1) {
            int id = CsvUtils.toInt(f[0], -1);
            if (id > 0) {
                owner.set(id);
                context.write(owner, MARKER);  // Tag with "P" for Page
            }
        }
    }
}
Stats Reducer with Distinct Counting (TaskESimple.java:57-79):
public static class StatsReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
    private final Text out = new Text();

    @Override
    protected void reduce(IntWritable key, Iterable<Text> values, Context context) 
        throws IOException, InterruptedException {
        boolean isOwner = false;
        int total = 0;
        Set<Integer> distinct = new HashSet<Integer>();
        
        for (Text v : values) {
            String[] p = CsvUtils.split(v.toString());
            if (p.length >= 2 && "P".equals(p[0])) {
                isOwner = true;  // User has a page
            } else if (p.length >= 2 && "A".equals(p[0])) {
                total++;  // Count total actions
                distinct.add(CsvUtils.toInt(p[1], -1));  // Track distinct pages
            }
        }
        
        // Only output page owners
        if (isOwner) {
            out.set(total + "," + distinct.size());
            context.write(key, out);
        }
    }
}
Job Configuration (TaskESimple.java:87-91):
// Use MultipleInputs for two data sources
MultipleInputs.addInputPath(job, new Path(args[0]), 
                           TextInputFormat.class, PageOwnerMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]), 
                           TextInputFormat.class, ActivityMapper.class);
job.setReducerClass(StatsReducer.class);

Why No Combiner?

Combiner Cannot Be Used Here: This task requires counting distinct pages accessed, which cannot be safely pre-aggregated:
// Problem: User 1 accesses pages [2, 3, 2, 3]
// Mapper A emits: 1 -> [2, 3]     (distinct: 2)
// Mapper B emits: 1 -> [2, 3]     (distinct: 2)
// Combiner cannot merge these correctly!
// Reducer needs all values to compute true distinct: 2 (not 4)
Why it fails:
  • Combiner would compute local distinct counts
  • Reducer would receive pre-computed counts, not raw page IDs
  • Cannot determine global distinct count from local distinct counts
  • Example: + = 2 distinct, not 4!
Distinct counting requires seeing all values at the reducer.

Optimization Challenges

The run instructions note:
# optimized version is optional; latest timing is slower
This task is difficult to optimize because:
  1. Distinct counting prevents combiner use
  2. Large activity dataset (10M records) must all shuffle
  3. In-reducer state: HashSet per user requires memory
  4. No map-side join opportunity: Need all activities per user
Sometimes the simple approach is the best approach!

Performance Characteristics

MetricValueNotes
Map Output10M recordsAll activities must shuffle
Reducer MemoryO(distinct pages per user)HashSet in memory
CombinerNot applicableDistinct counting prevents use
AlternativeNone practicalSimple is optimal
Alternative Optimization: A secondary sort pattern with custom partitioner could eliminate in-memory HashSet, but adds significant complexity for marginal gain.

Running the Task

export JAR=/home/ds503/ds503_bdm-1.0-SNAPSHOT.jar
export PAGES=/circlenet/pages/CircleNetPage.csv
export ACTIVITY=/circlenet/activitylog/ActivityLog.csv
export OUT=/circlenet/output

# Run simple version
hadoop jar $JAR circlenet.taskE.TaskESimple \
  $PAGES $ACTIVITY \
  $OUT/taskE/simple

# View results
hdfs dfs -cat $OUT/taskE/simple/part-r-00000 | head -20

Sample Output

1    145,23
2    0,0
3    892,67
4    23,12
...
Format: PageOwnerID<tab>TotalActions,DistinctPages

When Combiners Don’t Work

This task illustrates scenarios where combiners cannot be applied:
OperationCombiner Safe?Reason
SUMYesAssociative/commutative
COUNTYesAssociative/commutative
MAX/MINYesAssociative/commutative
AVGNoNeed total sum and count
DISTINCT COUNTNoCannot merge sets correctly
MEDIANNoRequires all values
SET UNIONMaybeOnly if sets are disjoint
Design Principle: Combiners work when the operation is associative and commutative, meaning:
  • Order doesn’t matter
  • Grouping doesn’t matter
  • combine(combine(a,b), c) = combine(a, combine(b,c))
Distinct counting violates this because local distinct sets overlap.

Key Takeaways

  • Approach: Single-job reduce-side join with distinct counting
  • No combiner: Distinct counting requires all values at reducer
  • Memory usage: HashSet per user for tracking distinct pages
  • Optimization limitation: Sometimes simple approach is optimal
  • When combiners fail: Non-associative operations (DISTINCT, AVG, MEDIAN)
  • Alternative patterns: Secondary sort could optimize, but adds complexity
  • Design lesson: Not all MapReduce tasks benefit from optimization

Build docs developers (and LLMs) love