如何遍历两次Reduce中的Iterable<Text>values

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何遍历两次Reduce中的Iterable<Text>values相关的知识,希望对你有一定的参考价值。

参考技术A package com.test;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class T

public static void main(String[] args)

// 只要实现了Iterable接口的对象都可以使用for-each循环。
// Iterable接口只由iterator方法构成,
// iterator()方法是java.lang.Iterable接口,被Collection继承。
/*public interface Iterable<T>
Iterator<T> iterator();
*/
Iterable<String> iter = new Iterable<String>()
public Iterator<String> iterator()
List<String> l = new ArrayList<String>();
l.add("aa");
l.add("bb");
l.add("cc");
return l.iterator();

;
for(int count : new int[] 1, 2)
for (String item : iter)
System.out.println(item);

System.out.println("---------->> " + count + " END.");




结果当然是很正常的完整无误的打印了两遍 Iterable 的值。那究竟是什么原因导致了 reduce 阶段的 Iterable 只能被遍历一次呢?
我们先看一段测试代码:
测试数据:
?

1
2
3
4
5
6
7

a 3
a 4
b 50
b 60
a 70
b 8
a 9
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class TestIterable

public static class M1 extends Mapper<Object, Text, Text, Text>
private Text oKey = new Text();
private Text oVal = new Text();
String[] lineArr;

public void map(Object key, Text value, Context context) throws IOException, InterruptedException
lineArr = value.toString().split(" ");
oKey.set(lineArr[0]);
oVal.set(lineArr[1]);
context.write(oKey, oVal);



public static class R1 extends Reducer<Text, Text, Text, Text>
List<String> valList = new ArrayList<String>();
List<Text> textList = new ArrayList<Text>();
String strAdd;
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
InterruptedException
valList.clear();
textList.clear();
strAdd = "";
for (Text val : values)
valList.add(val.toString());
textList.add(val);


// 坑之 1 :为神马输出的全是最后一个值?why?
for(Text text : textList)
strAdd += text.toString() + ", ";

System.out.println(key.toString() + "\t" + strAdd);
System.out.println(".......................");

// 我这样干呢?对了吗?
strAdd = "";
for(String val : valList)
strAdd += val + ", ";

System.out.println(key.toString() + "\t" + strAdd);
System.out.println("----------------------");

// 坑之 2 :第二次遍历的时候为什么得到的都是空?why?
valList.clear();
strAdd = "";
for (Text val : values)
valList.add(val.toString());

for(String val : valList)
strAdd += val + ", ";

System.out.println(key.toString() + "\t" + strAdd);
System.out.println(">>>>>>>>>>>>>>>>>>>>>>");



public static void main(String[] args) throws Exception
Configuration conf = new Configuration();
conf.set("mapred.job.queue.name", "regular");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2)
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);

System.out.println("------------------------");
Job job = new Job(conf, "TestIterable");
job.setJarByClass(TestIterable.class);
job.setMapperClass(M1.class);
job.setReducerClass(R1.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 输入输出路径
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileSystem.get(conf).delete(new Path(otherArgs[1]), true);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);



在 Eclipse 控制台中的结果如下:
a 9, 9, 9, 9,
.......................
a 3, 4, 70, 9,
----------------------
a
>>>>>>>>>>>>>>>>>>>>>>
b 8, 8, 8,
.......................
b 50, 60, 8,
----------------------
b
>>>>>>>>>>>>>>>>>>>>>>

关于第 1 个坑:对象重用( objects reuse )
reduce方法的javadoc中已经说明了会出现的问题:
The framework calls this method for each <key, (list of values)> pair in the grouped inputs. Output values must be of the same type as input values. Input keys must not be altered. The framework will reuse the key and value objects that are passed into the reduce, therefore the application should clone the objects they want to keep a copy of.
也就是说虽然reduce方法会反复执行多次,但key和value相关的对象只有两个,reduce会反复重用这两个对象。所以如果要保存key或者value的结果,只能将其中的值取出另存或者重新clone一个对象(例如Text store = new Text(value) 或者 String a = value.toString()),而不能直接赋引用。因为引用从始至终都是指向同一个对象,你如果直接保存它们,那最后它们都指向最后一个输入记录。会影响最终计算结果而出错。
看到这里,我想你会恍然大悟:这不是刚毕业找工作,面试官常问的问题:String 是不可变对象但为什么能相加呢?为什么字符串相加不提倡用 String,而用 StringBuilder ?如果你还不清楚这个问题怎么回答,建议你看看这篇《深入理解 String, StringBuffer 与 StringBuilder 的区别》http://my.oschina.net/leejun2005/blog/102377

