golang,go,博客,开源,编程
IBM/sarama
是一个用于 Go 编程语言的 Kafka 客户端库。它是一个非常流行的库,用于与 Apache Kafka 进行交互,支持 Kafka 的生产者、消费者以及其他各种与 Kafka 相关的操作。Kafka 是一个分布式流平台,广泛用于构建实时数据流管道和流应用程序。
IBM/sarama
提供了 Kafka 的各类功能支持,包括但不限于:
sarama
支持与 Kafka 集群的连接,允许你查询集群状态、管理分区等。gzip
、snappy
和 lz4
。Kafka 生产者用于将消息发送到 Kafka 主题。sarama
提供了简单的 API 来创建生产者并发送消息。
package main
import (
"log"
"github.com/Shopify/sarama"
)
func main() {
// 创建生产者配置
config := sarama.NewConfig()
config.Producer.Return.Successes = true
// 创建生产者
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatal("Failed to start Sarama producer:", err)
}
defer producer.Close()
// 创建消息
message := &sarama.ProducerMessage{
Topic: "test-topic",
Value: sarama.StringEncoder("Hello, Kafka!"),
}
// 发送消息
partition, offset, err := producer.SendMessage(message)
if err != nil {
log.Fatal("Failed to send message:", err)
}
log.Printf("Message is stored in partition %d with offset %d\n", partition, offset)
}
Kafka 消费者订阅一个或多个 Kafka 主题,从中读取消息。消费者可以是一个简单的消息消费者,也可以处理分区和消费者组。
package main
import (
"log"
"github.com/Shopify/sarama"
)
func main() {
// 创建消费者配置
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
// 创建消费者
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatal("Failed to start Sarama consumer:", err)
}
defer consumer.Close()
// 订阅主题
partitionConsumer, err := consumer.ConsumePartition("test-topic", 0, sarama.OffsetNewest)
if err != nil {
log.Fatal("Failed to start partition consumer:", err)
}
defer partitionConsumer.Close()
// 消费消息
for message := range partitionConsumer.Messages() {
log.Printf("Consumed message: %s", string(message.Value))
}
}
sarama
也支持 Kafka 消费者组,允许多个消费者并行地从多个分区中消费消息。每个消费者组会读取每个分区的消息,保证每个分区的消息只被组中的一个消费者处理。
package main
import (
"log"
"github.com/Shopify/sarama"
"github.com/Shopify/sarama/mocks"
)
func main() {
// 创建消费者组
config := sarama.NewConfig()
group, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "example-group", config)
if err != nil {
log.Fatal("Error creating consumer group: ", err)
}
defer group.Close()
// 创建消息处理函数
handler := &consumerGroupHandler{}
// 启动消费者组
for {
if err := group.Consume(context.Background(), []string{"test-topic"}, handler); err != nil {
log.Fatal("Error consuming messages: ", err)
}
}
}
// 消费者组处理器
type consumerGroupHandler struct{}
func (h *consumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
return nil
}
func (h *consumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
return nil
}
func (h *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
log.Printf("Consumed message: %s", string(message.Value))
session.MarkMessage(message, "")
}
return nil
}
sarama
也提供了异步生产者 API,可以在不等待 Kafka 响应的情况下继续执行其他任务。
package main
import (
"log"
"github.com/Shopify/sarama"
)
func main() {
// 创建生产者配置
config := sarama.NewConfig()
config.Producer.Return.Successes = true
// 创建异步生产者
producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatal("Failed to start Sarama producer:", err)
}
defer producer.Close()
// 创建消息并发送
message := &sarama.ProducerMessage{
Topic: "test-topic",
Value: sarama.StringEncoder("Hello, Kafka!"),
}
producer.Input() <- message
// 获取发送成功的消息
select {
case msg := <-producer.Successes():
log.Printf("Message sent to topic %s, partition %d, offset %d", msg.Topic, msg.Partition, msg.Offset)
case err := <-producer.Errors():
log.Printf("Failed to send message: %s", err)
}
}
sarama
被设计为高效并且低延迟,适用于大规模的 Kafka 消息传递,能够处理高吞吐量。sarama
可以帮助你快速构建生产者和消费者,处理实时流数据。sarama
可以用于构建日志收集系统,实时将日志信息发送到 Kafka 主题,再由下游消费者进行分析。sarama
提供了易用的 API 来实现这一功能。sarama
也适用于将数据从一个系统同步到另一个系统,尤其是在需要保证高可用性和数据一致性的情况下。IBM/sarama
是 Go 语言中与 Kafka 交互的标准库之一,功能全面,支持 Kafka 的各种核心特性,包括生产者、消费者、消费者组、事务、批量消息、压缩等。它适用于构建高吞吐量、分布式的实时数据流应用。通过 sarama
,你可以轻松地与 Kafka 集群交互,实现消息传递、流处理和数据同步等功能。