百度网站排名优化室内设计效果图怎么画
大数据之kafka应用
- 2024启kafka
 - kafka常见命令
 - 生产上重放信息
 - jmx
 - jmx的配置和开启
 - jmx的使用
 - jmx例子一
 
- jmx例子二
 - jmx例子三
 - jmx例子四(special)
 
2024启kafka
kafka常见命令
-  
指定jmx端口启动
kafka lsof -i :9999JMX_PORT=9999 /opt/kafka_2.12-3.1.0/bin/kafka-server-start.sh -daemon /opt/kafka_2.12-3.1.0/config/server.properties -  
新建topic:副本不能大于kafka_server数
/opt/kafka_2.12-3.1.0/bin/kafka-topics.sh --bootstrap-server only:9092 --partitions 3 --replication-factor 1 --create --topic knowScript -  
查看topic的详情
/opt/kafka_2.12-3.1.0/bin/kafka-topics.sh --bootstrap-server only:9092 --describe --topic knowScript -  
罗列所有的topic
/opt/kafka_2.12-3.1.0/bin/kafka-topics.sh --bootstrap-server only:9092 --list -  
给topic拓展分区:只增大不减小
/opt/kafka_2.12-3.1.0/bin/kafka-topics.sh --bootstrap-server only:9092 --alter --topic knowScript --partitions 4 -  
查看topic的磁盘大小,单位为Byte
/opt/kafka_2.12-3.1.0/bin/kafka-log-dirs.sh --bootstrap-server only:9092 --topic-list knowScript --describe | grep -oP '(?<=size":)\d+' | awk '{ sum += $1 } END { print sum }' -  
删除topic
/opt/kafka_2.12-3.1.0/bin/kafka-topics.sh --bootstrap-server only:9092 --topic knowScript --delete -  
查看消费者组列表
/opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --list -  
查看特定消费者组的消费情况
/opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --describe --group consumeByOffsetId -  
新建一个消费者组:消费一个topic即可
/opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic consumerTopicByOffset --group makeGroup -  
删除一个消费者组
/opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --delete --group makeGroup -  
往特定topic生产message,不指定key
/opt/kafka_2.12-3.1.0/bin/kafka-console-producer.sh --bootstrap-server only:9092 --topic knowScript -  
往特定topic生产message,指定key
/opt/kafka_2.12-3.1.0/bin/kafka-console-producer.sh --bootstrap-server only:9092 --topic knowScript --property parse.key=true --property key.separator=, -  
消费指定topic的message:–from-beginning,默认为最近的消息消费
/opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic knowScript --from-beginning -  
消费topic的message,限制条数
/opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic knowScript --offset earliest --partition 0 --max-messages 3 -  
消费指定topic的message:指定分区的offset
/opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic knowScript --offset latest --partition 0/opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic knowScript --offset earliest --partition 0/opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic knowScript --offset 10 --partition 0 -  
查看kafka的log文件明文内容
/opt/kafka_2.12-3.1.0/bin/kafka-dump-log.sh --files /opt/kafka_2.12-3.1.0/data/kafka_logs/knowScript-0/00000000000000000000.log -deep-iteration --print-data-log -  
查看topic的message总数
-  
每个分区的最小offset
/opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server only:9092 -topic knowScript --time -2 -  
每个分区的最大offset
/opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server only:9092 -topic knowScript --time -1 
 -  
 -  
查看指定消费者组的消费情况offset和lag
/opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --describe --group consumeByOffsetId -  
重设指定消费者组消费指定的offset(每个分区)
-  
打印结果,没有执行(生产使用这个验证)
/opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --group consumeByOffsetId --reset-offsets --to-offset 4 --topic consumerTopicByOffset --dry-run -  
直接执行
/opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --group consumeByOffsetId --reset-offsets --to-offset 4 --topic consumerTopicByOffset --execute -  
重设指定消费者组指定分区的指定offset(topic后使用
:$partitionNum来设定指定的partition)/opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --group consumeByOffsetId --reset-offsets --to-offset 3 --topic consumerTopicByOffset:1 --execute 
 -  
 -  
