使用bulkload向hbase中批量写入数据
1、数据样式
写入之前,需要整理以下数据的格式,之后将数据保存到hdfs中,本例使用的样式如下(用tab分开):
row1 N<br/> row2 M<br/> row3 B<br/> row4 V<br/> row5 N<br/> row6 M<br/> row7 B
2、代码
假设要将以上样式的数据写入到hbase中,列族为cf,列名为colb,可以使用下面的代码(参考)
package com.testdata;
import java.io.IOException;<br/>
import org.apache.hadoop.conf.Configuration;<br/>
import org.apache.hadoop.fs.Path;<br/>
import org.apache.hadoop.hbase.HBaseConfiguration;<br/>
import org.apache.hadoop.hbase.client.HTable;<br/>
import org.apache.hadoop.hbase.client.Put;<br/>
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;<br/>
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;<br/>
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;<br/>
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;<br/>
import org.apache.hadoop.hbase.util.Bytes;<br/>
import org.apache.hadoop.io.Text;<br/>
import org.apache.hadoop.mapreduce.Job;<br/>
import org.apache.hadoop.mapreduce.Mapper;<br/>
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;<br/>
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TestBulkLoad {
public static class LoadMapper extends Mapper<Object,Text,ImmutableBytesWritable,Put>{
@Override<br/>
protected void map(Object key, Text value, Context context)<br/>
throws IOException, InterruptedException {<br/>
String[] values = value.toString().split("\t");<br/>
if(values.length ==2 ){<br/>
byte[] rowkey = Bytes.toBytes(values[0]);<br/>
byte[] col_value = Bytes.toBytes(values[1]);<br/>
byte[] familly = Bytes.toBytes("cf");<br/>
byte[] column = Bytes.toBytes("colb");<br/>
ImmutableBytesWritable rowkeyWritable = new ImmutableBytesWritable(rowkey);<br/>
Put testput = new Put(rowkey);<br/>
testput.add(familly,column,col_value);<br/>
context.write(rowkeyWritable, testput);<br/>
}
}<br/>
}<br/>
public static void main(String[] args) throws Exception {<br/>
if(args.length !=4 ){<br/>
System.exit(0);<br/>
}
String in = args[0];<br/>
String out = args[1];<br/>
int unitmb =Integer.valueOf(args[2]);<br/>
String tbname = args[3];
Configuration conf = new Configuration();<br/>
conf.set("mapreduce.input.fileinputformat.split.maxsize", String.valueOf(unitmb * 1024 * 1024));<br/>
conf.set("mapred.min.split.size", String.valueOf(unitmb * 1024 * 1024));<br/>
conf.set("mapreduce.input.fileinputformat.split.minsize.per.node", String.valueOf(unitmb * 1024 * 1024));<br/>
conf.set("mapreduce.input.fileinputformat.split.minsize.per.rack", String.valueOf(unitmb * 1024 * 1024));
Job job = new Job(conf);<br/>
FileInputFormat.addInputPath(job, new Path(in));<br/>
FileOutputFormat.setOutputPath(job, new Path(out));<br/>
job.setMapperClass(LoadMapper.class);<br/>
job.setReducerClass(PutSortReducer.class);<br/>
job.setOutputFormatClass(HFileOutputFormat2.class);<br/>
job.setMapOutputKeyClass(ImmutableBytesWritable.class);<br/>
job.setMapOutputValueClass(Put.class);<br/>
job.setJarByClass(TestBulkLoad.class);
Configuration hbaseconf = HBaseConfiguration.create();<br/>
HTable table = new HTable(hbaseconf,tbname);<br/>
HFileOutputFormat2.configureIncrementalLoad(job, table);
job.waitForCompletion(true);<br/>
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseconf);<br/>
loader.doBulkLoad(new Path(out), table);
}
}
这段代码使用mapreduce程序对数据做了进一步处理,之后调用相关的api将数据写入hbase中。PutSortReducer是一个自带的reducer类,不需要再进行编写。
3、执行
数据保存在TEXT文件中,上面代码导出的jar包为bulkload,hbase的数据表名称为testdata,注意,先指定以下HADOOP_CLASSPATH,避免出错。
1 export HADOOP_CLASSPATH=$HBASE_HOME/lib/*:$HADOOP_CLASSPATH<br/> 2 hadoop jar ./Downloads/bulkload.jar com.testdata.TestBulkLoad Test hbasedata 64 testdata
4、结果

转发申明:
本文转自互联网,由小站整理并发布,在于分享相关技术和知识。版权归原作者所有,如有侵权,请联系本站 top8488@163.com,将在24小时内删除。谢谢
