hbase协处理器

Hbase自0.92之后开始支持Coprocessor(协处理器),旨在使用户可以将自己的代码放在regionserver上来运行,即将计算程序移动到数据所在的位置进行运算。这一点与MapReduce的思想一致。协处理器框架已经提供了一些类,用户可以通过继承这些类来扩展自己的功能,Hbase的Coprocess分为observer和endpoint两大类。简单说,observer相当于关系型数据库中的触发器,而endpoint则相当于关系型数据库中的存储过程

observer

这类协处理器与触发器类似:回调函数(也被称做钩子函数,hook)在一些特定事件发生时会被执行
主要有3种接口
RegionObserver:用户可以用这种的处理器处理数据修改事件,它们与表的region联系紧密
MasterObserver:可以被用作管理或DDL类型的操作,这些是集群级事件
WALObserver:提供控制WAL的钩子函数

编写回调函数,具体代码如附件RegionObserverEpl.java ,打包成jar

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package hbaseCoprocessor;

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;

public class RegionObserverEpl extends
BaseRegionObserver {

public static final byte[] FIXED_ROW =
Bytes.toBytes("@@@GETTIME@@@");
public static String tablename = "table";
public static String rowkey = "rowkey";
@Override
public void preGet(
final ObserverContext<RegionCoprocessorEnvironment> e,
final Get get, final List<KeyValue> results) throws
IOException {
//if (Bytes.equals(get.getRow(), FIXED_ROW)) { //书中原来的功能是如果查询的row为FIXED_ROW时,在结果返回系统时间
KeyValue kv = new KeyValue(get.getRow(), FIXED_ROW,
FIXED_ROW,
Bytes.toBytes(System.currentTimeMillis()));
results.add(kv);
//}
}
public static void selectRow(String tablename, String rowKey)
throws IOException {
Configuration config = HBaseConfiguration.create();
HTable table =new HTable(config, tablename);
Get g =new Get(rowKey.getBytes());
Result rs = table.get(g);
for (KeyValue kv : rs.raw()) {
System.out.print(new String(kv.getRow()) +" ");
System.out.print(new String(kv.getFamily()) +":");
System.out.print(new String(kv.getQualifier()) +" ");
System.out.println(new String(kv.getValue()));
}
table.close();
}
public static void main(String args[]){
try {
selectRow( tablename, rowkey);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("sucess!");
}
}

运行方式1

将jar放到hdfs中

1
2
3
[]$ ./bin/hdfs dfs -mkdir /jars

[]$ ./bin/hdfs dfs -put /home/q/opdir/test.jar /jars

拷贝jar包到hbase的lib目录下

登录hbase

1
2
3
4
5
6
7
8
9
10
11
hbase(main):004:0> disable 't'
0 row(s) in 1.3360 seconds

hbase(main):005:0> alter 't',METHOD => 'table_att','coprocessor'=>'hdfs://mycluster:8020/jars/test.jar|hbaseCoprocessor.RegionObserverEpl|1001|'
Updating all regions with the new schema...
1/1 regions updated.
Done.
0 row(s) in 2.3120 seconds

hbase(main):006:0> enable 't'
0 row(s) in 0.4810 seconds

运行

upload successful
将列值转化为uninx 时间

1
2
hbase(main):010:0> Time.at(Bytes.toLong("\x00\x00\x01JQ:\x9D\xD2".to_java_bytes)/ 1000)
=> Tue Dec 16 11:53:23 +0800 2014

运行方式2(这个对所有表都生效)

拷贝jar包到hbase的lib目录下

在hbase-site.xml增加如下配置

1
2
3
4
<property>
<name>hbase.coprocessor.region.classes</name>
<value>hbaseCoprocessor.RegionObserverEpl</value>
</property>

重启hbase