文章目录

Catlet 是 MyCAT 2.0上计划的新功能,预期可以完成多个数据中不同表之间的表连接(JOIN),目前还处于开发阶段。

Catlet 的入口类是在 HintCatletHandler,该类实现了 HintHandler 接口,可以在 RouteService.route() 中有 Hint 的时候使用,可见,Catlet 的调用需要使用注释。HintCatletHandler.route() 方法是该类的主要方法,其重要的代码有以下几行,

1
2
3
4
5
Catlet catlet = (Catlet) MycatServer.getInstance()
.getCatletClassLoader().getInstanceofClass(cateletClass);
catlet.route(sysConfig, schema, sqlType, realSQL, charset, sc,
cachePool);
catlet.processSQL(realSQL, new EngineCtx(sc.getSession2()));

先初始化一个 Catlet 类,再依次调用其 route()processSQL() 方法,这两个方法也是 Catlet 的入口方法。实际上,Catlet 是一个接口,有多个实现,route()processSQL() 是其中两个重要的抽象方法。对于跨数据的多表连接,目前只有一个开发中的类 ShareJoin,该类文件中定义了3个类,ShareJoinShareDBJoinHandlerShareRowOutPutDataHandler

Catlet.route() 的代码并不是特别复杂,重要代码是以下几行,

1
2
3
MySqlSelectQueryBlock mysqlSelectQuery = (MySqlSelectQueryBlock)st.getSelect().getQuery();
joinParser=new JoinParser(mysqlSelectQuery,realSQL);
joinParser.parser();

即调用 JoinParser.parser() 方法来解析 SQL。重要的代码基本都在 Catlet.processSQL() 方法中,这个方法相对复杂一些,调用的层次也比较多,会调用到 ShareDBJoinHandlerShareRowOutPutDataHandler,重要代码有以下几行,

1
2
3
4
5
6
ShareDBJoinHandler joinHandler = new ShareDBJoinHandler(this,joinParser.getJoinLkey());
ctx.executeNativeSQLSequnceJob(dataNodes, ssql, joinHandler);
EngineCtx.LOGGER.info("Catlet exec:"+getDataNode(getDataNodes())+" sql:" +ssql);
ctx.setAllJobFinishedListener(new AllJobFinishedListener() {
......

以上代码中,先使用之前 route() 方法中 JoinParser 的结果,获取所有用于表连接列(getJoinLkey()),并以此为参数初始化 ShareDBJoinHandler 类;然后把 ShareDBJoinHandler 作为执行 SQL 的回调类,执行 SQL;最后,设置所有工作都完成之后的监听类 AllJobFinishedListener。因此,ShareDBJoinHandler 是处理 JOIN 的关键类,并且,处理的算法与表连接的列有关。

ShareDBJoinHandler 类中,onRowData 方法用于处理收到查询结果,是主要的回调方法,这个方法主要调用了同一个类中的 putDBRow 方法。putDBRow 方法也比较短,主要代码有以下几行,

1
2
3
4
5
int batchSize = 999;
//1000条,发送一个查询请求
if (ids.size() > batchSize) {
createQryJob(batchSize);
}

先设置了一个批处理的大小,999(实际应该是1000,因为使用的是 > 而不是 >=),然后调用 createQryJobcreateQryJob 相对复杂一点,先是使用 StringBuilder sb 做了比较长的一段字符串拼接,然后有几行比较重要的代码,如下,

1
2
3
String sql = String.format(joinParser.getChildSQL(), sb);
getRoute(sql);
ctx.executeNativeSQLParallJob(getDataNodes(),sql, new ShareRowOutPutDataHandler(this,fields,joinindex,joinParser.getJoinRkey(), batchRows));

先根据之前字符串拼接的结果,生成了一个 SQL,然后就是正常的调用路由并执行,执行的回调类是 ShareRowOutPutDataHandler。如果不求甚解的话,感觉是 ShareJoin 把原有的 SQL 按每 1000 个表连接列的值为大小,生成一些子 SQL,并执行这些子 SQL,但并不清楚是如何拆分的。ShareRowOutPutDataHandler 类的 onRowData 方法,用于处理子 SQL 的返回。这个方法里面的代码没有什么特别,基本就是将处理的数据写回。

因此,拆分子 SQL 的方法实际上与字符串拼接有关,字符串拼接部分的代码如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
for(Map.Entry<String,String> e: ids.entrySet() ){
theId=e.getKey();
batchRows.put(theId, rows.remove(theId));
if (!svalue.equals(e.getValue())){
if(joinKeyType == Fields.FIELD_TYPE_VAR_STRING
|| joinKeyType == Fields.FIELD_TYPE_STRING){ // joinkey 为varchar
sb.append("'").append(e.getValue()).append("'").append(','); // ('digdeep','yuanfang')
}else{ // 默认joinkey为int/long
sb.append(e.getValue()).append(','); // (1,2,3)
}
}
svalue=e.getValue();
if (count++ > batchSize) {
break;
}
}
......
sb.deleteCharAt(sb.length() - 1).append(')');

实际上也没有特别复杂,就是根据连接列的类型,如果是 String 的话,就拼接为 ('a','b','c') 这样;如果是 int/long 类型,就拼接为 (1,2,3) 这样。然后在生成 SQL 的时候,调用 String sql = String.format(joinParser.getChildSQL(), sb);joinParser.getChildSQL() 的代码是,

1
2
3
//String sql="select "+joinRkey+","+sql+" from "+mtable+" where "+joinRkey+" in ";
String sql=tableFilter.getTableJoin().getSQL();
return sql;

代码并未深究,从注释来看,是在连接的列上加了 in,也就是说,最后拼成的 SQL 会是类似这样,

1
select XXXXXXX from XXXXXXXTABLES where joinRkey in YYYYYYYY

其中的 YYYYYYYY 是之前字符拼接的结果。因此,从此可以得知 Catlet 在处理多表连接算法的大致步骤,假设原先的 SQL 是 select a.name, b.dept from tableA a, tableB b where a.id = b.id,流程是,

  1. 先将 SQL 改写为 select a.name, a.id from tableA a,然后在相应节点上执行;
  2. 将1的返回按 a.id 拆分,每 1000 个值生成一个子 SQL,生成的 SQL 大致是 select b.dept, b.id from tableB b where b.id in (YYYYYYYY),其中,YYYYYYYY 是 a.id 的值每 1000 个拼出来的逗号分割的字符串;
  3. 等待2的返回,并将所有返回的结果拼接,写回;

以上是 MyCAT Catlet 的大致处理流程。

文章目录

欢迎来到Valleylord的博客!

本博的文章尽量原创。