SchemaRegestry组件原生的类和方法无法实现flink消费kafka的数据动态调整schema的情况--未彻底解决
Posted jia-tong
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SchemaRegestry组件原生的类和方法无法实现flink消费kafka的数据动态调整schema的情况--未彻底解决相关的知识,希望对你有一定的参考价值。
0、前提知识储备
Conflurent公司的SchemaRegestry组件的基本了解和使用
一、背景:
0.组件版本
flink:1.14
1.链路调整情况
原先链路:oracle-->OGG-->kafka-->flink-->数据库\\湖\\仓
实现链路:oracle-->OGG-->kafka(搭配conflurent公司的SchemaRegestry组件使用)-->flink-->数据库\\湖\\仓
2.链路调整缘由:
原链路中的kafka存储的数据格式是avro,每次源端oracle表做schema变更时,下游的相关程序都需要做停程序处理,费时费力,对运维不友好。
预期链路加入SchemaRegestry组件,它天然对avro格式数据支持,并且可以实现动态调整源端schema但不会要求程序手动停止。可以有效解耦链路的上下游,更加灵活,减少运维操作。
3.约束条件
目前项目组负责OGG和kafka的日常维护,flink程序及后续链路由其他项目组(包括我们自己的项目组)独立开发。因为SchemaRegestry组件是由我们负责引入的,在kafka之后链路上的项目组都需要做相应变更,所以需要我们项目组出一个样例代码(此代码逻辑已经实现并全链路跑通)。为使对下游代码改造的影响最小,要求个项目组在flink消费者程序中引入我们编写的反序列化代码,而不是实现自定义的SourceFunction。
二、目前困境
1.通过上面的背景可知,下游在改造时仍需要引入我们自定义的反序列化类,虽然这个类也是对flink原生类的一种具体实现。但是这种方式仍然不太友好,但是当前原生API不支持,找了官网和社区也没有发现有效的解决办法,大家都是在原生基础上按需进行封装。
2. 1.14版本的flink官网中存在AvroDeserializationSchema.forGeneric(...)这种方式,但是需要传入静态schema。目前schema是不确定的,需要根据消费信息中存储的id值去拿到对应的schema版本,这样也存在一个问题:初始化未进行消费时是无法拿到schema的,并且你也无法去解析消费信息对象去动态拿到id值进而拿到对应的schema。这样就成了一个死结。
三、目前的解决办法
1.对原生的反序列化类KafkaDeserializationSchema<GenericRecord>进行封装,实现只传入schemaregestryURL不需要传入schema就可以进行反序列化的操作,将schema变更和消费程序进行解耦。
四、未来优化方向
1.在引入SchemaRegestry组件后,优化flink消费kafka中的avro格式数据的方法,解耦schema的变更和程序运行之间的联系,确保flink消费程序可以实现:在初始化链路中无对应topic的数据时不会报错;当链路中存在积压数据的情况下,依然可以进行schema的变更,程序按照kafka的offset值顺序消费(先消费旧数据再消费新数据),并且在新旧数据的连接处可以自动实现schema的转换。
2.按照上述第一条的描述,可能需要程序在消费每条数据时拿到消息自带的id号并和缓存的schema进行比对,比对成功则反序列化数据;对比不成功则按照新id值重新获取schema文件并进行缓存,再重复上面的步骤。这样可能消费速率会比较慢,影响整个链路运行性能,具体影响多少需要进行仔细测试才能知道。
无法在反应中呈现另一个类组件内的类组件
【中文标题】无法在反应中呈现另一个类组件内的类组件【英文标题】:Cannot render a class component inside another class component in react 【发布时间】:2017-08-20 22:12:19 【问题描述】:我正在尝试在我的登录页面中呈现此登录表单,但我收到“登录表单未定义”。我不能将一个类组件渲染到另一个类组件中,还是某些状态造成了冲突,或者我没有看到错字?
此外,这样做的正确方法是什么?
文件夹树:SignIn(index.js, actions.js, LoginForm.js)
LoginForm.js:
import React from 'react';
import styled from 'styled-components';
import login from './actions';
import connect from 'react-redux';
import validateInput from '../../../server/middlewares/routes/shared/validation/loginvalidation';
const Form = styled.form`
position: absolute;
left: 40%;
top: 20%;
`;
const Input = styled.input`
border-bottom: 2px solid red;
background-color: transparent;
`;
const Button = styled.button`
background-color: red;
width: 100px;
height: 50px;
`;
class LoginForm extends React.Component
constructor(props)
super(props);
this.state =
identifier: '',
password: '',
errors: ,
isLoading: false
;
this.onSubmit = this.onSubmit.bind(this);
this.onChange = this.onChange.bind(this);
isValid()
const errors, isValid = validateInput(this.state);
if (!isValid)
this.setState( errors );
return isValid;
onSumit(e)
e.preventDefault();
if (this.isValid())
this.setState( errors: , isLoading: true );
this.props.login(this.state).then(
(res) => this.context.router.push('/'),
(err) => this.setState( errors: err.data.errors, isLoading: false )
);
onChange(e)
this.setState( [e.target.name]: e.target.value );
render()
const errors, identifier, password, isLoading = this.state;
return (
<Form onSubmit=this.onSubmit>
<h1> Login </h1>
<Input
field="identifier"
label="Username / Email"
value=identifier
error=errors.indentifier
onChange=this.onChange
/>
<Input
field="password"
label="Password"
value=password
error=errors.password
onChange=this.onChange
type="password"
/>
<Button disabled=isLoading> LOGIN </Button>
</Form>
)
// LoginForm.propTypes =
// login: React.PropTypes.func.isRequired
//
// LoginForm.contextTypes =
// router: React.PropTypes.object.isRequired
//
// export default connect(null, login )(LoginForm);
export default LoginForm;
index.js:
import React from 'react';
import styled from 'styled-components';
import connect from 'react-redux';
import LoginForm from './LoginForm';
const FormWrapper = styled.div`
position: absolute;
left: 40%;
top: 30%;
height: 400px;
width: 400px;
background-color: red;
`;
class LoginPage extends React.Component
render()
return (
<div>
<h1>weEWRWER</h1>
<FormWrapper>
<LoginForm />
</FormWrapper>
</div>
)
export default LoginPage;
【问题讨论】:
【参考方案1】:这只是一个错字 onSumit
-> onSubmit
在第 49 行的 LoginForm
中
【讨论】:
以上是关于SchemaRegestry组件原生的类和方法无法实现flink消费kafka的数据动态调整schema的情况--未彻底解决的主要内容,如果未能解决你的问题,请参考以下文章