AI智能
改变未来

go中简单使用kafka

钉钉、微博极速扩容黑科技,点击观看阿里云弹性计算年度发布会!>>>

原文链接:https://www.cnblogs.com/angelyan/p/10800739.html

windows上kafka的安装

1.安装jdk

下载地址:https://www.oracle.com/technetwork/java/javase/downloads/jre8-downloads-2133155.html

下载需要注册oracle

添加环境变量JAVA_HOME=C:\\Program Files\\Java\\jre1.8.0_211

2.安装Zookeeper

下载地址:https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

①进入zookeeper的相关设置所在的文件目录,例如本文的:D:\\zookeeper-3.4.14\\conf

②将\”zoo_sample.cfg\”重命名为\”zoo.cfg\”

③打开zoo.cfg 找到并编辑:

dataDir=/tmp/zookeeper to D:/zookeeper-3.4.14/data或 D:/zookeeper-3.4.14/data(路径仅为示例,具体可根据需要配置)

④与配置jre类似,在系统环境变量中添加:

a.系统变量中添加ZOOKEEPER_HOME=D:\\zookeeper-3.4.14

b.编辑系统变量中的path变量,增加%ZOOKEEPER_HOME%\\bin

⑤在zoo.cfg文件中修改默认的Zookeeper端口(默认端口2181)

⑥打开cmd窗口,输入zkserver,运行Zookeeper

3.安装kafka

下载地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/2.2.0/kafka_2.11-2.2.0.tgz

①进入kafka配置文件所在目录,D:\\kafka_2.11-2.2.0\\config

②编辑文件\”server.properties\”,找到并编辑:

log.dirs=/tmp/kafka-logs to log.dirs=D:/kafka_2.11-2.2.0/kafka-logs

③在server.properties文件中,zookeeper.connect=localhost:2181代表kafka所连接的zookeeper所在的服务器IP以及端口,可根据需要更改

修改advertised.host.name=服务器ip

④进入kafka安装目录D:\\kafka_2.11-2.2.0,打开cmd启动

.\\bin\\windows\\kafka-server-start.bat .\\config\\server.properties

补充知识点

topic

topic是存储消息的逻辑概念,不同的topic下的数据是分开存储的。不同的 topic 的消息是分开存储的, 每个 topic 可以有多个生产者向它发送消息,也可以有多 个消费者去消费其中的消息。

partition

一个 topic 可以划分多个分区partition(每个 Topic至少有一个分区partition),同一topic下的不同分区包含的消息是不同的。第i个分区分配在第i mod n个broker上。

每个消息在被添加到分区时,都会被分配一个offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka通过offset 保证消息在分区内的顺序,offset的顺序不跨分区,即kafka 只保证在同一个分区内的消息是有序的。

offset

每个消息在被添加到分区时,都会被分配一个 offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka 通过 offset 保证消息在分区内的顺序。offset 的顺序不跨分区,即 kafka 只保证在同一个分区内的消息是有序的; 对于应用层的消费来说,每次消费一个消息并且提交以后,会保存当前消费到的最近的一个 offset。

docker安装

1、下载镜像
这里使用了wurstmeister/kafka和wurstmeister/zookeeper这两个版本的镜像

docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka

2、启动

启动zookeeper容器docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
启动kafkadocker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.0.168:2181,192.168.0.169:2181,192.168.0.170:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.170:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka这里面主要设置了4个参数KAFKA_BROKER_ID=0KAFKA_ZOOKEEPER_CONNECT=192.168.0.168:2181,192.168.0.169:2181,192.168.0.170:2181KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.170:9092KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092KAFKA_ZOOKEEPER_CONNECT  配置的是zookeeper的地址,可以单节点配置,也可以配置zookeeper集群多节点,用逗号隔开中间两个参数的192.168.0.170改为宿主机器的IP地址,如果不这么设置,可能会导致在别的机器上访问不到kafka。

3、进入kafka容器

docker exec -it ${CONTAINER ID} /bin/bash进入kafka默认目录 /opt/kafka_2.11-0.10.1.0

go运行代码

producer

package mainimport (\"fmt\"\"time\"\"github.com/Shopify/sarama\")func main() {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll1000config.Producer.Partitioner = sarama.NewRandomPartitionerconfig.Producer.Return.Successes = trueclient, err := sarama.NewSyncProducer([]string{\"192.168.3.118:9092\"}, config)if err != nil {fmt.Println(\"producer close, err:\", err)return}defer client.Close()for {msg := &sarama.ProducerMessage{}msg.Topic = \"nginx_log\"msg.Value = sarama.StringEncoder(\"this is a good test, my message is good\")pid, offset, err := client.SendMessage(msg)if err != nil {fmt.Println(\"send message failed,\", err)return}fmt.Printf(\"pid:%v offset:%v\\n\", pid, offset)time.Sleep(10 * time.Second)}}

consumer

package mainimport (\"fmt\"\"strings\"\"sync\"\"github.com/Shopify/sarama\"\"time\")var (wg sync.WaitGroup)func main() {consumer, err := sarama.NewConsumer(strings.Split(\"127.0.0.1:9092\", \",\"), nil)if err != nil {fmt.Println(\"Failed to start consumer: %s\", err)return}partitionList, err := consumer.Partitions(\"nginx_log\")if err != nil {fmt.Println(\"Failed to get the list of partitions: \", err)return}fmt.Println(partitionList)for partition := range partitionList {pc, err := consumer.ConsumePartition(\"nginx_log\", int32(partition), sarama.OffsetNewest)if err != nil {fmt.Printf(\"Failed to start consumer for partition %d: %s\\n\", partition, err)return}defer pc.AsyncClose()go func(pc sarama.PartitionConsumer) {wg.Add(1)for msg := range pc.Messages() {fmt.Printf(\"Partition:%d, Offset:%d, Key:%s, Value:%s\", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))fmt.Println()}wg.Done()}(pc)}time.Sleep(10*time.Second)wg.Wait()consumer.Close()}

赞(0) 打赏
未经允许不得转载:爱站程序员基地 » go中简单使用kafka