当前位置: 首页 > news >正文

百度网站联系方式jsp网站维护

百度网站联系方式,jsp网站维护,网上企业名称预先核准系统,重庆市工程建设信息网官网新域名一、现象描述 使用Mac电脑本地启动spring-kakfa消费不到Kafka的消息,监控消费组的消息偏移量发现存在Lag的消息,但是本地客户端就是拉取不到,通过部署到公司k8s容器上消息却能正常消费! 本地启动的服务消费组监控 公司k8s容器服…

请添加图片描述

一、现象描述

使用Mac电脑本地启动spring-kakfa消费不到Kafka的消息,监控消费组的消息偏移量发现存在Lag的消息,但是本地客户端就是拉取不到,通过部署到公司k8s容器上消息却能正常消费!

本地启动的服务消费组监控
kafka消费组监控图
公司k8s容器服务消费组监控
kafka消费组监控图

二、环境信息

Spring Kafka版本: 2.1.13.RELEASE
Kafka Client版本: 1.0.2
Local JDK版本: Zulu 8.60.0.21-CA-macos-aarch64
K8s JDK版本: Oracle 1.8.0_202-b08

三、排查过程

  • 猜测是JDK版本或者JDK 对 Apple Silicon芯片兼容问题

  • Debug跟踪了KafkaConsumer poll过程,并没有发现任何异常,轮询拉取的线程正常循环执行,只是每次都拉取到 records 为0条。

  • 决定调整kafka 日志级别看下心跳是否正常,居然发现了有异常抛出,看到是snappy相关类NotClassFound

SLF4J: Failed toString() invocation on an object of type [org.apache.kafka.common.protocol.types.Struct]
Reported exception:
java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappyat org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:435)at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:466)at java.io.DataInputStream.readByte(DataInputStream.java:265)at org.apache.kafka.common.utils.ByteUtils.readVarint(ByteUtils.java:168)at org.apache.kafka.common.record.DefaultRecord.readFrom(DefaultRecord.java:292)at org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(DefaultRecordBatch.java:264)at org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:563)at org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:532)at org.apache.kafka.common.record.MemoryRecords.toString(MemoryRecords.java:292)at java.lang.String.valueOf(String.java:2994)at java.lang.StringBuilder.append(StringBuilder.java:136)at org.apache.kafka.common.protocol.types.Struct.toString(Struct.java:390)at java.lang.String.valueOf(String.java:2994)at java.lang.StringBuilder.append(StringBuilder.java:136)at org.apache.kafka.common.protocol.types.Struct.toString(Struct.java:384)at java.lang.String.valueOf(String.java:2994)at java.lang.StringBuilder.append(StringBuilder.java:136)at org.apache.kafka.common.protocol.types.Struct.toString(Struct.java:384)at org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:299)at org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:271)at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:233)at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:173)at ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293)at ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206)at ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223)at ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102)at ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84)at ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51)at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270)at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257)at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421)at ch.qos.logback.classic.Logger.filterAndLog_0_Or3Plus(Logger.java:383)at ch.qos.logback.classic.Logger.trace(Logger.java:437)at org.apache.kafka.common.utils.LogContext$KafkaLogger.trace(LogContext.java:135)at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:689)at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:469)at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:258)at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:297)at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:948)
[2023-09-15 14:02:27.248]^^A[TID: N/A]^^A[kafka-coordinator-heartbeat-thread | ingest-consume-group-follow-test-4]^^ATRACE^^Aorg.apache.kafka.clients.NetworkClient^^A[Consumer clientId=consumer-1, groupId=ingest-consume-group-follow-test-4] Completed receive from node 1 for FETCH with correlation id 15, received [FAILED toString()]
  • 如果了解 snappy-java这个依赖包的话,到这里就对拉取不到消息原因猜测的八九不离十了,因为 Kafka 服务端使用 snappy对息做了压缩并序列化为二进制进行传输,如果客户端在对消息的解压与反序列化过程中抛出异常,那么自然就拉取不到消息

  • 接着,解决一下snappy-java包的兼容问题,通过验证升级版本可以解决此问题。
    排除kafka-client包中 snappy-java v1.1.4版本依赖

<!-- spring-kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><!-- 排除 snappy-java 1.1.4 版本 --><exclusion><groupId>org.xerial.snappy</groupId><artifactId>snappy-java</artifactId></exclusion></exclusions></dependency>
  • 再引入高版本v1.1.8.4的依赖包
<dependency><groupId>org.xerial.snappy</groupId><artifactId>snappy-java</artifactId><version>1.1.8.4</version><scope>compile</scope>
</dependency>
  • 重新编译启动spring kafka客户端程序,消费问题解决~
    在这里插入图片描述

四、疑问解答

  1. 为什么Kafka Consumer poll消息过程没有异常抛出且可以正常运行?
    答:待补充
  2. 为什么调整日志级别为Trace才看到异常日志抛出?
    答:待补充
http://www.yayakq.cn/news/316522/

相关文章:

  • 怎么把现有网站开发php徐州焊接球网架公司
  • 电子商务网站建设岗位要求wordpress调用导航菜单
  • 宜兴建设局 审图中心 网站搭建商城到底哪家好
  • 网站搭建合同如何让百度口碑收录自己的网站
  • 外贸seo培训做网站排名优化有用吗
  • 松江建设投资有限公司网站线上营销推广方式都有哪些
  • 大学网站建设方案wordpress qq快捷登陆
  • 四川省城乡住房建设厅网站编写网站策划书
  • 学多久可以做网站 知乎注册一个公司一年需要多少钱
  • wordpress 3.3.1wordpress seo不好
  • 水果网站建设方案模板建站哪里有
  • 凡科建站平台昆明seo建站
  • 大同格泰网站建设网站建设与维护的论述题
  • 网站做跳转的意义wordpress被百度收录
  • 首页调用网站栏目id英文案例网站
  • 江门市建设工程投标网站云建站自动建站系统源码
  • asp.net 网站写好后如何运行网站模块
  • 高效网站建设咨询网站长图怎么做
  • 计算机网站建设和维护sem竞价教程
  • 长沙建网站一般要多少钱国外专业做汽配的网站
  • 有网站源码怎么做网站重庆网站建设的价格低
  • 广东建设工程注册中心网站网站怎么换域名
  • 常用的网站推广方法有哪些用自己的电脑做服务器建网站
  • 做网站怎么删除图片网站推广网站关键词排名怎么做
  • 做网站 使用权 所有权株洲专业建设网站
  • 能制作视频的软件厦门做网站优化公司
  • 网上做视频赚钱的网站有哪些wordpress 源码解读
  • 网站托管服务方案wordpress地图插件
  • 用vps建网站备案为什么要用模板建站?
  • 中山企业门户网站建设做网站一定要psd吗