Go连接kafka消费队列

Go连接kafka消费队列

       最近接到一个需求,就是需要开发一个脚本去消费kafka队列,然后发送消息给意向客户,综合考虑,最后决定使用Go来实现,最终很快完成脚本开发,特意跟大家分享,首先我们创建一个Go项目目录messagePush,初始化mod,输入以下命令行

go mod init messagePush

创建完之后,我们新建一个配置文件,用于存放配置信息,命名为config.json,配置如下

{
  "kafka_connection": "172.31.128.1:9092",
  "kafka_topic": "AICustIntentionPush"
}

然后创建运行文件,命名为main.go,代码如下

package main

//消费kafka脚本发送加微请求

//引入扩展
import (
   "sync"
   "encoding/json"
   "io/ioutil"
   "fmt"
   "github.com/Shopify/sarama"
   "time"
   "github.com/rifflock/lfshook"
   "github.com/sirupsen/logrus"
   "os"
   "path"
   "github.com/lestrrat/go-file-rotatelogs"
   "strconv"
)

//进程
var wg sync.WaitGroup

//定义配置文件解析后的结构
type Config struct {
   KafkaConnection string  `json:"kafka_connection"`
   KafkaTopic      string  `json:"kafka_topic"`
   KafkaTaskId     float64 `json:"kafka_task_id"`
   KafkaTaskIdB    float64 `json:"kafka_task_id_b"`
}

//空配置文件
var configData Config

/**
程序启动自定义
 */
func init() {
   //初始化读取配置文件
   jsonByte, err := ioutil.ReadFile("./config.json")
   if err != nil {
      fmt.Println("读取json文件失败", err)
      return
   }
   err = json.Unmarshal(jsonByte, &configData)
   if err != nil {
      fmt.Println("解析数据失败", err)
      return
   }
}

/**
主程序
 */
func main() {
   //获取配置
   kafkaTopic := configData.KafkaTopic           //分区
   KafkaConnection := configData.KafkaConnection //服务连接地址
   //创建消费者
   consumer, err := sarama.NewConsumer([]string{KafkaConnection}, nil)
   if err != nil {
      fmt.Printf("fail to start consumer, err:%v\n", err)
      return
   }
   //获取主题分区
   partitionList, err := consumer.Partitions(kafkaTopic) // 根据topic取到所有的分区
   if err != nil {
      fmt.Printf("fail to get list of partition:err%v\n", err)
      return
   }
   //打印启动
   fmt.Println("任务开始启动")
   //遍历分区
   for partition := range partitionList {
      //针对每个分区创建一个对应的分区消费者
      pc, err := consumer.ConsumePartition(kafkaTopic, int32(partition), sarama.OffsetNewest)
      //判断是否连接成功
      if err != nil {
         fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
         return
      }
      //关闭连接
      defer pc.AsyncClose()
      // 异步从每个分区消费信息
      wg.Add(1) //+1
      go func(sarama.PartitionConsumer) {
         defer wg.Done() //-1
         for msg := range pc.Messages() {
            //记录接收
            LogInfo("Info", "log", "kafka.log", string(msg.Value))
            //获取消息信息
            var messageInfoList map[string]interface{}
            errResult := json.Unmarshal(msg.Value, &messageInfoList)
            if errResult != nil {
               continue
            }
            //获取记录Id
            dataId, dataIdErr := strconv.Atoi(StrVal(messageInfoList["id"]))
            if dataIdErr != nil {
               dataId = 0
            }
            //获取消息结构体
            messageDataInfo, messageDataInfoErr := messageInfoList["msg"].(string)
            if !messageDataInfoErr {
               continue
            }
            //string转msp
            var messageInfo map[string]interface{}
            messageInfoErr := json.Unmarshal([]byte(messageDataInfo), &messageInfo)
            if messageInfoErr != nil {
               continue
            }
            //获取任务Id
            TaskId, TaskIdErr := messageInfo["TaskId"].(float64)
            if !TaskIdErr {
               TaskId = 0
            }
            //获取手机号码
            phoneNum, phoneNumErr := messageInfo["phoneNum"].(string)
            if !phoneNumErr {
               phoneNum = ""
            }
            //获取意向
            custIntention, custIntentionErr := messageInfo["custIntention"].(string)
            if !custIntentionErr {
               custIntention = ""
            }
            //判断当前客户是否具备完整信息
            if TaskId == 0 || phoneNum == "" || custIntention == "" || dataId == 0 {
               continue
            }
            //判断是否属于意向客户
            if custIntention != "A" && custIntention != "B" {
               continue
            }
            //输出
            fmt.Println(phoneNum + ":已接收")
         }
      }(pc)
   }
   wg.Wait()
}

