博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
HBase(十一):HBaseAndMapReduce小案例总结
阅读量:6827 次
发布时间:2019-06-26

本文共 9039 字,大约阅读时间需要 30 分钟。

  hot3.png

一:HBaseAndMapReduce 小案列总结:

        在HDFS某目录文件下有多个文件内容,将这些多个文件内容中的数据通过倒排索引后将结果写入到HBase某张表中,代码如下:

1.InvertedIndexMapper

public class InvertedIndexMapper extends Mapper
{ private Text keyInfo = new Text(); // 存储单词和URI的组合 private Text valueInfo = new Text(); //存储词频 private FileSplit split; // 存储split对象。 @Override protected void map(Object key, Text value, Mapper
.Context context) throws IOException, InterruptedException { System.out.println( "key-->: " +key + "\n value --> : "+value ); //获得
对所属的FileSplit对象。 split = (FileSplit) context.getInputSplit(); System.out.println( "split---> "+split.toString() ); //System.out.println("value.tostring()---> "+ value.toString() ); StringTokenizer itr = new StringTokenizer( value.toString()); while( itr.hasMoreTokens() ){ // key值由单词和URI组成。 keyInfo.set( itr.nextToken()+":"+split.getPath().toString()); //System.out.println("split.getPath().toString() --> "+ split.getPath().toString() ); //词频初始为1 valueInfo.set("1"); context.write(keyInfo, valueInfo); } }}

2.InvertedIndexCombiner

public class InvertedIndexCombiner extends Reducer
{ private Text info = new Text(); @Override protected void reduce(Text key, Iterable
values, Reducer
.Context context) throws IOException, InterruptedException { //统计词频 int sum = 0; for (Text value : values) { sum += Integer.parseInt(value.toString() ); } int splitIndex = key.toString().indexOf(":"); //重新设置value值由URI和词频组成 info.set( key.toString().substring( splitIndex + 1) +":"+sum ); //重新设置key值为单词 key.set( key.toString().substring(0,splitIndex)); context.write(key, info); }}

3.InvertedIndexReducer

public class InvertedIndexReducer extends Reducer
{ private Text result = new Text(); @Override protected void reduce(Text key, Iterable
values, Reducer
.Context context) throws IOException, InterruptedException { //生成文档列表 String fileList = new String(); for (Text value : values) { fileList += value.toString()+";"; } result.set(fileList); context.write(key, result); }}

4.HBaseAndInvertedIndex

public class HBaseAndInvertedIndex {		private static Path outPath;		public static void main(String[] args) throws Exception {		run();		System.out.println( "\n\n************************");		runHBase();	}	public static void run() throws Exception {		Configuration conf = new Configuration();		Job job = Job.getInstance(conf,"Hadoop-InvertedIndex");				job.setJarByClass(HBaseAndInvertedIndex.class);				//实现map函数,根据输入的
对生成中间结果。 job.setMapperClass(InvertedIndexMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setCombinerClass(InvertedIndexCombiner.class); job.setReducerClass(InvertedIndexReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path("hdfs://192.168.226.129:9000/txt/invertedindex/")); DateFormat df = new SimpleDateFormat( "yyyyMMddHHmmssS" ); String filename = df.format( new Date() ); outPath = new Path("hdfs://192.168.226.129:9000/rootdir/invertedindexhbase/result/"+filename+"/"); FileOutputFormat.setOutputPath(job, outPath); int result = job.waitForCompletion(true) ? 0 : 1; } public static void runHBase() throws Exception { Configuration conf = new Configuration(); conf = HBaseConfiguration.create(conf); conf.set("hbase.zookeeper.quorum", "192.168.226.129"); Job job = Job.getInstance(conf, "HBase-InvertedIndex"); job.setJarByClass(HBaseAndInvertedIndex.class); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 把数据写入Hbase数据库 FileInputFormat.addInputPath(job, new Path(outPath.toString()+"/part-r-00000") ); System.out.println( "path---> "+ outPath.toString() ); TableMapReduceUtil.initTableReducerJob("invertedindex",InvertedIndexHBaseReducer.class, job); //将数据写入HBase数据库 //首先先检查表是否存在 checkTable(conf); System.exit( job.waitForCompletion(true) ? 0 : 1 ); } private static void checkTable(Configuration conf) throws Exception { Connection con = ConnectionFactory.createConnection(conf); Admin admin = con.getAdmin(); TableName tn = TableName.valueOf("invertedindex"); if (!admin.tableExists(tn)){ HTableDescriptor htd = new HTableDescriptor(tn); HColumnDescriptor hcd = new HColumnDescriptor("indexKey"); htd.addFamily(hcd); admin.createTable(htd); System.out.println("表不存在,新创建表成功...."); } } /** * 1. 因为map是从hdfs中取数据,因此没有太大变化;而reduce需要输出结果到hbase中, * 所以这里继承了TableReduce
,这里没有valueout, * 但是规定TableReduce的valueout必须是Put或者Delete实例 * * 2.ImmutableBytesWritable:它是一个可以用作key或value类型的字节序列, * */ public static class InvertedIndexHBaseReducer extends TableReducer
{ @Override protected void reduce( Text key, Iterable
values, Reducer
.Context context) throws IOException, InterruptedException { System.out.println( "key---> " + key.toString() ); //注意行健参数的书写。 Put put = new Put(key.toString().getBytes()); put.addColumn(Bytes.toBytes( "indexKey" ), Bytes.toBytes("indexUrlWeight"),values.iterator().next().getBytes()); context.write(new ImmutableBytesWritable(key.getBytes()), put); } }}

 

///原数据目录文件:

invertedindex1.txt

Hello I will Learning HadoopHDFS MapReduceOther I will Learning HBase

invertedindex2.txt  :

Hello HBaseMapReduce HDFS

 

查看结果:scan:

hbase(main):002:0> scan 'invertedindex'ROW                             COLUMN+CELL                                                                               HBase                          column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900                                0/txt/invertedindex/invertedindex2.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in                                vertedindex1.txt:1;                                                                       HDFS                           column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900                                0/txt/invertedindex/invertedindex1.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in                                vertedindex2.txt:1;                                                                       Hadoop                         column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900                                0/txt/invertedindex/invertedindex1.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in                                vertedindex2.txt:1;                                                                       Hello                          column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900                                0/txt/invertedindex/invertedindex1.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in                                vertedindex2.txt:1;                                                                       I                              column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900                                0/txt/invertedindex/invertedindex1.txt:2;hdfs://192.168.226.129:9000/txt/invertedindex/in                                vertedindex2.txt:1;                                                                       Learning                       column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900                                0/txt/invertedindex/invertedindex1.txt:2;hdfs://192.168.226.129:9000/txt/invertedindex/in                                vertedindex2.txt:1;                                                                       MapReduce                      column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900                                0/txt/invertedindex/invertedindex1.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in                                vertedindex2.txt:1;                                                                       Other                          column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900                                0/txt/invertedindex/invertedindex1.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in                                vertedindex2.txt:1;                                                                       will                           column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900                                0/txt/invertedindex/invertedindex1.txt:2;hdfs://192.168.226.129:9000/txt/invertedindex/in                                vertedindex2.txt:1;                                                                      9 row(s) in 0.2240 seconds

 

转载于:https://my.oschina.net/gently/blog/678892

你可能感兴趣的文章