[go]heka插件开发

[go]heka插件开发

官方文档传送门

Heka是一个用于进行DataProcessing的一个系统, 具有高度的扩展性, 官方自称为 Swiss Army Knife 型的工具

heka通过一系列插件进行工作heka支持的插件种类有

  • Input
  • Splitters
  • Decoders
  • Filters
  • Encoders
  • Outputs

六种插件 , 对于不同插件的作用, 可以在官方文档内很容易的找到, 这里就不做说明了

开发一个Input插件

heka语言的插件大部分使用 golang 开发, 下面是一个使用heka实现一个input插件, 从文件读入数据的例子

使插件能够被heka识别(注册插件到heka中)

首先, go的开发要保证GOPATH配置正确, heka安装好之后, 为heka开发插件的默认GOPATH是 /path/to/your/heka-src/build/heka/ 如果不将gopath设置为这个, 就会找不到要导入的heka有关的包

为了保证我们对插件开发的可视化, 每一步开发都能看到结果是最好的, 因而我们先尝试开发一个最最简单的插件, 只让这个插件注册到heka中, 能够确定这个插件被加载即可

参考文档:

  • [1]http://hekad.readthedocs.io/en/v0.10.0/developing/plugin.html#registering-your-plugin
  • [2]http://hekad.readthedocs.io/en/v0.10.0/installing.html#building-hekad-with-external-plugins

首先我们在新的GOPATH下的src/github.com/下建立自己的工作区和包目录 (我这里以./VOID001/inputplug 为例)

然后我们定义一个type 这个type就是我们的Plugin的type, 这里起名为SampleInput, 其他的名字也都可以 , 这个地方对命名没有什么约束, 下面有对名字有约束的地方, 会进行说明.  然后 将这个type实现为Plugin interface(即将 Plugin 这个interface对应的全部方法实现在type上)

type SampleInputPlug struct {
	filePath string
}

func (t *SampleInputPlug) Init(config interface{}) error {
	return nil
}

这样, 就已经实现了一个最基本的Plugin ,SampleInputPlug 指针实现了Plugin这个接口, 然后通过查看 [1] 我们加入这样一句话到Init中, 将我们的Plugin注册到heka中(这里是有两个坑的, 马上就会发现了 = = )

func (t *SampleInputPlug) Init(config interface{}) error {
	pipeline.RegisterPlugin("SampleInputPlug", func() interface{} { return new(SampleInputPlug) })	
	return nil
}

好, 我们go build, 恩 没有什么问题, 下一步 就是把这个插件给编译到heka中(吐槽: go只支持静态编译, 所以每当新开发一个插件都需要对heka进行一次重新编译=A=很不爽)

根据[2]我们修改 /path/to/your/heka/cmake 下面的 plugin_loader.cmake 添加如下这一行(注:我目前还没有找到添加本地的git repo然后直接clone本地的gitrepo进行编译的方法 , 不过我认为肯定是有的, 目前先不管这个) , 在添加的时候要保证你的这个plugin所在的目录(即inputplug)被添加到了一个 git repo并push到了github上

