Identify all people who follow somebody’s CircleNetPage in their same RegionCode but are not being followed back. Report user IDs and nicknames of those who have at least one such one-way follow relationship.SQL Equivalent:
WITH SameRegionFollows AS ( SELECT f.ID1 as follower, f.ID2 as followed, p1.RegionCode as r1, p2.RegionCode as r2 FROM Follows f JOIN CircleNetPage p1 ON f.ID1 = p1.ID JOIN CircleNetPage p2 ON f.ID2 = p2.ID WHERE p1.RegionCode = p2.RegionCode),OneWay AS ( SELECT f1.follower FROM SameRegionFollows f1 WHERE NOT EXISTS ( SELECT 1 FROM SameRegionFollows f2 WHERE f2.follower = f1.followed AND f2.followed = f1.follower ))SELECT DISTINCT p.ID, p.NickNameFROM CircleNetPage pJOIN OneWay o ON p.ID = o.follower;
The simple approach uses two jobs: first filters same-region edges and detects asymmetry, then joins with page metadata.Job 1: Same-Region Edge Detection (TaskHSimple.java:26-68)
public static class SameRegionEdgeMapper extends Mapper<LongWritable, Text, Text, Text> {private final Map<String, Integer> regionById = new HashMap<String, Integer>();private final Text outKey = new Text();private final Text outVal = new Text();@Overrideprotected void setup(Context context) throws IOException { // Load page regions from distributed cache URI[] files = context.getCacheFiles(); if (files != null) { FileSystem fs = FileSystem.get(context.getConfiguration()); for (URI u : files) { BufferedReader br = new BufferedReader( new InputStreamReader(fs.open(new Path(u.getPath())))); String line; while ((line = br.readLine()) != null) { String[] p = CsvUtils.split(line); if (p.length >= 4) { regionById.put(p[0].trim(), CsvUtils.toInt(p[3], -1)); } } br.close(); } }}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] f = CsvUtils.split(value.toString()); if (f.length >= 3) { String a = f[1].trim(); // ID1 - follower String b = f[2].trim(); // ID2 - followed Integer ra = regionById.get(a); Integer rb = regionById.get(b); // Filter same-region follows only if (ra != null && rb != null && ra.intValue() == rb.intValue() && !a.equals(b)) { // Key by canonical pair (min, max) String min = a.compareTo(b) < 0 ? a : b; String max = a.compareTo(b) < 0 ? b : a; outKey.set(min + "," + max); outVal.set(a + "->" + b); // Track direction context.write(outKey, outVal); } }}}
Asymmetric Reducer (TaskHSimple.java:70-94):
public static class AsymmetricReducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String[] pair = CsvUtils.split(key.toString()); if (pair.length < 2) return; String a = pair[0]; String b = pair[1]; boolean ab = false; // Does a follow b? boolean ba = false; // Does b follow a? // Check both directions for (Text t : values) { String v = t.toString(); if ((a + "->" + b).equals(v)) { ab = true; } else if ((b + "->" + a).equals(v)) { ba = true; } } // Output follower if one-way (XOR) if (ab ^ ba) { String follower = ab ? a : b; context.write(new Text(follower), new Text("F")); }}}
Asymmetry Detection: By grouping edges by canonical pair (min, max), we can check both directions in a single reducer call. XOR (^) detects one-way relationships.
Job 2: Join with Page Metadata (TaskHSimple.java:96-145)
public static class FollowerMapper extends Mapper<LongWritable, Text, Text, Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] f = CsvUtils.splitTab(value.toString()); if (f.length >= 1) { context.write(new Text(f[0].trim()), new Text("F")); }}}public static class PageMapper extends Mapper<LongWritable, Text, Text, Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] f = CsvUtils.split(value.toString()); if (f.length >= 2) { context.write(new Text(f[0].trim()), new Text("P," + f[1].trim())); }}}public static class JoinReducer extends Reducer<Text, Text, NullWritable, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { boolean hasFollower = false; String nick = null; for (Text t : values) { String v = t.toString(); if ("F".equals(v)) { hasFollower = true; } else { String[] p = CsvUtils.split(v); if (p.length >= 2 && "P".equals(p[0])) { nick = p[1]; } } } if (hasFollower && nick != null) { context.write(NullWritable.get(), new Text(key + "," + nick)); }}}
// Simple: Canonical pairsKey: "1,2" (min,max)Value: "1->2" or "2->1"// Optimized: Adjacency listKey: 1Values: ["O,2", "O,3", "I,4", "I,5"]// User 1 follows 2,3 and is followed by 4,5
// Simple: XOR on canonical pairsif (ab ^ ba) { /* one-way */ }// Optimized: Set differencefor (followed : outSet) { if (!inSet.contains(followed)) { /* one-way */ }}
Both approaches correctly handle the directionality of follows. The optimized version is more extensible—if we needed to count how many one-way follows each user has, we could easily modify the logic.