Skip to main content
This guide covers MapReduce optimization techniques used in CircleNet Analytics to improve performance and reduce I/O overhead.

Understanding MapReduce Execution Flow

Before diving into optimizations, it’s important to understand how Hadoop processes jobs:
1

Read from HDFS

Input data is read from the Hadoop Distributed File System
2

Run Mappers

User-defined mapper transforms input into intermediate key-value pairs
3

Shuffle and Sort

MapReduce engine automatically shuffles data across the network (no user control)
4

Run Reducers

User-defined reducer aggregates values per key
5

Write to HDFS

Final output is written back to HDFS
The shuffle and sort phase is the most expensive operation in MapReduce. Most optimizations focus on reducing data transferred during this phase.

Combiner Optimization

What is a Combiner?

A combiner acts as a mini-reducer that runs locally on each mapper node before the shuffle phase. It performs local aggregation to reduce the amount of data sent across the network.

When to Use Combiners

Safe for Combiner

Operations that are associative and commutative:
  • SUM: a + b + c = (a + b) + c
  • MAX/MIN: max(a, b, c) = max(max(a, b), c)
  • COUNT: counting operations
Unsafe for Combiner:
  • AVG (average) - not associative
  • DISTINCT counting - requires all values
  • Any operation where order matters
Using a combiner incorrectly can produce wrong results!

Implementation Example: Task A (Hobby Frequency)

Task A counts the frequency of each favorite hobby. This is a perfect use case for combiners. Simple version (no combiner):
TaskA.java
public class TaskA {
    public static class TaskAMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
        private final static IntWritable one = new IntWritable(1);
        private Text hobby = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) 
                throws IOException, InterruptedException{
            String[] fields = value.toString().split(",");
            if(fields.length == 5){
                hobby.set(fields[4]);
                context.write(hobby, one);  // Emit (hobby, 1)
            }
        }
    }

    public static class TaskAReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) 
                throws IOException, InterruptedException{
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }
}
Optimized version (with combiner):
TaskAOptimized.java
public static void main(String[] args) throws Exception {
    Job job = Job.getInstance(new Configuration(), "TaskAOptimized");
    job.setJarByClass(TaskAOptimized.class);
    job.setMapperClass(MapperA.class);
    
    // Use the same reducer as combiner for local aggregation
    job.setCombinerClass(SumReducer.class);  // Key optimization!
    job.setReducerClass(SumReducer.class);
    
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
}
By adding a combiner, if a mapper emits “Chess” 1000 times, the combiner aggregates it to (Chess, 1000) before sending across the network, reducing shuffle data by 1000x!

Map-Only Jobs

When to Skip the Reducer

If your task requires no aggregation or joining, you can create a map-only job by setting the number of reduce tasks to zero. This saves I/O costs by eliminating the shuffle phase entirely.

Implementation Example: Task C (Filter by Hobby)

Task C finds all users whose hobby matches a specific value (e.g., “PodcastBinging”). This is a simple filter operation with no aggregation needed.
TaskCOptimized.java
public static void main(String[] args) throws Exception {
    Job job = Job.getInstance(conf, "TaskC Optimized - Map Only");
    job.setJarByClass(TaskCOptimized.class);
    job.setMapperClass(FilterMapper.class);
    
    // Skip reducer entirely - map-only job!
    job.setNumReduceTasks(0);
    
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);
}
Benefits:
  • No shuffle phase = faster execution
  • No network transfer overhead
  • Ideal for filtering, transforming, or formatting data
Combiners have no effect in map-only jobs since there’s no reduce phase.

Map-Side Joins with Distributed Cache

The Problem with Reduce-Side Joins

Traditional reduce-side joins require:
  1. Both datasets to be shuffled across the network
  2. All related records grouped by key in reducers
  3. High memory usage in reducers

Solution: Map-Side Join

When one dataset is small enough to fit in memory, load it into each mapper using the Distributed Cache and perform the join locally.

Implementation Example: Task B (Top 10 Pages)

Task B finds the 10 most accessed pages and returns their ID, nickname, and job title. Optimized approach:
TaskBOptimized.java
public static class TopJoinReducer extends Reducer<NullWritable, Text, NullWritable, Text> {
    private final Map<String, String> pageInfo = new HashMap<String, String>();

    @Override
    protected void setup(Context context) throws IOException {
        // Load CircleNetPage.csv into memory from distributed cache
        URI[] files = context.getCacheFiles();
        if (files != null) {
            FileSystem fs = FileSystem.get(context.getConfiguration());
            for (URI u : files) {
                BufferedReader br = new BufferedReader(
                    new InputStreamReader(fs.open(new Path(u.getPath()))));
                String line;
                while ((line = br.readLine()) != null) {
                    String[] p = CsvUtils.split(line);
                    if (p.length >= 3) {
                        // Cache: ID -> (nickname, jobTitle)
                        pageInfo.put(p[0].trim(), p[1].trim() + "," + p[2].trim());
                    }
                }
                br.close();
            }
        }
    }

    @Override
    protected void reduce(NullWritable key, Iterable<Text> values, Context context) {
        // Join with cached page data
        for (Text v : values) {
            String[] f = CsvUtils.splitTab(v.toString());
            String id = f[0].trim();
            String info = pageInfo.get(id);  // Lookup from cache
            // ... process and emit results
        }
    }
}

public static void main(String[] args) throws Exception {
    Job job2 = Job.getInstance(conf, "TaskB Optimized - Top10 Join");
    
    // Add CircleNetPage.csv to distributed cache
    job2.addCacheFile(new Path(args[1]).toUri());
    
    job2.setReducerClass(TopJoinReducer.class);
}

Benefits of Map-Side Joins

  • Eliminates one full MapReduce job
  • Reduces network shuffle for large datasets
  • Faster execution when reference data fits in memory
Use map-side joins when one dataset is small (< few hundred MB) and the other is large (GBs or TBs).

Optimization Strategy Summary

OptimizationWhen to UseExample TasksPerformance Gain
CombinerAssociative/commutative operations (SUM, MAX, MIN, COUNT)Task A, Task B (count), Task D (count)High - reduces shuffle data
Map-Only JobNo aggregation needed (filter, transform)Task C (filter), Task G (filter with threshold)High - eliminates shuffle phase
Map-Side JoinOne small dataset + one large datasetTask B (pages lookup), Task D (follower counts), Task G (user info)Medium-High - eliminates job stages
Distributed CacheSmall reference data needed by all mappersAll join operationsMedium - avoids reduce-side join

Performance Comparison Methodology

Each CircleNet Analytics task includes timing instrumentation:
long totalStart = System.currentTimeMillis();
// ... run jobs ...
boolean ok = JobTimer.run(job, "TaskAOptimized");
JobTimer.total("TaskAOptimized", totalStart);
Timing results are logged to $CIRCLENET_TIMING_FILE for comparison:
# View all timing results
cat $CIRCLENET_TIMING_FILE

# Compare total times
rg ",total," $CIRCLENET_TIMING_FILE
Always verify correctness before comparing performance. An incorrect optimization is worthless!

Best Practices

  1. Start with a simple, correct solution - Optimize only after you have working code
  2. Understand your data - Know which datasets are small enough for caching
  3. Profile before optimizing - Measure to identify actual bottlenecks
  4. Test correctness - Compare outputs of simple vs optimized versions
  5. Document tradeoffs - Explain why each optimization was chosen
Not all optimizations help every task:
  • Task C optimized version showed no gain over simple version
  • Task E optimized version was actually slower
  • Always benchmark your specific use case!

Build docs developers (and LLMs) love