当前位置:首页 > 应用开发

Golang 语言中 kafka 客户端库 Sarama

01、客户介绍

Apache Kafka 是端库一款开源的消息引擎系统。它在项目中的客户作用主要是削峰填谷和解耦。本文我们只介绍 Apache Kafka 的端库 Golang 客户端库 Sarama。Sarama 是客户 MIT 许可的 Apache Kafka 0.8 及更高版本的 Golang 客户端库。

如果读者朋友对 Apache Kafka 服务端还不了解,端库建议先阅读官方文档中的客户入门部分,本文使用的端库版本是 Apache Kafka 2.8。

02、客户生产者

我们可以使用 Sarama 库的端库 AsyncProducer 或 SyncProducer 生产消息。在大多数情况下首选使用 AsyncProducer 生产消息。客户它通过一个 channel 接收消息,端库并在后台尽可能高效的客户异步生产消息。

SyncProducer 发送 Kafka 消息后阻塞,端库直到接收到 ACK 确认。客户SyncProducer 有两个警告:它通常效率较低,并且实际的耐用性保证取决于 Producer.RequiredAcks 的配置值。在某些配置中,有时仍会丢失由 SyncProducer 确认的云服务器提供商消息,但是使用比较简单。

为了读者朋友们容易理解,本文我们介绍 SyncProducer 作为生产者的使用方式。如果读者朋友想了解 AsyncProducer 作为生产者的使用方式,请参考官方文档。

使用 SyncProducer 作为生产者的示例代码:

func sendMessage (brokerAddr []string, config *sarama.Config, topic string, value sarama.Encoder) {   producer, err := sarama.NewSyncProducer(brokerAddr, config)  if err != nil {    fmt.Println(err)   return  }  defer func() {    if err = producer.Close(); err != nil {     fmt.Println(err)    return   }  }()  msg := &sarama.ProducerMessage{    Topic: topic,   Value: value,  }  partition, offset, err := producer.SendMessage(msg)  if err != nil {    fmt.Println(err)   return  }  fmt.Printf("partition:%d offset:%d\n", partition, offset) } 

阅读上面这段代码,我们调用 NewSyncProducer() 创建一个新的 SyncProducer,给定 broker 地址和配置信息。调用 SendMessage() 生产给定的消息,并且仅在生产成功或失败时返回。它将返回分区(Partition)和生产的消息的偏移量(Offset),如果消息生产失败,则返回错误。

需要注意的是,为了避免泄露,必须在生产者上调用 Close(),因为当它超出范围时,可能不会自动垃圾回收。

03、消费者

我们可以使用 Sarama 库的消费者 Consumer 或消费者组 ConsumerGroup API 消费消息。为了读者朋友们容易理解,服务器托管本文我们介绍使用 Consumer 消费消息。

Consumer 管理 PartitionConsumers,该 PartitionConsumers 处理来自 brokers 的 Kafka 消息。

Consumer 消费消息的示例代码:

func consumer (brokenAddr []string, topic string, partition int32, offset int64) {   consumer, err := sarama.NewConsumer(brokenAddr, nil)  if err != nil {    fmt.Println(err)   return  }  defer func() {    if err = consumer.Close(); err != nil {     fmt.Println(err)    return   }  }()  partitionConsumer, err := consumer.ConsumePartition(topic, partition, offset)  if err != nil {    fmt.Println(err)   return  }  defer func() {    if err = partitionConsumer.Close(); err != nil {     fmt.Println(err)    return   }  }()  for msg := range partitionConsumer.Messages() {    fmt.Printf("partition:%d offset:%d key:%s val:%s\n", msg.Partition, msg.Offset, msg.Key, msg.Value)  } } 

阅读上面这段代码,我们调用 NewConsumer() 创建一个新的 consumer,给定 broker 地址和配置信息。调用 ConsumePartition() 创建 PartitionConsumer,给定 topic、partition 和 offset。PartitionConsumer 处理来自给定 topic 和 partition 的 Kafka 消息。

需要注意的是,为了防止泄露,必须调用 consumer 和 partitionConsumer 的 Close(),因为当它超出范围时,可能不会自动垃圾回收。

04、总结

本文主要介绍如何使用 Apache Kafka 的 Golang 语言客户端库 Sarama 生产和消费 Kafka 消息。关于生产者和消费者,分别列举了一个简单示例。除此之外,Sarama 库还提供了很多其它 Api,感兴趣的读者朋友可以阅读官方文档了解更多。

分享到:

滇ICP备2023006006号-16