Overview
Task B identifies the top 10 most accessed pages by analyzing activity logs and joining with page information.
TaskBSimple
Package: circlenet.taskB
Class: TaskBSimple
Source: src/main/java/circlenet/taskB/TaskBSimple.java
Main Method
public static void main(String[] args) throws Exception
Command-Line Arguments
Input path to the Activity Log CSV file
Input path to the Pages CSV file
Temporary output for count results
Temporary output for join results
Job 1: Count Access
Mapper: AccessCountMapper
Extracts page IDs from activity logs.
public static class AccessCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final IntWritable ONE = new IntWritable(1);
private final Text pageId = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] f = CsvUtils.split(value.toString());
if (f.length >= 3) {
pageId.set(f[2].trim());
context.write(pageId, ONE);
}
}
}
Reducer: SumReducer
Sums access counts per page.
Job 2: Join with Page Info
Mappers: PageMapper, CountMapper
Tags data sources and prepares for join.
Reducer: JoinReducer
Joins page metadata with access counts.
public static class JoinReducer extends Reducer<Text, Text, NullWritable, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String nick = "";
String job = "";
int count = 0;
for (Text t : values) {
String[] p = CsvUtils.split(t.toString());
if (p.length >= 2 && "P".equals(p[0])) {
nick = p[1];
job = p.length > 2 ? p[2] : "";
} else if (p.length >= 2 && "C".equals(p[0])) {
count = CsvUtils.toInt(p[1], 0);
}
}
out.set(count + "," + key.toString() + "," + nick + "," + job);
context.write(NullWritable.get(), out);
}
}
Job 3: Top 10
Reducer: TopReducer
Uses TreeMap to maintain top 10 pages by access count.
public static class TopReducer extends Reducer<NullWritable, Text, NullWritable, Text> {
private final TreeMap<Long, String> top = new TreeMap<Long, String>();
private long seq = 0L;
@Override
protected void reduce(NullWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text t : values) {
String[] f = CsvUtils.split(t.toString());
if (f.length >= 4) {
int count = CsvUtils.toInt(f[0], 0);
long keyRank = (count * 1000000L) + (seq++);
top.put(keyRank, f[1] + "," + f[2] + "," + f[3] + "," + count);
if (top.size() > 10) {
top.pollFirstEntry();
}
}
}
for (Map.Entry<Long, String> e : top.descendingMap().entrySet()) {
context.write(NullWritable.get(), new Text(e.getValue()));
}
}
}
Example Usage
hadoop jar $JAR circlenet.taskB.TaskBSimple $ACTIVITY $PAGES \
$OUT/taskB/tmp_count_simple $OUT/taskB/tmp_join_simple $OUT/taskB/simple
TaskBOptimized
Package: circlenet.taskB
Class: TaskBOptimized
Source: src/main/java/circlenet/taskB/TaskBOptimized.java
Main Method
public static void main(String[] args) throws Exception
Command-Line Arguments
Input path to the Activity Log CSV file
Input path to the Pages CSV file (loaded as distributed cache)
Temporary output for count results
Job 1: Count Access
Combiner: Yes, uses SumReducer as combiner.
Job 2: Top 10 with Map-Side Join
Reducer: TopJoinReducer
Loads page information from distributed cache in setup() and performs map-side join.
@Override
protected void setup(Context context) throws IOException {
URI[] files = context.getCacheFiles();
if (files == null) {
return;
}
FileSystem fs = FileSystem.get(context.getConfiguration());
for (URI u : files) {
BufferedReader br = new BufferedReader(
new InputStreamReader(fs.open(new Path(u.getPath()))));
String line;
while ((line = br.readLine()) != null) {
String[] p = CsvUtils.split(line);
if (p.length >= 3) {
pageInfo.put(p[0].trim(), p[1].trim() + "," + p[2].trim());
}
}
br.close();
}
}
Optimizations
- Uses combiner to reduce shuffle
- Map-side join using distributed cache
- Reduces from 3 jobs to 2 jobs
Example Usage
hadoop jar $JAR circlenet.taskB.TaskBOptimized $ACTIVITY $PAGES \
$OUT/taskB/tmp_count_opt $OUT/taskB/optimized