add_external_plugin(git https://github.com/VOID001/inputplug master)

好了,然后在heka根目录执行 source ./build.sh 然后新建一个heka配置文件 sample.toml 里面写上

[SampleInputPlug]

好了, 然后我们运行 ./hekad -config sample.toml  …. 诶 (#°Д°) 为毛出错了!!! (好了即将进入填坑时间)

╭─[email protected]_PC ~/GitRepos/heka/build/heka/bin  ‹dev› 
╰─➤  ./hekad -config samp.toml
2016/05/25 10:55:11 Pre-loading: [SampleInputPlug]
2016/05/25 10:55:11 No registered plugin type: SampleInputPlug
2016/05/25 10:55:11 Pre-loading: [NullSplitter]
2016/05/25 10:55:11 Loading: [NullSplitter]
2016/05/25 10:55:11 Pre-loading: [ProtobufDecoder]
2016/05/25 10:55:11 Loading: [ProtobufDecoder]
2016/05/25 10:55:11 Pre-loading: [ProtobufEncoder]
2016/05/25 10:55:11 Loading: [ProtobufEncoder]
2016/05/25 10:55:11 Pre-loading: [TokenSplitter]
2016/05/25 10:55:11 Loading: [TokenSplitter]
2016/05/25 10:55:11 Pre-loading: [PatternGroupingSplitter]
2016/05/25 10:55:11 Loading: [PatternGroupingSplitter]
2016/05/25 10:55:11 Pre-loading: [HekaFramingSplitter]
2016/05/25 10:55:11 Loading: [HekaFramingSplitter]
2016/05/25 10:55:11 Error reading config:  Empty configuration, exiting.

这个错误提示我们这个插件没有被注册 = = 仔细阅读文档之后发现 我们的RegisterPlugin的第一个参数字串 是不符合要求的

The name value should be a unique identifier for your plugin, and it should end in one of “Input”, “Splitter”, “Decoder”, “Filter”, “Encoder”, or “Output”, depending on the plugin type.

好, 我们把名字改成 SampleInput 去掉后面的Plug ,然后修改我们的toml配置文件 & 重新编译,运行 还是不行(#°Д°)

这次是一个很无语的错误 = = 我们应该把RegisterPlugin这个写在 init()函数内 而不是 Init 函数内 , init()函数是每一个包在被加载的时候最先调用的函数, 我们应该在这里对Plugin进行注册 , 修改后的代码如下:

package sampinput

import (
	"fmt"
	"github.com/mozilla-services/heka/pipeline"
)

type SampleInputPlug struct {
	filePath string
	config   *SampleInputConfig
}

type SampleInputConfig struct {
}

func init() {
	fmt.Println("Init VOID001's Plugin Done")
	pipeline.RegisterPlugin("SampleInput", func() interface{} { return new(SampleInputPlug) })
}

func (t *SampleInputPlug) Init(config interface{}) error {
	return nil
}

再次编译, 运行(WTF)看结果, 恩 终于成功了~

╰─➤  ./hekad -config samp.toml
Init VOID001's Plugin Done
2016/05/25 11:14:45 Pre-loading: [SampleInput]
2016/05/25 11:14:45 Pre-loading: [TokenSplitter]
2016/05/25 11:14:45 Loading: [TokenSplitter]
2016/05/25 11:14:45 Pre-loading: [PatternGroupingSplitter]
2016/05/25 11:14:45 Loading: [PatternGroupingSplitter]
2016/05/25 11:14:45 Pre-loading: [HekaFramingSplitter]
2016/05/25 11:14:45 Loading: [HekaFramingSplitter]
2016/05/25 11:14:45 Pre-loading: [NullSplitter]
2016/05/25 11:14:45 Loading: [NullSplitter]
2016/05/25 11:14:45 Pre-loading: [ProtobufDecoder]
2016/05/25 11:14:45 Loading: [ProtobufDecoder]
2016/05/25 11:14:45 Pre-loading: [ProtobufEncoder]
2016/05/25 11:14:45 Loading: [ProtobufEncoder]
2016/05/25 11:14:45 Loading: [SampleInput]
写好Input插件的框架

上面已经将我们的插件注册到 heka中了, 下来就是开始开发一个Input插件 , 我们先实现从文件中读取内容这一个简单的功能

为了实现一个Input插件, 我们必须实现Input接口 , Input接口的定义如下:

type Input interface {
    Run(ir InputRunner, h PluginHelper) (err error)
    Stop()
}

我们为*SampleInputPlug 实现这个接口

func (t *SampleInputPlug) Stop() {
	return
}

func (t *SampleInputPlug) Run(ir pipeline.InputRunner, h pipeline.PluginHelper) (err error) {
	return nil
}

对于一个Input Plugin 来说, Run这个函数在插件运行的时候就运行了,并且如果不出什么意外的话(比如突然死机, 或者代码写残了导致它崩溃了) 会一直运行到hekad退出  , 当然就算我们的插件跪了, 如果我们让插件具有了无限复活的buff的话~ (参见 Restarting Plugin) 那么还是可以救回来的 ~

Stop函数也就是释放资源, 清理现场, 比如回收打开的文件, 通知goroutine退出等等~

我们先来看Run函数, 这个函数通常要做这样三件事(也就是一个Input Plugin要做的事啦)

  • 获取data
  • 把获取到的data给一个SplitterRunner(这又是什么呢0.0 别急, 一会儿说)
  • (可选的)Provide a “pack decorator” function to the SplitterRunner to populate the message object with any input-specific information(暂时没看懂, 先放着)

那么我们就开始第一步的实现啦

首先获取数据, 我们通过打开一个文件来获取输入

if _, err := os.Open(t.filePath); err != nil {
	return fmt.Errorf("Open File %s error: %s", t.filePath, err.Error())
}
file, _ := os.Open(t.filePath)

这样 file 就是一个 io.Reader 接口 , 下一步就是将数据传递给Splitter Runner

SplitterRunner 就是一个用于读取接收到的数据并且传递到 heka pipeline的 接口, 通过 InputRunner 自带的函数 NewSplitterRunner 我们可以实现这个功能,  然后我们使用SplitStream这个方法, 读取并切割这个文件流, 下面是具体实现Run的代码

func (t *SampleInputPlug) Run(ir pipeline.InputRunner, h pipeline.PluginHelper) (err error) {
	//Access Data From Outside world
	if _, err := os.Open(t.filePath); err != nil {
		return fmt.Errorf("Open File %s error: %s", t.filePath, err.Error())
	}
	file, _ := os.Open(t.filePath)
	sr := ir.NewSplitterRunner("")
	for {
		sr.SplitStream(file, nil)
	}
	return nil
}

注意:

sr := ir.NewSplitterRunner("")

这一句一定要放在循环外, 放在循环里的话, 会导致内存被分配的sr占满(电脑卡死两次的哭着告诉你)

好,然后我们简单修改一下配置文件 sample.toml 让我们能看到我们的修改的效果

[SampleInput]
filepath="/tmp/text"

[PayloadEncoder]
append_newlines = false

[LogOutput]
message_matcher = "TRUE"
encoder = "PayloadEncoder"

然后我们执行 ./hekad -config /tmp/text (text 已经存在) 然后 , 恩, 我们要的输出看到啦 ~~~

好的现在停止heka(Ctrl + C) 结果发现, 没有办法停止(#°Д°)

^C2016/05/25 19:28:16 Shutdown initiated.
2016/05/25 19:28:16 Stop message sent to input 'SampleInput'

一直卡在这里….. = = 好吧看来我们还要继续

为我们的Input插件实现退出功能

在插件退出的时候会call  Stop函数 ,我们Run实现的是一个forever loop , 一直读取数据直到hekad退出,  为了通知Run退出, 我们需要实现一个 channel , 这里在我们的SampleInputPlug内实现了一个 stopMsg 成员, 类型为 chan interface{}  注意: 在使用你的 channel前 一定要make make make!!!!!(又被坑一波的我 = =) 然后我们用 select , 非阻塞的查询信号的到来 , 具体的代码如下~~

import (
	"errors"
	"fmt"
	"github.com/mozilla-services/heka/pipeline"
	"os"
)

type SampleInputPlug struct {
	filePath string
	config   *SampleInputConfig
	stopMsg  chan interface{}
}

type SampleInputConfig struct{}

func init() {
	pipeline.RegisterPlugin("SampleInput", func() interface{} { return new(SampleInputPlug) })
}

func (t *SampleInputPlug) Init(config interface{}) error {
	var ok bool
	conf := config.(pipeline.PluginConfig)
	if t.filePath, ok = conf["filepath"].(string); !ok {
		return errors.New("Now 'filepath' supported")
	}
	fmt.Println("Init Complete")
	t.stopMsg = make(chan interface{})
	return nil
}

func (t *SampleInputPlug) Stop() {
	fmt.Println("Stop Plugin")
	t.stopMsg <- struct{}{}
	return
}

func (t *SampleInputPlug) Run(ir pipeline.InputRunner, h pipeline.PluginHelper) (err error) {
	//Access Data From Outside world
	if _, err := os.Open(t.filePath); err != nil {
		return fmt.Errorf("Open File %s error: %s", t.filePath, err.Error())
	}
	file, _ := os.Open(t.filePath)
	sr := ir.NewSplitterRunner("")
	//Send it to heka pipeline
	for {
		sr.SplitStream(file, nil)
		//Check wheter stop signal is sent
		select {
		case <-t.stopMsg:
			fmt.Println("Runner get the Stop Message")
			return nil
		default:
		}
	}
	return fmt.Errorf("Fatal Error. Plugin Exit Unexpectedly")
}

好了, 这样我们的插件也能正常退出啦 , 这次就到这里 ~~

4 thoughts on “[go]heka插件开发

  1. 非常好,我急需这方面的资料,写的非常细致,思路严谨,排版风格看起来也是赏心悦目!

    1. 谬赞啦,谢谢,能对你有帮助我也很开心
      不过heka这个项目已经deprecated了
      我曾经研究过heka的源码,并改造为公司内使用的log分析平台,不过目前不从事相关工作了
      有问题的话欢迎交流w 邮件或者博客评论都可以的

Leave a Reply

Your email address will not be published. Required fields are marked *

19 + five =

This site uses Akismet to reduce spam. Learn how your comment data is processed.