关于第 2 个坑:http://stackoverflow.com/questions/6111248/iterate-twice-on-values
The Iterator you receive from that Iterable's iterator() method is special. The values may not all be in memory; Hadoop may be streaming them from disk. They aren't really backed by a Collection, so it's nontrivial to allow multiple iterations.

最后想说明的是:hadoop 框架的作者们真的是考虑很周全,在 hadoop 框架中,不仅有对象重用,还有 JVM 重用等,节约一切可以节约的资源,提高一切可以提高的性能。因为在这种海量数据处理的场景下,性能优化是非常重要的,你可能处理100条数据体现不出性能差别,但是你面对的是千亿、万亿级别的数据呢?本回答被提问者和网友采纳

map

一、map、filter、reduce

  map(fuction , iterable)   映射  对可迭代对象中的每一项,使用函数去改变

  filter(function, iterable)  过滤  可迭代对象中的每一项,放到函数中去计算,如何为真,则留下,构造成一个迭代器,为假则去除

  reduce(fuction,iterable)  减少  把元素中的左边的合并到右边去。

1.1 map  映射

  官方定义:Return an iterator that applies function to every item of iterable, yielding the results.

        返回一个迭代器,它对每个迭代项应用函数,得到结果。

  why:不使用for循环,就能将序列中的数据m--映射到给定的处理函数中。快速的对一个序列进行各种操作

numbers = [1, 3, 5, 7, 9]

# 改写成2,4,6,8,10

#普通做法
new_num = []
for x in numbers:
    new_num.append(x+1)

print(new_num)

# map版本
def addone(x):
    return x+1

print(list(map(addone, numbers)))

# 其他应用 改写字符串
str_list = [lilei, hmm, de8ug]
def change(s: str):
    return s.upper()

print(list(map(change, str_list)))

 

1.2 filter 过滤、筛选

  官方: Construct an iterator from those elements of iterable for which function returns true. 

      iterable may be either a sequence, a container which supports iteration, or an iterator.

      If function is None, the identity function is assumed, that is, all elements of iterable that are false are removed.

      从可迭代的元素中构造一个迭代器,函数返回true。iterable可以是一个序列,一个支持迭代的容器,

      或者一个迭代器。如果函数为None,则假定标识函数为false,即为 false的所有元素都被删除。

  why:不用for循环, 就能将序列中的数据一一映射到给定 的处理函数, 函数中添加了真假判断,

     True则返回 相应数据,最终得到筛选后的序列。

‘‘‘
    查找大于30 小于50的数字列表
‘‘‘
my_list = [24,23,75,12,43,9,42,28,37]
# 普通版本
new_list = []

for i in my_list:
    if 30 < i < 50:
        new_list.append(i)
print(new_list)

# filter版本
def choose(x):
    # if 30 < i < 50:
    #     return True
    return 30 < x < 50  # < 操作的结果,已经是bool 类型了

print(list(filter(choose, my_list)))
        
# 字符串操作
import re
str_list = [lilei, hmm, de8ug, debug1, de8ug2]

def lh(s: str):
    return re.search(de8ug, s)

print(list(filter(lh, str_list)))

 

1.3 reduce 减少,合并

  官方:Apply function of two arguments cumulatively to the items of sequence, from left to right,

     so as to reduce the sequence to a single value.

     将两个参数的函数累积到序列的项上,从左到右,以便将序列减少到单个值。

   why: 为了快速的进行累加,连乘的计算 使代码更简洁

  需要注意:py3中吧reduce放到了fuctiontools这个模块下了

from functools import reduce

numbers = [1, 3, 5, 7, 9]

# 普通版本 累加
count = 0
for i in numbers:
    count += i
print(count)

# reduce版 累加
def add(x, y):
    return x+y

print(reduce(add, numbers))

# 普通版本 连乘
count = 1
for i in numbers:
    count *= i
print(count)

# reduce版本 连乘
def mul(x, y):
    return x*y
print(reduce(mul, numbers))

 

 

  

 

以上是关于如何遍历两次Reduce中的Iterable<Text>values的主要内容,如果未能解决你的问题,请参考以下文章

Java里的实现了Iterable的类用iterator遍历完后怎么再次遍历?

map

python的map和reduce函数

python中reduce的用法

python中reduce的用法

Java中的Iterable与Iterator详解