0x01 前言
一个运行比较久完善的数据仓库,加上丰富多样的 bi 应用,会有成千上万个定时任务。如何保证任务之间依赖都设置对,匹配总体各层级任务之间关系,数仓建设过程中有这么一套流程规则:
- 禁止逆向调用
- 避免同层调用
- 避免跨层调用
- 优先使用公共层
有了这个标准指引,我们就要通过工具产品去解放调度任务依赖维护。那么如何去做呢?
首先我们在做数据任务开发的时候,已经遵循一套标准的命名方法。
命名规则是 {仓库分层}{业务域}{数据域}{业务描述}{时间周期+存储策略}
任务名跟这个任务所产出的任务表名保持一致。
所以核心思路就是从任务内容中解析出血缘关系了。
0x02 实现方式
一种比较简单的方式是采用正则
mod = r'((?<=s)(stage|ods|rods|dim|dwd|dws|app|rpt|dwb).S+(?=s))' # 反向匹配 (?<=s) 匹配任何空白字符 # 正向匹配 (?=s) 匹配空白字符 # (stage|ods|rods|dim|dwd|dws|app|rpt|dwb).S+ 非空子表达式一次或多次
第二种方案 druid parser
在 Druid 的 SQL 解析器中,有三个重要的组成部分,它们分别是:
Parser
词法分析
语法分析
AST(Abstract Syntax Tree,抽象语法树)
Visitor
这三者的关系如下图所示:
import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.statement.SQLUseStatement;
import com.alibaba.druid.sql.parser.ParserException;
import com.alibaba.druid.sql.visitor.SchemaStatVisitor;
import com.alibaba.druid.stat.TableStat;
import com.alibaba.druid.util.JdbcConstants;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
public class SqlBloodParse {
public static Map<String, TreeSet<String>> getFromTo (String sql) throws ParserException {
Map<String, TreeSet<String>> result = new HashMap<String, TreeSet<String>>();
List<SQLStatement> stmts = SQLUtils.parseStatements(sql, JdbcConstants.HIVE);
TreeSet<String> selectSet = new TreeSet<String>();
TreeSet<String> updateSet = new TreeSet<String>();
TreeSet<String> insertSet = new TreeSet<String>();
TreeSet<String> deleteSet = new TreeSet<String>();
if (stmts == null) {
return null;
}
String database = "DEFAULT";
for (SQLStatement stmt : stmts) {
SchemaStatVisitor statVisitor = SQLUtils.createSchemaStatVisitor(stmts,JdbcConstants.HIVE);
if (stmt instanceof SQLUseStatement) {
database = ((SQLUseStatement) stmt).getDatabase().getSimpleName();
}
stmt.accept(statVisitor);
Map<TableStat.Name, TableStat> tables = statVisitor.getTables();
if (tables != null) {
final String db = database;
for (Map.Entry<TableStat.Name, TableStat> table : tables.entrySet()) {
TableStat.Name tableName = table.getKey();
TableStat stat = table.getValue();
if (stat.getCreateCount() > 0 || stat.getInsertCount() > 0) { //create
String insert = tableName.getName();
if (!insert.contains("."))
insert = db + "." + insert;
insertSet.add(insert);
} else if (stat.getSelectCount() > 0) { //select
String select = tableName.getName();
if (!select.contains("."))
select = db + "." + select;
selectSet.add(select);
}else if (stat.getUpdateCount() > 0 ) { //update
String update = tableName.getName();
if (!update.contains("."))
update = db + "." + update;
updateSet.add(update);
}else if (stat.getDeleteCount() > 0) { //delete
String delete = tableName.getName();
if (!delete.contains("."))
delete = db + "." + delete;
deleteSet.add(delete);
}
}
}
}
result.put("select",selectSet);
result.put("insert",insertSet);
result.put("update",updateSet);
result.put("delete",deleteSet);
return result;
}
public static void main(String[] args) {
String sql = "select * from " +
"(select * from supindb.student d where dt='20190202')a " +
"left join " +
"(select * from supindb.college c where dt='20190202')b " +
" on a.uid=b.uid " +
"where a.uid > 0";
Map<String, TreeSet<String>> getfrom = getFromTo(sql);
for (Map.Entry<String, TreeSet<String>> entry : getfrom.entrySet()){
System.out.println("================");
System.out.println("key=" + entry.getKey());
for (String table : entry.getValue()){
System.out.println(table);
}
}
}
}