使用分布式缓存求多矩阵乘积

Posted lz3018

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用分布式缓存求多矩阵乘积相关的知识,希望对你有一定的参考价值。

使用分布式缓存有两点需要注意,这是今天折腾了一天的体会。

1)利用DistributedCache类添加缓存文件的语句要紧紧跟在Configuration实例之后

1         Configuration conf=new Configuration();
2         DistributedCache.addCacheFile(new URI(cachePath),conf);//添加分布式缓存
3         FileSystem fs=FileSystem.get(URI.create(cachePath),conf);
4         System.out.println(fs.getUri().toString());
5         fs.delete(new Path(outUri),true);
6         conf.set("rightMatrixNum","5");
7         conf.set("u","5");
8         Job job=new Job(conf,"MultiMatrix");
9         //DistributedCache.addCacheFile(new URI(cachePath),conf);//添加分布式缓存    

原先添加在第9行,运行一直报“空引用”的错,将 DistributedCache.addCacheFile(new URI(cachePath),conf);添加到第2行,紧跟在conf之后,就OK了(不过要满足以下第2点)

2)第2点就是使用分布式缓存的自定义mapper/reducer类必须定义为内部类。

3)当满足以上两点之后,程序中就可以正常的使用分布式缓存了,不过运行又会遇到一个问题,“FileNotFoundException....”后面就是缓存文件的路径,程序找不到缓存的文件,MR程序中读取文件时,默认FileSystem是hdfs,也就是从集群上读取,但是这些缓存文件恰恰是放在数据节点本地文件系统中的,所以程序中当然会“找不到文件”,解决方法很简单,在使用DistributedCache.getLocalCacheFiles()得到路径的技术上,在其前面追加字符串”file://“就可以了,如下所示(第6行)(http://hugh-wangp.iteye.com/blog/1468989) 

 1 public void setup(Context context) throws IOException {
 2             Configuration conf=context.getConfiguration();
 3             System.out.println("map setup() start!");
 4             //URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration());
 5             Path[] cacheFiles=DistributedCache.getLocalCacheFiles(conf);
 6            String localCacheFile="file://"+cacheFiles[0].toString();      
 7             System.out.println("local path is:"+cacheFiles[0].toString());
 8             // URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration());
 9             FileSystem fs =FileSystem.get(URI.create(localCacheFile), conf);
10             SequenceFile.Reader reader=null;
11             reader=new SequenceFile.Reader(fs,new Path(localCacheFile),conf);
12             IntWritable key= (IntWritable)ReflectionUtils.newInstance(reader.getKeyClass(),conf);
13             DoubleArrayWritable value= (DoubleArrayWritable)ReflectionUtils.newInstance(reader.getValueClass(),conf);
14             int valueLength=Array.getLength(value.toArray());
15             while (reader.next(key,value)){
16                 obValue=value.toArray();
17                 leftMatrix[key.get()]=new double[valueLength];
18                 for (int i=0;i<valueLength;++i){
19                     leftMatrix[key.get()][i]=Double.parseDouble(Array.get(obValue, i).toString());
20                 }
21 
22             }
23         }

 以下就是完整的代码,虽然现在还有点问题,不过分布式缓存是实现了的。 

  1 /**
  2  * Created with IntelliJ IDEA.
  3  * User: hadoop
  4  * Date: 16-3-6
  5  * Time: 下午12:47
  6  * To change this template use File | Settings | File Templates.
  7  */
  8 import org.apache.hadoop.conf.Configuration;
  9 import org.apache.hadoop.fs.FileSystem;
 10 import java.io.IOException;
 11 import java.lang.reflect.Array;
 12 import java.net.URI;
 13 
 14 import org.apache.hadoop.fs.Path;
 15 import org.apache.hadoop.io.*;
 16 import org.apache.hadoop.mapreduce.InputSplit;
 17 import org.apache.hadoop.mapreduce.Job;
 18 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 19 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 20 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 21 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 22 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 23 import org.apache.hadoop.mapreduce.Reducer;
 24 import org.apache.hadoop.mapreduce.Mapper;
 25 import org.apache.hadoop.filecache.DistributedCache;
 26 import org.apache.hadoop.util.ReflectionUtils;
 27 
 28 public class MutiDoubleInputMatrixProduct {
 29     public static void main(String[]args) throws IOException, ClassNotFoundException, InterruptedException {
 30         String uri="/testData/input";
 31         String outUri="/sOutput";
 32         String cachePath="/testData/F100";
 33         Configuration conf=new Configuration();
 34         DistributedCache.addCacheFile(URI.create(cachePath),conf);//添加分布式缓存
 35         FileSystem fs=FileSystem.get(URI.create(uri),conf);
 36         fs.delete(new Path(outUri),true);
 37         conf.set("rightMatrixNum","5");
 38         conf.set("u","5");
 39         Job job=new Job(conf,"MultiMatrix");
 40         //DistributedCache.addCacheFile(URI.create(cachePath),conf);//添加分布式缓存
 41         //DistributedCache.addLocalFiles(conf,cachePath);
 42 
 43         job.setJarByClass(MutiDoubleInputMatrixProduct.class);
 44         job.setInputFormatClass(SequenceFileInputFormat.class);
 45         job.setOutputFormatClass(SequenceFileOutputFormat.class);
 46         job.setMapperClass(MyMapper.class);
 47         job.setReducerClass(MyReducer.class);
 48         job.setMapOutputKeyClass(IntWritable.class);
 49         job.setMapOutputValueClass(DoubleArrayWritable.class);
 50         job.setOutputKeyClass(IntWritable.class);
 51         job.setOutputValueClass(DoubleArrayWritable.class);
 52         FileInputFormat.setInputPaths(job, new Path(uri));
 53         FileOutputFormat.setOutputPath(job,new Path(outUri));
 54         System.exit(job.waitForCompletion(true)?0:1);
 55     }
 56   public static  class MyMapper extends Mapper<IntWritable,DoubleArrayWritable,IntWritable,DoubleArrayWritable>{
 57         public DoubleArrayWritable map_value=new DoubleArrayWritable();
 58         public static double[][] leftMatrix=null;
 59         public Object obValue=null;
 60         public DoubleWritable[] arraySum=null;
 61         public double sum=0;
 62 
 63         public void setup(Context context) throws IOException {
 64             Configuration conf=context.getConfiguration();
 65             System.out.println("map setup() start!");
 66             //URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration());
 67             Path[] cacheFiles=DistributedCache.getLocalCacheFiles(conf);
 68            String localCacheFile="file://"+cacheFiles[0].toString();      
 69             System.out.println("local path is:"+cacheFiles[0].toString());
 70             // URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration());
 71             FileSystem fs =FileSystem.get(URI.create(localCacheFile), conf);
 72             SequenceFile.Reader reader=null;
 73             reader=new SequenceFile.Reader(fs,new Path(localCacheFile),conf);
 74             IntWritable key= (IntWritable)ReflectionUtils.newInstance(reader.getKeyClass(),conf);
 75             DoubleArrayWritable value= (DoubleArrayWritable)ReflectionUtils.newInstance(reader.getValueClass(),conf);
 76             int valueLength=Array.getLength(value.toArray());
 77             while (reader.next(key,value)){
 78                 obValue=value.toArray();
 79                 leftMatrix[key.get()]=new double[valueLength];
 80                 for (int i=0;i<valueLength;++i){
 81                     leftMatrix[key.get()][i]=Double.parseDouble(Array.get(obValue, i).toString());
 82                 }
 83 
 84             }
 85         }
 86         public void map(IntWritable key,DoubleArrayWritable value,Context context) throws IOException, InterruptedException {
 87             obValue=value.toArray();
 88             InputSplit inputSplit=context.getInputSplit();
 89             String fileName=((FileSplit)inputSplit).getPath().getName();
 90             if (fileName.startsWith("FB")) {
 91                 context.write(key,value);
 92             }
 93             else{
 94                 for (int i=0;i<leftMatrix.length;++i){
 95                     sum=0;
 96                     for (int j=0;j<leftMatrix[0].length;++j){
 97                         sum+= leftMatrix[i][j]*Double.parseDouble(Array.get(obValue,j).toString())*Double.parseDouble(context.getConfiguration().get("u"));
 98                     }
 99                     arraySum[i]=new DoubleWritable(sum);
100                 }
101                 map_value.set(arraySum);
102                 context.write(key,map_value);
103             }
104         }
105     }
106   public static class MyReducer extends Reducer<IntWritable,DoubleArrayWritable,IntWritable,DoubleArrayWritable>{
107         public DoubleWritable[] sum=null;
108         public Object obValue=null;
109         public DoubleArrayWritable valueArrayWritable=null;
110 
111         public void setup(Context context){
112             int rightMatrixNum= Integer.parseInt(context.getConfiguration().get("rightMatrixNum"));
113             sum=new DoubleWritable[rightMatrixNum];
114             for (int i=0;i<rightMatrixNum;++i){
115                 sum[i]=new DoubleWritable(0.0);
116             }
117         }
118 
119         public void reduce(IntWritable key,Iterable<DoubleArrayWritable>value,Context context) throws IOException, InterruptedException {
120             for(DoubleArrayWritable doubleValue:value){
121                 obValue=doubleValue.toArray();
122                 for (int i=0;i<Array.getLength(obValue);++i){
123                     sum[i]=new DoubleWritable(Double.parseDouble(Array.get(obValue,i).toString())+sum[i].get());
124                 }
125             }
126             valueArrayWritable.set(sum);
127             for (int i=0;i<sum.length;++i){
128                 sum[i].set(0.0);
129             }
130             context.write(key,valueArrayWritable);
131         }
132     }
133 }
134 class DoubleArrayWritable extends ArrayWritable {
135     public DoubleArrayWritable(){
136         super(DoubleWritable.class);
137     }
138     /*
139     public String toString(){
140         StringBuilder sb=new StringBuilder();
141         for (Writable val:get()){
142             DoubleWritable doubleWritable=(DoubleWritable)val;
143             sb.append(doubleWritable.get());
144             sb.append(",");
145         }
146         sb.deleteCharAt(sb.length()-1);
147         return sb.toString();
148     }
149     */
150 }

 另外,分布式缓存也可以在本地使用IDEA调试,路径必须是本地路径就可以了(33~37),就是我创建的工程中的路径。

以上是关于使用分布式缓存求多矩阵乘积的主要内容,如果未能解决你的问题,请参考以下文章

dp-分割整数问题

[程序员代码面试指南]数组和矩阵问题-数组中子数组的最大累乘积

矩阵乘积

一个关于tensorflow中多维矩阵乘积的问题

C:矩阵向量乘积,两个双数相乘给出错误符号

矩阵的行乘积和矩阵的列和