`

hive-4 hql语句对应mapreduce简介

    博客分类:
  • hive
 
阅读更多

 

 

 

1.1 Join的实现原理

select u.name, o.orderid from order o join user u on o.uid = u.uid;

在map的输出value中为不同表的数据打上tag标记,在reduce阶段根据tag判断数据来源。MapReduce的过程如下:

 


 

对应map-reduce代码如下:



 
 reduce :



 

 

 

 

1.2 Group By的实现原理

<!--EndFragment-->

 

select rank, isonline, count(*) from city group by rank, isonline;

将GroupBy的字段组合为map的输出key值,利用MapReduce的排序,在reduce阶段保存LastKey区分不同的key

 



 

实现代码如下:

package mapreduce;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * 实现hive如下语句的mr代码
 * select rank, isonline, count(*) from city group by rank, isonline;
 * @author zm
 *
 */
public class GroupByApp {

	// 0 定义操作地址
	static final String FILE_ROOT = "hdfs://master:9000/";
	static final String INPUT_PATH = "hdfs://master:9000/files";
	static final String OUT_PATH = "hdfs://master:9000/out";
	
	public static void main(String[] args) throws Exception{
		
		Configuration conf = new Configuration();
		FileSystem fileSystem = FileSystem.get(new URI(FILE_ROOT),conf);
		Path outpath = new Path(OUT_PATH);
		if(fileSystem.exists(outpath)){
			fileSystem.delete(outpath, true);
		}
		
		// 0 定义干活的人
		Job job = new Job(conf);
		// 1.1 告诉干活的人 输入流位置     读取hdfs中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		// 指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
		job.setInputFormatClass(TextInputFormat.class); //用户在启动MapReduce的时候需要指定一个InputFormat的implement
		
		//1.2 指定自定义的map类
		job.setMapperClass(GroupMapper.class);
		job.setMapOutputKeyClass(GroupBy.class);
		job.setMapOutputValueClass(LongWritable.class);
		
		
		//1.3 分区
		job.setNumReduceTasks(1);
		
		//1.4 TODO 排序、分组    目前按照默认方式执行
		//1.5 TODO 规约
		
		//2.2 指定自定义reduce类
		job.setReducerClass(GroupReducer.class);
		job.setOutputKeyClass(GroupBy.class);
		job.setOutputValueClass(LongWritable.class);
		
		//2.3 指定写出到哪里
		FileOutputFormat.setOutputPath(job, outpath);
		job.setOutputFormatClass(TextOutputFormat.class);
		
		// 让干活的人干活
		job.waitForCompletion(true);
		
	}
}

class GroupMapper extends Mapper<LongWritable, Text, GroupBy, LongWritable> {

	@Override
	protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
		String v1 = value1.toString();
		String[] splits = v1.split(",");
		//GroupBy groupBy = new GroupBy(Long.parseLong(splits[0]),Long.parseLong(splits[1]));
		GroupBy groupBy = new GroupBy(splits[0],Long.parseLong(splits[1]));
		context.write(groupBy, new LongWritable(1));
	}
	
}

class GroupReducer extends Reducer<GroupBy, LongWritable, GroupBy, LongWritable>{

	protected void reduce(GroupBy k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException {
		long count = 0;
		System.out.println("reduce----> k2: " + k2.toString());
		for(LongWritable v2 : v2s){
			System.out.println(v2.toString());
			count += v2.get();
		}
		context.write(k2, new LongWritable(count));
	}
	
}

class GroupBy implements WritableComparable<GroupBy> {

	private String rank;
	private long isonline;
	
	public GroupBy(){}
	public  GroupBy(String rank,long isonline){
		this.rank = rank;
		this.isonline = isonline;
	}
	@Override
	public void write(DataOutput out) throws IOException {
		Text.writeString(out, this.rank); // 使用Text 实现string类型读写入操作
		//out.writeLong(this.rank);
		out.writeLong(this.isonline);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.rank = Text.readString(in);
		//this.rank = in.readLong();
		this.isonline = in.readLong();
	}
	
	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + (int) (isonline ^ (isonline >>> 32));
		result = prime * result + ((rank == null) ? 0 : rank.hashCode());
		return result;
	}
	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (obj == null)
			return false;
		if (getClass() != obj.getClass())
			return false;
		GroupBy other = (GroupBy) obj;
		if (isonline != other.isonline)
			return false;
		if (rank == null) {
			if (other.rank != null)
				return false;
		} else if (!rank.equals(other.rank))
			return false;
		return true;
	}
	@Override
	public String toString() {
		return this.rank + "	" + this.isonline;
	}
	
	@Override
	public int compareTo(GroupBy other) {
		long result;
		result = this.rank.compareTo(other.rank);
		//result = this.rank - other.rank;
		if(result == 0){
			result = this.isonline - other.isonline;
		}
		return (int)result;
	}
	
}

 

结果:

[root@master ~]# hadoop fs -text /out/part-r-00000
Warning: $HADOOP_HOME is deprecated.

A       1       3
B       0       1

 

 

 

 <!--EndFragment-->
  • 大小: 61.5 KB
  • 大小: 55.8 KB
  • 大小: 113.5 KB
  • 大小: 37.4 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics