即使我在生产者配置中指定了压缩类型,kafka 代理也没有压缩我更大的消息
Posted
技术标签:
【中文标题】即使我在生产者配置中指定了压缩类型,kafka 代理也没有压缩我更大的消息【英文标题】:kafka broker is not gzipping my bigger size message , even though i specified compression type in the producer configuration 【发布时间】:2018-05-21 15:20:27 【问题描述】:下面是我的生产者配置,如果你看到他们的压缩类型为 gzip,即使我提到了压缩类型,为什么消息没有发布并且它失败了
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, edi856KafkaConfig.getBootstrapServersConfig());
props.put(ProducerConfig.RETRIES_CONFIG, edi856KafkaConfig.getRetriesConfig());
props.put(ProducerConfig.BATCH_SIZE_CONFIG, edi856KafkaConfig.getBatchSizeConfig());
props.put(ProducerConfig.LINGER_MS_CONFIG, edi856KafkaConfig.getIntegerMsConfig());
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, edi856KafkaConfig.getBufferMemoryConfig());
***props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");***
props.put(Edi856KafkaProducerConstants.SSL_PROTOCOL, edi856KafkaConfig.getSslProtocol());
props.put(Edi856KafkaProducerConstants.SECURITY_PROTOCOL, edi856KafkaConfig.getSecurityProtocol());
props.put(Edi856KafkaProducerConstants.SSL_KEYSTORE_LOCATION, edi856KafkaConfig.getSslKeystoreLocation());
props.put(Edi856KafkaProducerConstants.SSL_KEYSTORE_PASSWORD, edi856KafkaConfig.getSslKeystorePassword());
props.put(Edi856KafkaProducerConstants.SSL_TRUSTSTORE_LOCATION, edi856KafkaConfig.getSslTruststoreLocation());
props.put(Edi856KafkaProducerConstants.SSL_TRUSTSTORE_PASSWORD, edi856KafkaConfig.getSslTruststorePassword());
**props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");**
并且错误出现在下面
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1170632 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
2017-12-07_12:34:10.037 [http-nio-8080-exec-1] ERROR c.tgt.trans.producer.Edi856Producer - Exception while writing mesage to topic= ''
org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 1170632 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
并且想要我们需要使用的消费者配置,我想要消费者端的 kafka 消息的字符串表示
【问题讨论】:
【参考方案1】:不幸的是,您在使用 Kafka 中的新 Producer 实现时遇到了一个相当奇怪的问题。
尽管 Kafka 在代理级别应用的消息大小限制适用于单个压缩记录集(可能是多条消息),但新生产者目前对之前的记录应用 max.request.size
限制任何压缩。
这已在 https://issues.apache.org/jira/browse/KAFKA-4169 中捕获(创建于 2016 年 9 月 14 日,在撰写本文时尚未解决)。
如果您确定消息的压缩大小(加上记录集的任何开销)将小于代理配置的max.message.bytes
,您可能 无需更改代理上的任何配置,就可以通过增加生产者上max.request.size
属性的值而侥幸逃脱。这将允许生产者代码接受预压缩负载的大小,然后将其压缩并发送到代理。
但是需要注意的是,如果 Producer 尝试发送对于代理配置而言太大的请求,代理将拒绝该消息,这将由您的应用程序来正确处理。
【讨论】:
确认在kafka python中创建Producer为KafkaProducer(compression_type='gzip', max_request_size=2 ** 20 * 10) # 10 MBs
得到消息【参考方案2】:
只需阅读错误消息 :)
The message is 1170632 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration
消息大于 1 MByte,这是 Apache Kafka 允许的默认值。要允许大消息检查How can I send large messages with Kafka (over 15MB)?中的答案
【讨论】:
我只是生产者,我无法更改代理端配置,我已经检查了那些代理端配置,但我无权访问代理,我仍然只是生产者。 抱歉 - 但您需要访问代理配置 :(以上是关于即使我在生产者配置中指定了压缩类型,kafka 代理也没有压缩我更大的消息的主要内容,如果未能解决你的问题,请参考以下文章
Scala 编译器中的错误:java.lang.AssertionError:断言失败(即使在 Eclipse 中指定了项目依赖项)
没有这样的文件或目录:'tesseract': 'tesseract' 即使在 pytesseract.py 中指定了在哪里可以找到 tesseract