SchemaRegestry组件原生的类和方法无法实现flink消费kafka的数据动态调整schema的情况--未彻底解决

Posted jia-tong

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SchemaRegestry组件原生的类和方法无法实现flink消费kafka的数据动态调整schema的情况--未彻底解决相关的知识,希望对你有一定的参考价值。

0、前提知识储备

Conflurent公司的SchemaRegestry组件的基本了解和使用

一、背景:

0.组件版本

flink:1.14

1.链路调整情况

原先链路:oracle-->OGG-->kafka-->flink-->数据库\\湖\\仓

实现链路:oracle-->OGG-->kafka(搭配conflurent公司的SchemaRegestry组件使用)-->flink-->数据库\\湖\\仓

2.链路调整缘由:

原链路中的kafka存储的数据格式是avro,每次源端oracle表做schema变更时,下游的相关程序都需要做停程序处理,费时费力,对运维不友好。

预期链路加入SchemaRegestry组件,它天然对avro格式数据支持,并且可以实现动态调整源端schema但不会要求程序手动停止。可以有效解耦链路的上下游,更加灵活,减少运维操作。

3.约束条件

目前项目组负责OGG和kafka的日常维护,flink程序及后续链路由其他项目组(包括我们自己的项目组)独立开发。因为SchemaRegestry组件是由我们负责引入的,在kafka之后链路上的项目组都需要做相应变更,所以需要我们项目组出一个样例代码(此代码逻辑已经实现并全链路跑通)。为使对下游代码改造的影响最小,要求个项目组在flink消费者程序中引入我们编写的反序列化代码,而不是实现自定义的SourceFunction。

二、目前困境

1.通过上面的背景可知,下游在改造时仍需要引入我们自定义的反序列化类,虽然这个类也是对flink原生类的一种具体实现。但是这种方式仍然不太友好,但是当前原生API不支持,找了官网和社区也没有发现有效的解决办法,大家都是在原生基础上按需进行封装。

2. 1.14版本的flink官网中存在AvroDeserializationSchema.forGeneric(...)这种方式,但是需要传入静态schema。目前schema是不确定的,需要根据消费信息中存储的id值去拿到对应的schema版本,这样也存在一个问题:初始化未进行消费时是无法拿到schema的,并且你也无法去解析消费信息对象去动态拿到id值进而拿到对应的schema。这样就成了一个死结。

三、目前的解决办法

1.对原生的反序列化类KafkaDeserializationSchema<GenericRecord>进行封装,实现只传入schemaregestryURL不需要传入schema就可以进行反序列化的操作,将schema变更和消费程序进行解耦。

四、未来优化方向

1.在引入SchemaRegestry组件后,优化flink消费kafka中的avro格式数据的方法,解耦schema的变更和程序运行之间的联系,确保flink消费程序可以实现:在初始化链路中无对应topic的数据时不会报错;当链路中存在积压数据的情况下,依然可以进行schema的变更,程序按照kafka的offset值顺序消费(先消费旧数据再消费新数据),并且在新旧数据的连接处可以自动实现schema的转换。

2.按照上述第一条的描述,可能需要程序在消费每条数据时拿到消息自带的id号并和缓存的schema进行比对,比对成功则反序列化数据;对比不成功则按照新id值重新获取schema文件并进行缓存,再重复上面的步骤。这样可能消费速率会比较慢,影响整个链路运行性能,具体影响多少需要进行仔细测试才能知道。

无法在反应中呈现另一个类组件内的类组件

【中文标题】无法在反应中呈现另一个类组件内的类组件【英文标题】:Cannot render a class component inside another class component in react 【发布时间】:2017-08-20 22:12:19 【问题描述】:

我正在尝试在我的登录页面中呈现此登录表单,但我收到“登录表单未定义”。我不能将一个类组件渲染到另一个类组件中,还是某些状态造成了冲突,或者我没有看到错字?

此外,这样做的正确方法是什么?

文件夹树:SignIn(index.js, actions.js, LoginForm.js)


LoginForm.js:

import React from 'react';
import styled from 'styled-components';
import  login  from './actions';
import  connect  from 'react-redux';

import validateInput from '../../../server/middlewares/routes/shared/validation/loginvalidation';

const Form = styled.form`
    position: absolute;
    left: 40%;
    top: 20%;
`;

const Input = styled.input`
    border-bottom: 2px solid red;
    background-color: transparent;
`;

const Button = styled.button`
    background-color: red;
    width: 100px;
    height: 50px;
`;

class LoginForm extends React.Component 
    constructor(props) 
        super(props);
        this.state = 
            identifier: '',
            password: '',
            errors: ,
            isLoading: false
        ;

        this.onSubmit = this.onSubmit.bind(this);
        this.onChange = this.onChange.bind(this);
    

    isValid() 
        const  errors, isValid  = validateInput(this.state);

        if (!isValid) 
            this.setState( errors );
        

        return isValid;
    

    onSumit(e) 
        e.preventDefault();
        if (this.isValid()) 
            this.setState( errors: , isLoading: true );
            this.props.login(this.state).then(
                (res) => this.context.router.push('/'),
                (err) => this.setState( errors: err.data.errors, isLoading: false )
            );
        
    

    onChange(e) 
        this.setState( [e.target.name]: e.target.value );
    

    render() 
        const  errors, identifier, password, isLoading  = this.state;

        return (
            <Form onSubmit=this.onSubmit>
                <h1> Login </h1>

                <Input
                  field="identifier"
                  label="Username / Email"
                  value=identifier
                  error=errors.indentifier
                  onChange=this.onChange
                />

                 <Input
                  field="password"
                  label="Password"
                  value=password
                  error=errors.password
                  onChange=this.onChange
                  type="password"
                />

                <Button disabled=isLoading> LOGIN </Button>

            </Form>
        )
    


// LoginForm.propTypes = 
//     login: React.PropTypes.func.isRequired
// 

// LoginForm.contextTypes = 
//     router: React.PropTypes.object.isRequired
// 

// export default connect(null,  login )(LoginForm);

export default LoginForm;

index.js:

import React from 'react';
import styled from 'styled-components';
import  connect  from 'react-redux';

import LoginForm from './LoginForm';

const FormWrapper = styled.div`
    position: absolute;
    left: 40%;
    top: 30%;
    height: 400px;
    width: 400px;
    background-color: red;
`;

class LoginPage extends React.Component 
  render() 
    return (
      <div>
        <h1>weEWRWER</h1>
      <FormWrapper>
         <LoginForm />
      </FormWrapper>
      </div>
    )
  


export default LoginPage;

【问题讨论】:

【参考方案1】:

这只是一个错字 onSumit -> onSubmit 在第 49 行的 LoginForm

【讨论】:

以上是关于SchemaRegestry组件原生的类和方法无法实现flink消费kafka的数据动态调整schema的情况--未彻底解决的主要内容,如果未能解决你的问题,请参考以下文章

如何将 vue 组件上的类和样式属性传递给 $attrs 等不同的元素?

Es6 程序中的类和构造函数

原生php登陆注册

如何避免 Java 中的类和包之间的名称冲突?

java类和接口的区别

Locust 的类和方法