当前位置: 首页 > news >正文

企业网站建设论坛洛阳青峰网络公司做网站

企业网站建设论坛,洛阳青峰网络公司做网站,宁波高等级公路建设指挥部网站,网络营销10大平台文章目录 场景现象问题处理 场景现象 kafka作为消息队列,作为前端设备数据到后端消费的渠道,也被多个不同微服务消费一个服务与前端边缘计算设备建立socket消息,接收实时交通事件推送,再将事件发送到kafka里面。此处使用的是Spri…

文章目录

    • 场景现象
    • 问题处理

场景现象

  • kafka作为消息队列,作为前端设备数据到后端消费的渠道,也被多个不同微服务消费
  • 一个服务与前端边缘计算设备建立socket消息,接收实时交通事件推送,再将事件发送到kafka里面。此处使用的是Spring Kafka,普通的将事件列表数据转化为字符串后发送
  • 事件信息,需要入库和实时推送,也需要被第三方服务调用,第三方也采取消费kafka的方式
  • 这个服务,作为kafka的消费者,读取事件数据,发送短信给指定用户,但此处使用的是Spring.cloud.stream,函数式编程,绑定kafka的topic
  • 此服务消费时,加了一些打印。但是事件推送到kafka后,看不到任何日志打印,也没有报错。但是使用kafka命令查看,却已经被该消费者消费掉了
  • 消费的源代码如下:
    @Beanpublic Consumer<String> boxEventTopicConsumer(HWTrafficEventManager hwTrafficEventManager) {log.info("Consumer Received (boxTopic): " + hwTrafficEventManager);return value -> {log.info("Consumer Received (boxTopic): " + value);// todo : do somethings};}

问题处理

  • 消费某个多分区topic时,发现只有其中一个分区数据有打印,其他都没有,但是查看却发现都消费掉了
2023-09-21 13:45:13.706  INFO 2184 --- [container-0-C-1] c.newatc.data.kafka.KafkaConfiguration   : Consumer Received (boxTopic): {"equipmentNo":0,"laneNum":4,"statisticsCycle":60,"time":1695275100110,"vehicleLanes":[{"averageParkingNum":0.5,"averageSpeed":6.1,"lane":1,"laneFlow":2,"maxQueueLength":12,"nonVehicleAverageTravelTime":0.0,"overStopLineFlow":2,"pedestrianAverageTravelTime":0,"vehicleAverageDelay":15.3,"vehicleAverageTravelTime":0.2},{"averageParkingNum":0.0,"averageSpeed":35.7,"lane":2,"laneFlow":1,"maxQueueLength":0,"nonVehicleAverageTravelTime":0.0,"overStopLineFlow":1,"pedestrianAverageTravelTime":0,"vehicleAverageDelay":0.0,"vehicleAverageTravelTime":0.0},{"averageParkingNum":0.0,"averageSpeed":0.0,"lane":3,"laneFlow":3,"maxQueueLength":0,"nonVehicleAverageTravelTime":0.0,"overStopLineFlow":0,"pedestrianAverageTravelTime":0,"vehicleAverageDelay":0.0,"vehicleAverageTravelTime":0.0},{"averageParkingNum":0.0,"averageSpeed":25.4,"lane":4,"laneFlow":1,"maxQueueLength":0,"nonVehicleAverageTravelTime":0.0,"overStopLineFlow":5,"pedestrianAverageTravelTime":0,"vehicleAverageDelay":0.0,"vehicleAverageTravelTime":1.6}]}
  • 最终测试发现,列表转化为jsonarray字符串,无法打印。但是单条数据json字符串却可以
  • 将代码改为String[]后发现报错,代码如下
    @Beanpublic Consumer<String[]> boxEventTopicConsumer(HWTrafficEventManager hwTrafficEventManager) {log.info("Consumer Received (boxTopic): " + hwTrafficEventManager);return value -> {log.info("Consumer Received (boxTopic): " + value.length);};}
  • 报错如下:
2023-09-21 11:47:59.684 ERROR 9112 --- [container-0-C-1] o.s.kafka.listener.DefaultErrorHandler   : Backoff none exhausted for boxTopic-3@29941org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot deserialize value of type `[Ljava.lang.String;` from Object value (token `JsonToken.START_OBJECT`)at [Source: (byte[])"[{"equipmentNo":0,"eventCode":8,"eventType":1,"laneTurn":0,"relationNo":1,"time":1695267981221}]"; line: 1, column: 2] (through reference chain: java.lang.Object[][0]); nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `[Ljava.lang.String;` from Object value (token `JsonToken.START_OBJECT`)at [Source: (byte[])"[{"equipmentNo":0,"eventCode":8,"eventType":1,"laneTurn":0,"relationNo":1,"time":1695267981221}]"; line: 1, column: 2] (through reference chain: java.lang.Object[][0]), failedMessage=GenericMessage [payload=byte[96], headers={kafka_offset=29941, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3c31888e, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=3, kafka_receivedMessageKey=[B@3fcf7bd1, kafka_receivedTopic=boxTopic, kafka_receivedTimestamp=1695267871945, contentType=application/json, kafka_groupId=data-center}]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2683)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2649)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2609)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2536)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2427)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2305)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1979)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1364)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1355)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1247)at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot deserialize value of type `[Ljava.lang.String;` from Object value (token `JsonToken.START_OBJECT`)at [Source: (byte[])"[{"equipmentNo":0,"eventCode":8,"eventType":1,"laneTurn":0,"relationNo":1,"time":1695267981221}]"; line: 1, column: 2] (through reference chain: java.lang.Object[][0]); nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `[Ljava.lang.String;` from Object value (token `JsonToken.START_OBJECT`)at [Source: (byte[])"[{"equipmentNo":0,"eventCode":8,"eventType":1,"laneTurn":0,"relationNo":1,"time":1695267981221}]"; line: 1, column: 2] (through reference chain: java.lang.Object[][0])at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:237)at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertFromInternal(ApplicationJsonMessageMarshallingConverter.java:113)at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:185)at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:176)at org.springframework.cloud.function.context.config.SmartCompositeMessageConverter.fromMessage(SmartCompositeMessageConverter.java:48)at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertInputMessageIfNecessary(SimpleFunctionRegistry.java:1282)at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertInputIfNecessary(SimpleFunctionRegistry.java:1057)at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:696)at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:551)at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:84)at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:754)at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:586)at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216)at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:397)at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:83)at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:454)at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:428)at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:125)at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:119)at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:42)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2629)... 12 common frames omitted
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `[Ljava.lang.String;` from Object value (token `JsonToken.START_OBJECT`)at [Source: (byte[])"[{"equipmentNo":0,"eventCode":8,"eventType":1,"laneTurn":0,"relationNo":1,"time":1695267981221}]"; line: 1, column: 2] (through reference chain: java.lang.Object[][0])at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1741)at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1515)at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1420)at com.fasterxml.jackson.databind.DeserializationContext.extractScalarFromObject(DeserializationContext.java:932)at com.fasterxml.jackson.databind.deser.std.StdDeserializer._parseString(StdDeserializer.java:1292)at com.fasterxml.jackson.databind.deser.std.StringArrayDeserializer.deserialize(StringArrayDeserializer.java:166)at com.fasterxml.jackson.databind.deser.std.StringArrayDeserializer.deserialize(StringArrayDeserializer.java:25)at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4674)at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3723)at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:223)... 45 common frames omitted
  • 通过日志可以看出,是在接收数据的解析时发生了问题,jsonarray的字符串无法接受解析
  • 在消息发生时,在正常消息后面,加一个“-”,可以正常接收了,就是解析时需要先删掉尾字符再转json
  • 后面又发现,如果是作为字符接收,再转为字符串,是可以的
