hbase读流程代码分析

流程总览

  1. 从zookeeper中获取meta信息,并通过meta信息找到需要查找的table的startkey所在的region信息

  2. 和该region所在的regionserver进行rpc交互获取result

  3. region server查询memstore(memstore是是一个按key排序的树形结构的缓冲区),如果有该rowkey,则直接返回,若没有进入步骤4

  4. 查询blockcache,如果有该rowkey,则直接返回,若没有进入步骤5

  5. 查询storefile,不管有没有都直接返回

client代码分析

hbase读数据除了直接操作hfile之外有3个入口,get(),batch()和scan(),get()相对而言就比较简单,找到对应的regionserver然后发rpc即可,batch()采用单rpc多action的策略流程和get()类似,下面主要对scan涉及的核心接口进行分析。核心接口有以下几个

Connection:负责和zk建立连接

Table:负责维护相关对象

ResultScanner:负责给使用者遍历纾解

Caller:负责调用Callable

Callable:客户端和hbase交互的主要接口

Connection

默认的连接器是HConnectionImplementation,可以通过配置hbase.client.connection.impl修改。核心思路是基于zk的watcher,保持长连接,然后获取hbase元数据

Table

table通过Connection.getTable()实例化,默认的实现是HTable。这个类比较简单,只是维护了针对hbase一张表所用到的对象。主要关注遍历的方法,通过HTable.getScanner()实例化一个新的ResultScanner,使用者通过ResultScanner迭代器遍历获取result数据。

Scanner

client提供了4种scanner,参考HTable.getScanner(),1. ClientScanner,读取result的流程需要3次rpc,openScanner,next和closeScanner;2. 针对小量数据优化的ClientSmallScanner,和ClientScanner的区别在于,将openScanner,next和closeScanner合并到一个rpc执行,官方建议拉取的数据在64KB之内可以考虑用SmallScanner的方式;另外两个是基于reversed配置,也就是倒序遍历region,需要交换startkey和endkey的位置。ClientScanner是我们最常用的Scanner,也是默认的Scanner,下面对其进行分析

  1. 在初始化的时候通过nextScanner()方法,实例化一个新的Callable对象,并调用其call()方法
  2. next()方法,当使用者不断的调用next()时,ClientScanner()会先从缓存中找,是否还有result,如果还有那么直接返回,如果缓存中没有result,那么调用loadCache()方法
  3. loadCache()方法,调用Callable.call(),获取result数组。这里的异常处理需要特别关注,如果是UnkonwnScannerException,那么重试rpc直到本次scan超时,如果是OutOfOrderScannerNextException异常,scanner会重试rpc请求重复步骤3,如果已经重试过,那么直接抛出异常。重试的超时时间的配置hbase.client.scanner.timeout.period,默认是60s
  4. 拉取到result后,ClientScanner会进行合并,这是由于拉取到的result是部分的,不是完整的,说到底hbase是以Cell为最小单位进行存储或者传输的,要封装成result的话就需要进行合并。合并完之后将result缓存在内存中,缓存策略基于caching和maxResultSize,caching表示hbase client最多可以缓存在内存多少条数据,也就是多少个result;maxResultSize表示hbase client最多可以缓存多少内存大小的result,也就是控制result占用堆的大小
  5. 判断是否还需要再拉取result,这里有两种拉取判断,一种是之前的region拉取失败,转而拉取其replica,另一种是调用rpc拉取下一组result
  6. result达到内存限制或者数量(maxResultSize,caching)则返回

服务器代码

ScannerCallable

ClientScanne对应的Callable是ScannerCallable,也是最典型的Callable,下面对其核心方法进行分析

prepare()方法

核心功能是通过RPCRetryingCallerWithReadReplicas.getRegionLocations获取待遍历的table startkey的region,从而定位到region server。

核心call()方法

  1. 首次调用call(),client会发送一次开始rpc,高速region server本次scan开始了,此次rpc不获取result,只生成scannerId,之后的rpc不需要再传递scan配置,这形成了一个会话的概念。
  2. 通过rpc controller获取CellScanner,再转换成Result数组,这里参考ResponseConverter.getResults。注意,这里由于获取的result是连续的,也就是说region server是有状态的服务,client每次rpc都会带上当前请求的序号,也就是nextCallSeq,这有的类似传统数据库中的分页作用。当出现序号不匹配,region server会抛出异常。
  3. 如果需要关闭,那么向region server发送close的rpc

总结

hbase-client的scan操作总体上可以看成是两层迭代器,面向使用者的Scanner以及面向region server的Callable。Callable负责从regionserver中获取result,主要解决,Scanner负责整合result提供给使用者。这样做的思路很明显,数据大小是肯定会大于内存的,通过迭代器接口,可以让使用者处理完之前的result再拉取其他result,从而起到分页的效果,这操作对使用者是透明的。如果需要详细的scan日志,可以通过配置hbase.client.log.scanner.activity来打开开关,默认是false。

对于scan操作而言,拿ClientScanner来说,一次“完整rpc”过程包含3次rpc,open,result和close。如果失败了,region不可用或者在split,那么client会重试新的一次“完整rpc”,那么就是6次rpc。其他操作会少一点,例如SmallClientScanner一次“完整rpc”只需要1次rpc,它把open,close集成到了一起。hbase在client还是花了不少心思的。

hbase客户端重要参数

由于作为在线服务,需要能够保证在快速失败、失败容错重试等特性。

  • 快速失败能保证系统的低延时,能防止因为等待某个资源,造成服务资源暂用,最后导致服务不可用。
  • 失败容错能够提供服务的稳定性,进行服务失败是重试。