/**
记录日志
 */
func LogInfo(level, fileLogPath, fileLogName, content string) {
   //初始化日志配置
   var (
      logFilePath = fileLogPath //文件存储路径
      logFileName = fileLogName //文件名
   )
   // 日志文件
   fileName := path.Join(logFilePath, logFileName)
   // 写入文件
   file, _ := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY, 0666)
   //关闭连接
   defer file.Close()
   // 实例化
   logger := logrus.New()
   // 日志级别
   logger.SetLevel(logrus.DebugLevel)
   // 设置输出
   logger.Out = file
   // 设置 rotatelogs,实现文件分割
   logWriter, _ := rotatelogs.New(
      // 分割后的文件名称
      fileName+".%Y%m%d.log",
      // 生成软链,指向最新日志文件
      rotatelogs.WithLinkName(fileName),
      // 设置最大保存时间(7天)
      rotatelogs.WithMaxAge(7*24*time.Hour), //以hour为单位的整数
      // 设置日志切割时间间隔(1天)
      rotatelogs.WithRotationTime(1*time.Hour),
   )
   // hook机制的设置
   writerMap := lfshook.WriterMap{
      logrus.InfoLevel:  logWriter,
      logrus.FatalLevel: logWriter,
      logrus.DebugLevel: logWriter,
      logrus.WarnLevel:  logWriter,
      logrus.ErrorLevel: logWriter,
      logrus.PanicLevel: logWriter,
   }
   //给logrus添加hook
   logger.AddHook(lfshook.NewHook(writerMap, &logrus.JSONFormatter{
      TimestampFormat: "2006-01-02 15:04:05",
   }))
   // 打印日志
   switch {
   case level == "Info": //正常打印
      logger.WithFields(logrus.Fields{"content": content,}).Info()
   case level == "Warn": //警告
      logger.WithFields(logrus.Fields{"content": content,}).Warn()
   case level == "Error": //错误
      logger.WithFields(logrus.Fields{"content": content,}).Error()
   case level == "Debug": //调试
      logger.WithFields(logrus.Fields{"content": content,}).Debug()
   default:
   }
}

/**
类型转换
 */
func StrVal(value interface{}) string {
   // interface 转 string
   var key string
   if value == nil {
      return key
   }
   //类型判断
   switch value.(type) {
   case float64:
      ft := value.(float64)
      key = strconv.FormatFloat(ft, 'f', -1, 64)
   case float32:
      ft := value.(float32)
      key = strconv.FormatFloat(float64(ft), 'f', -1, 64)
   case int:
      it := value.(int)
      key = strconv.Itoa(it)
   case uint:
      it := value.(uint)
      key = strconv.Itoa(int(it))
   case int8:
      it := value.(int8)
      key = strconv.Itoa(int(it))
   case uint8:
      it := value.(uint8)
      key = strconv.Itoa(int(it))
   case int16:
      it := value.(int16)
      key = strconv.Itoa(int(it))
   case uint16:
      it := value.(uint16)
      key = strconv.Itoa(int(it))
   case int32:
      it := value.(int32)
      key = strconv.Itoa(int(it))
   case uint32:
      it := value.(uint32)
      key = strconv.Itoa(int(it))
   case int64:
      it := value.(int64)
      key = strconv.FormatInt(it, 10)
   case uint64:
      it := value.(uint64)
      key = strconv.FormatUint(it, 10)
   case string:
      key = value.(string)
   case []byte:
      key = string(value.([]byte))
   default:
      newValue, _ := json.Marshal(value)
      key = string(newValue)
   }
   //返回
   return key
}

然后我们在工作目录下创建一个目录名为log,用于存放日志。

       我们直接在项目目录下运行如下命令启动脚本

go run ./

然后我们在kafka生产数据,如下

1.png

程序接收到队列数据后,直接展示

2.png

这样我们就用Go实现队列消费。

0条评论

发表评论