Hadoop学习之路(二十五)MapReduce的API使用(一)

作者:扎心了,老铁

出处:https://www.cnblogs.com/qingyunzong/category/1169344.html


学生成绩—增强版

数据信息


1 computer,huangxiaoming,85,86,41,75,93,42,85 2 computer,xuzheng,54,52,86,91,42 3 computer,huangbo,85,42,96,38 4 english,zhaobenshan,54,52,86,91,42,85,75 5 english,liuyifei,85,41,75,21,85,96,14 6 algorithm,liuyifei,75,85,62,48,54,96,15 7 computer,huangjiaju,85,75,86,85,85 8 english,liuyifei,76,95,86,74,68,74,48 9 english,huangdatou,48,58,67,86,15,33,85 10 algorithm,huanglei,76,95,86,74,68,74,48 11 algorithm,huangjiaju,85,75,86,85,85,74,86 12 computer,huangdatou,48,58,67,86,15,33,85 13 english,zhouqi,85,86,41,75,93,42,85,75,55,47,22 14 english,huangbo,85,42,96,38,55,47,22 15 algorithm,liutao,85,75,85,99,66 16 computer,huangzitao,85,86,41,75,93,42,85 17 math,wangbaoqiang,85,86,41,75,93,42,85 18 computer,liujialing,85,41,75,21,85,96,14,74,86 19 computer,liuyifei,75,85,62,48,54,96,15 20 computer,liutao,85,75,85,99,66,88,75,91 21 computer,huanglei,76,95,86,74,68,74,48 22 english,liujialing,75,85,62,48,54,96,15 23 math,huanglei,76,95,86,74,68,74,48 24 math,huangjiaju,85,75,86,85,85,74,86 25 math,liutao,48,58,67,86,15,33,85 26 english,huanglei,85,75,85,99,66,88,75,91 27 math,xuzheng,54,52,86,91,42,85,75 28 math,huangxiaoming,85,75,85,99,66,88,75,91 29 math,liujialing,85,86,41,75,93,42,85,75 30 english,huangxiaoming,85,86,41,75,93,42,85 31 algorithm,huangdatou,48,58,67,86,15,33,85 32 algorithm,huangzitao,85,86,41,75,93,42,85,75

数据解释

数据字段个数不固定:

第一个是课程名称,总共四个课程,computer,math,english,algorithm,

第二个是学生姓名,后面是每次考试的分数

统计需求

1、统计每门课程的参考人数和课程平均分

2、统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件,并且按平均分从高到低排序,分数保留一位小数

3、求出每门课程参考学生成绩最高的学生的信息:课程,姓名和平均分

第一题

MRAvgScore1.java


