apache-atlas-hbase-bridge-源码分析

Posted 风的心愿

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了apache-atlas-hbase-bridge-源码分析相关的知识,希望对你有一定的参考价值。

元数据类型

Hbase元数据类型, 包括命令空间、表、列族、列

public enum HBaseDataTypes 
    // Classes
    HBASE_NAMESPACE,
    HBASE_TABLE,
    HBASE_COLUMN_FAMILY,
    HBASE_COLUMN;

    public String getName() 
        return name().toLowerCase();
    

Hbase元数据采集实现
1)批量采集HBaseBridge
2)实时变更 HBaseAtlasCoprocessor
虽然定义了HBASE_COLUMN,但是实际上是没有实现的,毕竟HBASE_COLUMN是动态添加的。

执行流程

HBaseBridge 执行流程如下图所示

源码分析

HBaseBridge #main

public class HBaseBridge 
   
//...
    private final String        metadataNamespace;
    private final AtlasClientV2 atlasClientV2;
    private final Admin         hbaseAdmin;

    public static void main(String[] args) 
        int exitCode = EXIT_CODE_FAILED;
        AtlasClientV2 atlasClientV2  =null;

        try 
            Options options = new Options();
            options.addOption("n","namespace", true, "namespace");
            options.addOption("t", "table", true, "tablename");
            options.addOption("f", "filename", true, "filename");

            CommandLineParser parser            = new BasicParser();
            CommandLine       cmd               = parser.parse(options, args);
            String            namespaceToImport = cmd.getOptionValue("n");
            String            tableToImport     = cmd.getOptionValue("t");
            String            fileToImport      = cmd.getOptionValue("f");
            Configuration     atlasConf         = ApplicationProperties.get();
            String[]          urls              = atlasConf.getStringArray(ATLAS_ENDPOINT);

           //...

            if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) 
                String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput();
                atlasClientV2 = new AtlasClientV2(urls, basicAuthUsernamePassword);
             else 
                UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
                atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), urls);
            
//...
            HBaseBridge importer = new HBaseBridge(atlasConf, atlasClientV2);
            if (StringUtils.isNotEmpty(fileToImport)) 
                File f = new File(fileToImport);
                if (f.exists() && f.canRead()) 
                    BufferedReader br   = new BufferedReader(new FileReader(f));
                    String         line = null;

                    while((line = br.readLine()) != null) 
                        String val[] = line.split(":");
                        if (ArrayUtils.isNotEmpty(val)) 
                            //...
                            importer.importHBaseEntities(namespaceToImport, tableToImport);
                        
                    
                    exitCode = EXIT_CODE_SUCCESS;
                 else 
                    LOG.error("Failed to read the file");
                
             else 
                importer.importHBaseEntities(namespaceToImport, tableToImport);
                exitCode = EXIT_CODE_SUCCESS;
            
         catch(ParseException e) 
           //...
         catch(Exception e) 
            //...
        finally 
            //...
        
    

HBaseBridge#importHBaseEntities

importHBaseEntities 只要负责处理namespaceToImport和tableToImport参数,然后执行相应的流程

private boolean importHBaseEntities(String namespaceToImport, String tableToImport) throws Exception 
    boolean ret = false;

    if (StringUtils.isEmpty(namespaceToImport) && StringUtils.isEmpty(tableToImport)) 
        // when both NameSpace and Table options are not present
        importNameSpaceAndTable();
        ret = true;
     else if (StringUtils.isNotEmpty(namespaceToImport)) 
        // When Namespace option is present or both namespace and table options are present
        importNameSpaceWithTable(namespaceToImport, tableToImport);
        ret = true;
     else  if (StringUtils.isNotEmpty(tableToImport)) 
        importTable(tableToImport);
        ret = true;
    

    return ret;

导入所有的命名空间和表

namespaceToImport和tableToImport均为空,导入所有的namespace和table

