Analyze user activity to determine total actions and distinct pages accessed
Task E determines which CircleNetPage owners have “favorites” by calculating two metrics: total actions/accesses made and the number of distinct pages they’ve accessed.
How many total accesses/actions to CircleNetPages they have made (from ActivityLog)
How many distinct CircleNetPages they have accessed/interacted with in total
Return page owner IDs with both metrics.SQL Equivalent:
SELECT p.ID, COUNT(*) as total_actions, COUNT(DISTINCT a.WhatPage) as distinct_pagesFROM CircleNetPage pLEFT JOIN ActivityLog a ON p.ID = a.ByWhoGROUP BY p.ID;
The implementation uses a single-job reduce-side join with in-reducer aggregation and distinct counting.Activity Mapper (TaskESimple.java:21-38):
public static class ActivityMapper extends Mapper<LongWritable, Text, IntWritable, Text> { private final IntWritable byWho = new IntWritable(); private final Text page = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] f = CsvUtils.split(value.toString()); if (f.length >= 3) { int by = CsvUtils.toInt(f[1], -1); // ByWho int p = CsvUtils.toInt(f[2], -1); // WhatPage if (by > 0 && p > 0) { byWho.set(by); page.set("A," + p); // Tag with "A" for Activity context.write(byWho, page); } } }}
Page Owner Mapper (TaskESimple.java:40-55):
public static class PageOwnerMapper extends Mapper<LongWritable, Text, IntWritable, Text> { private final IntWritable owner = new IntWritable(); private static final Text MARKER = new Text("P,1"); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] f = CsvUtils.split(value.toString()); if (f.length >= 1) { int id = CsvUtils.toInt(f[0], -1); if (id > 0) { owner.set(id); context.write(owner, MARKER); // Tag with "P" for Page } } }}
Stats Reducer with Distinct Counting (TaskESimple.java:57-79):
public static class StatsReducer extends Reducer<IntWritable, Text, IntWritable, Text> { private final Text out = new Text(); @Override protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { boolean isOwner = false; int total = 0; Set<Integer> distinct = new HashSet<Integer>(); for (Text v : values) { String[] p = CsvUtils.split(v.toString()); if (p.length >= 2 && "P".equals(p[0])) { isOwner = true; // User has a page } else if (p.length >= 2 && "A".equals(p[0])) { total++; // Count total actions distinct.add(CsvUtils.toInt(p[1], -1)); // Track distinct pages } } // Only output page owners if (isOwner) { out.set(total + "," + distinct.size()); context.write(key, out); } }}
Job Configuration (TaskESimple.java:87-91):
// Use MultipleInputs for two data sourcesMultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, PageOwnerMapper.class);MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, ActivityMapper.class);job.setReducerClass(StatsReducer.class);
Alternative Optimization: A secondary sort pattern with custom partitioner could eliminate in-memory HashSet, but adds significant complexity for marginal gain.