当前位置: 首页 > news >正文

网站建设需要会什么软件如何申请小程序店铺

网站建设需要会什么软件,如何申请小程序店铺,软件开发专业单词,王野吉鹿关于kafka 作为开发人员kafka中最常关注的几个概念,是topic,partition和group这几个概念。topic是主题的意思,简单的说topic是数据主题,这样解释好像显得很苍白,只是做了个翻译。一图胜前言,我们还是通过图解来说明。…

关于kafka

作为开发人员kafka中最常关注的几个概念,是topic,partition和group这几个概念。topic是主题的意思,简单的说topic是数据主题,这样解释好像显得很苍白,只是做了个翻译。一图胜前言,我们还是通过图解来说明。

生产者负责写数据,一个topic可以有多个分区。如下图所示,生产者写数据的示意。从这个图中,我们可以得出一个很重要的信息:分区有序,即消息在某个分区上是有顺序的,而全局是没有顺序的。这个意味着如果我们需要保证顺序,我们在写消息时需要往同一个分区中写数据。比如,我们有一个场景,有一个订单。首先,创建支付订单,发送一个kafka消息,然后,实际支付,发送一个kafka消息,最后,又想退款,又发起了退款,又发送了一个kafka消息。如果,这三个消息在不同的分区上,我们就无法保证,我们按照创建支付订单——支付——退款这个顺序执行。依据分区有序的特点,我们可以把跟这个订单相关的所有操作的消息都写到一个分区上,比如,可以通过根据订单id进行hash。

group只跟消费者有关系,消费者通过group进行标识,一个消费者实例表示一个消费者,消费者实例可以在不同的线程中开启,也可以在不同的进程中开启。这可以提升消费的并发能力。那么,这是否意味着可以无限开启消费者实例,以提升消费者消费消息的速度呢?

这就涉及到消费的逻辑,我先给出结论。接下来,我们还是通过示意图的方式来做补充说明。group跟partition的数量有关系,当消费者数量小于等于partition数量时,每个消费者都能消费到消息。

如果一个topic有4个分区,分别为p0,p1,p2,p3。现在有两个消费者组,分别是groupA和groupB。一个消费者就是一个消费实例,比如,C0就是在一个进程中启的一个消费者实例。因为这个topic有4个分区,groupA有4个消费者,则每个消费者被分配到一个对应的分区上消费,假如,这个分组又启了一个消费者consumer1,即消费者数量大于分区数量,则这个消费者不会读到消息。

而groupB只有两个消费者,则每一个消费者分别获取两个分区上的消息,如果,groupB只有一个消费者,那么,所有分区上的消息都只有这一个消费者获取。

开源项目kafka-go的使用

kafka-go是一个用go语言开发的kafka客户端。

生产者

我们看一个基于kafka-go实现的生产者的示例:

package mainimport ("context""fmt""log""time""github.com/segmentio/kafka-go"
)func main() {// 创建Kafka写入器配置writerConfig := kafka.WriterConfig{Brokers:  []string{"10.10.37.100:30001"},Topic:    "my-topic",Balancer: &kafka.Hash{}, //消息分区的策略,这个策略是通过hash算法根据kafka.Message的key值来选择分区的}// 创建写入器writer := kafka.NewWriter(writerConfig)// 发送消息messages := []string{"message 1", "message 2", "message 3", "message 4", "message 5", "message 6", "message 7", "message 8", "message 9", "message 10", "message 11", "message 12", "message 13", "message 14", "message 15", "message 16"}for _, msg := range messages {time.Sleep(1 * time.Second)if err := writer.WriteMessages(context.Background(), kafka.Message{Key:   []byte(fmt.Sprintf("key-%d", time.Now().UnixNano())),Value: []byte(msg),}); err != nil {fmt.Printf("Failed to write message: %v\n", err)} else {fmt.Printf("Message written: %s\n", msg)}}// 关闭写入器if err := writer.Close(); err != nil {fmt.Printf("Failed to close writer: %v\n", err)}
}

WriterConfig中有一个字段我们在开发过程中可能会需要关注到,Balancer:这个用于把消息分发到不同的分区上。这个策略可以自定义。当然,kafka-go中提供几种常用的方法,我以示例中的hash这个为例做简要说明,我们直接看源码。生产者发送消息的逻辑主要在这个WriteMessages方法中,我们直接定位到消息分发到对应分区的逻辑。

func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {//......//忽略与分区不相关的代码,我们只关注和分区相关的逻辑balancer := w.balancer()for i, msg := range msgs {topic, err := w.chooseTopic(msg)if err != nil {return err}numPartitions, err := w.partitions(ctx, topic)if err != nil {return err}//根据msg计算把消息分发到哪个分区partition := balancer.Balance(msg, loadCachedPartitions(numPartitions)...)key := topicPartition{topic:     topic,partition: int32(partition),}assignments[key] = append(assignments[key], int32(i))}batches := w.batchMessages(msgs, assignments)if w.Async {return nil}.....//忽略一些细节return werr
}

