Skip to main content

Overview

Task B identifies the top 10 most accessed pages by analyzing activity logs and joining with page information.

TaskBSimple

Package: circlenet.taskB
Class: TaskBSimple
Source: src/main/java/circlenet/taskB/TaskBSimple.java

Main Method

public static void main(String[] args) throws Exception

Command-Line Arguments

args[0]
string
required
Input path to the Activity Log CSV file
args[1]
string
required
Input path to the Pages CSV file
args[2]
string
required
Temporary output for count results
args[3]
string
required
Temporary output for join results
args[4]
string
required
Final output path

Job 1: Count Access

Mapper: AccessCountMapper Extracts page IDs from activity logs.
public static class AccessCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private static final IntWritable ONE = new IntWritable(1);
    private final Text pageId = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) 
            throws IOException, InterruptedException {
        String[] f = CsvUtils.split(value.toString());
        if (f.length >= 3) {
            pageId.set(f[2].trim());
            context.write(pageId, ONE);
        }
    }
}
Reducer: SumReducer Sums access counts per page.

Job 2: Join with Page Info

Mappers: PageMapper, CountMapper Tags data sources and prepares for join. Reducer: JoinReducer Joins page metadata with access counts.
public static class JoinReducer extends Reducer<Text, Text, NullWritable, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) 
            throws IOException, InterruptedException {
        String nick = "";
        String job = "";
        int count = 0;
        for (Text t : values) {
            String[] p = CsvUtils.split(t.toString());
            if (p.length >= 2 && "P".equals(p[0])) {
                nick = p[1];
                job = p.length > 2 ? p[2] : "";
            } else if (p.length >= 2 && "C".equals(p[0])) {
                count = CsvUtils.toInt(p[1], 0);
            }
        }
        out.set(count + "," + key.toString() + "," + nick + "," + job);
        context.write(NullWritable.get(), out);
    }
}

Job 3: Top 10

Reducer: TopReducer Uses TreeMap to maintain top 10 pages by access count.
public static class TopReducer extends Reducer<NullWritable, Text, NullWritable, Text> {
    private final TreeMap<Long, String> top = new TreeMap<Long, String>();
    private long seq = 0L;

    @Override
    protected void reduce(NullWritable key, Iterable<Text> values, Context context) 
            throws IOException, InterruptedException {
        for (Text t : values) {
            String[] f = CsvUtils.split(t.toString());
            if (f.length >= 4) {
                int count = CsvUtils.toInt(f[0], 0);
                long keyRank = (count * 1000000L) + (seq++);
                top.put(keyRank, f[1] + "," + f[2] + "," + f[3] + "," + count);
                if (top.size() > 10) {
                    top.pollFirstEntry();
                }
            }
        }
        for (Map.Entry<Long, String> e : top.descendingMap().entrySet()) {
            context.write(NullWritable.get(), new Text(e.getValue()));
        }
    }
}

Example Usage

hadoop jar $JAR circlenet.taskB.TaskBSimple $ACTIVITY $PAGES \
  $OUT/taskB/tmp_count_simple $OUT/taskB/tmp_join_simple $OUT/taskB/simple

TaskBOptimized

Package: circlenet.taskB
Class: TaskBOptimized
Source: src/main/java/circlenet/taskB/TaskBOptimized.java

Main Method

public static void main(String[] args) throws Exception

Command-Line Arguments

args[0]
string
required
Input path to the Activity Log CSV file
args[1]
string
required
Input path to the Pages CSV file (loaded as distributed cache)
args[2]
string
required
Temporary output for count results
args[3]
string
required
Final output path

Job 1: Count Access

Combiner: Yes, uses SumReducer as combiner.

Job 2: Top 10 with Map-Side Join

Reducer: TopJoinReducer Loads page information from distributed cache in setup() and performs map-side join.
@Override
protected void setup(Context context) throws IOException {
    URI[] files = context.getCacheFiles();
    if (files == null) {
        return;
    }
    FileSystem fs = FileSystem.get(context.getConfiguration());
    for (URI u : files) {
        BufferedReader br = new BufferedReader(
            new InputStreamReader(fs.open(new Path(u.getPath()))));
        String line;
        while ((line = br.readLine()) != null) {
            String[] p = CsvUtils.split(line);
            if (p.length >= 3) {
                pageInfo.put(p[0].trim(), p[1].trim() + "," + p[2].trim());
            }
        }
        br.close();
    }
}

Optimizations

  • Uses combiner to reduce shuffle
  • Map-side join using distributed cache
  • Reduces from 3 jobs to 2 jobs

Example Usage

hadoop jar $JAR circlenet.taskB.TaskBOptimized $ACTIVITY $PAGES \
  $OUT/taskB/tmp_count_opt $OUT/taskB/optimized

Build docs developers (and LLMs) love