博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
TF-IDF mapreduce实现
阅读量:347 次
发布时间:2019-03-04

本文共 14429 字,大约阅读时间需要 48 分钟。

前言:边写的时候我有修改一些代码,可能前后会有点对不上

文章目录

TF

map

输入:key:当前行偏移位置, value:当前行内容

输出:key: word:workname,val: 1
hadoop的context.write,必须保证有值写入,不能是null,如果是null会让map输出和reduce的输入不匹配而造成程序运行错误。
setup:
在开始前执行且仅执行一次
获取每个文件所属的作品名,避免在map中重复计算

public static class TFMapper extends Mapper
{
private final Text one = new Text("1"); private Text word = new Text(); private String workName = ""; @Override protected void setup(Mapper
.Context context) throws IOException, InterruptedException {
InputSplit inputSplit = context.getInputSplit(); String fileFullName = ((FileSplit)inputSplit).getPath().toString(); String[] nameSegments = fileFullName.split("/"); workName = nameSegments[nameSegments.length - 1]; int idx = workName.indexOf("第"); if (idx != -1){
workName = workName.substring(0, idx); } } @Override protected void map(Object key, Text value, Mapper
.Context context) throws IOException, InterruptedException {
StringTokenizer tokenizer = new StringTokenizer(value.toString()); while (tokenizer.hasMoreTokens()) {
word.set(String.join(":", tokenizer.nextToken(), workName)); context.write(word, one); } } }

combiner

对map的结果做一次合并,但是hadoop文档也有说明,即使设置了combiner它也不是一定会被执行的

public static class TFCombiner extends Reducer
{
@Override protected void reduce(Text key, Iterable
values, Reducer
.Context context) throws IOException, InterruptedException {
if (values == null) {
return; } int sumCount = 0; for (Text val : values) {
sumCount += Integer.parseInt(val.toString()); } context.write(key, new Text(String.valueOf(sumCount))); } }

Partitioner

分区数应该和你分配的reducer一样数目(小于也可以),但不能大于,不然就会出现像取模得到5,但没有5号reducer的情况;

分区数默认为1,你可以自己设置

public static class TFPartitioner extends Partitioner
{
@Override public int getPartition(Text key, Text value, int numPartitions) {
String fileName = key.toString().split(":")[1]; numPartitions = taskNum; return Math.abs((fileName.hashCode() * 127) % numPartitions); } }

reducer

key:word:workname

value:tf
reducer和combiner的逻辑基本一致,combiner的输出作为reducer的输入,在某些情况下,你要注意combiner不应该影响你想要呈现的真实结果。

public static class TFReducer extends Reducer
{
@Override protected void reduce(Text key, Iterable
values, Reducer
.Context context) throws IOException, InterruptedException {
if (values == null) {
return; } int sumCount = 0; for (Text val : values) {
sumCount += Integer.parseInt(val.toString()); } context.write(key, new Text(String.valueOf(sumCount))); } }

MAIN

public static void main(String[] args) {
Configuration conf = new Configuration(); try {
Job job = Job.getInstance(conf); job.setJobName("TF-job"); job.setJarByClass(TFMR.class); job.setMapperClass(TFMR.TFMapper.class); job.setCombinerClass(TFMR.TFCombiner.class); job.setPartitionerClass(TFMR.TFPartitioner.class); job.setNumReduceTasks(taskNum); job.setReducerClass(TFMR.TFReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); String inputPath = args[0]; String outputPath = args[1]; FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); job.waitForCompletion(true); } catch (Exception e) {
e.printStackTrace(); } }

TF的测试

实验要求是按第字分解获取作品名,我自己写了个很小的样例集来检查

  1. 在hadoop目录下创建test-in目录作测试用:
    hadoop fs -mkdir /test-in
    查看目录是否创建 hadoop fs -ls /
    直接ls查看不到test-in目录不用慌,因为HDFS是hadoop原生态的文件系统,这条命令是在hdfs文件系统下创建目录,所以要用hadoop的命令方式才能查看到你的文件
  2. 在本地写一个测试文件
    内容如下
    在这里插入图片描述我的文件是在workspace文件夹下,把它复制到test-in目录下
    hdfs dfs -copyFromLocal /home/xjm/class3_spring/big-data/workspace/little /test-in
    hadoop fs -ls /test-in看一下是否复制成功了
    在这里插入图片描述
    查看文本内容hdfs dfs -cat /test-in/小说第一
  3. 运行测试
    在pom文件指定程序入口com.xjm.TF
    hadoop jar TF-IDF-1.0-SNAPSHOT.jar /test-in /test-out
    (调整为自己的jar包路径)
    输出目录不能事先存在,只能由程序创建,如果已经有test-out目录,
    可以通过hadoop fs -rm -r -skipTrash /test-out删除
  4. 查看结果
    查看你的输出目录hadoop fs -ls /test-out
    设置了几个reducer就有几个输出文件
    hadoop fs -cat /test-out/part-r-00000
    可以看到已经正确输出了
    在这里插入图片描述

