Glow使用小结 - 统计Key出现的次数

Glow是一个基于golang的分布式大数据计算框架。 由Chris Lu开发维护,这个哥们应该是属于"大神"一级的,至少他写的东西star数基本都在K一级。做开发做到这个份儿,也算是值了。

和Glow属于姊妹框架的是Gleam,据作者说这个框架更高效,更简洁。 我还没有接触Gleam,等哪天接触到了,再写写Gleam使用小结。

背景

下面说一下使用Glow准备要做什么事情。 作为一个bigdata的新人,入手课程基本都是world count。 Glow也有一个world count的example,这个example是用来统计一共出现了多少个单词作为熟悉工具,这些代码基本够用了。但此时手边正好有一个需求,需要统计Nginx log文件中每个URL一共出现了多少次。正好使用Glow来练练手。

分析

需要完成这个功能,我们先来分析一下需求-统计相同URL出现的次数。 将需求分解开,就是是下面的两个子需求:

  • 对忽略参数的URL进行归并处理, 对存在参数的URL(即包含"?"的这些URL)做分割操作。
  • 对统一处理后的URL进行聚合操作,得出每个URL出现的次数

如果不使用MapReduce,那么处理思路应该是这样的:

  1. 遍历log文件, 按照解析规则分割每行数据
  2. 将解析后的URL作为Key存入Map中,value每次递增加一

使用MapReduce之后,处理思路则变成了下面的逻辑:

  1. 将log文件作为Input,并依次读取每行文本。
  2. 对传入的行数据做映射处理,输出K,V。 Key是分割后的URL,Value则是出现的次数(1)
  3. 根据Key进行GroupByKey操作,并输出K,V。Key仍然是URL,而Value则变成了所有Value的集合。
  4. 依次输出所有的记录。

通过两种逻辑对比可发现,使用MapReduce之后,所有的处理都在围绕Map和Reduce进行设计。 大数据从某种意义上来说,就是在设计合适的Map和Reduce函数。

实施

确定处理逻辑之后,下面开始编码实现。

  1. log文件作为Input,并读取文本数据。
    flow.New().TextFile("print.access.log", 2).Map(func(line string, ch chan string) {
        items, err := zs.DouExstact(line, "\"")
        if err != nil {
            fmt.Println(err)
            return
        }

        if len(items) != 16 {
            return
        }

        key := items[2]
        url := key
        if strings.Contains(key, "?") {
            _key := strings.Split(key, "?")
            url = _key[0]
        }
        ch <- url
    })

解释代码之前,我们看一下log格式:

"10.0.3.48" - "-" [10/Aug/2018:16:28:16 +0800] "GET /m/print/websocket/ HTTP/1.1" "101" "0" "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36" "116.23.231.50, 116.23.231.50" "304ff106732b41adb190391bc06f74be" "p1.eqxiu.com" "10.0.7.71:8080" "101" "2735.329" "2735.329" "-" "-"

正常情况下,一个完整log会包含16个属性。每个属性都通过"来标示。因此,第一步就是解析属性,我使用了(zs "github.com/andy-zhangtao/gogather/strings")的DouExstact函数来解析数据。第三个属性就是URL,做一些简单处理之后,将URL放入ch之中。 Glow会从ch读取数据,并往下传递。

  1. Map处理,并输出K,V

Map处理时,将接受分割出的URL,并构造一个。 所以这一步很简单:

    Map(func(key string) (string, int) {
        return key, 1
    })

虽然代码很简单,但我作为一个新手,因为不了解MapReduce思想在这里吃了大亏。Map函数的Output将会作为后续操作的Input,所以生成OutPut时一定要慎之又慎。 很多教程里面都是直接定义Output,也没有说为什么要这么定义。

从我的填坑过程来说,每个Output的设计都需要考虑到后续步骤。 例如,我的需求是按照Key进行数据统计,因此需要将key放到Output之中,并往下传递。 如果只需要统计出现的单词数量,那么就和Key没关系了,因此Output只包含一个Int就行。

  1. 对所有Key进行GroupByKey操作。

这一步我们写的代码最少,但框架做了不少工作(可以通过最后的程序调用图看出)。 因为第二步生成的Output包含了,因此我们就可以针对Key进行GroupBy了。

GroupByKey()  

此案例中的key是string,所以不需要做特别处理,就可以直接GroupBy。 如果是用户自定义的KeyType,就需要使用LocalReduceByKey来处理。(LocalReduceByKey还没有使用过,以后有机会会补上这一课)。

  1. 统计数据

在第三步中,我们对所有数据根据Key进行了GroupBy操作。按照SQL的使用经验,GroupBy之后应该就是总数。 但很遗憾,并不是。 Glow只是返回了所有value的集合。 想想也合理,这个案例中,value是int,我们可以进行Count操作。 如果是自定义数据类型呢? 所以将所有value按照Key进行分组,然后生成这样的Output是一种合理的操作。

所以,在第四步中,我们对[]value进行count操作。

Map(func(key string, count []int) {  
        fmt.Printf("key:[%s], count:[%v]\n", key, len(count))
    })

至此,我们就完成了统计URL出现次数的工作。 完整代码如下:

package main

import (  
    "github.com/chrislusf/glow/flow"
    _ "github.com/chrislusf/glow/driver"
    zs "github.com/andy-zhangtao/gogather/strings"
    "fmt"
    "flag"
    "strings"
)

func main() {  
    flag.Parse()

    flow.New().TextFile("../print.access.log", 2).Map(func(line string, ch chan string) {
        items, err := zs.DouExstact(line, "\"")
        if err != nil {
            fmt.Println(err)
            return
        }

        if len(items) != 16 {
            return
        }

        key := items[2]
        url := key
        if strings.Contains(key, "?") {
            _key := strings.Split(key, "?")
            url = _key[0]
        }
        ch <- url
    }).Map(func(key string) (string, int) {
        return key, 1
    }).GroupByKey().Map(func(key string, count []int) {
        fmt.Printf("key:[%s], count:[%v]\n", key, len(count))
    }).Run()

    flow.Ready()
}

最后我们看一下,Glow生成的调用图.


调用过程