hadoop reducer 不考虑两个相等的自定义可写对象相等

Posted

技术标签:

【中文标题】hadoop reducer 不考虑两个相等的自定义可写对象相等【英文标题】:hadoop reducer not considering two equal custom writable objects as equal 【发布时间】:2021-01-17 19:00:34 【问题描述】:

我正在尝试编写一个 map reduce 程序来检查共同的朋友。 我使用自定义可写 (FriendPair) 作为键。

给定以下输入

Tom Jerry,John
John Jerry,Sarah,Tom

它应该输出 Jerry 作为 Tom 和 John 的共同朋友

[John,Tom]    Jerry
[John,Sarah]    
[John,Jerry]
[Tom,Jerry] 

相反,map reduce 正在输出以下内容

[John,Tom]  
[John,Sarah]    
[John,Jerry]    
[Tom,John]  
[Tom,Jerry]

键 [John,Tom] 和 [Tom,John] 被认为是不相等的。

下面是代码

自定义可写

    public class FriendPair implements WritableComparable<FriendPair> 
        
        Text friend1;
        Text friend2;
        
        public FriendPair() 
            this.friend1 = new Text("");
            this.friend2 = new Text("");
        
        
        public FriendPair(Text friend1, Text friend2) 
            this.friend1 = friend1;
            this.friend2 = friend2;
        
        
        public Text getFriend1() 
            return friend1;
        
        public void setFriend1(Text friend1) 
            this.friend1 = friend1;
        
        public Text getFriend2() 
            return friend2;
        
        public void setFriend2(Text friend2) 
            this.friend2 = friend2;
        
    
        @Override
        public void write(DataOutput out) throws IOException 
            friend1.write(out);
            friend2.write(out);
        
    
        @Override
        public void readFields(DataInput in) throws IOException 
            friend1.readFields(in);
            friend2.readFields(in);
        
    
        @Override
        public int compareTo(FriendPair pair2) 
            return ((friend1.compareTo(pair2.getFriend2()) == 0 && friend2.compareTo(pair2.getFriend1()) == 0)
                   || (friend1.compareTo(pair2.getFriend1()) == 0 && friend2.compareTo(pair2.getFriend2()) == 0)) ? 0 : -1;
        
    
        @Override
        public boolean equals(Object o) 
            FriendPair pair2 = (FriendPair) o;
            return (friend1.equals(pair2.getFriend2()) && friend2.equals(pair2.getFriend1()) 
                    || friend1.equals(pair2.getFriend1()) && friend2.equals(pair2.getFriend2()));
        
        
        @Override
        public String toString() 
            return "[" + friend1 + "," + friend2 + "]";
        
        
        @Override
        public int hashCode() 
            return friend1.hashCode() + friend2.hashCode();
        
    
    

映射器

public class MutualFriendsMapper extends Mapper<LongWritable, Text, FriendPair, Text> 

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 

        String line = value.toString();
        String[] items = line.split("\t");

        String name = items[0];
        String friendsList = items[1];
        String[] friends = friendsList.split(",");
        for (String friend : friends) 
            FriendPair fp = new FriendPair(new Text(name), new Text(friend));
            FriendPair fp2 = new FriendPair(new Text(friend), new Text(name));
            context.write(fp, new Text(friendsList));
        
    

减速器

public class MutualFriendsReducer extends Reducer<FriendPair, Text, FriendPair, FriendArray> 

    @Override
    public void reduce(FriendPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException 
        
        List<String> allFriends = new ArrayList<String>();
        for(Text value : values) 
            String[] valueArray = value.toString().split(",");
            allFriends.addAll(Arrays.asList(valueArray));
        
        List<Text> commonFriends = new ArrayList<Text>();
        Set<String> uniqueFriendSet = new HashSet<String>(allFriends);
        for(String friend : uniqueFriendSet) 
            int frequency = Collections.frequency(allFriends, friend);
            if(frequency > 1) 
                commonFriends.add(new Text(friend));
            
        
        
        context.write(key, new FriendArray(Text.class, commonFriends.toArray(new Text[commonFriends.size()])));
    

FriendArray(输出)

public class FriendArray extends ArrayWritable 

    public FriendArray(Class<? extends Writable> valueClass, Writable[] values) 
        super(valueClass, values);
    
    
    public FriendArray(Class<? extends Writable> valueClass) 
        super(valueClass);
    
    
    public FriendArray() 
        super(Text.class);
    

    @Override
    public Text[] get() 
        return (Text[]) super.get();
    
    
    @Override
    public void write(DataOutput data) throws IOException 
        for(Text t : get()) 
            t.write(data);
        
    
    
    @Override
    public String toString() 
        Text[] friendArray = Arrays.copyOf(get(), get().length, Text[].class);
        String print="";
        
        for(Text f : friendArray) 
            print+=f+",";
        
        return print;
    

任何帮助将不胜感激。

【问题讨论】:

【参考方案1】:

在“排序”阶段,Hadoop 不对 java 对象进行操作,而仅对它们的字节表示(FriendPair.write() 方法的输出)进行操作,因此它不能调用FriendPair.equals()。因此,为了让 Hadoop 理解键 [John,Tom] 和 [Tom,John] 相等,您必须确保它们的 write 输出相同。实现此目的的一种方法是强制执行配对中朋友的顺序,例如按字母顺序对它们进行排序(然后两个配对看起来都是 [John,Tom])。

【讨论】:

这行得通,但我仍然不明白为什么当 [Tom, John] 和 [John, Tom] 对相等时 compareTo 方法没有返回 0。它们不应该被发送到同一个reducer吗? 嗯,你说对了一部分。它们被发送到同一个 reducer 实例,因为它们具有相同的哈希码。但 reducer 将它们视为不同的键,因为它们的字节表示不同。

以上是关于hadoop reducer 不考虑两个相等的自定义可写对象相等的主要内容,如果未能解决你的问题,请参考以下文章

比较两个集合的相等性,而不考虑其中项目的顺序

[Hadoop]MapReducer工作过程

Hadoop优化

Hadoop企业优化

Hadoop:Reducer 不会发出正确的计算

等效于 mongo 的 out:reduce 选项在 hadoop