纯钧chunjun的http-x插件修复

Posted 实习小生

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了纯钧chunjun的http-x插件修复相关的知识,希望对你有一定的参考价值。

简介

chunjun是一款基于flink的开源数据同步工具,官方文档,其提供了很多flink官方未提供的插件供大家来使用,特别是达梦插件在国产化环境中很方便!

本次介绍的是chunjun中的一款http插件,通过该插件可以实现基于http请求的流处理,但是目前官方提供的http插件在以SQL模式运行的时候是有一些问题的,所以我花了些时间将问题排查修复下,并且添加了一个分页的新功能。下面是具体的过程。

问题

按照官方文档使用http插件运行的时候,会报下面的错误

    java.lang.RuntimeException: request data error,msg is prevResponse value is  exception java.lang.RuntimeException: key data.id on msg=请求成功, total=0, code=0000, data=[name=第0臭桑, id=0, name=第1臭桑, id=1], timestamp=2023-02-12 16:39:12 is not a json
	
    at com.dtstack.chunjun.util.MapUtil.getValueByKey(MapUtil.java:161)
	at com.dtstack.chunjun.connector.http.client.ResponseParse.buildResponseByKey(ResponseParse.java:63)
	at com.dtstack.chunjun.connector.http.client.JsonResponseParse.next(JsonResponseParse.java:95)
	at com.dtstack.chunjun.connector.http.client.HttpClient.doExecute(HttpClient.java:272)
	at com.dtstack.chunjun.connector.http.client.HttpClient.execute(HttpClient.java:184)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

	at com.dtstack.chunjun.connector.http.inputformat.HttpInputFormat.nextRecordInternal(HttpInputFormat.java:118)
	at com.dtstack.chunjun.source.format.BaseRichInputFormat.nextRecord(BaseRichInputFormat.java:198)
	at com.dtstack.chunjun.source.format.BaseRichInputFormat.nextRecord(BaseRichInputFormat.java:68)
	at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:133)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:267)

解决方案【修改源码】

修改HttpOptions 添加两个配置
// 第一个是配置是数据主体,一般http请求都是标准的统一返回值,有状态码 状态信息 数据主体,我们需要的数据都在数据主体里面的
public static final ConfigOption<String> DATA_SUBJECT =
            ConfigOptions.key("dataSubject")
                    .stringType()
                    .defaultValue("$data")
                    .withDescription("response data subject");
// 这个配置是发送http请求的周期,如果设置2的话 就会重复请求两次的 如果是-1则会一直重复请求
    public static final ConfigOption<Long> CYCLES =
            ConfigOptions.key("cycles")
                    .longType()
                    .defaultValue(1L)
                    .withDescription("request cycle");
修改HttpDynamicTableFactory
    @Override
    public Set<ConfigOption<?>> optionalOptions() 
        Set<ConfigOption<?>> options = new HashSet<>();
        options.add(HttpOptions.DECODE);
        options.add(HttpOptions.METHOD);
        options.add(HttpOptions.HEADER);
        options.add(HttpOptions.BODY);
        options.add(HttpOptions.PARAMS);
        options.add(HttpOptions.INTERVALTIME);
        options.add(HttpOptions.COLUMN);
        options.add(HttpOptions.DELAY);
        // 下面这俩是对应了with参数
        options.add(HttpOptions.DATA_SUBJECT);
        options.add(HttpOptions.CYCLES);
        return options;
    
    private HttpRestConfig getRestapiConf(ReadableConfig config) 
        Gson gson = GsonUtil.setTypeAdapter(new Gson());
        HttpRestConfig httpRestConfig = new HttpRestConfig();
        httpRestConfig.setIntervalTime(config.get(HttpOptions.INTERVALTIME));
        httpRestConfig.setUrl(config.get(HttpOptions.URL));
        httpRestConfig.setDecode(config.get(HttpOptions.DECODE));
        httpRestConfig.setRequestMode(config.get(HttpOptions.METHOD));
        // 将上面配置的参数信息封装到http请求配置里面
        httpRestConfig.setDataSubject(config.get(HttpOptions.DATA_SUBJECT));
        httpRestConfig.setCycles(config.get(HttpOptions.CYCLES));
        httpRestConfig.setParam(
                gson.fromJson(
                        config.get(HttpOptions.PARAMS),
                        new TypeToken<List<MetaParam>>() .getType()));
        httpRestConfig.setHeader(
                gson.fromJson(
                        config.get(HttpOptions.HEADER),
                        new TypeToken<List<MetaParam>>() .getType()));
        httpRestConfig.setBody(
                gson.fromJson(
                        config.get(HttpOptions.BODY),
                        new TypeToken<List<MetaParam>>() .getType()));
        httpRestConfig.setColumn(
                gson.fromJson(
                        config.get(HttpOptions.COLUMN),
                        new TypeToken<List<FieldConf>>() .getType()));
        return httpRestConfig;
    
