Opengauss同步数据至kafka实例
Posted 麒思妙想
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Opengauss同步数据至kafka实例相关的知识,希望对你有一定的参考价值。
配置
postgresql.conf
必须修改 wal_levle = logical
酌情根据实际情况修改 wal_sender_timeout
,max_wal_sender
, max_repocation_slots
pg_hba.conf
允许用户replication的访问,以及针对ip的访问限制例如host repication jacky 0.0.0.0/0 md5
docker 模拟kafka集群
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
environment:
DC_NAME: qnib
ports:
- "2181:2181"
privileged: true
zkui:
image: maauso/zkui
# dns: 127.0.0.1
depends_on:
- zookeeper
ports:
- "9090:9090"
environment:
ZKLIST: zookeeper:2181
privileged: true
kafka:
image: wurstmeister/kafka
depends_on: [ zookeeper ]
ports:
- "9092:9092"
- "9094:9094"
environment:
# KAFKA_ADVERTISED_HOST_NAME: 0.0.0.0
# KAFKA_LISTENERS: PLAINTEXT://kafka:9092
# KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: INTERNAL://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
kafka-manager:
image: sheepkiller/kafka-manager
depends_on: [ zookeeper,kafka ]
ports:
- "9000:9000"
environment:
ZK_HOSTS: zookeeper:2181
代码实例
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.opengauss.PGProperty;
import org.opengauss.jdbc.PgConnection;
import org.opengauss.replication.LogSequenceNumber;
import org.opengauss.replication.PGReplicationStream;
import java.nio.ByteBuffer;
import java.sql.DriverManager;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class App
public static String SOURCEURL = "jdbc:opengauss://127.0.0.1:5432/t1";
public static String USER = "tj";
public static String PASSWD = "Dafei1288@";
public static String TOPIC = "pg_test";//定义主题
public static final String BROKERS_ADDRESS = "127.0.0.1:9094";
// public static final int REQUEST_REQUIRED_ACKS = 1;
// public static final String CLIENT_ID = "producer_test_id";
public static void main(String[] args) throws Exception
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// props.put(ProducerConfig.ACKS_CONFIG, String.valueOf(REQUEST_REQUIRED_ACKS));
// props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer(props);
Properties properties = new Properties();
PGProperty.USER.set(properties, USER);
PGProperty.PASSWORD.set(properties, PASSWD);
PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
PGProperty.REPLICATION.set(properties, "database");
PGProperty.PREFER_QUERY_MODE.set(properties, "simple");
Class.forName("org.opengauss.Driver");
PgConnection conn = (PgConnection) DriverManager.getConnection(SOURCEURL, properties);
System.out.println("connection success!");
String slotName = "replication_slot";
String lsn = "22DBF70";
LogSequenceNumber waitLSN = LogSequenceNumber.valueOf(lsn);
PGReplicationStream stream = conn
.getReplicationAPI()
.replicationStream()
.logical()
.withSlotName(slotName)
.withSlotOption("include-xids", false)
.withSlotOption("skip-empty-xacts", true)
.withStartPosition(waitLSN)
// .withSlotOption("parallel-decode-num", 10) //解;解码线程并发度
// .withSlotOption("white-table-list", "public.logic_test") //白名单列表
// .withSlotOption("standby-connection", true) //强制备机解码
// .withSlotOption("decode-style", "t") //解码格式
// .withSlotOption("sending-bacth", 1) //批量发送解码结果
.start();
while (true)
ByteBuffer byteBuffer = stream.readPending();
if (byteBuffer == null)
TimeUnit.MILLISECONDS.sleep(10L);
continue;
int offset = byteBuffer.arrayOffset();
byte[] source = byteBuffer.array();
int length = source.length - offset;
String res = new String(source, offset, length);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, res);
kafkaProducer.send(record);
System.out.println("send ok ==> "+res);
//如果需要flush lsn,根据业务实际情况调用以下接口
LogSequenceNumber lastRecv = stream.getLastReceiveLSN();
System.out.println(lastRecv);
// stream.setFlushedLSN(lastRecv);
// stream.forceUpdateStatus();
创作打卡挑战赛
赢取流量/现金/CSDN周边激励大奖
以上是关于Opengauss同步数据至kafka实例的主要内容,如果未能解决你的问题,请参考以下文章
TIDB - 使用 TICDC 将数据同步至下游 Kafka 中