`

Hadoop-1.2.1 单词统计例子

阅读更多
package com.bjsxt.mr;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;



/**
* 单词统计
* @author tingyu
* @date  2016-02-29 00:44
*/

/**
* KEYIN:一句话或单词的下标
* VALUEIN:输入的VALUE为文本
* KEYOUT:   输出的KEY为文本
* VALUEOUT: 输出为数字
*/
public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/*
* 每次调用map方法时会传入split(分片)中一行数据,key为该行数据在分片中的下标位置
*
*/
protected void map(LongWritable key, Text value, Context context) throws IOException ,InterruptedException {
String line=value.toString();
StringTokenizer st=new StringTokenizer(line);  //默认按空格进行切分
while(st.hasMoreTokens()){
String world=st.nextToken();
context.write(new Text(world), new IntWritable(1));   //map输出
}

};
}








package com.bjsxt.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


/**
* 单词统计
* @author tingyu
* @date  2016-02-29 00:44
*/
/*
* KEYIN: 即map的输出key
* VALUEIN: 即map输出的value
* KEYOUT: 文本
* VALUEOUT: 数值
*/
public class WcReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
protected void reduce(Text key, java.lang.Iterable<IntWritable> iterable, Context context)
throws java.io.IOException ,InterruptedException {
int sum=0;
for(IntWritable val:iterable){
sum+=val.get();
}
context.write(key, new IntWritable(sum));

};
}





package com.bjsxt.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


/**
* 单词统计
* @author tingyu
* @date  2016-02-29 00:44
*/
public class JobRun {
public static void main(String[] args) {
Configuration config=new Configuration();
config.set("mapred.job.tracker", "192.168.0.200:9001");   //即hadoop-1.2/conf/mapred-site.xml中的配置
config.set("fs.default.name", "hdfs://192.168.0.200:9000");
//如果本地Eclipse不行,就需要设置jar文件的位置
//config.set("mapred.jar", "C:\\Users\\tingyu\\Desktop\\hadoop\\wordCount.jar");
try {
Job job=new Job(config,"world count");
job.setJarByClass(JobRun.class);
job.setMapperClass(WcMapper.class);
job.setReducerClass(WcReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置reduceTask的任务数
//job.setNumReduceTasks(2); 

FileInputFormat.addInputPath(job, new Path("/opt/input/wc"));
FileOutputFormat.setOutputPath(job, new Path("/opt/output/wc"));

System.exit(job.waitForCompletion(true)?0:1);

} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics