调度任务依赖解析

0 / 38

0x01 前言

一个运行比较久完善的数据仓库,加上丰富多样的 bi 应用,会有成千上万个定时任务。如何保证任务之间依赖都设置对,匹配总体各层级任务之间关系,数仓建设过程中有这么一套流程规则:

  1. 禁止逆向调用
  2. 避免同层调用
  3. 避免跨层调用
  4. 优先使用公共层

image.png

有了这个标准指引,我们就要通过工具产品去解放调度任务依赖维护。那么如何去做呢?

首先我们在做数据任务开发的时候,已经遵循一套标准的命名方法。

image.png

命名规则是 {仓库分层}{业务域}{数据域}{业务描述}{时间周期+存储策略}

任务名跟这个任务所产出的任务表名保持一致。

所以核心思路就是从任务内容中解析出血缘关系了。

0x02 实现方式

一种比较简单的方式是采用正则

mod = r'((?<=s)(stage|ods|rods|dim|dwd|dws|app|rpt|dwb).S+(?=s))' # 反向匹配 (?<=s) 匹配任何空白字符 # 正向匹配 (?=s) 匹配空白字符 # (stage|ods|rods|dim|dwd|dws|app|rpt|dwb).S+ 非空子表达式一次或多次

第二种方案 druid parser

在 Druid 的 SQL 解析器中,有三个重要的组成部分,它们分别是:

  • Parser

  • 词法分析

  • 语法分析

  • AST(Abstract Syntax Tree,抽象语法树)

  • Visitor

这三者的关系如下图所示:

image.png

import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.statement.SQLUseStatement;
import com.alibaba.druid.sql.parser.ParserException;
import com.alibaba.druid.sql.visitor.SchemaStatVisitor;
import com.alibaba.druid.stat.TableStat;
import com.alibaba.druid.util.JdbcConstants;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;

public class SqlBloodParse {

    public static Map<String, TreeSet<String>> getFromTo (String sql) throws ParserException {
        Map<String, TreeSet<String>> result = new HashMap<String, TreeSet<String>>();
        List<SQLStatement> stmts = SQLUtils.parseStatements(sql, JdbcConstants.HIVE);
        TreeSet<String> selectSet = new TreeSet<String>();
        TreeSet<String> updateSet = new TreeSet<String>();
        TreeSet<String> insertSet = new TreeSet<String>();
        TreeSet<String> deleteSet = new TreeSet<String>();

        if (stmts == null) {
            return null;
        }

        String database = "DEFAULT";
        for (SQLStatement stmt : stmts) {
            SchemaStatVisitor statVisitor = SQLUtils.createSchemaStatVisitor(stmts,JdbcConstants.HIVE);
            if (stmt instanceof SQLUseStatement) {
                database = ((SQLUseStatement) stmt).getDatabase().getSimpleName();
            }
            stmt.accept(statVisitor);
            Map<TableStat.Name, TableStat> tables = statVisitor.getTables();

            if (tables != null) {
                final String db = database;
                for (Map.Entry<TableStat.Name, TableStat> table : tables.entrySet()) {
                    TableStat.Name tableName = table.getKey();
                    TableStat stat = table.getValue();

                    if (stat.getCreateCount() > 0 || stat.getInsertCount() > 0) { //create
                        String insert = tableName.getName();
                        if (!insert.contains("."))
                            insert = db + "." + insert;
                        insertSet.add(insert);
                    } else if (stat.getSelectCount() > 0) { //select
                        String select = tableName.getName();
                        if (!select.contains("."))
                            select = db + "." + select;
                        selectSet.add(select);
                    }else if (stat.getUpdateCount() > 0 ) { //update
                        String update = tableName.getName();
                        if (!update.contains("."))
                            update = db + "." + update;
                        updateSet.add(update);
                    }else if (stat.getDeleteCount() > 0) { //delete
                        String delete = tableName.getName();
                        if (!delete.contains("."))
                            delete = db + "." + delete;
                        deleteSet.add(delete);
                    }
                }
            }
        }

        result.put("select",selectSet);
        result.put("insert",insertSet);
        result.put("update",updateSet);
        result.put("delete",deleteSet);

        return result;
    }

    public static void main(String[] args) {
        String sql = "select * from " +
                "(select * from supindb.student d where dt='20190202')a " +
                "left join " +
                "(select * from supindb.college c where dt='20190202')b " +
                " on a.uid=b.uid " +
                "where a.uid > 0";

        Map<String, TreeSet<String>> getfrom = getFromTo(sql);

        for (Map.Entry<String, TreeSet<String>> entry : getfrom.entrySet()){
            System.out.println("================");
            System.out.println("key=" + entry.getKey());
            for (String table : entry.getValue()){
                System.out.println(table);
            }
        }
    }
}

参考

  1. https://www.jianshu.com/p/7d741f1888c1
  2. https://www.jianshu.com/p/437aa22ea3ca
  3. https://github.com/alibaba/druid/wiki/SQL-Parser