http://www.yayakq.cn/news/662117/

相关文章:

  • 网站不用域名解绑工程合同模板
  • discuz做资讯网站合适吗顺企网南昌网站建设
  • 手机 网站制作怀化网站建设设计
  • 电商网站商品页的优化目标是什么?百度网站收录
  • 网站建设增长率wordpress建2个网站
  • 网站网址模板东莞黄江网站建设
  • 温州网站建设平台申请注册商标需要多少钱
  • 站长工具ip地址查询域名敦煌网网站评价
  • 更换网站logo太原建设网站制作
  • dw做的网站 图片的路径什么是营销型手机网站建设
  • 安徽建设相关网站哪里做网站做的好
  • 哪里能注册免费的网站国外开发网站
  • 网站后台seo优化如何做图们网络推广
  • 网站建设相关资料建英语网站好
  • 罗湖做网站58建设网站思维导图
  • html 公司网站 代码下载一亩地开发多少钱
  • 网站三网合一案例xcache wordpress
  • 查询网 域名查询南宁seo结算
  • 无锡网站推广优化费用精仿源码社区网站源码
  • 做外贸的如何上国外网站wordpress二维码分享
  • 内网门户网站wordpress屏蔽评论
  • 什么是企业型网站wordpress的.htaccess
  • 那些网站权重高珠海移动互联网开发
  • 怎么做网站切图前端开发有哪些
  • 举重运动员 做网站海南人才网
  • 用源码做自己的网站无线网站应建设在什么地方
  • 网页设计工作岗位及薪资直通车优化推广
  • 网站接做网站单一对一视频软件开发
  • 鄂州网站建设推广报价400免费服务电话申请
  • 网站怎么做站长统计南通专业家纺网站建设