A combiner acts as a mini-reducer that runs locally on each mapper node before the shuffle phase. It performs local aggregation to reduce the amount of data sent across the network.
Task A counts the frequency of each favorite hobby. This is a perfect use case for combiners.Simple version (no combiner):
TaskA.java
public class TaskA { public static class TaskAMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text hobby = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String[] fields = value.toString().split(","); if(fields.length == 5){ hobby.set(fields[4]); context.write(hobby, one); // Emit (hobby, 1) } } } public static class TaskAReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{ int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } }}
Optimized version (with combiner):
TaskAOptimized.java
public static void main(String[] args) throws Exception { Job job = Job.getInstance(new Configuration(), "TaskAOptimized"); job.setJarByClass(TaskAOptimized.class); job.setMapperClass(MapperA.class); // Use the same reducer as combiner for local aggregation job.setCombinerClass(SumReducer.class); // Key optimization! job.setReducerClass(SumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);}
By adding a combiner, if a mapper emits “Chess” 1000 times, the combiner aggregates it to (Chess, 1000) before sending across the network, reducing shuffle data by 1000x!
If your task requires no aggregation or joining, you can create a map-only job by setting the number of reduce tasks to zero. This saves I/O costs by eliminating the shuffle phase entirely.
Each CircleNet Analytics task includes timing instrumentation:
long totalStart = System.currentTimeMillis();// ... run jobs ...boolean ok = JobTimer.run(job, "TaskAOptimized");JobTimer.total("TaskAOptimized", totalStart);
Timing results are logged to $CIRCLENET_TIMING_FILE for comparison:
# View all timing resultscat $CIRCLENET_TIMING_FILE# Compare total timesrg ",total," $CIRCLENET_TIMING_FILE
Always verify correctness before comparing performance. An incorrect optimization is worthless!