Skip to main content
Task G identifies “outdated” CircleNetPages by finding users who have not accessed CircleNet for 90 days (defined as 90 days × 24 hours = 2,160 time units from the maximum timestamp).

Problem Statement

Return IDs and nicknames of persons that have not accessed CircleNet for 90 days (no entries in ActivityLog in the last 90 days). The ActivityLog uses hourly timestamp granularity. SQL Equivalent:
WITH GlobalMax AS (
    SELECT MAX(ActionTime) as max_time
    FROM ActivityLog
),
UserLastAccess AS (
    SELECT ByWho, MAX(ActionTime) as last_access
    FROM ActivityLog
    GROUP BY ByWho
)
SELECT p.ID, p.NickName
FROM CircleNetPage p
LEFT JOIN UserLastAccess u ON p.ID = u.ByWho
CROSS JOIN GlobalMax g
WHERE u.last_access IS NULL 
   OR u.last_access < (g.max_time - 2160);  -- 90 days * 24 hours

Approaches

Simple Implementation (3 Jobs)

The simple approach uses three sequential MapReduce jobs to find the global max, user max, and filter outdated users.Job 1: Find Global Maximum Timestamp (TaskGSimple.java:24-53)
public static class GlobalMaxMapper extends Mapper<LongWritable, Text, NullWritable, IntWritable> {
private final IntWritable out = new IntWritable();

@Override
protected void map(LongWritable key, Text value, Context context) 
    throws IOException, InterruptedException {
    String[] f = CsvUtils.split(value.toString());
    if (f.length >= 5) {
        out.set(CsvUtils.toInt(f[4], -1));  // ActionTime
        if (out.get() >= 0) {
            context.write(NullWritable.get(), out);
        }
    }
}
}

public static class MaxReducer extends Reducer<NullWritable, IntWritable, NullWritable, IntWritable> {
@Override
protected void reduce(NullWritable key, Iterable<IntWritable> values, Context context) 
    throws IOException, InterruptedException {
    int max = Integer.MIN_VALUE;
    for (IntWritable v : values) {
        if (v.get() > max) {
            max = v.get();
        }
    }
    context.write(NullWritable.get(), 
                 new IntWritable(max == Integer.MIN_VALUE ? 0 : max));
}
}
Job 2: Find Last Access Per User (TaskGSimple.java:55-85)
public static class UserMaxMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final Text outKey = new Text();
private final IntWritable outVal = new IntWritable();

@Override
protected void map(LongWritable key, Text value, Context context) 
    throws IOException, InterruptedException {
    String[] f = CsvUtils.split(value.toString());
    if (f.length >= 5) {
        int by = CsvUtils.toInt(f[1], -1);  // ByWho
        int t = CsvUtils.toInt(f[4], -1);   // ActionTime
        if (by > 0 && t >= 0) {
            outKey.set(String.valueOf(by));
            outVal.set(t);
            context.write(outKey, outVal);
        }
    }
}
}

public static class UserMaxReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) 
    throws IOException, InterruptedException {
    int max = Integer.MIN_VALUE;
    for (IntWritable v : values) {
        if (v.get() > max) {
            max = v.get();
        }
    }
    context.write(key, 
                 new IntWritable(max == Integer.MIN_VALUE ? 0 : max));
}
}
Read Global Max and Calculate Threshold (TaskGSimple.java:180-189):
FileSystem fs = FileSystem.get(conf);
BufferedReader br = new BufferedReader(
new InputStreamReader(fs.open(new Path(args[2] + "/part-r-00000"))));
String line = br.readLine();
br.close();
int globalMax = 0;
if (line != null) {
String[] p = CsvUtils.splitTab(line);
globalMax = CsvUtils.toInt(p[p.length - 1], 0);
}
int threshold = globalMax - 2160;  // 90 days * 24 hours
Job 3: Filter Outdated Users (TaskGSimple.java:87-141)
// Use MultipleInputs to read Pages and UserMax
public static class PageMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) 
    throws IOException, InterruptedException {
    String[] f = CsvUtils.split(value.toString());
    if (f.length >= 2) {
        context.write(new Text(f[0].trim()), 
                     new Text("P," + f[1].trim()));
    }
}
}

public static class MaxMapOutMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) 
    throws IOException, InterruptedException {
    String[] f = CsvUtils.splitTab(value.toString());
    if (f.length >= 2) {
        context.write(new Text(f[0].trim()), 
                     new Text("M," + f[1].trim()));
    }
}
}

public static class OutdatedReducer extends Reducer<Text, Text, NullWritable, Text> {
private int threshold;

@Override
protected void setup(Context context) {
    threshold = context.getConfiguration().getInt("task.g.threshold", 0);
}

@Override
protected void reduce(Text key, Iterable<Text> values, Context context) 
    throws IOException, InterruptedException {
    String nick = null;
    Integer last = null;
    for (Text t : values) {
        String[] p = CsvUtils.split(t.toString());
        if ("P".equals(p[0])) {
            nick = p[1];
        } else if ("M".equals(p[0])) {
            last = CsvUtils.toInt(p[1], -1);
        }
    }
    // Output if user has page but no recent activity
    if (nick != null && (last == null || last < threshold)) {
        context.write(NullWritable.get(), new Text(key + "," + nick));
    }
}
}
Characteristics:
  • 3 sequential jobs with high overhead
  • Separate job to find global max
  • Reduce-side join in Job 3
  • Multiple HDFS reads by driver

Performance Comparison

MetricSimpleOptimizedImprovement
Number of Jobs3233% reduction
Global Max CalculationSeparate jobIn driver1 job saved
Job 1 Shuffle10M records~200K (with combiner)98% reduction
Join StrategyReduce-sideMap-sideEliminates shuffle
Job 2 TypeReduce jobMap-only50% I/O saved
Execution TimeBaseline40-50% fasterSignificant

Running the Task

export JAR=/home/ds503/ds503_bdm-1.0-SNAPSHOT.jar
export PAGES=/circlenet/pages/CircleNetPage.csv
export ACTIVITY=/circlenet/activitylog/ActivityLog.csv
export OUT=/circlenet/output

# Run simple version (3 jobs)
hadoop jar $JAR circlenet.taskG.TaskGSimple \
  $PAGES $ACTIVITY \
  $OUT/taskG/tmp_globalmax_simple \
  $OUT/taskG/tmp_usermax_simple \
  $OUT/taskG/simple

# View results
hdfs dfs -cat $OUT/taskG/simple/part-r-00000 | head -20

Sample Output

12456,InactiveUser1
34567,DormantPage
78901,OldAccount
...
Format: UserID,NickName (users with no activity in last 90 days)

Understanding the 90-Day Calculation

ActionTime granularity: hours
90 days = 90 * 24 hours = 2,160 time units
Threshold = GlobalMaxTime - 2160

Examples:
- GlobalMax = 1,000,000
- Threshold = 997,840
- User last access = 997,500 → Outdated (< threshold)
- User last access = 998,000 → Active (≥ threshold)
- User with no access → Outdated (NULL < threshold)

Key Takeaways

  • Simple approach: 3 jobs with separate global max computation
  • Optimization techniques:
    • Combiner for MAX: Pre-compute local max (98% shuffle reduction)
    • Driver computation: Calculate global max in driver instead of separate job
    • Map-side join: Load user timestamps into memory
    • Map-only job: Eliminate reduce phase for filtering
  • Design pattern: When intermediate result is small, compute in driver vs. launching new job
  • Performance: 40-50% faster with optimized approach
  • Output filename: Map-only jobs produce part-m-* instead of part-r-*

Build docs developers (and LLMs) love