Hadoop学习之路(二十六)MapReduce的API使用(二)

作者:扎心了,老铁

出处:https://www.cnblogs.com/qingyunzong/category/1169344.html


数据及需求

数据格式

movies.dat  3884条数据


1::Toy Story (1995)::Animation|Children's|Comedy 2::Jumanji (1995)::Adventure|Children's|Fantasy 3::Grumpier Old Men (1995)::Comedy|Romance 4::Waiting to Exhale (1995)::Comedy|Drama 5::Father of the Bride Part II (1995)::Comedy 6::Heat (1995)::Action|Crime|Thriller 7::Sabrina (1995)::Comedy|Romance 8::Tom and Huck (1995)::Adventure|Children's 9::Sudden Death (1995)::Action 10::GoldenEye (1995)::Action|Adventure|Thriller

users.dat  6041条数据


1::F::1::10::48067 2::M::56::16::70072 3::M::25::15::55117 4::M::45::7::02460 5::M::25::20::55455 6::F::50::9::55117 7::M::35::1::06810 8::M::25::12::11413 9::M::25::17::61614 10::F::35::1::95370

ratings.dat  1000210条数据


1::1193::5::978300760 1::661::3::978302109 1::914::3::978301968 1::3408::4::978300275 1::2355::5::978824291 1::1197::3::978302268 1::1287::5::978302039 1::2804::5::978300719 1::594::4::978302268 1::919::4::978301368

数据解释

1、users.dat 数据格式为: 2::M::56::16::70072
对应字段为:UserID BigInt, Gender String, Age Int, Occupation String, Zipcode String
对应字段中文解释:用户id,性别,年龄,职业,邮政编码

2、movies.dat 数据格式为: 2::Jumanji (1995)::Adventure|Children’s|Fantasy
对应字段为:MovieID BigInt, Title String, Genres String
对应字段中文解释:电影ID,电影名字,电影类型

3、ratings.dat 数据格式为: 1::1193::5::978300760
对应字段为:UserID BigInt, MovieID BigInt, Rating Double, Timestamped String
对应字段中文解释:用户ID,电影ID,评分,评分时间戳

用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型
userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType

需求统计

(1)求被评分次数最多的10部电影,并给出评分次数(电影名,评分次数)
(2)分别求男性,女性当中评分最高的10部电影(性别,电影名,评分)
(3)求movieid = 2116这部电影各年龄段(因为年龄就只有7个,就按这个7个分就好了)的平均影评(年龄段,评分)
(4)求最喜欢看电影(影评次数最多)的那位女性评最高分的10部电影的平均影评分(人,电影名,影评)
(5)求好片(评分>=4.0)最多的那个年份的最好看的10部电影
(6)求1997年上映的电影中,评分最高的10部Comedy类电影
(7)该影评库中各种类型电影中评价最高的5部电影(类型,电影名,平均影评分)
(8)各年评分最高的电影类型(年份,类型,影评分)
(9)每个地区最高评分的电影名,把结果存入HDFS(地区,电影名,电影评分)

代码实现

1、求被评分次数最多的10部电影,并给出评分次数(电影名,评分次数)

分析:此问题涉及到2个文件,ratings.dat和movies.dat,2个文件数据量倾斜比较严重,此处应该使用mapjoin方法,先将数据量较小的文件预先加载到内存中

MovieMR1_1.java