指定消费者组从最新的offset进行消费(没有指定group,则全部都消费,指定了会从消费者组最新的offset开始消费)
/opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic consumerTopicByOffset --group consumeByOffsetId --from-beginning -  
生产message基准测试; num-records生产的message条数,through为-1表示不限制吞吐量,record-size表示每条record的大小为1024K, producer-props后面跟着kv的producer属性配置
/opt/kafka_2.12-3.1.0/bin/kafka-producer-perf-test.sh --topic test_producer_perf --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=only:9092 acks=-1 linger.ms=2000 compression.type=lz4 -  
消费message基准测试
/opt/kafka_2.12-3.1.0/bin/kafka-consumer-perf-test.sh --broker-list only:9092 --messages 10000000 --topic test_producer_perf 
生产上重放信息
-  
查看指定消费者组的消费情况offset
/opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --describe --group consumeByOffsetId -  
重设指定消费者组消费指定的offset(每个分区)
/opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --group consumeByOffsetId --reset-offsets --to-offset 4 --topic consumerTopicByOffset --execute-  
打印结果,没有执行
/opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --group consumeByOffsetId --reset-offsets --to-offset 4 --topic consumerTopicByOffset --dry-run -  
other:重设指定消费者组指定分区的指定offset(topic后使用
:partitionNum来设定指定的partition)/opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --group consumeByOffsetId --reset-offsets --to-offset 3 --topic consumerTopicByOffset:1 --dry-run 
 -  
 -  
指定消费者组从最新的offset进行消费(没有指定group,则全部都消费,指定了会从消费者组最新的offset开始消费)
/opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic consumerTopicByOffset --group consumeByOffsetId --from-beginning 
-  
指定分区指定offset进行开始消费(查看某消费者组在特定的分区丢失了哪些消息)
/opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic consumerTopicByOffset --offset 10 --partition 0 
jmx
jmx的配置和开启
-  
配置jmx
JMX_PORT=9999 /opt/kafka_2.12-3.1.0/bin/kafka-server-start.sh -daemon /opt/kafka_2.12-3.1.0/config/server.properties- 配置文件配置更稳定
 
 
jmx的使用
jmx例子一
-  
参考url的MBEAN
https://docs.confluent.io/platform/current/kafka/monitoring.html#kafka-monitoring-metrics-broker
-  
找到category类别下面的MBEAN:
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic={topicName} -  
获取到对应的MBEAN的指标值
/opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi-  
输出列有
"time","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:Count","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:EventType","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:FifteenMinuteRate","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:FiveMinuteRate","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:MeanRate","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:OneMinuteRate","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:RateUnit" -  
寻找上面的name的值,如 FifteenMinuteRate
/opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi --date-format "YYYY-MM-dd HH:mm:ss" --attributes FifteenMinuteRate --reporting-interval 5000- –date-format格式化时间 、 --reporting-interval指定更新时间间隔
 
 
 -  
 
 -  
 
jmx例子二
-  
找到category类别下面的MBEAN
kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs -  
获取到对应的MBEAN的指标值
/opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi-  
输出列有
"time","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:50thPercentile","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:75thPercentile","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:95thPercentile","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:98thPercentile","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:999thPercentile","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:99thPercentile","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:Count","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:Max","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:Mean","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:Min","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:StdDev"-  
寻找上面的name的值,如 75thPercentile、 95thPercentile
/opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi --attributes 75thPercentile/opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi --attributes 95thPercentile 
 -  
 
 -  
 
jmx例子三
-  
找到category类别下面的MBEAN
kafka.network:type=RequestChannel,name=ResponseQueueSize -  
获取到对应的MBEAN的指标值
/opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.network:type=RequestChannel,name=ResponseQueueSize --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi-  
输出列有
-  
"time","kafka.network:type=RequestChannel,name=ResponseQueueSize:Value" 
 -  
 -  
寻找上面的name的值,如 Value
-  
/opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.network:type=RequestChannel,name=ResponseQueueSize --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi --attributes Value 
 -  
 
 -  
 
jmx例子四(special)
-  
并不是所有的MBEAN都有对应的jmx指标值;例如:kafka.log:type=Log,name=LogEndOffset
 -  
下面的命令会报错
/opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.log:type=Log,name=LogEndOffset --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi -  
但是从
jconsole localhost:9999的弹窗MBEAN可以找到;上面更深层的信息/opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.log:type=Log,name=LogEndOffset,topic=flink2kafka,partition=0 --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi 
