Skip to main content
Task H identifies users who follow someone in their same region but are not followed back, demonstrating graph analysis patterns in MapReduce.

Problem Statement

Identify all people who follow somebody’s CircleNetPage in their same RegionCode but are not being followed back. Report user IDs and nicknames of those who have at least one such one-way follow relationship. SQL Equivalent:
WITH SameRegionFollows AS (
    SELECT f.ID1 as follower, f.ID2 as followed,
           p1.RegionCode as r1, p2.RegionCode as r2
    FROM Follows f
    JOIN CircleNetPage p1 ON f.ID1 = p1.ID
    JOIN CircleNetPage p2 ON f.ID2 = p2.ID
    WHERE p1.RegionCode = p2.RegionCode
),
OneWay AS (
    SELECT f1.follower
    FROM SameRegionFollows f1
    WHERE NOT EXISTS (
        SELECT 1 FROM SameRegionFollows f2
        WHERE f2.follower = f1.followed
          AND f2.followed = f1.follower
    )
)
SELECT DISTINCT p.ID, p.NickName
FROM CircleNetPage p
JOIN OneWay o ON p.ID = o.follower;

Approaches

Simple Implementation (2 Jobs)

The simple approach uses two jobs: first filters same-region edges and detects asymmetry, then joins with page metadata.Job 1: Same-Region Edge Detection (TaskHSimple.java:26-68)
public static class SameRegionEdgeMapper extends Mapper<LongWritable, Text, Text, Text> {
private final Map<String, Integer> regionById = new HashMap<String, Integer>();
private final Text outKey = new Text();
private final Text outVal = new Text();

@Override
protected void setup(Context context) throws IOException {
    // Load page regions from distributed cache
    URI[] files = context.getCacheFiles();
    if (files != null) {
        FileSystem fs = FileSystem.get(context.getConfiguration());
        for (URI u : files) {
            BufferedReader br = new BufferedReader(
                new InputStreamReader(fs.open(new Path(u.getPath()))));
            String line;
            while ((line = br.readLine()) != null) {
                String[] p = CsvUtils.split(line);
                if (p.length >= 4) {
                    regionById.put(p[0].trim(), CsvUtils.toInt(p[3], -1));
                }
            }
            br.close();
        }
    }
}

@Override
protected void map(LongWritable key, Text value, Context context) 
    throws IOException, InterruptedException {
    String[] f = CsvUtils.split(value.toString());
    if (f.length >= 3) {
        String a = f[1].trim();  // ID1 - follower
        String b = f[2].trim();  // ID2 - followed
        Integer ra = regionById.get(a);
        Integer rb = regionById.get(b);
        
        // Filter same-region follows only
        if (ra != null && rb != null && ra.intValue() == rb.intValue() 
            && !a.equals(b)) {
            // Key by canonical pair (min, max)
            String min = a.compareTo(b) < 0 ? a : b;
            String max = a.compareTo(b) < 0 ? b : a;
            outKey.set(min + "," + max);
            outVal.set(a + "->" + b);  // Track direction
            context.write(outKey, outVal);
        }
    }
}
}
Asymmetric Reducer (TaskHSimple.java:70-94):
public static class AsymmetricReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) 
    throws IOException, InterruptedException {
    String[] pair = CsvUtils.split(key.toString());
    if (pair.length < 2) return;
    
    String a = pair[0];
    String b = pair[1];
    boolean ab = false;  // Does a follow b?
    boolean ba = false;  // Does b follow a?
    
    // Check both directions
    for (Text t : values) {
        String v = t.toString();
        if ((a + "->" + b).equals(v)) {
            ab = true;
        } else if ((b + "->" + a).equals(v)) {
            ba = true;
        }
    }
    
    // Output follower if one-way (XOR)
    if (ab ^ ba) {
        String follower = ab ? a : b;
        context.write(new Text(follower), new Text("F"));
    }
}
}
Asymmetry Detection: By grouping edges by canonical pair (min, max), we can check both directions in a single reducer call. XOR (^) detects one-way relationships.
Job 2: Join with Page Metadata (TaskHSimple.java:96-145)
public static class FollowerMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) 
    throws IOException, InterruptedException {
    String[] f = CsvUtils.splitTab(value.toString());
    if (f.length >= 1) {
        context.write(new Text(f[0].trim()), new Text("F"));
    }
}
}

