Overview
Task F identifies pages with follower counts above the average, involving counting, averaging, and filtering operations.
TaskFSimple
Package: circlenet.taskF
Class: TaskFSimple
Source: src/main/java/circlenet/taskF/TaskFSimple.java
Main Method
public static void main(String[] args) throws Exception
Command-Line Arguments
Input path to the Pages CSV file
Input path to the Follows CSV file
Temporary output for join with counts
Temporary output for average calculation
Job 1: Join and Count
Mappers: PageMapper, FollowMapper
Prepare data from both sources for join.
Reducer: JoinCountReducer
Joins pages with follower counts.
public static class JoinCountReducer extends Reducer<Text, Text, NullWritable, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String nick = null;
int count = 0;
for (Text t : values) {
String[] p = CsvUtils.split(t.toString());
if (p.length >= 2 && "P".equals(p[0])) {
nick = p[1];
} else if (p.length >= 2 && "F".equals(p[0])) {
count += CsvUtils.toInt(p[1], 0);
}
}
if (nick != null) {
out.set(key.toString() + "," + nick + "," + count);
context.write(NullWritable.get(), out);
}
}
}
Job 2: Calculate Average
Mapper: AvgMapper
Extracts follower counts.
Reducer: AvgReducer
Computes the average follower count.
public static class AvgReducer extends Reducer<Text, IntWritable, NullWritable, DoubleWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
long sum = 0;
long cnt = 0;
for (IntWritable v : values) {
sum += v.get();
cnt++;
}
double avg = cnt == 0 ? 0.0 : (sum * 1.0) / cnt;
context.write(NullWritable.get(), new DoubleWritable(avg));
}
}
Job 3: Filter
Mapper: FilterMapper
Reads the average from configuration and filters pages above average.
public static class FilterMapper extends Mapper<LongWritable, Text, NullWritable, Text> {
private double avg;
@Override
protected void setup(Context context) {
avg = context.getConfiguration().getDouble("task.f.avg", 0.0);
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] f = CsvUtils.split(value.toString());
if (f.length >= 3) {
int count = CsvUtils.toInt(f[2], 0);
if (count > avg) {
out.set(f[0] + "," + f[1] + "," + count);
context.write(NullWritable.get(), out);
}
}
}
}
Example Usage
hadoop jar $JAR circlenet.taskF.TaskFSimple $PAGES $FOLLOWS \
$OUT/taskF/tmp_join_simple $OUT/taskF/tmp_avg_simple $OUT/taskF/simple
TaskFOptimized
Package: circlenet.taskF
Class: TaskFOptimized
Source: src/main/java/circlenet/taskF/TaskFOptimized.java
Main Method
public static void main(String[] args) throws Exception
Command-Line Arguments
Input path to the Pages CSV file
Input path to the Follows CSV file
Temporary output for follower counts
Job 1: Count Followers
Mapper: CountMapper
Counts followers per page.
Combiner: Yes, uses SumReducer as combiner.
Average Calculation
Performed in driver code by reading the count output and dividing by total pages:
FileSystem fs = FileSystem.get(conf);
BufferedReader br = new BufferedReader(
new InputStreamReader(fs.open(new Path(args[2] + "/part-r-00000"))));
String line;
long sum = 0;
long ownersWithFollowers = 0;
while ((line = br.readLine()) != null) {
String[] p = CsvUtils.splitTab(line);
if (p.length >= 2) {
sum += CsvUtils.toInt(p[1], 0);
ownersWithFollowers++;
}
}
br.close();
long totalPages = 0;
BufferedReader brPages = new BufferedReader(
new InputStreamReader(fs.open(new Path(args[0]))));
while (brPages.readLine() != null) {
totalPages++;
}
brPages.close();
double avg = totalPages == 0 ? 0.0 : (sum * 1.0) / totalPages;
Job 2: Filter with Map-Side Join
Mapper: FilterMapper
Loads follower counts from distributed cache and filters based on average.
@Override
protected void setup(Context context) throws IOException {
URI[] files = context.getCacheFiles();
if (files != null) {
FileSystem fs = FileSystem.get(context.getConfiguration());
for (URI u : files) {
BufferedReader br = new BufferedReader(
new InputStreamReader(fs.open(new Path(u.getPath()))));
String line;
while ((line = br.readLine()) != null) {
String[] p = CsvUtils.splitTab(line);
if (p.length >= 2) {
counts.put(p[0].trim(), CsvUtils.toInt(p[1], 0));
}
}
br.close();
}
}
avg = context.getConfiguration().getDouble("task.f.avg", 0.0);
}
Optimizations
- Uses combiner to reduce shuffle
- Reduces from 3 jobs to 2 jobs
- Average calculated in driver (no separate job)
- Map-side join using distributed cache
Example Usage
hadoop jar $JAR circlenet.taskF.TaskFOptimized $PAGES $FOLLOWS \
$OUT/taskF/tmp_count_opt $OUT/taskF/optimized