之前学习hadoop的时候,一直希望可以调试hadoop源码,可是一直没找到有效的方法,今天在调试矩阵乘法的时候发现了调试的方法,所以在这里记录下来。
1)事情的起因是想在一个Job里设置map的数量(虽然最终的map数量是由分片决定的),在hadoop1.2.1之前,设置方法是:
job.setNumMapTasks()
不过,hadoop1.2.1没有了这个方法,只保留了设置reduce数量的方法。继续搜索资料,发现有同学提供了另外一种方法,就是使用configuration设置,设置方式如下:
conf.set("mapred.map.tasks",5);//设置5个map
按照上述方法设置之后,还是没有什么效果,控制分片数量的代码如下():
goalSize=totalSize/(numSplits==0?1:numSplits)//totalSize是输入数据文件的大小,numSplits是用户设置的map数量,就是按照用户自己//的意愿,每个分片的大小应该是goalSizeminSize=Math.max(job.getLong("mapred.min.split.size",1),minSplitSize)//hadoop1.2.1中mapred-default.xml文件中mapred.min.split.size=0,所以job.getLong("mapred.min.split.size",1)=0,而minSplitSize是InputSplit中的一个数据成员,在File//Split中值为1.所以minSize=1,其目的就是得到配置中的最小值。splitSize=Math.max(minSize,Math.min(goalSize,blockSize))//真正的分片大小就是取按照用户设置的map数量计算出的goalSize和块大小blockSize中最小值(这是为了是分片不会大于一个块大小,有利于本地化计算),并且又比minSize大的值。
其实,这是hadoop1.2.1之前的生成分片的方式,所以即使设置了map数量也不会有什么实际效果。
2)新版API(hadoop1.2.1)中计算分片的代码如下所示:
1 public ListgetSplits(JobContext job) throws IOException { 2 long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job)); 3 long maxSize = getMaxSplitSize(job); 4 ArrayList splits = new ArrayList(); 5 List files = this.listStatus(job); 6 Iterator i$ = files.iterator(); 7 8 while(true) { 9 while(i$.hasNext()) {10 FileStatus file = (FileStatus)i$.next();11 Path path = file.getPath();12 FileSystem fs = path.getFileSystem(job.getConfiguration());13 long length = file.getLen();14 BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0L, length);15 if(length != 0L && this.isSplitable(job, path)) {16 long blockSize = file.getBlockSize();17 long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);18 19 long bytesRemaining;20 for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {21 int blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);22 splits.add(new FileSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts()));23 }24 25 if(bytesRemaining != 0L) {26 splits.add(new FileSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkLocations.length - 1].getHosts()));27 }28 } else if(length != 0L) {29 splits.add(new FileSplit(path, 0L, length, blkLocations[0].getHosts()));30 } else {31 splits.add(new FileSplit(path, 0L, length, new String[0]));32 }33 }34 35 job.getConfiguration().setLong("mapreduce.input.num.files", (long)files.size());36 LOG.debug("Total # of splits: " + splits.size());37 return splits;38 }39 }
第17行使用computeSplitSize(blockSize,minSize,maxsize)计算分片大小。
a.minSize通过以下方式计算:
long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job))
而getFormatMinSplitSize():
protected long getFormatMinSplitSize() { return 1L; }
而getMinSplitSize(job):
public static long getMinSplitSize(JobContext job) { return job.getConfiguration().getLong("mapred.min.split.size", 1L); }
没有设置“mapred.min.split.size”的默认值是0。
所以,不设置“mapred.min.split.size”的话,就使用方法的默认值1代替,而“mapred.min.split.size”的默认值是0,所以minSize的值就是1
b.再看maxSize的计算方式:
long maxSize = getMaxSplitSize(job);
而getMaxSplitSize():
public static long getMaxSplitSize(JobContext context) { return context.getConfiguration().getLong("mapred.max.split.size", 9223372036854775807L); }
没有设置"mapred.max.split.size"的话,就使用方法的默认值 9223372036854775807,而"mapred.max.split.size"并没有默认值,所以maxSize= 9223372036854775807;
c.我们已经能够计算出minSize=1,maxSize= 9223372036854775807,接下来计算分片大小:
long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);
protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); }
显然,分片大小是就是maxSize和blockSize的较小值(minSize=1),那么我们就可以通过设置"mapred.max.split.size"来控制map的数量,只要设置值比物理块小就可以了。使用configuration对象的设置方法如下:
conf.set("mapred.max.split.size",2000000)//单位是字节,物理块是16M
3)可以设置map数量的矩阵乘法代码如下所示:
1 /** 2 * Created with IntelliJ IDEA. 3 * User: hadoop 4 * Date: 16-3-14 5 * Time: 下午3:13 6 * To change this template use File | Settings | File Templates. 7 */ 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.fs.FileSystem; 10 import java.io.IOException; 11 import java.net.URI; 12 import org.apache.hadoop.fs.Path; 13 import org.apache.hadoop.io.*; 14 import org.apache.hadoop.io.DoubleWritable; 15 import org.apache.hadoop.io.Writable; 16 import org.apache.hadoop.mapreduce.InputSplit; 17 import org.apache.hadoop.mapreduce.Job; 18 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 19 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 20 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 21 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 22 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 23 import org.apache.hadoop.mapreduce.Reducer; 24 import org.apache.hadoop.mapreduce.Mapper; 25 import org.apache.hadoop.filecache.DistributedCache; 26 import org.apache.hadoop.util.ReflectionUtils; 27 28 public class MutiDoubleInputMatrixProduct { 29 30 public static void initDoubleArrayWritable(int length,DoubleWritable[] doubleArrayWritable){ 31 for (int i=0;i{ 37 public DoubleArrayWritable map_value=new DoubleArrayWritable(); 38 public double[][] leftMatrix=null;/******************************************/ 39 //public Object obValue=null; 40 public DoubleWritable[] arraySum=null; 41 public DoubleWritable[] tempColumnArrayDoubleWritable=null; 42 public DoubleWritable[] tempRowArrayDoubleWritable=null; 43 public double sum=0; 44 public double uValue; 45 public int leftMatrixRowNum; 46 public int leftMatrixColumnNum; 47 public void setup(Context context) throws IOException { 48 Configuration conf=context.getConfiguration(); 49 leftMatrixRowNum=conf.getInt("leftMatrixRowNum",10); 50 leftMatrixColumnNum=conf.getInt("leftMatrixColumnNum",10); 51 leftMatrix=new double[leftMatrixRowNum][leftMatrixColumnNum]; 52 uValue=(double)(context.getConfiguration().getFloat("u",1.0f)); 53 tempRowArrayDoubleWritable=new DoubleWritable[leftMatrixColumnNum]; 54 initDoubleArrayWritable(leftMatrixColumnNum,tempRowArrayDoubleWritable); 55 tempColumnArrayDoubleWritable=new DoubleWritable[leftMatrixRowNum]; 56 initDoubleArrayWritable(leftMatrixRowNum,tempColumnArrayDoubleWritable); 57 System.out.println("map setup() start!"); 58 //URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration()); 59 Path[] cacheFiles=DistributedCache.getLocalCacheFiles(conf); 60 String localCacheFile="file://"+cacheFiles[0].toString(); 61 //URI[] cacheFiles=DistributedCache.getCacheFiles(conf); 62 //DistributedCache. 63 System.out.println("local path is:"+cacheFiles[0].toString()); 64 // URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration()); 65 FileSystem fs =FileSystem.get(URI.create(localCacheFile), conf); 66 SequenceFile.Reader reader=null; 67 reader=new SequenceFile.Reader(fs,new Path(localCacheFile),conf); 68 IntWritable key= (IntWritable)ReflectionUtils.newInstance(reader.getKeyClass(),conf); 69 DoubleArrayWritable value= (DoubleArrayWritable)ReflectionUtils.newInstance(reader.getValueClass(),conf); 70 //int valueLength=0; 71 int rowIndex=0; 72 int index; 73 while (reader.next(key,value)){ 74 index=-1; 75 for (Writable val:value.get()){ //ArrayWritable类的get方法返回Writable[]数组 76 tempRowArrayDoubleWritable[++index].set(((DoubleWritable)val).get()); 77 } 78 //obValue=value.toArray(); 79 rowIndex=key.get(); 80 leftMatrix[rowIndex]=new double[leftMatrixColumnNum]; 81 //this.leftMatrix=new double[valueLength][Integer.parseInt(context.getConfiguration().get("leftMatrixColumnNum"))]; 82 for (int i=0;i {121 public DoubleWritable[] sum=null;122 // public Object obValue=null;123 public DoubleArrayWritable valueArrayWritable=new DoubleArrayWritable();124 public DoubleWritable[] tempColumnArrayDoubleWritable=null;125 private int leftMatrixRowNum;126 127 public void setup(Context context){128 //leftMatrixColumnNum=context.getConfiguration().getInt("leftMatrixColumnNum",100);129 leftMatrixRowNum=context.getConfiguration().getInt("leftMatrixRowNum",100);130 sum=new DoubleWritable[leftMatrixRowNum];131 initDoubleArrayWritable(leftMatrixRowNum,sum);132 //tempRowArrayDoubleWritable=new DoubleWritable[leftMatrixColumnNum];133 tempColumnArrayDoubleWritable=new DoubleWritable[leftMatrixRowNum];134 initDoubleArrayWritable(leftMatrixRowNum,tempColumnArrayDoubleWritable);135 }136 //如果矩阵的计算已经在map中完成了,貌似可以不使用reduce,如果不创建reduce类,MR框架仍然会调用一个默认的reduce,只是这个reduce什么也不做137 //但是,不使用reduce的话,map直接写文件,有多少个map就会产生多少个结果文件。这里使用reduce是为了将结果矩阵存储在一个文件中。138 public void reduce(IntWritable key,Iterable value,Context context) throws IOException, InterruptedException {139 //int valueLength=0;140 for(DoubleArrayWritable doubleValue:value){141 int index=-1;142 for (Writable val:doubleValue.get()){143 tempColumnArrayDoubleWritable[++index].set(((DoubleWritable)val).get());144 }145 //valueLength=Array.getLength(obValue);146 /*147 for (int i=0;i
4)接下来说说如何断点调试hadoop源码,这里以计算文件分片的源码为例来说明。
a.首先找到FileInputFormat类,这个类就在hadoop-core-1.2.1.jar中,我们需要将这个jar包添加到工程中,如下所示:
虽然这是编译之后的类文件,也就是字节码,但是仍然可以像java源码一样,断点调试,这里我们分别在getSplits()方法和computeSplitSize()方法中添加两个断点,然后使用IDEA在本地直接以Debug方式运行我们的MapReduce程序,结果如下所示:
命中断点,并且我们可以查看相关的变量值。