NiFi 一键自动升级Nar包

Posted 青冬

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了NiFi 一键自动升级Nar包相关的知识,希望对你有一定的参考价值。

NiFi进行统一nar包组件升级

auth : Hadi
since : 2022-3-9 10:54:29

NiFi界面上,每次更新了nar包后,总是会涉及到手动changeVersion的操作:

但,如果这个nar包涉及到的组件实例过多,每次都一个一个的去点击,那实在是太麻烦了。所以身为程序员不能做这种机器化的工作,所以写了简单的模拟请求给直接供大家参考。由于代码在公司开发,这里仅仅发送第一个测试版本,但放心可靠!

模拟请求

这个版本仅使用http请求,且无认证的方式进行更改,如果需要包装,那么就包装一下就可以了。

通过手动发送请求,可以在浏览器F12界面看到对应的请求体,我们模拟请求体就可以了。(或者直接nifi doc api中也有相关介绍)

模拟一下json请求:

一共需要6个参数,clientId/cmdVersion/identifier/group/artifact/version。后面三项是我们nar包的签名,前面三项是可以通过修改前访问组件获取。(直接访问http://NIFI_IP/nifi-api/processId)



搜寻对应Processors

通过nifi doc api查询可以得到访问/resources 可以获取到页面上的所有组件的信息,筛选出processors部分,然后判断nar包签名是否匹配。

完成后,保存在HashMap中,等待执行。

功能菜单

由于可能有多个nar需要更改,或者多个NiFi集群需要更改,那么我们就弄个交互页面来独立成为单独的NiFi Util工具包吧!

总结

如果需要HTTPS 和 认证版本只需要进行很简单的包装即可。本工具需要在nifi集群健康,且组件STOP或DISABLE状态,如果想添加功能,只需要模拟STOP请求即可(也很简单)。框架已经搭好,各位随意使用,不过转载或使用请标注作者和出处,谢谢。

注意

  • 无法更改的组件不会被更改(RUNNING状态)。

  • 如果组件的relationship和配置项不一致,还是需要手动处理。

  • 如果对应版本没加载到NiFi中,不会进行版本更改。

  • 代码简单,各位想要的东西自寻更改。

  • 如果该代码涉及到部分隐私泄露,请立即私信联系,谢谢各位。

源码

简单的构建就弄完了,具体免费源代码请移步[这里]https://download.csdn.net/download/qq_36610426/84050357)。

主要实现类:

package com.huangyichun.nifi.util;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

import java.io.*;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Scanner;

/**
 * @author huangyichun107
 * @since 2021-11-23 10:40:03
 * <p>
 * 用于NiFi的nar包统一升级改造
 */
public class ChangeNiFiNarVersion 
    static 
        Properties prop = System.getProperties();
        prop.setProperty("http.proxyHost", "127.0.0.1"); //如果不需要代理请注释静态方法。
        prop.setProperty("http.proxyPort", "3128");
    

    /**
     * testIp 127.0.0.1
     */
    static String NIFI_IP = "";
    static String NIFI_PORT = "8848";
    static String NIFI_URL = "http://" + NIFI_IP + ":" + NIFI_PORT + "/nifi-api";
    final static Map<String, Processor> PROCESSOR_MAP = new HashMap<>();
    final static String STRING_LIST = "\\tq | exit | out :\\n" +
            "\\t    out the program.\\n" +
            "\\t    \\n" +
            "\\tip $nifiIp:\\n" +
            "\\t    set nifi ip.\\n" +
            "\\n" +
            "\\tversion $version:\\n" +
            "\\t    set version.\\n" +
            "\\n" +
            "\\tartifact $artifact:\\n" +
            "\\t    set artifact.\\n" +
            "\\n" +
            "\\tgroup $group:\\n" +
            "\\t    set group.\\n" +
            "\\n" +
            "\\tsearch:\\n" +
            "\\t    need ip/version/artifact/group to seach older processors.\\n" +
            "\\n" +
            "\\tchange:\\n" +
            "\\t    need ip/version/artifact/group to change older processors.\\n" +
            "\\n" +
            "\\tcheck:\\n" +
            "\\t    to list parameters now.";
    final static Scanner scanner = new Scanner(System.in);

    public static void main(String[] args) 
        System.out.println(STRING_LIST);
        System.out.println("尽量输入慢一点,java嘛,不寒碜");
        String version = "";
        String artifact = "";
        String group = "";
        String cmd;
        while (true) 
            System.out.println("cmd plz.");
            cmd = scanner.nextLine();
            String in = cmd.split(" ")[0];
            switch (in) 
                case "q":
                case "exit":
                case "out":
                    System.out.println("log out");
                    return;

                case "ip":
                    final String[] s = cmd.split(" ");
                    if (s.length != 2) 
                        System.out.println("ip $nifiIp:\\n  set nifi ip.\\n  ip cmd need 1 parameter");
                        break;
                    
                    if (!s[1].matches("\\\\d1,3\\\\.\\\\d1,3\\\\.\\\\d1,3\\\\.\\\\d1,3")) 
                        System.out.println("wrong nifi ip");
                        break;
                    
                    NIFI_IP = s[1];
                    NIFI_URL = "http://" + NIFI_IP + ":10111/nifi-api";
                    PROCESSOR_MAP.clear();
                    break;
                case "version":
                    final String[] versions = cmd.split(" ");
                    if (versions.length != 2) 
                        System.out.println("version $version:\\n  set nifi version.\\n  version cmd need 1 parameter");
                        break;
                    
                    version = versions[1];
                    break;
                case "artifact":
                    final String[] artifacts = cmd.split(" ");
                    if (artifacts.length != 2) 
                        System.out.println("artifact $artifact:\\n  set nifi artifact.\\n  artifact cmd need 1 parameter");
                        break;
                    
                    artifact = artifacts[1];
                    break;
                case "group":
                    final String[] groups = cmd.split(" ");
                    if (groups.length != 2) 
                        System.out.println("group $group:\\n  set nifi group.\\n  group cmd need 1 parameter");
                        break;
                    
                    group = groups[1];
                    break;
                case "search":
                    if (NIFI_IP.isEmpty() || version.isEmpty() || artifact.isEmpty() || group.isEmpty()) 
                        System.out.println("search need ip/version/artifact/group to change older processors.");
                    
                    System.out.println("+----------------------------- search start -------------------------------+");
                    search(artifact, group);
                    if (PROCESSOR_MAP.isEmpty()) 
                        System.out.println("find no processors");
                    
                    System.out.println("+----------------------------- search done --------------------------------+");
                    break;
                case "change":
                    if (NIFI_IP.isEmpty() || version.isEmpty() || artifact.isEmpty() || group.isEmpty()) 
                        System.out.println("change need ip/version/artifact/group to change older processors.");
                    
                    if (PROCESSOR_MAP.isEmpty()) 
                        System.out.println("find no processors");
                        break;
                    
                    System.out.println("+----------------------------- change start -------------------------------+");
                    change(artifact, group, version);
                    System.out.println("+----------------------------- change done --------------------------------+");
                    break;
                case "check":
                    System.out.println("-----------------------------------------------------------------------------------------------------------");
                    System.out.println("\\tip:" + NIFI_IP + "\\n\\tversion:" + version + "\\n\\tartifact:" + artifact + "\\n\\tgroup:" + group);
                    System.out.println("-----------------------------------------------------------------------------------------------------------");
                    break;
                default:
                    System.out.println("wrong cmd.");
                    System.out.println(STRING_LIST);
            
        
    

    public static void change(String artifact, String group, String version) 
        if (PROCESSOR_MAP.isEmpty()) 
            System.out.println("find 0 processor to change, plz search again.");
        
        for (Map.Entry<String, Processor> stringProcessorEntry : PROCESSOR_MAP.entrySet()) 
            final Processor value = stringProcessorEntry.getValue();
            System.out.println(value.identifier + ":" + doChangeVersionJsonObject(value.identifier, artifact, group, version));
        
    

    public static void search(String myArtifact, String myGroup) 
        PROCESSOR_MAP.clear();
        final String resourcesJson = doGetRequest(NIFI_URL + "/resources");
        final JSONObject jsonObject = JSONObject.parseObject(resourcesJson);
        final JSONArray resourcesArray = jsonObject.getJSONArray("resources");
        for (Object o : resourcesArray) 
            JSONObject processorId = o instanceof JSONObject ? ((JSONObject) o) : null;
            if (processorId == null) 
                continue;
            
            final String identifier = processorId.getString("identifier");

            // 判断是否是 processors
            if (!identifier.startsWith("/processors")) 
                continue;
            

            // 获取组件信息 json
            final String processorsJson = doGetRequest(NIFI_URL + identifier);
            final JSONObject processorsJsonObject = JSONObject.parseObject(processorsJson);

            // 获取组件 component
            final JSONObject component = processorsJsonObject.getJSONObject("component");
            final String type = component.getString("type");

            // 获取组件 bundle
            final JSONObject bundle = component.getJSONObject("bundle");
            final String artifact = bundle.getString("artifact");
            final String version = bundle.getString("version");
            final String group = bundle.getString("group");
            if (artifact.equals(myArtifact) && group.equals(myGroup)) 
                // 获取配置 config
                final JSONObject config = component.getJSONObject("config");
                final JSONObject properties = config.getJSONObject("properties");
                System.out.println(String.join("\\t", "find", artifact, version, group, identifier, type));
                PROCESSOR_MAP.put(identifier, new Processor(identifier, type, config, properties, artifact));
            

        
    


    static boolean doChangeVersionJsonObject(String identifier, String artifact, String group, String version) 
        // 在更新的时候,必须获取到最新的 Processors version
        final String path = NIFI_URL + identifier;
        final JSONObject result = JSONObject.parseObject(doGetRequest(path));
        final JSONObject revision = result.getJSONObject("revision");
        final String clientId = revision.getString("clientId");
        int cmdVersion = revision.getInteger("version");
        identifier = identifier.replaceFirst("/processors/", "");
        String json = JSONObject.parseObject("" +
                "\\n" +
                "    \\"revision\\": \\n" +
                "        \\"clientId\\": \\"" + clientId + "\\",\\n" +
                "        \\"version\\": " + cmdVersion + "\\n" +
                "    ,\\n" +
                "    \\"disconnectedNodeAcknowledged\\": false,\\n" +
                "    \\"component\\": \\n" +
                "        \\"id\\": \\"" + identifier + "\\",\\n" +
                "        \\"bundle\\": \\n" +
                "            \\"group\\": \\"" + group + "\\",\\n" +
                "            \\"artifact\\": \\"" + artifact + "\\",\\n" +
                "            \\"version\\": \\"" + version + "\\"\\n" +
                "        \\n" +
                "    \\n" +
                "").toString();
        doPutRequest(path, json);
        return true;
    



    /**
     * 发起一个url GET 请求并反回String
     *
     * @param path http://www.baidu.com
     * @return the html or data
     */
    public static String doGetRequest(String path) 
        StringBuilder stringBuilder = new StringBuilder();
        final String http = "http";
        if (!path.startsWith(http)) 
            System.out.println("the url NOT begin with http. plz check: " + path);
            return stringBuilder.toString();
        
        URL url;
        try 
            url = new URL(path);
         catch (MalformedURLException e) 
            System.out.println("the url create failed. plz check: " + path);
            return stringBuilder.toString();
        
        try 
            HttpURLConnection conn = (HttpURLConnection) url.openConnection();
            conn.setRequestProperty("accept", "*/*");
            conn.setRequestProperty("connection", "Keep-Alive");
            conn.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)");
            conn.setDoOutput(true);
            conn.setDoInput(true);
            conn.setRequestMethod("GET");
            conn.setConnectTimeout(50000);
            conn.connect();
            try (InputStream is = conn.getInputStream()) 
                // 返回的就是一个json 不会很长直接推string吧, 后续也不想用JSON来解析了,直接String解析吧
                BufferedReader br = new BufferedReader(new InputStreamReader(is));
                String str;
                while ((str = br.readLine()) != null) 
                    str = new String(str.getBytes(), StandardCharsets.UTF_8);
                    stringBuilder.append(str);
                
             catch (IOException e) 
                e.printStackTrace();
            
            conn.disconnect();
         catch (MalformedURLException e) 
            System.out.println("IOException plz check:" + e);
            e.printStackTrace();
         catch (IOException e) 
            System.out.println启动期间的 Apache NiFi 自定义 NAR NoClassDefFoundError

NiFi - java.lang.NoSuchMethodError

NiFi 升级后迁移 DateFlow 的最佳方法是啥?

python 一键升级所有包

C# clickonce用处

Nifi01概念