Overview
Task G identifies pages whose owners haven’t had activity within 2160 time units of the global maximum timestamp.
TaskGSimple
Package: circlenet.taskG
Class: TaskGSimple
Source: src/main/java/circlenet/taskG/TaskGSimple.java
Main Method
public static void main(String[] args) throws Exception
Command-Line Arguments
Input path to the Pages CSV file
Input path to the Activity Log CSV file
Temporary output for global max timestamp
Temporary output for user max timestamps
Job 1: Find Global Max Timestamp
Mapper: GlobalMaxMapper
Extracts timestamps from activity log.
Reducer: MaxReducer
Finds the global maximum timestamp.
public static class MaxReducer extends Reducer<NullWritable, IntWritable, NullWritable, IntWritable> {
@Override
protected void reduce(NullWritable key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int max = Integer.MIN_VALUE;
for (IntWritable v : values) {
if (v.get() > max) {
max = v.get();
}
}
if (max == Integer.MIN_VALUE) {
max = 0;
}
context.write(NullWritable.get(), new IntWritable(max));
}
}
Job 2: Find User Max Timestamps
Mapper: UserMaxMapper
Extracts timestamps per user.
Reducer: UserMaxReducer
Finds max timestamp for each user.
Job 3: Identify Outdated Pages
Mappers: PageMapper, MaxMapOutMapper
Prepare page info and user max timestamps for join.
Reducer: OutdatedReducer
Identifies pages whose owners have max timestamp below threshold.
public static class OutdatedReducer extends Reducer<Text, Text, NullWritable, Text> {
private int threshold;
@Override
protected void setup(Context context) {
threshold = context.getConfiguration().getInt("task.g.threshold", 0);
}
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String nick = null;
Integer last = null;
for (Text t : values) {
String[] p = CsvUtils.split(t.toString());
if (p.length >= 2 && "P".equals(p[0])) {
nick = p[1];
} else if (p.length >= 2 && "M".equals(p[0])) {
last = CsvUtils.toInt(p[1], -1);
}
}
if (nick != null && (last == null || last < threshold)) {
context.write(NullWritable.get(), new Text(key.toString() + "," + nick));
}
}
}
Threshold Calculation
Between jobs, the driver reads the global max and computes the threshold:
int globalMax = 0;
if (line != null) {
String[] p = CsvUtils.splitTab(line);
globalMax = CsvUtils.toInt(p[p.length - 1], 0);
}
int threshold = globalMax - 2160;
Example Usage
hadoop jar $JAR circlenet.taskG.TaskGSimple $PAGES $ACTIVITY \
$OUT/taskG/tmp_globalmax_simple $OUT/taskG/tmp_usermax_simple $OUT/taskG/simple
TaskGOptimized
Package: circlenet.taskG
Class: TaskGOptimized
Source: src/main/java/circlenet/taskG/TaskGOptimized.java
Main Method
public static void main(String[] args) throws Exception
Command-Line Arguments
Input path to the Pages CSV file
Input path to the Activity Log CSV file
Temporary output for user max timestamps
Job 1: Find User Max Timestamps
Mapper: UserMaxMapper
Extracts timestamps per user.
Reducer: MaxReducer
Combiner: Yes, uses MaxReducer as combiner.
Global Max Calculation
Performed in driver by scanning all user max values:
FileSystem fs = FileSystem.get(conf);
BufferedReader br = new BufferedReader(
new InputStreamReader(fs.open(new Path(args[2] + "/part-r-00000"))));
String line;
int globalMax = 0;
while ((line = br.readLine()) != null) {
String[] p = CsvUtils.splitTab(line);
if (p.length >= 2) {
int t = CsvUtils.toInt(p[1], 0);
if (t > globalMax) {
globalMax = t;
}
}
}
br.close();
int threshold = globalMax - 2160;
Job 2: Map-Side Join and Filter
Mapper: OutdatedMapJoin
Loads user max timestamps from distributed cache and performs filtering.
@Override
protected void setup(Context context) throws IOException {
threshold = context.getConfiguration().getInt("task.g.threshold", 0);
URI[] files = context.getCacheFiles();
if (files == null) {
return;
}
FileSystem fs = FileSystem.get(context.getConfiguration());
for (URI u : files) {
BufferedReader br = new BufferedReader(
new InputStreamReader(fs.open(new Path(u.getPath()))));
String line;
while ((line = br.readLine()) != null) {
String[] p = CsvUtils.splitTab(line);
if (p.length >= 2) {
userLast.put(p[0].trim(), CsvUtils.toInt(p[1], 0));
}
}
br.close();
}
}
Map-Only Job: Sets setNumReduceTasks(0) for map-only execution.
Optimizations
- Uses combiner to reduce shuffle
- Reduces from 3 jobs to 2 jobs
- Global max calculated in driver (no separate job)
- Map-side join using distributed cache
- Map-only second job
Example Usage
hadoop jar $JAR circlenet.taskG.TaskGOptimized $PAGES $ACTIVITY \
$OUT/taskG/tmp_usermax_opt $OUT/taskG/optimized