Skip to main content

Overview

Task G identifies pages whose owners haven’t had activity within 2160 time units of the global maximum timestamp.

TaskGSimple

Package: circlenet.taskG
Class: TaskGSimple
Source: src/main/java/circlenet/taskG/TaskGSimple.java

Main Method

public static void main(String[] args) throws Exception

Command-Line Arguments

args[0]
string
required
Input path to the Pages CSV file
args[1]
string
required
Input path to the Activity Log CSV file
args[2]
string
required
Temporary output for global max timestamp
args[3]
string
required
Temporary output for user max timestamps
args[4]
string
required
Final output path

Job 1: Find Global Max Timestamp

Mapper: GlobalMaxMapper Extracts timestamps from activity log. Reducer: MaxReducer Finds the global maximum timestamp.
public static class MaxReducer extends Reducer<NullWritable, IntWritable, NullWritable, IntWritable> {
    @Override
    protected void reduce(NullWritable key, Iterable<IntWritable> values, Context context) 
            throws IOException, InterruptedException {
        int max = Integer.MIN_VALUE;
        for (IntWritable v : values) {
            if (v.get() > max) {
                max = v.get();
            }
        }
        if (max == Integer.MIN_VALUE) {
            max = 0;
        }
        context.write(NullWritable.get(), new IntWritable(max));
    }
}

Job 2: Find User Max Timestamps

Mapper: UserMaxMapper Extracts timestamps per user. Reducer: UserMaxReducer Finds max timestamp for each user.

Job 3: Identify Outdated Pages

Mappers: PageMapper, MaxMapOutMapper Prepare page info and user max timestamps for join. Reducer: OutdatedReducer Identifies pages whose owners have max timestamp below threshold.
public static class OutdatedReducer extends Reducer<Text, Text, NullWritable, Text> {
    private int threshold;

    @Override
    protected void setup(Context context) {
        threshold = context.getConfiguration().getInt("task.g.threshold", 0);
    }

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) 
            throws IOException, InterruptedException {
        String nick = null;
        Integer last = null;
        for (Text t : values) {
            String[] p = CsvUtils.split(t.toString());
            if (p.length >= 2 && "P".equals(p[0])) {
                nick = p[1];
            } else if (p.length >= 2 && "M".equals(p[0])) {
                last = CsvUtils.toInt(p[1], -1);
            }
        }
        if (nick != null && (last == null || last < threshold)) {
            context.write(NullWritable.get(), new Text(key.toString() + "," + nick));
        }
    }
}

Threshold Calculation

Between jobs, the driver reads the global max and computes the threshold:
int globalMax = 0;
if (line != null) {
    String[] p = CsvUtils.splitTab(line);
    globalMax = CsvUtils.toInt(p[p.length - 1], 0);
}
int threshold = globalMax - 2160;

Example Usage

hadoop jar $JAR circlenet.taskG.TaskGSimple $PAGES $ACTIVITY \
  $OUT/taskG/tmp_globalmax_simple $OUT/taskG/tmp_usermax_simple $OUT/taskG/simple

TaskGOptimized

Package: circlenet.taskG
Class: TaskGOptimized
Source: src/main/java/circlenet/taskG/TaskGOptimized.java

Main Method

public static void main(String[] args) throws Exception

Command-Line Arguments

args[0]
string
required
Input path to the Pages CSV file
args[1]
string
required
Input path to the Activity Log CSV file
args[2]
string
required
Temporary output for user max timestamps
args[3]
string
required
Final output path

Job 1: Find User Max Timestamps

Mapper: UserMaxMapper Extracts timestamps per user. Reducer: MaxReducer Combiner: Yes, uses MaxReducer as combiner.

Global Max Calculation

Performed in driver by scanning all user max values:
FileSystem fs = FileSystem.get(conf);
BufferedReader br = new BufferedReader(
    new InputStreamReader(fs.open(new Path(args[2] + "/part-r-00000"))));
String line;
int globalMax = 0;
while ((line = br.readLine()) != null) {
    String[] p = CsvUtils.splitTab(line);
    if (p.length >= 2) {
        int t = CsvUtils.toInt(p[1], 0);
        if (t > globalMax) {
            globalMax = t;
        }
    }
}
br.close();

int threshold = globalMax - 2160;

Job 2: Map-Side Join and Filter

Mapper: OutdatedMapJoin Loads user max timestamps from distributed cache and performs filtering.
@Override
protected void setup(Context context) throws IOException {
    threshold = context.getConfiguration().getInt("task.g.threshold", 0);
    URI[] files = context.getCacheFiles();
    if (files == null) {
        return;
    }
    FileSystem fs = FileSystem.get(context.getConfiguration());
    for (URI u : files) {
        BufferedReader br = new BufferedReader(
            new InputStreamReader(fs.open(new Path(u.getPath()))));
        String line;
        while ((line = br.readLine()) != null) {
            String[] p = CsvUtils.splitTab(line);
            if (p.length >= 2) {
                userLast.put(p[0].trim(), CsvUtils.toInt(p[1], 0));
            }
        }
        br.close();
    }
}
Map-Only Job: Sets setNumReduceTasks(0) for map-only execution.

Optimizations

  • Uses combiner to reduce shuffle
  • Reduces from 3 jobs to 2 jobs
  • Global max calculated in driver (no separate job)
  • Map-side join using distributed cache
  • Map-only second job

Example Usage

hadoop jar $JAR circlenet.taskG.TaskGOptimized $PAGES $ACTIVITY \
  $OUT/taskG/tmp_usermax_opt $OUT/taskG/optimized

Build docs developers (and LLMs) love