Skip to main content

Overview

Task H identifies page owners who follow someone in the same region without being followed back (asymmetric relationship).

TaskHSimple

Package: circlenet.taskH
Class: TaskHSimple
Source: src/main/java/circlenet/taskH/TaskHSimple.java

Main Method

public static void main(String[] args) throws Exception

Command-Line Arguments

args[0]
string
required
Input path to the Pages CSV file (loaded as distributed cache)
args[1]
string
required
Input path to the Follows CSV file
args[2]
string
required
Temporary output for asymmetric edges
args[3]
string
required
Final output path

Job 1: Find Same-Region Asymmetric Relationships

Mapper: SameRegionEdgeMapper Loads page regions from distributed cache and identifies same-region follow relationships.
@Override
protected void setup(Context context) throws IOException {
    URI[] files = context.getCacheFiles();
    if (files == null) {
        return;
    }
    FileSystem fs = FileSystem.get(context.getConfiguration());
    for (URI u : files) {
        BufferedReader br = new BufferedReader(
            new InputStreamReader(fs.open(new Path(u.getPath()))));
        String line;
        while ((line = br.readLine()) != null) {
            String[] p = CsvUtils.split(line);
            if (p.length >= 4) {
                regionById.put(p[0].trim(), CsvUtils.toInt(p[3], -1));
            }
        }
        br.close();
    }
}

@Override
protected void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
    String[] f = CsvUtils.split(value.toString());
    if (f.length >= 3) {
        String a = f[1].trim();
        String b = f[2].trim();
        Integer ra = regionById.get(a);
        Integer rb = regionById.get(b);
        if (ra != null && rb != null && ra.intValue() == rb.intValue() && !a.equals(b)) {
            String min = a.compareTo(b) < 0 ? a : b;
            String max = a.compareTo(b) < 0 ? b : a;
            outKey.set(min + "," + max);
            outVal.set(a + "->" + b);
            context.write(outKey, outVal);
        }
    }
}
Reducer: AsymmetricReducer Detects asymmetric relationships by checking if only one direction exists.
public static class AsymmetricReducer extends Reducer<Text, Text, Text, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) 
            throws IOException, InterruptedException {
        String[] pair = CsvUtils.split(key.toString());
        if (pair.length < 2) {
            return;
        }
        String a = pair[0];
        String b = pair[1];
        boolean ab = false;
        boolean ba = false;
        for (Text t : values) {
            String v = t.toString();
            if ((a + "->" + b).equals(v)) {
                ab = true;
            } else if ((b + "->" + a).equals(v)) {
                ba = true;
            }
        }
        if (ab ^ ba) {
            String follower = ab ? a : b;
            context.write(new Text(follower), new Text("F"));
        }
    }
}

Job 2: Join with Page Nicknames

Mappers: FollowerMapper, PageMapper Prepare asymmetric follower IDs and page info for join. Reducer: JoinReducer Joins to get nicknames for the identified pages.
public static class JoinReducer extends Reducer<Text, Text, NullWritable, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) 
            throws IOException, InterruptedException {
        boolean hasFollower = false;
        String nick = null;
        for (Text t : values) {
            String v = t.toString();
            if ("F".equals(v)) {
                hasFollower = true;
            } else {
                String[] p = CsvUtils.split(v);
                if (p.length >= 2 && "P".equals(p[0])) {
                    nick = p[1];
                }
            }
        }
        if (hasFollower && nick != null) {
            context.write(NullWritable.get(), new Text(key.toString() + "," + nick));
        }
    }
}

Example Usage

hadoop jar $JAR circlenet.taskH.TaskHSimple $PAGES $FOLLOWS \
  $OUT/taskH/tmp_edges_simple $OUT/taskH/simple

TaskHOptimized

Package: circlenet.taskH
Class: TaskHOptimized
Source: src/main/java/circlenet/taskH/TaskHOptimized.java

Main Method

public static void main(String[] args) throws Exception

Command-Line Arguments

args[0]
string
required
Input path to the Pages CSV file (loaded as distributed cache)
args[1]
string
required
Input path to the Follows CSV file
args[2]
string
required
Final output path

Single Job Approach

Mapper: SameRegionGraphMapper Loads page regions from distributed cache and emits both outgoing and incoming edges.
@Override
protected void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
    String[] f = CsvUtils.split(value.toString());
    if (f.length >= 3) {
        String a = f[1].trim();
        String b = f[2].trim();
        Integer ra = regionById.get(a);
        Integer rb = regionById.get(b);
        if (ra != null && rb != null && ra.intValue() == rb.intValue() && !a.equals(b)) {
            outKey.set(a);
            outVal.set("O," + b);
            context.write(outKey, outVal);

            outKey.set(b);
            outVal.set("I," + a);
            context.write(outKey, outVal);
        }
    }
}
Reducer: OneWayReducer Loads page nicknames from distributed cache and detects one-way relationships.
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) 
        throws IOException, InterruptedException {
    Set<String> outSet = new HashSet<String>();
    Set<String> inSet = new HashSet<String>();
    
    for (Text t : values) {
        String[] p = CsvUtils.split(t.toString());
        if (p.length >= 2 && "O".equals(p[0])) {
            outSet.add(p[1]);
        } else if (p.length >= 2 && "I".equals(p[0])) {
            inSet.add(p[1]);
        }
    }

    boolean oneWayFound = false;
    for (String followed : outSet) {
        if (!inSet.contains(followed)) {
            oneWayFound = true;
            break;
        }
    }

    if (oneWayFound) {
        String id = key.toString();
        String nick = nickById.containsKey(id) ? nickById.get(id) : "";
        context.write(NullWritable.get(), new Text(id + "," + nick));
    }
}

Optimizations

  • Reduces from 2 jobs to 1 job
  • Map-side join using distributed cache for both regions and nicknames
  • More efficient graph processing by checking both directions in reducer

Example Usage

hadoop jar $JAR circlenet.taskH.TaskHOptimized $PAGES $FOLLOWS $OUT/taskH/optimized

Build docs developers (and LLMs) love