1 public class MovieMR1_1 { 2 3 public static void main(String[] args) throws Exception { 4 5 if(args.length < 4) { 6 args = new String[4]; 7 args[0] = "/movie/input/"; 8 args[1] = "/movie/output/"; 9 args[2] = "/movie/cache/movies.dat"; 10 args[3] = "/movie/output_last/"; 11 } 12 13 14 Configuration conf1 = new Configuration(); 15 conf1.set("fs.defaultFS", "hdfs://hadoop1:9000/"); 16 System.setProperty("HADOOP_USER_NAME", "hadoop"); 17 FileSystem fs1 = FileSystem.get(conf1); 18 19 20 Job job1 = Job.getInstance(conf1); 21 22 job1.setJarByClass(MovieMR1_1.class); 23 24 job1.setMapperClass(MoviesMapJoinRatingsMapper1.class); 25 job1.setReducerClass(MovieMR1Reducer1.class); 26 27 job1.setMapOutputKeyClass(Text.class); 28 job1.setMapOutputValueClass(IntWritable.class); 29 30 job1.setOutputKeyClass(Text.class); 31 job1.setOutputValueClass(IntWritable.class); 32 33 34 35 //缓存普通文件到task运行节点的工作目录 36 URI uri = new URI("hdfs://hadoop1:9000"+args[2]); 37 System.out.println(uri); 38 job1.addCacheFile(uri); 39 40 41 Path inputPath1 = new Path(args[0]); 42 Path outputPath1 = new Path(args[1]); 43 if(fs1.exists(outputPath1)) { 44 fs1.delete(outputPath1, true); 45 } 46 FileInputFormat.setInputPaths(job1, inputPath1); 47 FileOutputFormat.setOutputPath(job1, outputPath1); 48 49 boolean isDone = job1.waitForCompletion(true); 50 System.exit(isDone ? 0 : 1); 51 52 } 53 54 public static class MoviesMapJoinRatingsMapper1 extends Mapper<LongWritable, Text, Text, IntWritable>{ 55 56 //用了存放加载到内存中的movies.dat数据 57 private static Map<String,String> movieMap = new HashMap<>(); 58 //key:电影ID 59 Text outKey = new Text(); 60 //value:电影名+电影类型 61 IntWritable outValue = new IntWritable(); 62 63 64 /** 65 * movies.dat: 1::Toy Story (1995)::Animation|Children's|Comedy 66 * 67 * 68 * 将小表(movies.dat)中的数据预先加载到内存中去 69 * */ 70 @Override 71 protected void setup(Context context) throws IOException, InterruptedException { 72 73 Path[] localCacheFiles = context.getLocalCacheFiles(); 74 75 76 String strPath = localCacheFiles[0].toUri().toString(); 77 78 BufferedReader br = new BufferedReader(new FileReader(strPath)); 79 String readLine; 80 while((readLine = br.readLine()) != null) { 81 82 String[] split = readLine.split("::"); 83 String movieId = split[0]; 84 String movieName = split[1]; 85 String movieType = split[2]; 86 87 movieMap.put(movieId, movieName+"\t"+movieType); 88 } 89 90 br.close(); 91 } 92 93 94 /** 95 * movies.dat: 1 :: Toy Story (1995) :: Animation|Children's|Comedy 96 * 电影ID 电影名字 电影类型 97 * 98 * ratings.dat: 1 :: 1193 :: 5 :: 978300760 99 * 用户ID 电影ID 评分 评分时间戳 100 * 101 * value: ratings.dat读取的数据 102 * */ 103 @Override 104 protected void map(LongWritable key, Text value, Context context) 105 throws IOException, InterruptedException { 106 107 String[] split = value.toString().split("::"); 108 109 String userId = split[0]; 110 String movieId = split[1]; 111 String movieRate = split[2]; 112 113 //根据movieId从内存中获取电影名和类型 114 String movieNameAndType = movieMap.get(movieId); 115 String movieName = movieNameAndType.split("\t")[0]; 116 String movieType = movieNameAndType.split("\t")[1]; 117 118 outKey.set(movieName); 119 outValue.set(Integer.parseInt(movieRate)); 120 121 context.write(outKey, outValue); 122 123 } 124 125 } 126 127 128 public static class MovieMR1Reducer1 extends Reducer<Text, IntWritable, Text, IntWritable>{ 129 //每部电影评论的次数 130 int count; 131 //评分次数 132 IntWritable outValue = new IntWritable(); 133 134 @Override 135 protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { 136 137 count = 0; 138 139 for(IntWritable value : values) { 140 count++; 141 } 142 143 outValue.set(count); 144 145 context.write(key, outValue); 146 } 147 148 } 149 150 151 }

MovieMR1_2.java


1 public class MovieMR1_2 { 2 3 public static void main(String[] args) throws Exception { 4 if(args.length < 2) { 5 args = new String[2]; 6 args[0] = "/movie/output/"; 7 args[1] = "/movie/output_last/"; 8 } 9 10 11 Configuration conf1 = new Configuration(); 12 conf1.set("fs.defaultFS", "hdfs://hadoop1:9000/"); 13 System.setProperty("HADOOP_USER_NAME", "hadoop"); 14 FileSystem fs1 = FileSystem.get(conf1); 15 16 17 Job job = Job.getInstance(conf1); 18 19 job.setJarByClass(MovieMR1_2.class); 20 21 job.setMapperClass(MoviesMapJoinRatingsMapper2.class); 22 job.setReducerClass(MovieMR1Reducer2.class); 23 24 25 job.setMapOutputKeyClass(MovieRating.class); 26 job.setMapOutputValueClass(NullWritable.class); 27 28 job.setOutputKeyClass(MovieRating.class); 29 job.setOutputValueClass(NullWritable.class); 30 31 32 Path inputPath1 = new Path(args[0]); 33 Path outputPath1 = new Path(args[1]); 34 if(fs1.exists(outputPath1)) { 35 fs1.delete(outputPath1, true); 36 } 37 //对第一步的输出结果进行降序排序 38 FileInputFormat.setInputPaths(job, inputPath1); 39 FileOutputFormat.setOutputPath(job, outputPath1); 40 41 boolean isDone = job.waitForCompletion(true); 42 System.exit(isDone ? 0 : 1); 43 44 45 } 46 47 //注意输出类型为自定义对象MovieRating,MovieRating按照降序排序 48 public static class MoviesMapJoinRatingsMapper2 extends Mapper<LongWritable, Text, MovieRating, NullWritable>{ 49 50 MovieRating outKey = new MovieRating(); 51 52 @Override 53 protected void map(LongWritable key, Text value, Context context) 54 throws IOException, InterruptedException { 55 //'Night Mother (1986) 70 56 String[] split = value.toString().split("\t"); 57 58 outKey.setCount(Integer.parseInt(split[1]));; 59 outKey.setMovieName(split[0]); 60 61 context.write(outKey, NullWritable.get()); 62 63 } 64 65 } 66 67 //排序之后自然输出,只取前10部电影 68 public static class MovieMR1Reducer2 extends Reducer<MovieRating, NullWritable, MovieRating, NullWritable>{ 69 70 Text outKey = new Text(); 71 int count = 0; 72 73 @Override 74 protected void reduce(MovieRating key, Iterable<NullWritable> values,Context context) throws IOException, InterruptedException { 75 76 for(NullWritable value : values) { 77 count++; 78 if(count > 10) { 79 return; 80 } 81 context.write(key, value); 82 83 } 84 85 } 86 87 } 88 }

MovieRating.java


1 public class MovieRating implements WritableComparable<MovieRating>{ 2 private String movieName; 3 private int count; 4 5 public String getMovieName() { 6 return movieName; 7 } 8 public void setMovieName(String movieName) { 9 this.movieName = movieName; 10 } 11 public int getCount() { 12 return count; 13 } 14 public void setCount(int count) { 15 this.count = count; 16 } 17 18 public MovieRating() {} 19 20 public MovieRating(String movieName, int count) { 21 super(); 22 this.movieName = movieName; 23 this.count = count; 24 } 25 26 27 @Override 28 public String toString() { 29 return movieName + "\t" + count; 30 } 31 @Override 32 public void readFields(DataInput in) throws IOException { 33 movieName = in.readUTF(); 34 count = in.readInt(); 35 } 36 @Override 37 public void write(DataOutput out) throws IOException { 38 out.writeUTF(movieName); 39 out.writeInt(count); 40 } 41 @Override 42 public int compareTo(MovieRating o) { 43 return o.count - this.count ; 44 } 45 46 }

2、分别求男性,女性当中评分最高的10部电影(性别,电影名,评分)

分析:此问题涉及到3个表的联合查询,需要先将2个小表的数据预先加载到内存中,再进行查询

对三表进行联合

MoviesThreeTableJoin.java


1 /** 2 * 进行3表的联合查询 3 * 4 * */ 5 public class MoviesThreeTableJoin { 6 7 public static void main(String[] args) throws Exception { 8 9 if(args.length < 4) { 10 args = new String[4]; 11 args[0] = "/movie/input/"; 12 args[1] = "/movie/output2/"; 13 args[2] = "/movie/cache/movies.dat"; 14 args[3] = "/movie/cache/users.dat"; 15 } 16 17 Configuration conf = new Configuration(); 18 conf.set("fs.defaultFS", "hdfs://hadoop1:9000/"); 19 System.setProperty("HADOOP_USER_NAME", "hadoop"); 20 FileSystem fs = FileSystem.get(conf); 21 Job job = Job.getInstance(conf); 22 23 job.setJarByClass(MoviesThreeTableJoin.class); 24 job.setMapperClass(ThreeTableMapper.class); 25 26 job.setOutputKeyClass(Text.class); 27 job.setOutputValueClass(NullWritable.class); 28 29 URI uriUsers = new URI("hdfs://hadoop1:9000"+args[3]); 30 URI uriMovies = new URI("hdfs://hadoop1:9000"+args[2]); 31 job.addCacheFile(uriUsers); 32 job.addCacheFile(uriMovies); 33 34 Path inputPath = new Path(args[0]); 35 Path outputPath = new Path(args[1]); 36 37 if(fs.exists(outputPath)) { 38 fs.delete(outputPath,true); 39 } 40 41 FileInputFormat.setInputPaths(job, inputPath); 42 FileOutputFormat.setOutputPath(job, outputPath); 43 44 boolean isDone = job.waitForCompletion(true); 45 System.exit(isDone ? 0 : 1); 46 47 } 48 49 50 public static class ThreeTableMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ 51 52 53 //用于缓存movies和users中数据 54 private Map<String,String> moviesMap = new HashMap<>(); 55 private Map<String,String> usersMap = new HashMap<>(); 56 //用来存放读取的ratings.dat中的一行数据 57 String[] ratings; 58 59 60 Text outKey = new Text(); 61 62 @Override 63 protected void setup(Context context) throws IOException, InterruptedException { 64 65 BufferedReader br = null; 66 67 Path[] paths = context.getLocalCacheFiles(); 68 String usersLine = null; 69 String moviesLine = null; 70 71 for(Path path : paths) { 72 String name = path.toUri().getPath(); 73 if(name.contains("movies.dat")) { 74 //读取movies.dat文件中的一行数据 75 br = new BufferedReader(new FileReader(name)); 76 while((moviesLine = br.readLine()) != null) { 77 /**对读取的这行数据按照::进行切分 78 * 2::Jumanji (1995)::Adventure|Children's|Fantasy 79 * 电影ID,电影名字,电影类型 80 * 81 *电影ID作为key,其余作为value 82 */ 83 String[] split = moviesLine.split("::"); 84 moviesMap.put(split[0], split[1]+"::"+split[2]); 85 } 86 }else if(name.contains("users.dat")) { 87 //读取users.dat文件中的一行数据 88 br = new BufferedReader(new FileReader(name)); 89 while((usersLine = br.readLine()) != null) { 90 /** 91 * 对读取的这行数据按照::进行切分 92 * 2::M::56::16::70072 93 * 用户id,性别,年龄,职业,邮政编码 94 * 95 * 用户ID作为key,其他的作为value 96 * */ 97 String[] split = usersLine.split("::"); 98 System.out.println(split[0]+"----"+split[1]); 99 usersMap.put(split[0], split[1]+"::"+split[2]+"::"+split[3]+"::"+split[4]); 100 } 101 } 102 103 } 104 105 } 106 107 108 @Override 109 protected void map(LongWritable key, Text value, Context context) 110 throws IOException, InterruptedException { 111 112 ratings = value.toString().split("::"); 113 //通过电影ID和用户ID获取用户表和电影表中的其他信息 114 String movies = moviesMap.get(ratings[1]); 115 String users = usersMap.get(ratings[0]); 116 117 //三表信息的联合 118 String threeTables = value.toString()+"::"+movies+"::"+users; 119 outKey.set(threeTables); 120 121 context.write(outKey, NullWritable.get()); 122 } 123 } 124 125 126 }

三表联合之后的数据为

1000::1023::5::975041651::Winnie the Pooh and the Blustery Day (1968)::Animation|Children's::F::25::6::90027
1000::1029::3::975041859::Dumbo (1941)::Animation|Children's|Musical::F::25::6::90027
1000::1036::4::975040964::Die Hard (1988)::Action|Thriller::F::25::6::90027
1000::1104::5::975042421::Streetcar Named Desire, A (1951)::Drama::F::25::6::90027
1000::110::5::975040841::Braveheart (1995)::Action|Drama|War::F::25::6::90027
1000::1196::3::975040841::Star Wars: Episode V - The Empire Strikes Back (1980)::Action|Adventure|Drama|Sci-Fi|War::F::25::6::90027
1000::1198::5::975040841::Raiders of the Lost Ark (1981)::Action|Adventure::F::25::6::90027
1000::1200::4::975041125::Aliens (1986)::Action|Sci-Fi|Thriller|War::F::25::6::90027
1000::1201::5::975041025::Good, The Bad and The Ugly, The (1966)::Action|Western::F::25::6::90027
1000::1210::5::975040629::Star Wars: Episode VI - Return of the Jedi (1983)::Action|Adventure|Romance|Sci-Fi|War::F::25::6::90027

字段解释


1000 :: 1036 :: 4 :: 975040964 :: Die Hard (1988) :: Action|Thriller :: F :: 25 :: 6 :: 90027 用户ID 电影ID 评分    评分时间戳 电影名字 电影类型 性别 年龄 职业 邮政编码 0        1        2        3            4              5            6      7      8       9

要分别求男性,女性当中评分最高的10部电影(性别,电影名,评分)

1、以性别和电影名分组,以电影名+性别为key,以评分为value进行计算;

2、以性别+电影名+评分作为对象,以性别分组,以评分降序进行输出TOP10

业务逻辑:MoviesDemo2.java

  1 public class MoviesDemo2 {
  2
  3     public static void main(String[] args) throws Exception {
  4         
  5         Configuration conf1 = new Configuration();
  6         Configuration conf2 = new Configuration();
  7         FileSystem fs1 = FileSystem.get(conf1);
  8         FileSystem fs2 = FileSystem.get(conf2);
  9         Job job1 = Job.getInstance(conf1);
 10         Job job2 = Job.getInstance(conf2);
 11         
 12         job1.setJarByClass(MoviesDemo2.class);
 13         job1.setMapperClass(MoviesDemo2Mapper1.class);
 14         job2.setMapperClass(MoviesDemo2Mapper2.class);
 15         job1.setReducerClass(MoviesDemo2Reducer1.class);
 16         job2.setReducerClass(MoviesDemo2Reducer2.class);
 17         
 18         job1.setOutputKeyClass(Text.class);
 19         job1.setOutputValueClass(DoubleWritable.class);
 20         
 21         job2.setOutputKeyClass(MoviesSexBean.class);
 22         job2.setOutputValueClass(NullWritable.class);
 23         
 24         job2.setGroupingComparatorClass(MoviesSexGC.class);
 25         
 26         Path inputPath1 = new Path("D:\\MR\\hw\\movie\\output3he1");
 27         Path outputPath1 = new Path("D:\\MR\\hw\\movie\\output2_1");
 28         Path inputPath2 = new Path("D:\\MR\\hw\\movie\\output2_1");
 29         Path outputPath2 = new Path("D:\\MR\\hw\\movie\\output2_end");
 30         
 31         if(fs1.exists(outputPath1)) {
 32             fs1.delete(outputPath1,true);
 33         }
 34         if(fs2.exists(outputPath2)) {
 35             fs2.delete(outputPath2,true);
 36         }
 37         
 38         
 39         FileInputFormat.setInputPaths(job1, inputPath1);
 40         FileOutputFormat.setOutputPath(job1, outputPath1);
 41         
 42         FileInputFormat.setInputPaths(job2, inputPath2);
 43         FileOutputFormat.setOutputPath(job2, outputPath2);
 44         
 45         JobControl control = new JobControl("MoviesDemo2");
 46         
 47         ControlledJob aJob = new ControlledJob(job1.getConfiguration());
 48         ControlledJob bJob = new ControlledJob(job2.getConfiguration());
 49         
 50         bJob.addDependingJob(aJob);
 51         
 52         control.addJob(aJob);
 53         control.addJob(bJob);
 54         
 55         Thread thread = new Thread(control);
 56         thread.start();
 57         
 58         while(!control.allFinished()) {
 59             thread.sleep(1000);
 60         }
 61         System.exit(0);
 62         
 63         
 64     }
 65     
 66     
 67     /**
 68      * 数据来源:3个文件关联之后的输出文件
 69      * 以电影名+性别为key,以评分为value进行输出
 70      *
 71      * 1000::1036::4::975040964::Die Hard (1988)::Action|Thriller::F::25::6::90027
 72      *
 73      * 用户ID::电影ID::评分::评分时间戳::电影名字::电影类型::性别::年龄::职业::邮政编码
 74      *
 75      * */
 76     public static class MoviesDemo2Mapper1 extends Mapper<LongWritable, Text, Text, DoubleWritable>{
 77         
 78         Text outKey = new Text();
 79         DoubleWritable outValue = new DoubleWritable();
 80         
 81         @Override
 82         protected void map(LongWritable key, Text value,Context context)
 83                 throws IOException, InterruptedException {
 84
 85             String[] split = value.toString().split("::");
 86             String strKey = split[4]+"\t"+split[6];
 87             String strValue = split[2];
 88             
 89             outKey.set(strKey);
 90             outValue.set(Double.parseDouble(strValue));
 91             
 92             context.write(outKey, outValue);
 93         }
 94         
 95     }
 96     
 97     /**
 98      * 以电影名+性别为key,计算平均分
 99      * */
100     public static class MoviesDemo2Reducer1 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>{
101         
102         DoubleWritable outValue = new DoubleWritable();
103         
104         @Override
105         protected void reduce(Text key, Iterable<DoubleWritable> values,Context context)
106                 throws IOException, InterruptedException {
107             
108             int count = 0;
109             double sum = 0;
110             for(DoubleWritable value : values) {
111                 count++;
112                 sum += Double.parseDouble(value.toString());
113             }
114             double avg = sum / count;
115             
116             outValue.set(avg);
117             context.write(key, outValue);
118         }
119     }
120     
121     /**
122      * 以电影名+性别+评分作为对象,以性别分组,以评分降序排序
123      * */
124     public static class MoviesDemo2Mapper2 extends Mapper<LongWritable, Text, MoviesSexBean, NullWritable>{
125         
126         MoviesSexBean outKey = new MoviesSexBean();
127         
128         @Override
129         protected void map(LongWritable key, Text value,Context context)
130                 throws IOException, InterruptedException {
131             
132             String[] split = value.toString().split("\t");
133             outKey.setMovieName(split[0]);
134             outKey.setSex(split[1]);
135             outKey.setScore(Double.parseDouble(split[2]));
136             
137             context.write(outKey, NullWritable.get());
138         
139         }
140     }
141     
142     /**
143      * 取性别男女各前10名评分最好的电影
144      * */
145     public static class MoviesDemo2Reducer2 extends Reducer<MoviesSexBean, NullWritable, MoviesSexBean, NullWritable>{
146         
147         @Override
148         protected void reduce(MoviesSexBean key, Iterable<NullWritable> values,Context context)
149                 throws IOException, InterruptedException {
150             
151             int count = 0;
152             for(NullWritable nvl : values) {
153                 count++;
154                 context.write(key, NullWritable.get());
155                 if(count == 10) {
156                     return;
157                 }        
158             }
159         
160         }
161     }
162 }

对象:MoviesSexBean.java

 1 public class MoviesSexBean implements WritableComparable<MoviesSexBean>{
 2     
 3     private String movieName;
 4     private String sex;
 5     private double score;
 6     
 7     public MoviesSexBean() {
 8         super();
 9     }
10     public MoviesSexBean(String movieName, String sex, double score) {
11         super();
12         this.movieName = movieName;
13         this.sex = sex;
14         this.score = score;
15     }
16     public String getMovieName() {
17         return movieName;
18     }
19     public void setMovieName(String movieName) {
20         this.movieName = movieName;
21     }
22     public String getSex() {
23         return sex;
24     }
25     public void setSex(String sex) {
26         this.sex = sex;
27     }
28     public double getScore() {
29         return score;
30     }
31     public void setScore(double score) {
32         this.score = score;
33     }
34     @Override
35     public String toString() {
36         return movieName + "\t" + sex + "\t" + score ;
37     }
38     @Override
39     public void readFields(DataInput in) throws IOException {
40         movieName = in.readUTF();
41         sex = in.readUTF();
42         score = in.readDouble();
43     }
44     @Override
45     public void write(DataOutput out) throws IOException {
46         out.writeUTF(movieName);
47         out.writeUTF(sex);
48         out.writeDouble(score);
49     }
50     @Override
51     public int compareTo(MoviesSexBean o) {
52         
53         int result = this.getSex().compareTo(o.getSex());
54         if(result == 0) {
55             double diff = this.getScore() - o.getScore();
56             
57             if(diff == 0) {
58                 return 0;
59             }else {
60                 return diff > 0 ? -1 : 1;
61             }
62             
63         }else {
64             return result > 0 ? -1 : 1;
65         }
66         
67     }
68     
69     
70     
71 }

分组:MoviesSexGC.java

 1 public class MoviesSexGC extends WritableComparator{
 2     
 3     public MoviesSexGC() {
 4         super(MoviesSexBean.class,true);
 5     }
 6     
 7     @Override
 8     public int compare(WritableComparable a, WritableComparable b) {
 9         
10         MoviesSexBean msb1 = (MoviesSexBean)a;
11         MoviesSexBean msb2 = (MoviesSexBean)b;
12
13         return msb1.getSex().compareTo(msb2.getSex());
14     }
15     
16 }

3、求movieid = 2116这部电影各年龄段(因为年龄就只有7个,就按这个7个分就好了)的平均影评(年龄段,评分)

以第二部三表联合之后的文件进行操作


1 public class MovieDemo3 { 2 3 public static void main(String[] args) throws Exception { 4 5 Configuration conf = new Configuration(); 6 FileSystem fs = FileSystem.get(conf); 7 Job job = Job.getInstance(conf); 8 9 job.setJarByClass(MovieDemo3.class); 10 job.setMapperClass(MovieDemo3Mapper.class); 11 job.setReducerClass(MovieDemo3Reducer.class); 12 13 job.setOutputKeyClass(Text.class); 14 job.setOutputValueClass(DoubleWritable.class); 15 16 Path inputPath = new Path("D:\\MR\\hw\\movie\\3he1"); 17 Path outputPath = new Path("D:\\MR\\hw\\movie\\outpu3"); 18 19 if(fs.exists(outputPath)) { 20 fs.delete(outputPath,true); 21 } 22 23 FileInputFormat.setInputPaths(job, inputPath); 24 FileOutputFormat.setOutputPath(job, outputPath); 25 26 boolean isDone = job.waitForCompletion(true); 27 System.exit(isDone ? 0 : 1); 28 29 } 30 31 32 /** 33 * 1000::1036::4::975040964::Die Hard (1988)::Action|Thriller::F::25::6::90027 34 * 35 * 用户ID::电影ID::评分::评分时间戳::电影名字::电影类型::性别::年龄::职业::邮政编码 36 * 0 1 2 3 4 5 6 7 8 9 37 * 38 * key:电影ID+电影名字+年龄段 39 * value:评分 40 * 求movieid = 2116这部电影各年龄段 41 * */ 42 public static class MovieDemo3Mapper extends Mapper<LongWritable, Text, Text, DoubleWritable>{ 43 44 Text outKey = new Text(); 45 DoubleWritable outValue = new DoubleWritable(); 46 47 @Override 48 protected void map(LongWritable key, Text value, Context context) 49 throws IOException, InterruptedException { 50 51 String[] split = value.toString().split("::"); 52 int movieID = Integer.parseInt(split[1]); 53 54 if(movieID == 2116) { 55 String strKey = split[1]+"\t"+split[4]+"\t"+split[7]; 56 String strValue = split[2]; 57 58 outKey.set(strKey); 59 outValue.set(Double.parseDouble(strValue)); 60 61 context.write(outKey, outValue); 62 } 63 64 } 65 } 66 67 68 69 /** 70 * 对map的输出结果求平均评分 71 * */ 72 public static class MovieDemo3Reducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable>{ 73 74 DoubleWritable outValue = new DoubleWritable(); 75 76 @Override 77 protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) 78 throws IOException, InterruptedException { 79 80 int count = 0; 81 double sum = 0; 82 83 for(DoubleWritable value : values) { 84 count++; 85 sum += Double.parseDouble(value.toString()); 86 } 87 88 double avg = sum / count; 89 90 outValue.set(avg); 91 92 context.write(key, outValue); 93 94 } 95 96 } 97 98 }

4、求最喜欢看电影(影评次数最多)的那位女性评最高分的10部电影的平均影评分(人,电影名,影评)


1000 :: 1036 :: 4 :: 975040964 :: Die Hard (1988) :: Action|Thriller :: F :: 25 :: 6 :: 90027 用户ID 电影ID 评分    评分时间戳 电影名字 电影类型 性别 年龄 职业 邮政编码 0        1        2        3            4              5            6      7      8       9

(1)求出评论次数最多的女性ID

  MoviesDemo4_1.java

  1 public class MoviesDemo4 {
  2
  3     public static void main(String[] args) throws Exception {
  4         
  5         Configuration conf1 = new Configuration();
  6         FileSystem fs1 = FileSystem.get(conf1);
  7         Job job1 = Job.getInstance(conf1);
  8         
  9         job1.setJarByClass(MoviesDemo4.class);
 10         job1.setMapperClass(MoviesDemo4Mapper1.class);
 11         job1.setReducerClass(MoviesDemo4Reducer1.class);
 12         
 13         
 14         job1.setMapOutputKeyClass(Text.class);
 15         job1.setMapOutputValueClass(Text.class);
 16         job1.setOutputKeyClass(Text.class);
 17         job1.setOutputValueClass(DoubleWritable.class);
 18         
 19         
 20         Configuration conf2 = new Configuration();
 21         FileSystem fs2 = FileSystem.get(conf2);
 22         Job job2 = Job.getInstance(conf2);
 23         
 24         job2.setJarByClass(MoviesDemo4.class);
 25         job2.setMapperClass(MoviesDemo4Mapper2.class);
 26         job2.setReducerClass(MoviesDemo4Reducer2.class);
 27         
 28         job2.setMapOutputKeyClass(Moviegoers.class);
 29         job2.setMapOutputValueClass(NullWritable.class);
 30         job2.setOutputKeyClass(Moviegoers.class);
 31         job2.setOutputValueClass(NullWritable.class);
 32         
 33         Path inputPath1 = new Path("D:\\MR\\hw\\movie\\3he1");
 34         Path outputPath1 = new Path("D:\\MR\\hw\\movie\\outpu4_1");
 35         
 36         if(fs1.exists(outputPath1)) {
 37             fs1.delete(outputPath1,true);
 38         }
 39         
 40         FileInputFormat.setInputPaths(job1, inputPath1);
 41         FileOutputFormat.setOutputPath(job1, outputPath1);
 42         
 43         
 44         Path inputPath2 = new Path("D:\\MR\\hw\\movie\\outpu4_1");
 45         Path outputPath2 = new Path("D:\\MR\\hw\\movie\\outpu4_2");
 46         
 47         if(fs2.exists(outputPath2)) {
 48             fs2.delete(outputPath2,true);
 49         }
 50         
 51         FileInputFormat.setInputPaths(job2, inputPath2);
 52         FileOutputFormat.setOutputPath(job2, outputPath2);
 53         
 54         JobControl control = new JobControl("MoviesDemo4");
 55         
 56         ControlledJob ajob = new ControlledJob(job1.getConfiguration());
 57         ControlledJob bjob = new ControlledJob(job2.getConfiguration());
 58         
 59         bjob.addDependingJob(ajob);
 60         
 61         control.addJob(ajob);
 62         control.addJob(bjob);
 63         
 64         Thread thread = new Thread(control);
 65         thread.start();
 66         
 67         while(!control.allFinished()) {
 68             thread.sleep(1000);
 69         }
 70         System.exit(0);
 71     }
 72     
 73     /**
 74      * 1000::1036::4::975040964::Die Hard (1988)::Action|Thriller::F::25::6::90027
 75      *
 76      * 用户ID::电影ID::评分::评分时间戳::电影名字::电影类型::性别::年龄::职业::邮政编码
 77      * 0        1     2      3        4       5     6   7    8    9
 78      *
 79      * 1、key:用户ID
 80       * 2、value:电影名+评分
 81      *
 82      * */
 83     public static class MoviesDemo4Mapper1 extends Mapper<LongWritable, Text, Text, Text>{
 84         
 85         Text outKey = new Text();
 86         Text outValue = new Text();
 87         
 88         @Override
 89         protected void map(LongWritable key, Text value, Context context)
 90                 throws IOException, InterruptedException {
 91             
 92             String[] split = value.toString().split("::");
 93             
 94             String strKey = split[0];
 95             String strValue = split[4]+"\t"+split[2];
 96             
 97             if(split[6].equals("F")) {
 98                 outKey.set(strKey);
 99                 outValue.set(strValue);
100                 context.write(outKey, outValue);
101             }
102             
103         }
104         
105     }
106     
107     //统计每位女性的评论总数
108     public static class MoviesDemo4Reducer1 extends Reducer<Text, Text, Text, IntWritable>{
109         
110         IntWritable outValue = new IntWritable();
111         
112         @Override
113         protected void reduce(Text key, Iterable<Text> values, Context context)
114                 throws IOException, InterruptedException {
115
116             int count = 0;
117             for(Text value : values) {
118                 count++;
119             }
120             outValue.set(count);
121             context.write(key, outValue);
122         }
123         
124     }
125     
126     //对第一次MapReduce的输出结果进行降序排序
127     public static class MoviesDemo4Mapper2 extends Mapper<LongWritable, Text,Moviegoers,NullWritable>{
128         
129         Moviegoers outKey = new Moviegoers();
130         
131         @Override
132         protected void map(LongWritable key, Text value, Context context)
133                 throws IOException, InterruptedException {
134             
135             String[] split = value.toString().split("\t");
136             
137             outKey.setName(split[0]);
138             outKey.setCount(Integer.parseInt(split[1]));
139             context.write(outKey, NullWritable.get());
140         }
141         
142     }
143     
144     //排序之后取第一个值(评论最多的女性ID和评论次数)
145     public static class MoviesDemo4Reducer2 extends Reducer<Moviegoers,NullWritable, Moviegoers,NullWritable>{
146         
147         int count = 0;
148         
149         @Override
150         protected void reduce(Moviegoers key, Iterable<NullWritable> values,Context context)
151                 throws IOException, InterruptedException {
152
153             for(NullWritable nvl : values) {
154                 count++;
155                 if(count > 1) {
156                     return;
157                 }
158                 context.write(key, nvl);    
159             }
160         
161         }
162         
163     }
164     
165     
166 }
赞(0) 打赏

如未加特殊说明,此网站文章均为原创,转载必须注明出处。Java 技术驿站 » Hadoop学习之路(二十六)MapReduce的API使用(二)
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

关注【Java 技术驿站】公众号,每天早上 8:10 为你推送一篇技术文章

扫描二维码关注我!


关注【Java 技术驿站】公众号 回复 “VIP”,获取 VIP 地址永久关闭弹出窗口

免费获取资源

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