IDF

实验要求TF是按作品数,一个作品被分成了多个文件,像这样,

在这里插入图片描述

而这里的IDF是按文件数算,所以上一步的输出没法利用。

这里我是直接在setup累加的总文件数,所以是默认一个文件对应一个map;
我查到说由于文件大小和设置分区大小,分片数可能!=文件数,这样的话总文件数有问题,我想用set记录,然后算set的大小解决;但是随之想到,同时也可能出现一个文件中分成多片后,word被记录了多次,所以这里还要另外处理,嗯…
不过,默认配置的话,文件还没大到那个程度,庆幸
RE:改了文档数统计,之前用static,结果一直是0,叹气

mapper

输入:key:当前行偏移位置, value:当前行内容

输出:key:word, value:1(填充值,无意义)

public static class IDFMapper extends Mapper
{
private final Text one = new Text("1"); private Text word = new Text(); @Override protected void map(Object key, Text value, Mapper
.Context context) throws IOException, InterruptedException {
StringTokenizer tokenizer = new StringTokenizer(value.toString()); while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken()); context.write(word, one); } } }

combiner

将相同key值记录合并,一个word在文档中出现应该只有一条记录

public static class TFCombiner extends Reducer
{
@Override protected void reduce(Text key, Iterable
values, Reducer
.Context context) throws IOException, InterruptedException {
if (values == null) {
return; } context.write(key, new Text("1")); } }

partitioner

按照word进行分区

public static class IDFPartitioner extends Partitioner
{
@Override public int getPartition(Text key, Text value, int numPartitions) {
String word = key.toString(); numPartitions = taskNum; return Math.abs((word.hashCode() * 127) % numPartitions); } }

reducer

输入:key:word, value:{1,1…}

输出:key:word ,value:idf

public static class IDFReducer extends Reducer
{
@Override protected void reduce(Text key, Iterable
values, Reducer
.Context context) throws IOException, InterruptedException {
if (values == null) {
return; } int fileCount = 0; for (Text value : values) {
fileCount += Integer.parseInt(value.toString()); } double idfValue = Math.log10(1.0 * totalFiles / (fileCount + 1)); context.write(key, new Text(String.valueOf(idfValue))); } }

MAIN

在输入的输出路径名加上IDF后缀以区分,通过API计算文件总数

这个计算数目有点问题,是非递归的

public static void main(String[] args) {
Configuration conf = new Configuration(); try {
Job job = Job.getInstance(conf); job.setJobName("IDF-job"); job.setJarByClass(IDFMR.class); job.setMapperClass(IDFMR.IDFMapper.class); job.setCombinerClass(IDFMR.IDFCombiner.class); job.setPartitionerClass(IDFMR.IDFPartitioner.class); job.setNumReduceTasks(taskNum); job.setReducerClass(IDFMR.IDFReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); Path inputPath = new Path(args[0]); // 计算文档总数并加入设置 job.setProfileParams(String.valueOf(getFilesNum(conf, inputPath))); String outputPath = args[1] + "IDF"; Path fileOutPath = new Path(outputPath); FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(fileOutPath)) {
fileSystem.delete(fileOutPath, true); } FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, fileOutPath); job.waitForCompletion(true); } catch (Exception e) {
e.printStackTrace(); } }
private static int getFilesNum(Configuration conf, Path inputPath) throws FileNotFoundException, IOException {
FileSystem fs = FileSystem.get(conf); FileStatus status[] = fs.listStatus(inputPath); return status.length; }

测试

  1. 运行测试
    在pom文件指定程序入口com.xjm.IDF
    hadoop jar TF-IDF-1.0-SNAPSHOT.jar /test-in /test-out
    (调整为自己的jar包路径)
    输出目录不能事先存在,只能由程序创建,如果已经有输出目录,
    可以通过hadoop fs -rm -r -skipTrash /test-outIDF删除
  2. 查看结果
    查看你的输出目录hadoop fs -ls /test-outIDF
    设置了几个reducer就有几个输出文件
    hadoop fs -cat /test-outIDF/part-r-00000
    老是卡住跑不出来,好不容易跑出来了,结果是infnity,叹气,看了下是总文件数计算的问题
    调了调,跑出来了
    为了验证,我打了三行,分别是包含的文件数,总文件数,idf,验算了下没问题
    在这里插入图片描述
    验算完之后就可以把多余的两行删掉了
    在这里插入图片描述

TF-IDFMR

分别计算完tf 和idf之后,将两个结果合并

TF的输出格式是 word:workname tf
IDF的输出格式是 word idf
把这两个作为输入,得到的输出是 workname word tf-idf
文件输入进来mapper的输入key 是行开头字符偏移量,value是行内容

