如何在 Kafka 中设置消息的大小?

Posted

技术标签:

【中文标题】如何在 Kafka 中设置消息的大小?【英文标题】:How do I set the size of messages in Kafka? 【发布时间】:2016-06-12 00:39:15 【问题描述】:

我目前使用的是 Kafka 0.9.0.1。根据我发现的一些来源,设置消息大小的方法是修改server.properties中的以下键值。

message.max.bytes replica.fetch.max.bytes fetch.message.max.bytes

我的server.properties 文件实际上有这些设置。

message.max.bytes=10485760
replica.fetch.max.bytes=20971520
fetch.message.max.bytes=10485760

其他可能相关的设置如下。

socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

但是,当我尝试发送有效负载为 4 到 6 MB 的消息时,消费者永远不会收到任何消息。生产者似乎在没有抛出任何异常的情况下发送消息。如果我确实发送了较小的有效负载(例如

知道我在配置设置方面做错了什么吗?

这是发送消息的示例代码。

Producer<String, byte[]> producer = new KafkaProducer<>(getProducerProps());
File dir = new File("/path/to/dir");
for(String s : dir.list()) 
  File f = new File(dir, s);
  byte[] data = Files.readAllBytes(f.toPath());
  Payload payload = new Payload(data); //a simple pojo to store payload
  String key = String.valueOf(System.currentTimeMillis());
  byte[] val = KryoUtil.toBytes(payload); //custom util to use kryo to get bytes[]
  producer.send(new ProducerRecord<>("test", key, val));

producer.close();

这是接收消息的示例代码。

KafkaConsumer consumer = new KafkaConsumer<>(getConsumerProps());
consumer.subscribe(Arrays.asList("test"));
while(true) 
  ConsumerRecord<String, byte[]> records = consumer.poll(100);
  for(ConsumerRecord<String, byte[]> record : records) 
    long offset = record.offset();
    String key = record.key();
    byte[] val = record.value();
    Payload payload = (Payload)KryoUtil.toObject(val, Payload.class); //custom util to use kryo to deserialize back to object
    System.out.println(
      System.format("offset=%d, key=%s", offset, key));
  

以下是为生产者和消费者填充属性文件的方法。

public static Properties getProducerProps() 
  Properties props = new Properties();
  props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092");
  props.put("acks", "all");
  props.put("retries", 0);
  props.put("batch.size", 16384);
  props.put("linger.ms", 1);
  props.put("buffer.memory", 33554432);
  props.put("compression.type", "snappy");
  props.put("max.request.size", 10485760); //need this
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
  return props;


public static Properties getConsumerProps() 
  Properties props = new Properties();
  props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092");
  props.put("group.id", "test");
  props.put("enable.auto.commit", "true");
  props.put("auto.commit.interval.ms", "1000");
  props.put("session.timeout.ms", "30000");
  props.put("max.partition.fetch.bytes", 10485760); //need this too
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
  return props;

【问题讨论】:

【参考方案1】:

简, 不要使用fetch.message.max.bytes 首先因为这是一个来自消费者的属性并且不会进入 server.properties 文件,其次是因为它是用于旧版本的消费者,而不是 在创建 Consumer 时使用 max.partition.fetch.bytes 作为用于实例化它的属性的一部分。

【讨论】:

我刚试过,但我得到了同样的效果。未收到“大”文件。我想知道它们是否甚至被发送,因为当消费者开始从主题中读取时,偏移量是连续的(例如 1、2、3 等......)。对我来说,制作人似乎甚至不会发送大文件? 事实证明,我需要为生产者设置max.request.size,为消费者设置max.partition.fetch.bytes。我会稍微修改一下代码,看看是否真的需要 max.partition.fetch.bytes 是的,事实证明我确实需要这两种设置。如果我没有设置max.partition.fetch.bytes,那么我会得到一个RecordTooLargeException 是的,你也需要 max.request.size,但既然你告诉我发送不是问题,我就没有太注意那个参数。你能接受答案吗?【参考方案2】:

您需要增加服务器端(如前所述)和客户端。

使用 kafka-python Producer 的 Python 示例:

producer = KafkaProducer(bootstrap_servers=brokers, max_request_size=1048576)

将 max_request_size 增加到所需的值,默认为 1048576。

【讨论】:

【参考方案3】:

max.fetch.bytes 选项也是可能的。

【讨论】:

以上是关于如何在 Kafka 中设置消息的大小?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 kafka sink 连接器中设置特定表?

如何从属性文件中设置Kafka参数?

如何在不使用 Docker 或 Windows Server 2016 上的 Confluent 平台的情况下在 Kafka 中设置 Debezium SQL Server 连接器?

如何在spring集成消息中设置JMS Header

如何在 Android 中设置 AlertDialog 的大小? [复制]

如何在 jekyll 中设置大小/旋转图像?