Find the most accessed CircleNet pages using activity logs
Task B identifies the 10 most popular CircleNetPages by analyzing ActivityLog access counts and joining with page metadata to return ID, NickName, JobTitle, and access count.
Find the 10 most popular CircleNetPages based on ActivityLog access counts. Return the page ID, nickname, job title, and total access count.SQL Equivalent:
SELECT p.ID, p.NickName, p.JobTitle, COUNT(*) as access_countFROM ActivityLog aJOIN CircleNetPage p ON a.WhatPage = p.IDGROUP BY p.ID, p.NickName, p.JobTitleORDER BY access_count DESCLIMIT 10;
The simple approach uses reduce-side join with three sequential MapReduce jobs.Job 1: Count Page Accesses (TaskBSimple.java:23-49)
public static class AccessCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {private static final IntWritable ONE = new IntWritable(1);private final Text pageId = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] f = CsvUtils.split(value.toString()); if (f.length >= 3) { pageId.set(f[2].trim()); // WhatPage field context.write(pageId, ONE); }}}public static class SumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int s = 0; for (IntWritable v : values) { s += v.get(); } context.write(key, new IntWritable(s));}}
Job 2: Reduce-Side Join (TaskBSimple.java:51-101)
// Tag page metadata with "P,"public static class PageMapper extends Mapper<LongWritable, Text, Text, Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] f = CsvUtils.split(value.toString()); if (f.length >= 3) { context.write(new Text(f[0].trim()), new Text("P," + f[1].trim() + "," + f[2].trim())); }}}// Tag counts with "C,"public static class CountMapper extends Mapper<LongWritable, Text, Text, Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] f = CsvUtils.splitTab(value.toString()); if (f.length >= 2) { context.write(new Text(f[0].trim()), new Text("C," + f[1].trim())); }}}// Join by page IDpublic static class JoinReducer extends Reducer<Text, Text, NullWritable, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String nick = "", job = ""; int count = 0; for (Text t : values) { String[] p = CsvUtils.split(t.toString()); if ("P".equals(p[0])) { nick = p[1]; job = p.length > 2 ? p[2] : ""; } else if ("C".equals(p[0])) { count = CsvUtils.toInt(p[1], 0); } } context.write(NullWritable.get(), new Text(count + "," + key + "," + nick + "," + job));}}
The optimized version uses map-side join with distributed cache and adds a combiner.Job 1: Count with Combiner (TaskBOptimized.java:26-52)
public static class AccessCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {private static final IntWritable ONE = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] f = CsvUtils.split(value.toString()); if (f.length >= 3) { context.write(new Text(f[2].trim()), ONE); }}}// Combiner pre-aggregates at mapperjob1.setCombinerClass(SumReducer.class);job1.setReducerClass(SumReducer.class);
Job 2: Top-10 with Map-Side Join (TaskBOptimized.java:61-111)
public static class TopJoinReducer extends Reducer<NullWritable, Text, NullWritable, Text> {private final TreeMap<Long, String> top = new TreeMap<Long, String>();private final Map<String, String> pageInfo = new HashMap<String, String>();@Overrideprotected void setup(Context context) throws IOException { // Load page metadata into memory from distributed cache URI[] files = context.getCacheFiles(); if (files != null) { FileSystem fs = FileSystem.get(context.getConfiguration()); for (URI u : files) { BufferedReader br = new BufferedReader( new InputStreamReader(fs.open(new Path(u.getPath())))); String line; while ((line = br.readLine()) != null) { String[] p = CsvUtils.split(line); if (p.length >= 3) { // pageInfo: ID -> "NickName,JobTitle" pageInfo.put(p[0].trim(), p[1].trim() + "," + p[2].trim()); } } br.close(); } }}@Overrideprotected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // Select top 10 and join with in-memory page data for (Text v : values) { String[] f = CsvUtils.splitTab(v.toString()); if (f.length >= 2) { int count = CsvUtils.toInt(f[1], 0); String id = f[0].trim(); long keyRank = (count * 1000000L) + (seq++); top.put(keyRank, id + "," + count); if (top.size() > 10) { top.pollFirstEntry(); } } } // Output with joined nickname/job for (Map.Entry<Long, String> e : top.descendingMap().entrySet()) { String[] pair = CsvUtils.split(e.getValue()); String id = pair[0]; String info = pageInfo.getOrDefault(id, ","); String[] pj = CsvUtils.split(info); String nick = pj.length > 0 ? pj[0] : ""; String job = pj.length > 1 ? pj[1] : ""; String count = pair.length > 1 ? pair[1] : "0"; context.write(NullWritable.get(), new Text(id + "," + nick + "," + job + "," + count)); }}}// Add page file to distributed cachejob2.addCacheFile(new Path(args[1]).toUri());
Map-Side Join Optimization: Loading the CircleNetPage dataset (200K records, ~5MB) into memory is feasible and avoids an entire reduce-side shuffle. This saves significant I/O.
When to Use Map-Side Joins: When one dataset fits in memory (~100MB or less) and you’re joining with a much larger dataset. Perfect for dimension table lookups.