如何简单使用Hadoop中MapReduce来快速处理千万行Json格式数据

需求:

每行数据格式”{\”movie\”:\”2599\”,\”rate\”:\”5\”,\”timeStamp\”:\”957716949\”,\”uid\”:\”6040\”}”

从其中计算出每个用户评分最高的十步电影movie值和rate值

输出为uid:…movie…rate…

思路:

map端先将读取的json数据转成pojo对象,所以要创建一个bean用来接收每行的json数据,然后将用户uid作物key,对象作为value放入context中。

reduce端将相同key值的pojo对象放入list集合,使用Collections.sort()方法对其按照rate值排序。最后遍历排序后的前十条记录写入context中。

代码:

bean对象

package com.season.mapper;
 
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
 
import org.apache..io.Writable;
 
public class MovieBean implements Writable{
 
	private Long movie;
	private Long rate;
	private Long timeStamp;
	private Long uid;
	
	public MovieBean() {
	}
 
	public MovieBean(Long movie, Long rate, Long timeStamp, Long uid) {
		super();
		this.movie = movie;
		this.rate = rate;
		this.timeStamp = timeStamp;
		this.uid = uid;
	}
 
	public Long getMovie() {
		return movie;
	}
 
	public void setMovie(Long movie) {
		this.movie = movie;
	}
 
	public Long getRate() {
		return rate;
	}
 
	public void setRate(Long rate) {
		this.rate = rate;
	}
 
	public Long getTimeStamp() {
		return timeStamp;
	}
 
	public void setTimeStamp(Long timeStamp) {
		this.timeStamp = timeStamp;
	}
 
	public Long getUid() {
		return uid;
	}
 
	public void setUid(Long uid) {
		this.uid = uid;
	}
 
	@Override
	public void readFields(DataInput in) throws IOException {
		this.movie = in.readLong();
		this.rate = in.readLong();
		this.timeStamp = in.readLong();
		this.uid = in.readLong();
	}
 
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(movie);
		out.writeLong(rate);
		out.writeLong(timeStamp);
		out.writeLong(uid);
	}
 
}

map:

package com.season.mapper;
 
import java.io.IOException;
 
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop..Mapper;
 
public class MovieMapper extends Mapper<LongWritable, Text, LongWritable, MovieBean>{
	
	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		MovieBean movie = JsonUtils.jsonToPojo(value.toString(), MovieBean.class);
		
		context.write(new LongWritable(movie.getUid()),movie);
		
	}
}

reduce:

package com.season.mapper;
 
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
 
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
 
public class MovieReducer extends Reducer<LongWritable, MovieBean, LongWritable, Text> {
 
	@Override
	protected void reduce(LongWritable key, Iterable<MovieBean> values, Context context) throws IOException, InterruptedException {
 
		List<MovieBean> mList = new ArrayList<MovieBean>();
 
		for (MovieBean movie : values) {
			MovieBean bean = new MovieBean();
			bean.setMovie(movie.getMovie());
			bean.setRate(movie.getRate());
			bean.setTimeStamp(movie.getTimeStamp());
			bean.setUid(movie.getUid());
			mList.add(bean);
		}
		
		Collections.sort(mList, new Comparator<MovieBean>(){
 
			@Override
			public int compare(MovieBean o1, MovieBean o2) {
				if (o1.getRate().longValue() < o2.getRate().longValue()) {
					return 1;
				} 
				if (o1.getRate().equals(o2.getRate())) {
					return 0;
				}
				return -1;
			}
			
		});
 
		for (int i = 0; i < 10; i++) {
			context.write(key, new Text("rate: "+mList.get(i).getRate()+" movie: "+ mList.get(i).getMovie()));
		}
	}
 
}

MovieJobSubmitter:

package com.season.mapper;
 
import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 
public class MovieJobSubmitter {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJar("/root/movie.jar");
		//job.setJarByClass(MovieJobSubmitter.class);
		
		job.setMapperClass(MovieMapper.class);
		job.setReducerClass(MovieReducer.class);
		
		job.setMapOutputKeyClass(LongWritable.class);
		job.setMapOutputValueClass(MovieBean.class);
		
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(Text.class);
		
		job.setInputFormatClass(TextInputFormat.class);
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		
		job.setOutputFormatClass(TextOutputFormat.class);
		TextOutputFormat.setOutputPath(job, new Path(args[1]));
		
		boolean res = job.waitForCompletion(true);
		
		System.exit(res?1:0);
		
	}
}

最后将数据文件放入hdfs集群中,将程序打成jar包放入hadoop客户端中
使用命令行:hadoop jar -XX.jar main方法全路径 输入目录 输出目录

最后可以得打如下处理结果

6036    rate: 5 movie: 593
6036    rate: 5 movie: 594
6036    rate: 5 movie: 3022
6036    rate: 5 movie: 3028
6036    rate: 5 movie: 3030
6036    rate: 5 movie: 3034
6036    rate: 5 movie: 903
6037    rate: 5 movie: 593
6037    rate: 5 movie: 903
6037    rate: 5 movie: 904
6037    rate: 5 movie: 910
6037    rate: 5 movie: 924
6037    rate: 5 movie: 926
6037    rate: 5 movie: 928
6037    rate: 5 movie: 3095
6037    rate: 5 movie: 3471
6037    rate: 5 movie: 260
6038    rate: 5 movie: 3088
6038    rate: 5 movie: 1148
6038    rate: 5 movie: 1183
6038    rate: 5 movie: 1223
6038    rate: 5 movie: 1296
6038    rate: 5 movie: 1079
6038    rate: 4 movie: 1419
6038    rate: 4 movie: 232
6038    rate: 4 movie: 1136
6038    rate: 4 movie: 2146
6039    rate: 5 movie: 3037
6039    rate: 5 movie: 903

点赞

发表评论

电子邮件地址不会被公开。 必填项已用*标注