Overview
Task E computes activity statistics for each page owner: total accesses and distinct page views.
TaskESimple
Package: circlenet.taskE
Class: TaskESimple
Source: src/main/java/circlenet/taskE/TaskESimple.java
Main Method
public static void main(String[] args) throws Exception
Command-Line Arguments
Input path to the Pages CSV file
Input path to the Activity Log CSV file
Mapper: ActivityMapper
Emits activity records keyed by the user who performed the activity.
public static class ActivityMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
private final IntWritable byWho = new IntWritable();
private final Text page = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] f = CsvUtils.split(value.toString());
if (f.length >= 3) {
int by = CsvUtils.toInt(f[1], -1);
int p = CsvUtils.toInt(f[2], -1);
if (by > 0 && p > 0) {
byWho.set(by);
page.set("A," + p);
context.write(byWho, page);
}
}
}
}
Mapper: PageOwnerMapper
Emits page owner records tagged with “P”.
public static class PageOwnerMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
private final IntWritable owner = new IntWritable();
private static final Text MARKER = new Text("P,1");
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] f = CsvUtils.split(value.toString());
if (f.length >= 1) {
int id = CsvUtils.toInt(f[0], -1);
if (id > 0) {
owner.set(id);
context.write(owner, MARKER);
}
}
}
}
Reducer: StatsReducer
Computes total accesses and distinct page views for page owners.
public static class StatsReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
private final Text out = new Text();
@Override
protected void reduce(IntWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
boolean isOwner = false;
int total = 0;
Set<Integer> distinct = new HashSet<Integer>();
for (Text v : values) {
String[] p = CsvUtils.split(v.toString());
if (p.length >= 2 && "P".equals(p[0])) {
isOwner = true;
} else if (p.length >= 2 && "A".equals(p[0])) {
total++;
distinct.add(CsvUtils.toInt(p[1], -1));
}
}
if (isOwner) {
out.set(total + "," + distinct.size());
context.write(key, out);
}
}
}
Each line contains:
- Page owner ID
- Total access count
- Distinct page count
<owner_id>\t<total_accesses>,<distinct_pages>
Example Usage
hadoop jar $JAR circlenet.taskE.TaskESimple $PAGES $ACTIVITY $OUT/taskE/simple
Notes
- An optimized version was implemented but performed slower than the simple version
- The simple version is recommended for production use
- Uses HashSet in reducer to track distinct pages