Hbase客户端提供的重试机制,并通过配置合理的参数使得客户端在保证一定容错性的同时还能够保证系统的低延迟特性。

hbase.client.pause

失败重试时等待时间,随着重试次数越多,重试等待时间越长,计算方式如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
public static int RETRY_BACKOFF[] = { 1, 2, 3, 5, 10, 20, 40, 100, 100, 100, 100, 200, 200 }; 
long normalPause = pause * HConstants.RETRY_BACKOFF[ntries];
long jitter = (long)(normalPause * RANDOM.nextFloat() * 0.01f);
```
所以如果重试10次,hbase.client.pause=50ms,则每次重试等待时间为{5010015025050010002000500050005000}。
属性默认值为100ms。

hbase.client.pause属性控制的是让客户端在两次重试之间休眠多久。其默认值是100毫秒(0.1秒),建议设置为20
## hbase.client.retries.number
失败时重试次数,默认为31次。可以根据自己应用的需求将该值调整的比较小。

hbase.client.retries.number属性用来指定最大重试次数。其默认值是350,建议设置为11
每两次重试之间的休眠时间可按下面这个公式计算得出
其中,RETRY_BACKOFF是一个重试系数表,其定义如下。
public static int RETRY_BACKOFF[] =(1, 1, 1, 2, 2, 4, 4, 8, 16, 32)
在重试10次以后,HBase就会一直使用最后一个系数(32)来计算休眠时间。
如果将暂停时间设为20毫秒,最大重试次数设为11,每两次连接集群重试之间的暂停时间将依次为:(20, 20, 20, 40, 40, 80, 80, 160, 320, 640, 640)
这就意味着客户端将在2060毫秒内重试11次,然后放弃连接到集群。

## hbase.rpc.timeout
该参数表示一次RPC请求的超时时间。如果某次RPC时间超过该值,客户端就会主动关闭socket。
默认该值为1min,应用为在线服务时,可以根据应用的超时时间,设置该值.如果应用总共超时为3s,**则该值也应该为3s或者更小**.
## hbase.client.operation.timeout
该参数表示HBase客户端发起一次数据操作直至得到响应之间总的超时时间,数据操作类型包括get、append、increment、delete、put等。该值与hbase.rpc.timeout的区别为,hbase.rpc.timeout为一次rpc调用的超时时间。而hbase.client.operation.timeout为一次操作总的时间(从开始调用到重试n次之后失败的总时间)。
举个例子说明,比如一次Put请求,客户端首先会将请求封装为一个caller对象,该对象发送RPC请求到服务器,假如此时因为服务器端正好发生了严重的Full GC,导致这次RPC时间超时引起SocketTimeoutException,对应的就是hbase.rpc.timeout。那假如caller对象发送RPC请求之后刚好发生网络抖动,进而抛出网络异常,HBase客户端就会进行重试,重试多次之后如果总操作时间超时引起SocketTimeoutException,对应的就是hbase.client.operation.timeout。
## hbase.client.scanner.timeout.period
该参数是表示HBase客户端发起一次scan操作的rpc调用至得到响应之间总的超时时间。一次scan操作是指发起一次regionserver rpc调用的操作,hbase会根据scan查询条件的cacheing、batch设置将scan操作会分成多次rpc操作。比如满足scan条件的rowkey数量为10000个,scan查询的cacheing=200,则查询所有的结果需要执行的rpc调用次数为50个。而该值是指50个rpc调用的单个相应时间的最大值。
## 可以考虑使用HtableInterface带诶Htable

## hbase.ipc.client.tcpnodelay
此设置将禁止使用Nagle算法来进行客户端和服务器之间的套接字传输。

Nagle算法是一种提高网络效率的手段,它会将若干较小的传出消息存在缓冲区中,然后再将它们一次全都发送出去。Nagle算法默认是启用的。低延迟系统应该将hbase.ipc.client.tcpnodelay设置为true,从而禁用Nagle算法。默认已经是true,不需要修改

```java
/**
* hbase client 单例连接
* @author chun
*
*/
public class HbaseClient {
private static final Logger LOGGER = LoggerFactory.getLogger(HbaseClient.class);
private Configuration conf = null;
private Connection conn = null;
private static HbaseClient instance = null;

private HbaseClient(){
init();
}
public void init(){
try{
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "127.0.0.1");
conf.set("zookeeper.znode.parent", "/hbase");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("hbase.client.pause", "50");
conf.set("hbase.client.retries.number", "3");
conf.set("hbase.rpc.timeout", "2000");
conf.set("hbase.client.operation.timeout", "3000");
conf.set("hbase.client.scanner.timeout.period", "10000");

conn = ConnectionFactory.createConnection(conf);
}catch(Exception e){
LOGGER.error("初始化hbase连接失败"+e.getMessage(),e);
}
}

public static HbaseClient getInstance(){
if(instance == null){
synchronized (HbaseClient.class) {
if(instance == null){
instance = new HbaseClient();
}
}
}
return instance;
}

/**
* 获取htable操作类
* @param tableName
* @return
* @throws IOException
*/
public Table getHtable(String tableName) throws IOException{
return conn.getTable(TableName.valueOf(tableName));
}

/**
*
* @param hTableInterface
*/
public void relaseHtable(Table table){
if(table == null){
return;
}
try {
table.close();
} catch (IOException e) {
LOGGER.error(e.getMessage(),e);
}
}

/**
* 关闭hbase连接
*/
public void destory(){
try {
conn.close();
instance = null;
} catch (IOException e) {
LOGGER.error(e.getMessage(),e);
}
}

}

参考:
https://www.cnblogs.com/ulysses-you/p/10072883.html#_labelTop