修改HttpRowConverter
// 修改类的泛型 原来是 String 现在需要修改成Map<String,Object>
public class HttpRowConverter
        extends AbstractRowConverter<Map<String, Object>, RowData, RowData, LogicalType>
    // 上面修改了泛型后 这里重写的方法参数类型也会是map类型,在别的地方调用这个方法的时候,传递的就是map类型数据
    // 但是源码里面用String接收的,这样会导致调用方法的时候就出错,而且单步调试的时候就是进不到这个方法的,只能进入到类上
    // 前面传递过来的就是map类型数据了,源码里面,这个方法里的前两行是将字符串转成map的,那也就是说这两行代码不需要了,删除即可
    @Override
    public RowData toInternal(Map<String, Object> result) throws Exception 
        GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount());
        List<String> columns = rowType.getFieldNames();
        for (int pos = 0; pos < columns.size(); pos++) 
            Object value =
                    MapUtil.getValueByKey(
                            result, columns.get(pos), httpRestConfig.getFieldDelimiter());
            if (value instanceof LinkedTreeMap) 
                value = value.toString();
            
            genericRowData.setField(pos, toInternalConverters.get(pos).deserialize(value));
        
        return genericRowData;
    

经过上面的修改之后,可以在with参数里面指定数据主体和请求周期,直接在localTest类运行即可成功!下面是示例的sql

CREATE TABLE source
(
    id   int,
    name varchar
) WITH (
      \'connector\' = \'http-x\'
      ,\'url\' = \'http://127.0.0.1:8090/test/test\'
      ,\'intervalTime\' = \'3000\'
      ,\'method\' = \'get\'
      ,\'cycles\' = \'5\',
      ,\'dataSubject\' = \'$data\'
      ,\'decode\' = \'json\'
      ,\'paging\' = \'true\'
      ,\'pagingParam\' = \'pageNumber\'
      ,\'params\' = \'["key": "pageNumber","value":1,"type":"int","key": "pageSize","value":100,"type":"int"]\'
      ,\'column\' = \'[
              
                "name": "id",
                "type": "int"
              ,
              
                "name": "name",
                "type": "String"
              
            ]\'
      );

CREATE TABLE sink
(
    id   int,
    name varchar
) WITH (
      \'connector\' = \'stream-x\'
      );

insert into sink
select *
from source u;

后续

目前在上面的基础上,我又加了分页查询的功能,后面有时间会编辑此博客加上分页的源码修改

最后

转载请注明来处

以上是关于纯钧chunjun的http-x插件修复的主要内容,如果未能解决你的问题,请参考以下文章

保姆级教程!玩转 ChunJun 详细指南

ChunJun任务提交-源码分析

技术干货|如何利用 ChunJun 实现数据离线同步?

ChunJun-JDBC轮询增量更新-源码分析

详解 Flink Catalog 在 ChunJun 中的实践之路

ChunJun FTP Connector 功能扩展解读