public static class PageMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) 
    throws IOException, InterruptedException {
    String[] f = CsvUtils.split(value.toString());
    if (f.length >= 2) {
        context.write(new Text(f[0].trim()), 
                     new Text("P," + f[1].trim()));
    }
}
}

public static class JoinReducer extends Reducer<Text, Text, NullWritable, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) 
    throws IOException, InterruptedException {
    boolean hasFollower = false;
    String nick = null;
    for (Text t : values) {
        String v = t.toString();
        if ("F".equals(v)) {
            hasFollower = true;
        } else {
            String[] p = CsvUtils.split(v);
            if (p.length >= 2 && "P".equals(p[0])) {
                nick = p[1];
            }
        }
    }
    if (hasFollower && nick != null) {
        context.write(NullWritable.get(), new Text(key + "," + nick));
    }
}
}
Characteristics:
  • 2 jobs with map-side region lookup
  • Edge canonicalization for asymmetry detection
  • Reduce-side join in Job 2

Performance Comparison

MetricSimpleOptimizedImprovement
Number of Jobs2150% reduction
Region LookupMap-side (cache)Map-side (cache)Same
Nickname JoinJob 2 reduce-sideIn-reducer (cache)Eliminates job
Shuffle Operations2150% reduction
Execution TimeBaseline30-40% fasterSignificant

Running the Task

export JAR=/home/ds503/ds503_bdm-1.0-SNAPSHOT.jar
export PAGES=/circlenet/pages/CircleNetPage.csv
export FOLLOWS=/circlenet/follows/Follows.csv
export OUT=/circlenet/output

# Run simple version (2 jobs)
hadoop jar $JAR circlenet.taskH.TaskHSimple \
  $PAGES $FOLLOWS \
  $OUT/taskH/tmp_edges_simple \
  $OUT/taskH/simple

# View results
hdfs dfs -cat $OUT/taskH/simple/part-r-00000 | head -20

Sample Output

1234,UserOne
5678,SocialButterfly
9012,ActiveFollower
...
Format: UserID,NickName (users with at least one same-region one-way follow)

Graph Algorithm Patterns

This task demonstrates common graph patterns in MapReduce:

Edge Representation

// Simple: Canonical pairs
Key: "1,2" (min,max)
Value: "1->2" or "2->1"

// Optimized: Adjacency list
Key: 1
Values: ["O,2", "O,3", "I,4", "I,5"]
// User 1 follows 2,3 and is followed by 4,5

Asymmetry Detection

// Simple: XOR on canonical pairs
if (ab ^ ba) { /* one-way */ }

// Optimized: Set difference
for (followed : outSet) {
    if (!inSet.contains(followed)) { /* one-way */ }
}
Both approaches correctly handle the directionality of follows. The optimized version is more extensible—if we needed to count how many one-way follows each user has, we could easily modify the logic.

Key Takeaways

  • Simple approach: 2 jobs with canonical edge pairs and reduce-side join
  • Optimization technique: Combine into 1 job using map-side joins for both region and nickname
  • Graph pattern: Build local adjacency lists at reducers
  • Distributed cache: Load reference data (pages) into memory at both mapper and reducer
  • Edge representation: Outgoing vs. incoming edges for asymmetry detection
  • Performance: 30-40% faster with single-job approach
  • Design consideration: Map-side joins work when reference data fits in memory (200K pages OK)

Build docs developers (and LLMs) love