private void importNameSpaceAndTable() throws Exception 

    NamespaceDescriptor[] namespaceDescriptors = hbaseAdmin.listNamespaceDescriptors();

    if (ArrayUtils.isNotEmpty(namespaceDescriptors)) 
        for (NamespaceDescriptor namespaceDescriptor : namespaceDescriptors) 
            String namespace = namespaceDescriptor.getName();
            importNameSpace(namespace);
        
    

    TableDescriptor[] htds = hbaseAdmin.listTables();
    if (ArrayUtils.isNotEmpty(htds)) 
        for (TableDescriptor htd : htds) 
            String tableName = htd.getTableName().getNameAsString();
            importTable(tableName);
        
    

导入指定的命名空间

namespaceToImport不为空,导入指定的namespace和namespace下的table

private void importNameSpaceWithTable(String namespaceToImport, String tableToImport) throws Exception 
    importNameSpace(namespaceToImport);

    List<TableDescriptor> hTableDescriptors = new ArrayList<>();

    if (StringUtils.isEmpty(tableToImport)) 
// 导入指定namespace
        List<NamespaceDescriptor> matchingNameSpaceDescriptors = getMatchingNameSpaces(namespaceToImport);
        if (CollectionUtils.isNotEmpty(matchingNameSpaceDescriptors)) 
            hTableDescriptors = getTableDescriptors(matchingNameSpaceDescriptors);
        
     else 
        tableToImport = namespaceToImport +":" + tableToImport;
        TableDescriptor[] htds = hbaseAdmin.listTables(Pattern.compile(tableToImport));
        hTableDescriptors.addAll(Arrays.asList(htds));
    

    if (CollectionUtils.isNotEmpty(hTableDescriptors)) 
        for (TableDescriptor htd : hTableDescriptors) 
            String tblName = htd.getTableName().getNameAsString();
            importTable(tblName);
        
    

导入指定的表

tableToImport不为空,导入指定的table和table的命名空间。importTable会处理表、列族的实体,没有处理列

public void importTable(final String tableName) throws Exception 
    String            tableNameStr = null;
    TableDescriptor[] htds         = hbaseAdmin.listTables(Pattern.compile(tableName));

    if (ArrayUtils.isNotEmpty(htds)) 
        for (TableDescriptor htd : htds) 
            String tblNameWithNameSpace    = htd.getTableName().getNameWithNamespaceInclAsString();
            String tblNameWithOutNameSpace = htd.getTableName().getNameAsString();

            if (tableName.equals(tblNameWithNameSpace)) 
                tableNameStr = tblNameWithNameSpace;
             else if (tableName.equals(tblNameWithOutNameSpace)) 
                tableNameStr = tblNameWithOutNameSpace;
             else 
                // when wild cards are used in table name
                if (tblNameWithNameSpace != null) 
                    tableNameStr = tblNameWithNameSpace;
                 else if (tblNameWithOutNameSpace != null) 
                    tableNameStr = tblNameWithOutNameSpace;
                
            

            byte[]                 nsByte       = htd.getTableName().getNamespace();
            String                 nsName       = new String(nsByte);
            NamespaceDescriptor    nsDescriptor = hbaseAdmin.getNamespaceDescriptor(nsName);
            AtlasEntityWithExtInfo entity       = createOrUpdateNameSpace(nsDescriptor);
            ColumnFamilyDescriptor[]    hcdts        = htd.getColumnFamilies();
// 处理表、列族,没有处理列
            createOrUpdateTable(nsName, tableNameStr, entity.getEntity(), htd, hcdts);
        
     else 
        throw new AtlasHookException("No Table found for the given criteria. Table = " + tableName);
    

createOrUpdateTable处理表、列族列实体,比较简单这里就不详细描述

导入命名空间

public void importNameSpace(final String nameSpace) throws Exception 
    List<NamespaceDescriptor> matchingNameSpaceDescriptors = getMatchingNameSpaces(nameSpace);

    if (CollectionUtils.isNotEmpty(matchingNameSpaceDescriptors)) 
        for (NamespaceDescriptor namespaceDescriptor : matchingNameSpaceDescriptors) 
            createOrUpdateNameSpace(namespaceDescriptor);
        
     else 
        throw new AtlasHookException("No NameSpace found for the given criteria. NameSpace = " + nameSpace);
    

createOrUpdateNameSpace处理命名空间实体,比较简单这里就不详细描述

以上是关于apache-atlas-hbase-bridge-源码分析的主要内容,如果未能解决你的问题,请参考以下文章