如果设置了分区策略,则以设置的分区策略进行分发消息。 

func (w *Writer) balancer() Balancer {if w.Balancer != nil {return w.Balancer}return &w.roundRobin
}

接下来,我们看一下我的示例中使用的Hash的分发消息的策略。我们看到这个方法的主要逻辑,就是根据msg中key进行hash,然后根据topic下总分区数来计算消息分发到对应的分区号。hasher.Write(msg.Key)

func (h *Hash) Balance(msg Message, partitions ...int) int {if msg.Key == nil {return h.rr.Balance(msg, partitions...)}hasher := h.Hasherif hasher != nil {h.lock.Lock()defer h.lock.Unlock()} else {hasher = fnv1aPool.Get().(hash.Hash32)defer fnv1aPool.Put(hasher)}hasher.Reset()if _, err := hasher.Write(msg.Key); err != nil {panic(err)}// uses same algorithm that Sarama's hashPartitioner uses// note the type conversions here.  if the uint32 hash code is not cast to// an int32, we do not get the same result as sarama.partition := int32(hasher.Sum32()) % int32(len(partitions))if partition < 0 {partition = -partition}return int(partition)
}

消费者

消费者没有太多的注意事项,只是如果有多个分区,想要提升并发能力,可以启多个消费者。我们直接看示例。

package mainimport ("context""fmt""os""os/signal""syscall""time""github.com/segmentio/kafka-go"
)func main() {// 创建Kafka读取器配置readerConfig := kafka.ReaderConfig{Brokers:  []string{"10.10.37.100:30001"},Topic:    "my-topic",GroupID:  "my-group",MinBytes: 10e3, // 10KBMaxBytes: 10e6, // 10MB}// 创建读取器reader := kafka.NewReader(readerConfig)// 处理信号以便优雅地关闭sigChan := make(chan os.Signal, 1)signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)// 读取消息for {select {case sig := <-sigChan:fmt.Printf("Caught signal %v: terminating\n", sig)returndefault:msg, err := reader.ReadMessage(context.Background())if err != nil {fmt.Printf("Failed to read message: %v\n", err)continue}fmt.Printf("当前时间:%v Message on topic: %s value: %s partion:%d\n", time.Now(), msg.Topic, string(msg.Value), msg.Partition)}}
}

创建topic

直接上示例。创建topic是个幂等操作。

package mainimport ("net""strconv""github.com/segmentio/kafka-go"
)func main() {// to create topics when auto.create.topics.enable='false'topic := "my-topic"conn, err := kafka.Dial("tcp", "10.10.37.100:30001")if err != nil {panic(err.Error())}defer conn.Close()controller, err := conn.Controller()if err != nil {panic(err.Error())}var controllerConn *kafka.ConncontrollerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))if err != nil {panic(err.Error())}defer controllerConn.Close()topicConfigs := []kafka.TopicConfig{{Topic:             topic,NumPartitions:     10,ReplicationFactor: 1,},}err = controllerConn.CreateTopics(topicConfigs...)if err != nil {panic(err.Error())}
}

http://www.yayakq.cn/news/897480/

相关文章:

  • 婚纱摄影网站河南省住建厅官网
  • 云南网站建设公司免费电商网站模板
  • 定制公司网站建设泉州最专业微信网站建设开发
  • 更改网站图标如何做公司宣传网站
  • 网站目前如何做外链建设项目查询网站
  • 中山小榄网站wordpress转到手机端
  • 自建站织梦html网站地图
  • 网站专题页面制作无锡产品排名优化
  • 去哪找做塑料的网站网络彩票网站建设多少钱
  • 建站模板免费下载wordpress 菜单 链接
  • 怎么做教育网站WordPress摘要文字字数
  • 如何做专业的模板下载网站一个虚拟主机可以做几个网站
  • qq钓鱼网站生成器手机版网站建设网址
  • 杭州知名的网站制作策略2021没封的网站有人分享吗
  • 网站排名公司哪家好wordpress 书签
  • 外贸网站建设费用情况做公众号微网站
  • 点击app图标进入网站怎么做wordpress网站维护
  • 自己设计的网站如何推广做很多网站
  • 网络营销网站 优帮云wordpress手机端菜单设置
  • 聊城做网站推广企业展厅建筑设计
  • 八旬老太做直播 什么网站电商网站建设存在哪些问题
  • 住房和城乡建设部网站注册进度建筑教育
  • 长春网站网络公司南宁网站建站公司
  • 阿里云服务器可以做网站吗天津网站搜索排名优化
  • 企业做网站怎么查公司信息
  • 蚌埠哪里做网站主题公园wordpress
  • 做收钱的网站要什么条件外包网站自己维护
  • 网站维护企业宁波建站公司哪家服务好
  • 小型网站设计及建设论文范本广东工厂搜索seo
  • 中国域名门户网站网站不能批量上传图片