本文共 14429 字,大约阅读时间需要 48 分钟。
前言:边写的时候我有修改一些代码,可能前后会有点对不上
输入:key:当前行偏移位置, value:当前行内容
输出:key: word:workname,val: 1 hadoop的context.write,必须保证有值写入,不能是null,如果是null会让map输出和reduce的输入不匹配而造成程序运行错误。 setup: 在开始前执行且仅执行一次 获取每个文件所属的作品名,避免在map中重复计算public static class TFMapper extends Mapper
对map的结果做一次合并,但是hadoop文档也有说明,即使设置了combiner它也不是一定会被执行的
public static class TFCombiner extends Reducer{ @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { if (values == null) { return; } int sumCount = 0; for (Text val : values) { sumCount += Integer.parseInt(val.toString()); } context.write(key, new Text(String.valueOf(sumCount))); } }
分区数应该和你分配的reducer一样数目(小于也可以),但不能大于,不然就会出现像取模得到5,但没有5号reducer的情况;
分区数默认为1,你可以自己设置public static class TFPartitioner extends Partitioner{ @Override public int getPartition(Text key, Text value, int numPartitions) { String fileName = key.toString().split(":")[1]; numPartitions = taskNum; return Math.abs((fileName.hashCode() * 127) % numPartitions); } }
key:word:workname
value:tf reducer和combiner的逻辑基本一致,combiner的输出作为reducer的输入,在某些情况下,你要注意combiner不应该影响你想要呈现的真实结果。public static class TFReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { if (values == null) { return; } int sumCount = 0; for (Text val : values) { sumCount += Integer.parseInt(val.toString()); } context.write(key, new Text(String.valueOf(sumCount))); } }
public static void main(String[] args) { Configuration conf = new Configuration(); try { Job job = Job.getInstance(conf); job.setJobName("TF-job"); job.setJarByClass(TFMR.class); job.setMapperClass(TFMR.TFMapper.class); job.setCombinerClass(TFMR.TFCombiner.class); job.setPartitionerClass(TFMR.TFPartitioner.class); job.setNumReduceTasks(taskNum); job.setReducerClass(TFMR.TFReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); String inputPath = args[0]; String outputPath = args[1]; FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); job.waitForCompletion(true); } catch (Exception e) { e.printStackTrace(); } }
实验要求是按第字分解获取作品名,我自己写了个很小的样例集来检查
hadoop fs -mkdir /test-in
查看目录是否创建 hadoop fs -ls /
直接ls查看不到test-in目录不用慌,因为HDFS是hadoop原生态的文件系统,这条命令是在hdfs文件系统下创建目录,所以要用hadoop的命令方式才能查看到你的文件hdfs dfs -copyFromLocal /home/xjm/class3_spring/big-data/workspace/little /test-in
hadoop fs -ls /test-in
看一下是否复制成功了 hdfs dfs -cat /test-in/小说第一
hadoop jar TF-IDF-1.0-SNAPSHOT.jar /test-in /test-out
(调整为自己的jar包路径) 输出目录不能事先存在,只能由程序创建,如果已经有test-out目录, 可以通过hadoop fs -rm -r -skipTrash /test-out
删除hadoop fs -ls /test-out
设置了几个reducer就有几个输出文件 hadoop fs -cat /test-out/part-r-00000
可以看到已经正确输出了 实验要求TF是按作品数,一个作品被分成了多个文件,像这样,
而这里的IDF是按文件数算,所以上一步的输出没法利用。
这里我是直接在setup累加的总文件数,所以是默认一个文件对应一个map; 我查到说由于文件大小和设置分区大小,分片数可能!=文件数,这样的话总文件数有问题,我想用set记录,然后算set的大小解决;但是随之想到,同时也可能出现一个文件中分成多片后,word被记录了多次,所以这里还要另外处理,嗯… 不过,默认配置的话,文件还没大到那个程度,庆幸 RE:改了文档数统计,之前用static,结果一直是0,叹气输入:key:当前行偏移位置, value:当前行内容
输出:key:word, value:1(填充值,无意义)public static class IDFMapper extends Mapper{ private final Text one = new Text("1"); private Text word = new Text(); @Override protected void map(Object key, Text value, Mapper .Context context) throws IOException, InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value.toString()); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); } } }
将相同key值记录合并,一个word在文档中出现应该只有一条记录
public static class TFCombiner extends Reducer{ @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { if (values == null) { return; } context.write(key, new Text("1")); } }
按照word进行分区
public static class IDFPartitioner extends Partitioner{ @Override public int getPartition(Text key, Text value, int numPartitions) { String word = key.toString(); numPartitions = taskNum; return Math.abs((word.hashCode() * 127) % numPartitions); } }
输入:key:word, value:{1,1…}
输出:key:word ,value:idfpublic static class IDFReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { if (values == null) { return; } int fileCount = 0; for (Text value : values) { fileCount += Integer.parseInt(value.toString()); } double idfValue = Math.log10(1.0 * totalFiles / (fileCount + 1)); context.write(key, new Text(String.valueOf(idfValue))); } }
在输入的输出路径名加上IDF后缀以区分,通过API计算文件总数
这个计算数目有点问题,是非递归的public static void main(String[] args) { Configuration conf = new Configuration(); try { Job job = Job.getInstance(conf); job.setJobName("IDF-job"); job.setJarByClass(IDFMR.class); job.setMapperClass(IDFMR.IDFMapper.class); job.setCombinerClass(IDFMR.IDFCombiner.class); job.setPartitionerClass(IDFMR.IDFPartitioner.class); job.setNumReduceTasks(taskNum); job.setReducerClass(IDFMR.IDFReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); Path inputPath = new Path(args[0]); // 计算文档总数并加入设置 job.setProfileParams(String.valueOf(getFilesNum(conf, inputPath))); String outputPath = args[1] + "IDF"; Path fileOutPath = new Path(outputPath); FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(fileOutPath)) { fileSystem.delete(fileOutPath, true); } FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, fileOutPath); job.waitForCompletion(true); } catch (Exception e) { e.printStackTrace(); } }
private static int getFilesNum(Configuration conf, Path inputPath) throws FileNotFoundException, IOException { FileSystem fs = FileSystem.get(conf); FileStatus status[] = fs.listStatus(inputPath); return status.length; }
hadoop jar TF-IDF-1.0-SNAPSHOT.jar /test-in /test-out
(调整为自己的jar包路径) 输出目录不能事先存在,只能由程序创建,如果已经有输出目录, 可以通过hadoop fs -rm -r -skipTrash /test-outIDF
删除hadoop fs -ls /test-outIDF
设置了几个reducer就有几个输出文件 hadoop fs -cat /test-outIDF/part-r-00000
老是卡住跑不出来,好不容易跑出来了,结果是infnity,叹气,看了下是总文件数计算的问题 调了调,跑出来了 为了验证,我打了三行,分别是包含的文件数,总文件数,idf,验算了下没问题 分别计算完tf 和idf之后,将两个结果合并
TF的输出格式是 word:workname tf IDF的输出格式是 word idf 把这两个作为输入,得到的输出是 workname word tf-idf 文件输入进来mapper的输入key 是行开头字符偏移量,value是行内容把输入格式处理成
word workname#tf word idfpublic static class TFIDFMapper extends Mapper{ @Override protected void map(Object key, Text value, Mapper .Context context) throws IOException, InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value.toString()); String str1 = tokenizer.nextToken(); String str2 = tokenizer.nextToken(); if (str1.indexOf(":") != -1){ String str3 = str1.split(":")[0]; String str4 = str1.split(":")[1]; context.write(new Text(str3), new Text(String.join("#", str4, str2))); } else context.write(new Text(str1), new Text(str2)); } }
找出不含#的value作为idf,其他的依次分割出作品名和tf,并计算
按要求格式输出public static class TFIDFReducer extends Reducer{ private Text workname = new Text(); private Text wordval = new Text(); @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { if (values == null) { return; } double idf = 1.0; ArrayList list = new ArrayList (); for (Text val : values) { String str = val.toString(); if (str.indexOf("#") != -1) list.add(str); else idf = Double.parseDouble(str); } for(String str:list){ workname.set(str.split("#")[0]); int tf = Integer.parseInt(str.split("#")[1]); String tfidf = String.valueOf(idf*tf); wordval.set(String.join(" ",key.toString(),tfidf)); context.write(workname, wordval); } } }
在输入路径加入相应后缀
public static void main(String[] args) { Configuration conf = new Configuration(); try { Job job = Job.getInstance(conf); job.setJobName("TFIDF-job"); job.setJarByClass(TFIDFMR.class); job.setMapperClass(TFIDFMR.TFIDFMapper.class); job.setPartitionerClass(TFIDFMR.TFIDFPartitioner.class); job.setNumReduceTasks(taskNum); job.setReducerClass(TFIDFMR.TFIDFReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // String inputPath1 = args[0] + "TF"; String inputPath2 = args[0] + "IDF"; String outputPath = args[1] + "TFIDF"; Path fileOutPath = new Path(outputPath); FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(fileOutPath)) { fileSystem.delete(fileOutPath, true); } FileInputFormat.addInputPaths(job, String.join(",", inputPath1, inputPath2)); FileOutputFormat.setOutputPath(job, fileOutPath); job.waitForCompletion(true); } catch (Exception e) { e.printStackTrace(); } }
hadoop jar TF-IDF-1.0-SNAPSHOT.jar /test-out /test-out
(调整为自己的jar包路径) 输出目录不能事先存在,只能由程序创建,如果已经有输出目录, 可以通过hadoop fs -rm -r -skipTrash /test-outIDF
删除 [RE:代码加了检测,已经存在的话会先自动删除]hadoop fs -ls /test-outIDF
三个单元分别测试成功后,写一个工作台将三个连接起来工作
package com.xjm;public class Client { public static void main(String[] args) { TFMR.main(args); IDFMR.main(args); String[] nargs = { args[1],args[1]}; TFIDFMR.main(nargs); }}
hadoop jar TF-IDF-1.0-SNAPSHOT.jar /test-in /test-out
(调整为自己的jar包路径) 输出目录不能事先存在,只能由程序创建,如果已经有输出目录, 可以通过hadoop fs -rm -r -skipTrash /test-out
删除 [RE:代码加了检测,已经存在的话会先自动删除]hadoop fs -ls /test-out
为了比对输出内容,我写的是test-out1 maven-assembly-plugin false jar-with-dependencies com.xjm.TFMR make-assembly package assembly
我是用maven管理的项目,在pom文件里加入相关依赖就好了。
如果下载不成功,有很多原因;你可以在classpath加入你下的hadoop jar包;或者换源下载,我用阿里云的昨天一直下不了,1是老仓库的地址更新了。2 深更半夜好像连接不好,终端一直显示unknown hosts,今早重试又可以了…看8088界面状态信息,以及确保你的各节点成功开启了
我又遇到unhealthy nodes了… 删磁盘,重启几次虚拟机又能跑了…被挂起了,原因是资源不够
转载地址:http://kumq.baihongyu.com/