mapper

把输入格式处理成

word workname#tf
word idf

public static class TFIDFMapper extends Mapper
{
@Override protected void map(Object key, Text value, Mapper
.Context context) throws IOException, InterruptedException {
StringTokenizer tokenizer = new StringTokenizer(value.toString()); String str1 = tokenizer.nextToken(); String str2 = tokenizer.nextToken(); if (str1.indexOf(":") != -1){
String str3 = str1.split(":")[0]; String str4 = str1.split(":")[1]; context.write(new Text(str3), new Text(String.join("#", str4, str2))); } else context.write(new Text(str1), new Text(str2)); } }

reducer

找出不含#的value作为idf,其他的依次分割出作品名和tf,并计算

按要求格式输出

public static class TFIDFReducer extends Reducer
{
private Text workname = new Text(); private Text wordval = new Text(); @Override protected void reduce(Text key, Iterable
values, Reducer
.Context context) throws IOException, InterruptedException {
if (values == null) {
return; } double idf = 1.0; ArrayList
list = new ArrayList
(); for (Text val : values) {
String str = val.toString(); if (str.indexOf("#") != -1) list.add(str); else idf = Double.parseDouble(str); } for(String str:list){
workname.set(str.split("#")[0]); int tf = Integer.parseInt(str.split("#")[1]); String tfidf = String.valueOf(idf*tf); wordval.set(String.join(" ",key.toString(),tfidf)); context.write(workname, wordval); } } }

main

在输入路径加入相应后缀

public static void main(String[] args) {
Configuration conf = new Configuration(); try {
Job job = Job.getInstance(conf); job.setJobName("TFIDF-job"); job.setJarByClass(TFIDFMR.class); job.setMapperClass(TFIDFMR.TFIDFMapper.class); job.setPartitionerClass(TFIDFMR.TFIDFPartitioner.class); job.setNumReduceTasks(taskNum); job.setReducerClass(TFIDFMR.TFIDFReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // String inputPath1 = args[0] + "TF"; String inputPath2 = args[0] + "IDF"; String outputPath = args[1] + "TFIDF"; Path fileOutPath = new Path(outputPath); FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(fileOutPath)) {
fileSystem.delete(fileOutPath, true); } FileInputFormat.addInputPaths(job, String.join(",", inputPath1, inputPath2)); FileOutputFormat.setOutputPath(job, fileOutPath); job.waitForCompletion(true); } catch (Exception e) {
e.printStackTrace(); } }

测试

  1. 运行测试
    在pom文件指定程序入口com.xjm.TFIDFMR
    hadoop jar TF-IDF-1.0-SNAPSHOT.jar /test-out /test-out
    (调整为自己的jar包路径)
    输出目录不能事先存在,只能由程序创建,如果已经有输出目录,
    可以通过hadoop fs -rm -r -skipTrash /test-outIDF删除
    [RE:代码加了检测,已经存在的话会先自动删除]
  2. 查看结果
    查看你的输出目录hadoop fs -ls /test-outIDF
    在这里插入图片描述

client

三个单元分别测试成功后,写一个工作台将三个连接起来工作

package com.xjm;public class Client {
public static void main(String[] args) {
TFMR.main(args); IDFMR.main(args); String[] nargs = {
args[1],args[1]}; TFIDFMR.main(nargs); }}
  1. 运行测试
    在pom文件指定程序入口为com.xjm.Client
    hadoop jar TF-IDF-1.0-SNAPSHOT.jar /test-in /test-out
    (调整为自己的jar包路径)
    输出目录不能事先存在,只能由程序创建,如果已经有输出目录,
    可以通过hadoop fs -rm -r -skipTrash /test-out删除
    [RE:代码加了检测,已经存在的话会先自动删除]
  2. 查看结果
    查看你的输出目录hadoop fs -ls /test-out
    为了比对输出内容,我写的是test-out1
    在这里插入图片描述
    贴个全的,和之前对比一致
    在这里插入图片描述

POM

程序入口

maven-assembly-plugin
false
jar-with-dependencies
com.xjm.TFMR
make-assembly
package
assembly

一些遇到的问题

The import org.apache cannot be resolved

我是用maven管理的项目,在pom文件里加入相关依赖就好了。

如果下载不成功,有很多原因;你可以在classpath加入你下的hadoop jar包;或者换源下载,我用阿里云的昨天一直下不了,1是老仓库的地址更新了。2 深更半夜好像连接不好,终端一直显示unknown hosts,今早重试又可以了…

跑了很久都没有结果

看8088界面状态信息,以及确保你的各节点成功开启了

我又遇到unhealthy nodes了…
删磁盘,重启几次虚拟机又能跑了…

waiting for AM container to be allocated, launched and register with RM.

被挂起了,原因是资源不够

转载地址:http://kumq.baihongyu.com/

你可能感兴趣的文章