1 /** 2 * 需求:统计每门课程的参考人数和课程平均分 3 * */ 4 public class MRAvgScore1 { 5 6 public static void main(String[] args) throws Exception { 7 8 Configuration conf1 = new Configuration(); 9 Configuration conf2 = new Configuration(); 10 11 Job job1 = Job.getInstance(conf1); 12 Job job2 = Job.getInstance(conf2); 13 14 job1.setJarByClass(MRAvgScore1.class); 15 job1.setMapperClass(AvgScoreMapper1.class); 16 //job.setReducerClass(MFReducer.class); 17 18 job1.setOutputKeyClass(Text.class); 19 job1.setOutputValueClass(DoubleWritable.class); 20 21 Path inputPath1 = new Path("D:\\MR\\hw\\work3\\input"); 22 Path outputPath1 = new Path("D:\\MR\\hw\\work3\\output_hw1_1"); 23 24 FileInputFormat.setInputPaths(job1, inputPath1); 25 FileOutputFormat.setOutputPath(job1, outputPath1); 26 27 28 job2.setMapperClass(AvgScoreMapper2.class); 29 job2.setReducerClass(AvgScoreReducer2.class); 30 31 job2.setOutputKeyClass(Text.class); 32 job2.setOutputValueClass(DoubleWritable.class); 33 34 Path inputPath2 = new Path("D:\\MR\\hw\\work3\\output_hw1_1"); 35 Path outputPath2 = new Path("D:\\MR\\hw\\work3\\output_hw1_end"); 36 37 FileInputFormat.setInputPaths(job2, inputPath2); 38 FileOutputFormat.setOutputPath(job2, outputPath2); 39 40 JobControl control = new JobControl("AvgScore"); 41 42 ControlledJob aJob = new ControlledJob(job1.getConfiguration()); 43 ControlledJob bJob = new ControlledJob(job2.getConfiguration()); 44 45 bJob.addDependingJob(aJob); 46 47 control.addJob(aJob); 48 control.addJob(bJob); 49 50 Thread thread = new Thread(control); 51 thread.start(); 52 53 while(!control.allFinished()) { 54 thread.sleep(1000); 55 } 56 System.exit(0); 57 58 } 59 60 61 62 /** 63 * 数据类型:computer,huangxiaoming,85,86,41,75,93,42,85 64 * 65 * 需求:统计每门课程的参考人数和课程平均分 66 * 67 * 分析:以课程名称+姓名作为key,以平均分数作为value 68 * */ 69 public static class AvgScoreMapper1 extends Mapper<LongWritable, Text, Text, DoubleWritable>{ 70 71 @Override 72 protected void map(LongWritable key, Text value,Context context) 73 throws IOException, InterruptedException { 74 75 String[] splits = value.toString().split(","); 76 //拼接成要输出的key 77 String outKey = splits[0]+"\t"+splits[1]; 78 int length = splits.length; 79 int sum = 0; 80 //求出成绩的总和 81 for(int i=2;i<length;i++) { 82 sum += Integer.parseInt(splits[i]); 83 } 84 //求出平均分 85 double outValue = sum / (length - 2); 86 87 context.write(new Text(outKey), new DoubleWritable(outValue)); 88 89 } 90 91 } 92 93 /** 94 * 对第一次MapReduce输出的结果进一步计算,第一步输出结果样式为 95 * math huangjiaju 82.0 96 * math huanglei 74.0 97 * math huangxiaoming 83.0 98 * math liujialing 72.0 99 * math liutao 56.0 100 * math wangbaoqiang 72.0 101 * math xuzheng 69.0 102 * 103 * 需求:统计每门课程的参考人数和课程平均分 104 * 分析:以课程名称作为key,以分数作为value进行 输出 105 * 106 * */ 107 public static class AvgScoreMapper2 extends Mapper<LongWritable, Text, Text, DoubleWritable>{ 108 109 @Override 110 protected void map(LongWritable key, Text value,Context context) 111 throws IOException, InterruptedException { 112 113 String[] splits = value.toString().split("\t"); 114 String outKey = splits[0]; 115 String outValue = splits[2]; 116 117 context.write(new Text(outKey), new DoubleWritable(Double.parseDouble(outValue))); 118 } 119 120 } 121 122 /** 123 * 针对同一门课程,对values进行遍历计数,看看有多少人参加了考试,并计算出平均成绩 124 * */ 125 public static class AvgScoreReducer2 extends Reducer<Text, DoubleWritable, Text, Text>{ 126 127 @Override 128 protected void reduce(Text key, Iterable<DoubleWritable> values, 129 Context context) throws IOException, InterruptedException { 130 131 int count = 0; 132 double sum = 0; 133 for(DoubleWritable value : values) { 134 count++; 135 sum += value.get(); 136 } 137 138 double avg = sum / count; 139 String outValue = count + "\t" + avg; 140 context.write(key, new Text(outValue)); 141 } 142 143 } 144 145 146 }

第二题

MRAvgScore2.java


1 public class MRAvgScore2 { 2 3 public static void main(String[] args) throws Exception { 4 5 Configuration conf = new Configuration(); 6 7 Job job = Job.getInstance(conf); 8 9 job.setJarByClass(MRAvgScore2.class); 10 job.setMapperClass(ScoreMapper3.class); 11 job.setReducerClass(ScoreReducer3.class); 12 13 job.setOutputKeyClass(StudentBean.class); 14 job.setOutputValueClass(NullWritable.class); 15 16 job.setPartitionerClass(CoursePartitioner.class); 17 job.setNumReduceTasks(4); 18 19 Path inputPath = new Path("D:\\MR\\hw\\work3\\output_hw1_1"); 20 Path outputPath = new Path("D:\\MR\\hw\\work3\\output_hw2_1"); 21 22 FileInputFormat.setInputPaths(job, inputPath); 23 FileOutputFormat.setOutputPath(job, outputPath); 24 boolean isDone = job.waitForCompletion(true); 25 System.exit(isDone ? 0 : 1); 26 } 27 28 29 public static class ScoreMapper3 extends Mapper<LongWritable, Text, StudentBean, NullWritable>{ 30 31 @Override 32 protected void map(LongWritable key, Text value,Context context) 33 throws IOException, InterruptedException { 34 35 String[] splits = value.toString().split("\t"); 36 37 double score = Double.parseDouble(splits[2]); 38 DecimalFormat df = new DecimalFormat("#.0"); 39 df.format(score); 40 41 StudentBean student = new StudentBean(splits[0],splits[1],score); 42 43 context.write(student, NullWritable.get()); 44 45 } 46 47 } 48 49 public static class ScoreReducer3 extends Reducer<StudentBean, NullWritable, StudentBean, NullWritable>{ 50 51 @Override 52 protected void reduce(StudentBean key, Iterable<NullWritable> values,Context context) 53 throws IOException, InterruptedException { 54 55 for(NullWritable nvl : values){ 56 context.write(key, nvl); 57 } 58 59 } 60 } 61 }

StudentBean.java


1 public class StudentBean implements WritableComparable<StudentBean>{ 2 private String course; 3 private String name; 4 private double avgScore; 5 6 public String getCourse() { 7 return course; 8 } 9 public void setCourse(String course) { 10 this.course = course; 11 } 12 public String getName() { 13 return name; 14 } 15 public void setName(String name) { 16 this.name = name; 17 } 18 public double getavgScore() { 19 return avgScore; 20 } 21 public void setavgScore(double avgScore) { 22 this.avgScore = avgScore; 23 } 24 public StudentBean(String course, String name, double avgScore) { 25 super(); 26 this.course = course; 27 this.name = name; 28 this.avgScore = avgScore; 29 } 30 public StudentBean() { 31 super(); 32 } 33 34 @Override 35 public String toString() { 36 return course + "\t" + name + "\t" + avgScore; 37 } 38 @Override 39 public void readFields(DataInput in) throws IOException { 40 course = in.readUTF(); 41 name = in.readUTF(); 42 avgScore = in.readDouble(); 43 } 44 @Override 45 public void write(DataOutput out) throws IOException { 46 out.writeUTF(course); 47 out.writeUTF(name); 48 out.writeDouble(avgScore); 49 } 50 @Override 51 public int compareTo(StudentBean stu) { 52 double diffent = this.avgScore - stu.avgScore; 53 if(diffent == 0) { 54 return 0; 55 }else { 56 return diffent > 0 ? -1 : 1; 57 } 58 } 59 60 61 }

第三题

MRScore3.java


1 public class MRScore3 { 2 3 public static void main(String[] args) throws Exception { 4 5 Configuration conf1 = new Configuration(); 6 Configuration conf2 = new Configuration(); 7 8 Job job1 = Job.getInstance(conf1); 9 Job job2 = Job.getInstance(conf2); 10 11 job1.setJarByClass(MRScore3.class); 12 job1.setMapperClass(MRMapper3_1.class); 13 //job.setReducerClass(ScoreReducer3.class); 14 15 16 job1.setMapOutputKeyClass(IntWritable.class); 17 job1.setMapOutputValueClass(StudentBean.class); 18 job1.setOutputKeyClass(IntWritable.class); 19 job1.setOutputValueClass(StudentBean.class); 20 21 job1.setPartitionerClass(CoursePartitioner2.class); 22 23 job1.setNumReduceTasks(4); 24 25 Path inputPath = new Path("D:\\MR\\hw\\work3\\input"); 26 Path outputPath = new Path("D:\\MR\\hw\\work3\\output_hw3_1"); 27 28 FileInputFormat.setInputPaths(job1, inputPath); 29 FileOutputFormat.setOutputPath(job1, outputPath); 30 31 job2.setMapperClass(MRMapper3_2.class); 32 job2.setReducerClass(MRReducer3_2.class); 33 34 job2.setMapOutputKeyClass(IntWritable.class); 35 job2.setMapOutputValueClass(StudentBean.class); 36 job2.setOutputKeyClass(StudentBean.class); 37 job2.setOutputValueClass(NullWritable.class); 38 39 Path inputPath2 = new Path("D:\\MR\\hw\\work3\\output_hw3_1"); 40 Path outputPath2 = new Path("D:\\MR\\hw\\work3\\output_hw3_end"); 41 42 FileInputFormat.setInputPaths(job2, inputPath2); 43 FileOutputFormat.setOutputPath(job2, outputPath2); 44 45 JobControl control = new JobControl("Score3"); 46 47 ControlledJob aJob = new ControlledJob(job1.getConfiguration()); 48 ControlledJob bJob = new ControlledJob(job2.getConfiguration()); 49 50 bJob.addDependingJob(aJob); 51 52 control.addJob(aJob); 53 control.addJob(bJob); 54 55 Thread thread = new Thread(control); 56 thread.start(); 57 58 while(!control.allFinished()) { 59 thread.sleep(1000); 60 } 61 System.exit(0); 62 63 64 } 65 66 67 68 69 public static class MRMapper3_1 extends Mapper<LongWritable, Text, IntWritable, StudentBean>{ 70 71 StudentBean outKey = new StudentBean(); 72 IntWritable outValue = new IntWritable(); 73 List<String> scoreList = new ArrayList<>(); 74 75 protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException { 76 77 scoreList.clear(); 78 String[] splits = value.toString().split(","); 79 long sum = 0; 80 81 for(int i=2;i<splits.length;i++) { 82 scoreList.add(splits[i]); 83 sum += Long.parseLong(splits[i]); 84 } 85 86 Collections.sort(scoreList); 87 outValue.set(Integer.parseInt(scoreList.get(scoreList.size()-1))); 88 89 double avg = sum * 1.0/(splits.length-2); 90 outKey.setCourse(splits[0]); 91 outKey.setName(splits[1]); 92 outKey.setavgScore(avg); 93 94 context.write(outValue, outKey); 95 96 }; 97 } 98 99 100 101 public static class MRMapper3_2 extends Mapper<LongWritable, Text,IntWritable, StudentBean >{ 102 103 StudentBean outValue = new StudentBean(); 104 IntWritable outKey = new IntWritable(); 105 106 protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException { 107 108 String[] splits = value.toString().split("\t"); 109 outKey.set(Integer.parseInt(splits[0])); 110 111 outValue.setCourse(splits[1]); 112 outValue.setName(splits[2]); 113 outValue.setavgScore(Double.parseDouble(splits[3])); 114 115 context.write(outKey, outValue); 116 117 118 }; 119 } 120 121 122 public static class MRReducer3_2 extends Reducer<IntWritable, StudentBean, StudentBean, NullWritable>{ 123 124 StudentBean outKey = new StudentBean(); 125 126 @Override 127 protected void reduce(IntWritable key, Iterable<StudentBean> values,Context context) 128 throws IOException, InterruptedException { 129 130 int length = values.toString().length(); 131 132 for(StudentBean value : values) { 133 outKey = value; 134 } 135 136 context.write(outKey, NullWritable.get()); 137 138 } 139 } 140 141 142 }
赞(0) 打赏

如未加特殊说明,此网站文章均为原创,转载必须注明出处。Java 技术驿站 » Hadoop学习之路(二十五)MapReduce的API使用(一)
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

关注【Java 技术驿站】公众号,每天早上 8:10 为你推送一篇技术文章

扫描二维码关注我!


关注【Java 技术驿站】公众号 回复 “VIP”,获取 VIP 地址永久关闭弹出窗口

免费获取资源

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