Overview
Task H identifies page owners who follow someone in the same region without being followed back (asymmetric relationship).
TaskHSimple
Package: circlenet.taskH
Class: TaskHSimple
Source: src/main/java/circlenet/taskH/TaskHSimple.java
Main Method
public static void main(String[] args) throws Exception
Command-Line Arguments
Input path to the Pages CSV file (loaded as distributed cache)
Input path to the Follows CSV file
Temporary output for asymmetric edges
Job 1: Find Same-Region Asymmetric Relationships
Mapper: SameRegionEdgeMapper
Loads page regions from distributed cache and identifies same-region follow relationships.
@Override
protected void setup(Context context) throws IOException {
URI[] files = context.getCacheFiles();
if (files == null) {
return;
}
FileSystem fs = FileSystem.get(context.getConfiguration());
for (URI u : files) {
BufferedReader br = new BufferedReader(
new InputStreamReader(fs.open(new Path(u.getPath()))));
String line;
while ((line = br.readLine()) != null) {
String[] p = CsvUtils.split(line);
if (p.length >= 4) {
regionById.put(p[0].trim(), CsvUtils.toInt(p[3], -1));
}
}
br.close();
}
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] f = CsvUtils.split(value.toString());
if (f.length >= 3) {
String a = f[1].trim();
String b = f[2].trim();
Integer ra = regionById.get(a);
Integer rb = regionById.get(b);
if (ra != null && rb != null && ra.intValue() == rb.intValue() && !a.equals(b)) {
String min = a.compareTo(b) < 0 ? a : b;
String max = a.compareTo(b) < 0 ? b : a;
outKey.set(min + "," + max);
outVal.set(a + "->" + b);
context.write(outKey, outVal);
}
}
}
Reducer: AsymmetricReducer
Detects asymmetric relationships by checking if only one direction exists.
public static class AsymmetricReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String[] pair = CsvUtils.split(key.toString());
if (pair.length < 2) {
return;
}
String a = pair[0];
String b = pair[1];
boolean ab = false;
boolean ba = false;
for (Text t : values) {
String v = t.toString();
if ((a + "->" + b).equals(v)) {
ab = true;
} else if ((b + "->" + a).equals(v)) {
ba = true;
}
}
if (ab ^ ba) {
String follower = ab ? a : b;
context.write(new Text(follower), new Text("F"));
}
}
}
Job 2: Join with Page Nicknames
Mappers: FollowerMapper, PageMapper
Prepare asymmetric follower IDs and page info for join.
Reducer: JoinReducer
Joins to get nicknames for the identified pages.
public static class JoinReducer extends Reducer<Text, Text, NullWritable, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
boolean hasFollower = false;
String nick = null;
for (Text t : values) {
String v = t.toString();
if ("F".equals(v)) {
hasFollower = true;
} else {
String[] p = CsvUtils.split(v);
if (p.length >= 2 && "P".equals(p[0])) {
nick = p[1];
}
}
}
if (hasFollower && nick != null) {
context.write(NullWritable.get(), new Text(key.toString() + "," + nick));
}
}
}
Example Usage
hadoop jar $JAR circlenet.taskH.TaskHSimple $PAGES $FOLLOWS \
$OUT/taskH/tmp_edges_simple $OUT/taskH/simple
TaskHOptimized
Package: circlenet.taskH
Class: TaskHOptimized
Source: src/main/java/circlenet/taskH/TaskHOptimized.java
Main Method
public static void main(String[] args) throws Exception
Command-Line Arguments
Input path to the Pages CSV file (loaded as distributed cache)
Input path to the Follows CSV file
Single Job Approach
Mapper: SameRegionGraphMapper
Loads page regions from distributed cache and emits both outgoing and incoming edges.
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] f = CsvUtils.split(value.toString());
if (f.length >= 3) {
String a = f[1].trim();
String b = f[2].trim();
Integer ra = regionById.get(a);
Integer rb = regionById.get(b);
if (ra != null && rb != null && ra.intValue() == rb.intValue() && !a.equals(b)) {
outKey.set(a);
outVal.set("O," + b);
context.write(outKey, outVal);
outKey.set(b);
outVal.set("I," + a);
context.write(outKey, outVal);
}
}
}
Reducer: OneWayReducer
Loads page nicknames from distributed cache and detects one-way relationships.
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Set<String> outSet = new HashSet<String>();
Set<String> inSet = new HashSet<String>();
for (Text t : values) {
String[] p = CsvUtils.split(t.toString());
if (p.length >= 2 && "O".equals(p[0])) {
outSet.add(p[1]);
} else if (p.length >= 2 && "I".equals(p[0])) {
inSet.add(p[1]);
}
}
boolean oneWayFound = false;
for (String followed : outSet) {
if (!inSet.contains(followed)) {
oneWayFound = true;
break;
}
}
if (oneWayFound) {
String id = key.toString();
String nick = nickById.containsKey(id) ? nickById.get(id) : "";
context.write(NullWritable.get(), new Text(id + "," + nick));
}
}
Optimizations
- Reduces from 2 jobs to 1 job
- Map-side join using distributed cache for both regions and nicknames
- More efficient graph processing by checking both directions in reducer
Example Usage
hadoop jar $JAR circlenet.taskH.TaskHOptimized $PAGES $FOLLOWS $OUT/taskH/optimized