NiFi 一键自动升级Nar包
Posted 青冬
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了NiFi 一键自动升级Nar包相关的知识,希望对你有一定的参考价值。
NiFi进行统一nar包组件升级
序
auth : Hadi
since : 2022-3-9 10:54:29
NiFi界面上,每次更新了nar包后,总是会涉及到手动changeVersion的操作:
但,如果这个nar包涉及到的组件实例过多,每次都一个一个的去点击,那实在是太麻烦了。所以身为程序员不能做这种机器化的工作,所以写了简单的模拟请求给直接供大家参考。由于代码在公司开发,这里仅仅发送第一个测试版本,但放心可靠!
模拟请求
这个版本仅使用http请求,且无认证的方式进行更改,如果需要包装,那么就包装一下就可以了。
通过手动发送请求,可以在浏览器F12界面看到对应的请求体,我们模拟请求体就可以了。(或者直接nifi doc api中也有相关介绍)
模拟一下json请求:
一共需要6个参数,clientId/cmdVersion/identifier/group/artifact/version。后面三项是我们nar包的签名,前面三项是可以通过修改前访问组件获取。(直接访问http://NIFI_IP/nifi-api/processId)
搜寻对应Processors
通过nifi doc api查询可以得到访问/resources 可以获取到页面上的所有组件的信息,筛选出processors部分,然后判断nar包签名是否匹配。
完成后,保存在HashMap中,等待执行。
功能菜单
由于可能有多个nar需要更改,或者多个NiFi集群需要更改,那么我们就弄个交互页面来独立成为单独的NiFi Util工具包吧!
总结
如果需要HTTPS 和 认证版本只需要进行很简单的包装即可。本工具需要在nifi集群健康,且组件STOP或DISABLE状态,如果想添加功能,只需要模拟STOP请求即可(也很简单)。框架已经搭好,各位随意使用,不过转载或使用请标注作者和出处,谢谢。
注意
-
无法更改的组件不会被更改(RUNNING状态)。
-
如果组件的relationship和配置项不一致,还是需要手动处理。
-
如果对应版本没加载到NiFi中,不会进行版本更改。
-
代码简单,各位想要的东西自寻更改。
-
如果该代码涉及到部分隐私泄露,请立即私信联系,谢谢各位。
源码
简单的构建就弄完了,具体免费源代码请移步[这里]https://download.csdn.net/download/qq_36610426/84050357)。
主要实现类:
package com.huangyichun.nifi.util;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Scanner;
/**
* @author huangyichun107
* @since 2021-11-23 10:40:03
* <p>
* 用于NiFi的nar包统一升级改造
*/
public class ChangeNiFiNarVersion
static
Properties prop = System.getProperties();
prop.setProperty("http.proxyHost", "127.0.0.1"); //如果不需要代理请注释静态方法。
prop.setProperty("http.proxyPort", "3128");
/**
* testIp 127.0.0.1
*/
static String NIFI_IP = "";
static String NIFI_PORT = "8848";
static String NIFI_URL = "http://" + NIFI_IP + ":" + NIFI_PORT + "/nifi-api";
final static Map<String, Processor> PROCESSOR_MAP = new HashMap<>();
final static String STRING_LIST = "\\tq | exit | out :\\n" +
"\\t out the program.\\n" +
"\\t \\n" +
"\\tip $nifiIp:\\n" +
"\\t set nifi ip.\\n" +
"\\n" +
"\\tversion $version:\\n" +
"\\t set version.\\n" +
"\\n" +
"\\tartifact $artifact:\\n" +
"\\t set artifact.\\n" +
"\\n" +
"\\tgroup $group:\\n" +
"\\t set group.\\n" +
"\\n" +
"\\tsearch:\\n" +
"\\t need ip/version/artifact/group to seach older processors.\\n" +
"\\n" +
"\\tchange:\\n" +
"\\t need ip/version/artifact/group to change older processors.\\n" +
"\\n" +
"\\tcheck:\\n" +
"\\t to list parameters now.";
final static Scanner scanner = new Scanner(System.in);
public static void main(String[] args)
System.out.println(STRING_LIST);
System.out.println("尽量输入慢一点,java嘛,不寒碜");
String version = "";
String artifact = "";
String group = "";
String cmd;
while (true)
System.out.println("cmd plz.");
cmd = scanner.nextLine();
String in = cmd.split(" ")[0];
switch (in)
case "q":
case "exit":
case "out":
System.out.println("log out");
return;
case "ip":
final String[] s = cmd.split(" ");
if (s.length != 2)
System.out.println("ip $nifiIp:\\n set nifi ip.\\n ip cmd need 1 parameter");
break;
if (!s[1].matches("\\\\d1,3\\\\.\\\\d1,3\\\\.\\\\d1,3\\\\.\\\\d1,3"))
System.out.println("wrong nifi ip");
break;
NIFI_IP = s[1];
NIFI_URL = "http://" + NIFI_IP + ":10111/nifi-api";
PROCESSOR_MAP.clear();
break;
case "version":
final String[] versions = cmd.split(" ");
if (versions.length != 2)
System.out.println("version $version:\\n set nifi version.\\n version cmd need 1 parameter");
break;
version = versions[1];
break;
case "artifact":
final String[] artifacts = cmd.split(" ");
if (artifacts.length != 2)
System.out.println("artifact $artifact:\\n set nifi artifact.\\n artifact cmd need 1 parameter");
break;
artifact = artifacts[1];
break;
case "group":
final String[] groups = cmd.split(" ");
if (groups.length != 2)
System.out.println("group $group:\\n set nifi group.\\n group cmd need 1 parameter");
break;
group = groups[1];
break;
case "search":
if (NIFI_IP.isEmpty() || version.isEmpty() || artifact.isEmpty() || group.isEmpty())
System.out.println("search need ip/version/artifact/group to change older processors.");
System.out.println("+----------------------------- search start -------------------------------+");
search(artifact, group);
if (PROCESSOR_MAP.isEmpty())
System.out.println("find no processors");
System.out.println("+----------------------------- search done --------------------------------+");
break;
case "change":
if (NIFI_IP.isEmpty() || version.isEmpty() || artifact.isEmpty() || group.isEmpty())
System.out.println("change need ip/version/artifact/group to change older processors.");
if (PROCESSOR_MAP.isEmpty())
System.out.println("find no processors");
break;
System.out.println("+----------------------------- change start -------------------------------+");
change(artifact, group, version);
System.out.println("+----------------------------- change done --------------------------------+");
break;
case "check":
System.out.println("-----------------------------------------------------------------------------------------------------------");
System.out.println("\\tip:" + NIFI_IP + "\\n\\tversion:" + version + "\\n\\tartifact:" + artifact + "\\n\\tgroup:" + group);
System.out.println("-----------------------------------------------------------------------------------------------------------");
break;
default:
System.out.println("wrong cmd.");
System.out.println(STRING_LIST);
public static void change(String artifact, String group, String version)
if (PROCESSOR_MAP.isEmpty())
System.out.println("find 0 processor to change, plz search again.");
for (Map.Entry<String, Processor> stringProcessorEntry : PROCESSOR_MAP.entrySet())
final Processor value = stringProcessorEntry.getValue();
System.out.println(value.identifier + ":" + doChangeVersionJsonObject(value.identifier, artifact, group, version));
public static void search(String myArtifact, String myGroup)
PROCESSOR_MAP.clear();
final String resourcesJson = doGetRequest(NIFI_URL + "/resources");
final JSONObject jsonObject = JSONObject.parseObject(resourcesJson);
final JSONArray resourcesArray = jsonObject.getJSONArray("resources");
for (Object o : resourcesArray)
JSONObject processorId = o instanceof JSONObject ? ((JSONObject) o) : null;
if (processorId == null)
continue;
final String identifier = processorId.getString("identifier");
// 判断是否是 processors
if (!identifier.startsWith("/processors"))
continue;
// 获取组件信息 json
final String processorsJson = doGetRequest(NIFI_URL + identifier);
final JSONObject processorsJsonObject = JSONObject.parseObject(processorsJson);
// 获取组件 component
final JSONObject component = processorsJsonObject.getJSONObject("component");
final String type = component.getString("type");
// 获取组件 bundle
final JSONObject bundle = component.getJSONObject("bundle");
final String artifact = bundle.getString("artifact");
final String version = bundle.getString("version");
final String group = bundle.getString("group");
if (artifact.equals(myArtifact) && group.equals(myGroup))
// 获取配置 config
final JSONObject config = component.getJSONObject("config");
final JSONObject properties = config.getJSONObject("properties");
System.out.println(String.join("\\t", "find", artifact, version, group, identifier, type));
PROCESSOR_MAP.put(identifier, new Processor(identifier, type, config, properties, artifact));
static boolean doChangeVersionJsonObject(String identifier, String artifact, String group, String version)
// 在更新的时候,必须获取到最新的 Processors version
final String path = NIFI_URL + identifier;
final JSONObject result = JSONObject.parseObject(doGetRequest(path));
final JSONObject revision = result.getJSONObject("revision");
final String clientId = revision.getString("clientId");
int cmdVersion = revision.getInteger("version");
identifier = identifier.replaceFirst("/processors/", "");
String json = JSONObject.parseObject("" +
"\\n" +
" \\"revision\\": \\n" +
" \\"clientId\\": \\"" + clientId + "\\",\\n" +
" \\"version\\": " + cmdVersion + "\\n" +
" ,\\n" +
" \\"disconnectedNodeAcknowledged\\": false,\\n" +
" \\"component\\": \\n" +
" \\"id\\": \\"" + identifier + "\\",\\n" +
" \\"bundle\\": \\n" +
" \\"group\\": \\"" + group + "\\",\\n" +
" \\"artifact\\": \\"" + artifact + "\\",\\n" +
" \\"version\\": \\"" + version + "\\"\\n" +
" \\n" +
" \\n" +
"").toString();
doPutRequest(path, json);
return true;
/**
* 发起一个url GET 请求并反回String
*
* @param path http://www.baidu.com
* @return the html or data
*/
public static String doGetRequest(String path)
StringBuilder stringBuilder = new StringBuilder();
final String http = "http";
if (!path.startsWith(http))
System.out.println("the url NOT begin with http. plz check: " + path);
return stringBuilder.toString();
URL url;
try
url = new URL(path);
catch (MalformedURLException e)
System.out.println("the url create failed. plz check: " + path);
return stringBuilder.toString();
try
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestProperty("accept", "*/*");
conn.setRequestProperty("connection", "Keep-Alive");
conn.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)");
conn.setDoOutput(true);
conn.setDoInput(true);
conn.setRequestMethod("GET");
conn.setConnectTimeout(50000);
conn.connect();
try (InputStream is = conn.getInputStream())
// 返回的就是一个json 不会很长直接推string吧, 后续也不想用JSON来解析了,直接String解析吧
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String str;
while ((str = br.readLine()) != null)
str = new String(str.getBytes(), StandardCharsets.UTF_8);
stringBuilder.append(str);
catch (IOException e)
e.printStackTrace();
conn.disconnect();
catch (MalformedURLException e)
System.out.println("IOException plz check:" + e);
e.printStackTrace();
catch (IOException e)
System.out.println启动期间的 Apache NiFi 自定义 NAR NoClassDefFoundError