Skip to main content

Overview

Task D counts the number of followers for each page by joining the Pages and Follows datasets.

TaskDSimple

Package: circlenet.taskD
Class: TaskDSimple
Source: src/main/java/circlenet/taskD/TaskDSimple.java

Main Method

public static void main(String[] args) throws Exception

Command-Line Arguments

args[0]
string
required
Input path to the Pages CSV file
args[1]
string
required
Input path to the Follows CSV file
args[2]
string
required
Output path for results

Mapper: PageMapper

Emits page information tagged with “P”.
public static class PageMapper extends Mapper<LongWritable, Text, Text, Text> {
    private final Text outKey = new Text();
    private final Text outVal = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) 
            throws IOException, InterruptedException {
        String[] f = CsvUtils.split(value.toString());
        if (f.length >= 2) {
            outKey.set(f[0].trim());
            outVal.set("P," + f[1].trim());
            context.write(outKey, outVal);
        }
    }
}

Mapper: FollowsMapper

Emits follower counts tagged with “F”.
public static class FollowsMapper extends Mapper<LongWritable, Text, Text, Text> {
    private final Text outKey = new Text();
    private static final Text OUT = new Text("F,1");

    @Override
    protected void map(LongWritable key, Text value, Context context) 
            throws IOException, InterruptedException {
        String[] f = CsvUtils.split(value.toString());
        if (f.length >= 3) {
            outKey.set(f[2].trim());
            context.write(outKey, OUT);
        }
    }
}

Reducer: JoinReducer

Joins page nicknames with follower counts.
public static class JoinReducer extends Reducer<Text, Text, Text, IntWritable> {
    private final IntWritable out = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) 
            throws IOException, InterruptedException {
        String nick = null;
        int count = 0;
        for (Text t : values) {
            String[] p = CsvUtils.split(t.toString());
            if (p.length >= 2 && "P".equals(p[0])) {
                nick = p[1];
            } else if (p.length >= 2 && "F".equals(p[0])) {
                count += CsvUtils.toInt(p[1], 0);
            }
        }
        if (nick != null) {
            out.set(count);
            context.write(new Text(key.toString() + "," + nick), out);
        }
    }
}

Example Usage

hadoop jar $JAR circlenet.taskD.TaskDSimple $PAGES $FOLLOWS $OUT/taskD/simple

TaskDOptimized

Package: circlenet.taskD
Class: TaskDOptimized
Source: src/main/java/circlenet/taskD/TaskDOptimized.java

Main Method

public static void main(String[] args) throws Exception

Command-Line Arguments

args[0]
string
required
Input path to the Pages CSV file
args[1]
string
required
Input path to the Follows CSV file
args[2]
string
required
Temporary output for follower counts
args[3]
string
required
Final output path

Job 1: Count Followers

Mapper: CountMapper Counts followers per page. Reducer: SumReducer Combiner: Yes, uses SumReducer as combiner.

Job 2: Map-Side Join

Mapper: JoinMapper Loads follower counts from distributed cache and performs map-side join with pages.
@Override
protected void setup(Context context) throws IOException {
    URI[] files = context.getCacheFiles();
    if (files == null) {
        return;
    }
    FileSystem fs = FileSystem.get(context.getConfiguration());
    for (URI u : files) {
        BufferedReader br = new BufferedReader(
            new InputStreamReader(fs.open(new Path(u.getPath()))));
        String line;
        while ((line = br.readLine()) != null) {
            String[] p = CsvUtils.splitTab(line);
            if (p.length >= 2) {
                followers.put(p[0].trim(), CsvUtils.toInt(p[1], 0));
            }
        }
        br.close();
    }
}
Map-Only Job: Sets setNumReduceTasks(0) for map-only execution.

Optimizations

  • Uses combiner to reduce shuffle
  • Map-side join using distributed cache
  • Map-only second job (no reduce phase)

Example Usage

hadoop jar $JAR circlenet.taskD.TaskDOptimized $PAGES $FOLLOWS \
  $OUT/taskD/tmp_count_opt $OUT/taskD/optimized

Build docs developers (and LLMs) love