Skip to main content

Overview

Task F identifies pages with follower counts above the average, involving counting, averaging, and filtering operations.

TaskFSimple

Package: circlenet.taskF
Class: TaskFSimple
Source: src/main/java/circlenet/taskF/TaskFSimple.java

Main Method

public static void main(String[] args) throws Exception

Command-Line Arguments

args[0]
string
required
Input path to the Pages CSV file
args[1]
string
required
Input path to the Follows CSV file
args[2]
string
required
Temporary output for join with counts
args[3]
string
required
Temporary output for average calculation
args[4]
string
required
Final output path

Job 1: Join and Count

Mappers: PageMapper, FollowMapper Prepare data from both sources for join. Reducer: JoinCountReducer Joins pages with follower counts.
public static class JoinCountReducer extends Reducer<Text, Text, NullWritable, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) 
            throws IOException, InterruptedException {
        String nick = null;
        int count = 0;
        for (Text t : values) {
            String[] p = CsvUtils.split(t.toString());
            if (p.length >= 2 && "P".equals(p[0])) {
                nick = p[1];
            } else if (p.length >= 2 && "F".equals(p[0])) {
                count += CsvUtils.toInt(p[1], 0);
            }
        }
        if (nick != null) {
            out.set(key.toString() + "," + nick + "," + count);
            context.write(NullWritable.get(), out);
        }
    }
}

Job 2: Calculate Average

Mapper: AvgMapper Extracts follower counts. Reducer: AvgReducer Computes the average follower count.
public static class AvgReducer extends Reducer<Text, IntWritable, NullWritable, DoubleWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) 
            throws IOException, InterruptedException {
        long sum = 0;
        long cnt = 0;
        for (IntWritable v : values) {
            sum += v.get();
            cnt++;
        }
        double avg = cnt == 0 ? 0.0 : (sum * 1.0) / cnt;
        context.write(NullWritable.get(), new DoubleWritable(avg));
    }
}

Job 3: Filter

Mapper: FilterMapper Reads the average from configuration and filters pages above average.
public static class FilterMapper extends Mapper<LongWritable, Text, NullWritable, Text> {
    private double avg;

    @Override
    protected void setup(Context context) {
        avg = context.getConfiguration().getDouble("task.f.avg", 0.0);
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) 
            throws IOException, InterruptedException {
        String[] f = CsvUtils.split(value.toString());
        if (f.length >= 3) {
            int count = CsvUtils.toInt(f[2], 0);
            if (count > avg) {
                out.set(f[0] + "," + f[1] + "," + count);
                context.write(NullWritable.get(), out);
            }
        }
    }
}

Example Usage

hadoop jar $JAR circlenet.taskF.TaskFSimple $PAGES $FOLLOWS \
  $OUT/taskF/tmp_join_simple $OUT/taskF/tmp_avg_simple $OUT/taskF/simple

TaskFOptimized

Package: circlenet.taskF
Class: TaskFOptimized
Source: src/main/java/circlenet/taskF/TaskFOptimized.java

Main Method

public static void main(String[] args) throws Exception

Command-Line Arguments

args[0]
string
required
Input path to the Pages CSV file
args[1]
string
required
Input path to the Follows CSV file
args[2]
string
required
Temporary output for follower counts
args[3]
string
required
Final output path

Job 1: Count Followers

Mapper: CountMapper Counts followers per page. Combiner: Yes, uses SumReducer as combiner.

Average Calculation

Performed in driver code by reading the count output and dividing by total pages:
FileSystem fs = FileSystem.get(conf);
BufferedReader br = new BufferedReader(
    new InputStreamReader(fs.open(new Path(args[2] + "/part-r-00000"))));
String line;
long sum = 0;
long ownersWithFollowers = 0;
while ((line = br.readLine()) != null) {
    String[] p = CsvUtils.splitTab(line);
    if (p.length >= 2) {
        sum += CsvUtils.toInt(p[1], 0);
        ownersWithFollowers++;
    }
}
br.close();

long totalPages = 0;
BufferedReader brPages = new BufferedReader(
    new InputStreamReader(fs.open(new Path(args[0]))));
while (brPages.readLine() != null) {
    totalPages++;
}
brPages.close();
double avg = totalPages == 0 ? 0.0 : (sum * 1.0) / totalPages;

Job 2: Filter with Map-Side Join

Mapper: FilterMapper Loads follower counts from distributed cache and filters based on average.
@Override
protected void setup(Context context) throws IOException {
    URI[] files = context.getCacheFiles();
    if (files != null) {
        FileSystem fs = FileSystem.get(context.getConfiguration());
        for (URI u : files) {
            BufferedReader br = new BufferedReader(
                new InputStreamReader(fs.open(new Path(u.getPath()))));
            String line;
            while ((line = br.readLine()) != null) {
                String[] p = CsvUtils.splitTab(line);
                if (p.length >= 2) {
                    counts.put(p[0].trim(), CsvUtils.toInt(p[1], 0));
                }
            }
            br.close();
        }
    }
    avg = context.getConfiguration().getDouble("task.f.avg", 0.0);
}

Optimizations

  • Uses combiner to reduce shuffle
  • Reduces from 3 jobs to 2 jobs
  • Average calculated in driver (no separate job)
  • Map-side join using distributed cache

Example Usage

hadoop jar $JAR circlenet.taskF.TaskFOptimized $PAGES $FOLLOWS \
  $OUT/taskF/tmp_count_opt $OUT/taskF/optimized

Build docs developers (and LLMs) love