版权声明:本文为博主原创文章,转载请注明出处:https://twocups.cn/index.php/2021/03/15/33/

Metricbeat 源码地址:https://github.com/elastic/beats/tree/master/metricbeat

写在前面

上一篇我们分析了 Filebeat 的源码,这一篇我们来分析 Metricbeat 的源码。在我读 Metricbeat 的源码之前,以为 Metricbeat 和 Filebeat 的源码在实现层面是相近的,结果读完后发现其实它们除了 libbeat 库中用到的东西和最后发布到后端的流程差不多之外,其他方面关系还真的不是很大。而且不知道是不是我的错觉,Metricbeat 的源码风格和 Filebeat 不太一样,其中面向对象的思想特别浓烈,并且在管道的特性和使用上相对于 Filebeat 来说没有那么多。这次的源码分析和上次一样,分为工作原理和运行流程。并且,运行流程也同样是一个主流程,很多暗线也没有提到。

Metricbeat 的工作原理

工作原理的官方文档:https://www.elastic.co/guide/en/beats/metricbeat/current/how-metricbeat-works.html

Metricbeat 中最重要的两个组件是 Module Metricset。Module 定义了 Metricbeat 采集数据的对象,即去哪里收集数据,例如 MySQL、Redis、System 等;而 Metricset 定义了 Metricbeat 采集对象中需要采集的指标,即收集哪些数据,例如 System 中的 cpu、内存、硬盘、网络等。

很明显,Metricset 与 Module 是从属关系,即每一个 Module 都包含许多 Metricset。Module 是属于服务层面的,它指定与服务有关的详细信息,例如 Metricbeat 和服务如何连接、Metricbeat 采集服务指标的频率、Metricbeat 采集哪些指标等。Metricset 是 Module 中负责获取和构建数据的组件,它发送一次请求就可以从服务中取回一系列的指标。就像我们在实际部署那一章举过的例子,Redis 的 Module 组件中包含了一个 INFO Metricset,而这个 Metricset 就是服务收集 Redis 中的相关指标的,然后再把收集到的指标发送到 Elasticsearch。

Metricbeat 启动流程

我画了一个 Metricbeat 运行的流程图,里面描述了 Metricbeat 的主要组件采集日志数据的过程。大家可以先看看图,有个大概的印象,我们再详细往下说。

接下来,我们详细看看 Metricbeat 是如何运行的。

首先,Metricbeat 文件夹根目录下的 main.go 文件主方法运行。其中,重要的是引入了 /cmd/root.go 文件中的 RootCmd 变量 ,该变量的变量类型是 BeatsRootCmd,结构体在下面给出了。BeatsRootCmd 的功能是处理所有应用程序的命令行页面,同时解析用户标志并运行子命令。

func main() {
	if err := cmd.RootCmd.Execute(); err != nil {
		os.Exit(1)
	}
}
// BeatsRootCmd结构体
type BeatsRootCmd struct {
	cobra.Command
	RunCmd        *cobra.Command
	SetupCmd      *cobra.Command
	VersionCmd    *cobra.Command
	CompletionCmd *cobra.Command
	ExportCmd     *cobra.Command
	TestCmd       *cobra.Command
	KeystoreCmd   *cobra.Command
}

既然引入了 /cmd/root.go 文件,那文件中的初始化方法也会运行。变量 RootCmd 的声明在该文件的开头给出了,下面的初始化方法只是该变量的赋值。变量 RootCmd 的值将由初始化方法 Initialize 得到,而该方法的参数是 Metricbeat 的默认参数,由方法 MetricbeatSettings 得到。

func init() {
	RootCmd = Initialize(MetricbeatSettings())
}
// 设置Metricbeat 的默认参数
func MetricbeatSettings() instance.Settings {
	var runFlags = pflag.NewFlagSet(Name, pflag.ExitOnError)
	runFlags.AddGoFlag(flag.CommandLine.Lookup("system.hostfs"))
	return instance.Settings{
		RunFlags:      runFlags,
		Name:          Name,
		HasDashboards: true,
		Processing:    processing.MakeDefaultSupport(true, withECSVersion, processing.WithHost, processing.WithAgentMeta()),
	}
}

方法 Initialize 负责初始化 Metricbeat 的入口命令。

func Initialize(settings instance.Settings) *cmd.BeatsRootCmd {
	rootCmd := cmd.GenRootCmdWithSettings(beater.DefaultCreator(), settings)
	rootCmd.AddCommand(cmd.GenModulesCmd(Name, "", BuildModulesManager))
	rootCmd.TestCmd.AddCommand(test.GenTestModulesCmd(Name, "", beater.DefaultTestModulesCreator()))
	return rootCmd
}

方法 GenRootCmdWithSettings 进一步处理命令,并且返回我们使用的 Metricbeat 的根命令。

func GenRootCmdWithSettings(beatCreator beat.Creator, settings instance.Settings) *BeatsRootCmd {
	if settings.IndexPrefix == "" {
		settings.IndexPrefix = settings.Name
	}
	rootCmd := &BeatsRootCmd{}
	rootCmd.Use = settings.Name
	// 由beat的名称来获取默认配置,这里的beat名称指的就是metricbeat
	err := cfgfile.ChangeDefaultCfgfileFlag(settings.Name)
	if err != nil {
		panic(fmt.Errorf("failed to set default config file path: %v", err))
	}
	// 根据配置处理命令行的命令
	rootCmd.RunCmd = genRunCmd(settings, beatCreator)
	rootCmd.ExportCmd = genExportCmd(settings)
	rootCmd.TestCmd = genTestCmd(settings, beatCreator)
	rootCmd.SetupCmd = genSetupCmd(settings, beatCreator)
	rootCmd.KeystoreCmd = genKeystoreCmd(settings)
	rootCmd.VersionCmd = GenVersionCmd(settings)
	rootCmd.CompletionCmd = genCompletionCmd(settings, rootCmd)
	rootCmd.Run = rootCmd.RunCmd.Run
	// Persistent flags是持久表示,在所有子命令中都通用
	rootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("E"))
	rootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("c"))
	rootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("d"))
	rootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("v"))
	rootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("e"))
	rootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("environment"))
	rootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("path.config"))
	rootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("path.data"))
	rootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("path.logs"))
	rootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("path.home"))
	rootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("strict.perms"))
	if f := flag.CommandLine.Lookup("plugin"); f != nil {
		rootCmd.PersistentFlags().AddGoFlag(f)
	}
	// 继承根flags
	rootCmd.Flags().AddFlagSet(rootCmd.RunCmd.Flags())
	// 注册所有beat共有的子命令
	rootCmd.AddCommand(rootCmd.RunCmd)
	rootCmd.AddCommand(rootCmd.SetupCmd)
	rootCmd.AddCommand(rootCmd.VersionCmd)
	rootCmd.AddCommand(rootCmd.CompletionCmd)
	rootCmd.AddCommand(rootCmd.ExportCmd)
	rootCmd.AddCommand(rootCmd.TestCmd)
	rootCmd.AddCommand(rootCmd.KeystoreCmd)
	return rootCmd
}

方法 genRunCmd 是 libbeat 库中 /libbeat/cmd/run.go 中的唯一方法,它的任务就是运行根命令。

func genRunCmd(settings instance.Settings, beatCreator beat.Creator) *cobra.Command {
	name := settings.Name
	runCmd := cobra.Command{
		Use:   "run",
		Short: "Run " + name,
		Run: func(cmd *cobra.Command, args []string) {
			err := instance.Run(settings, beatCreator)
			if err != nil {
				os.Exit(1)
			}
		},
	}
	// 运行子命令的flags
	runCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("N"))
	runCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("httpprof"))
	runCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("cpuprofile"))
	runCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("memprofile"))
	if settings.RunFlags != nil {
		runCmd.Flags().AddFlagSet(settings.RunFlags)
	}
	return &runCmd
}

方法 Run 的功能是初始化并且实例化一个beat。

func Run(settings Settings, bt beat.Creator) error {
	err := setUmaskWithSettings(settings)
	if err != nil && err != errNotImplemented {
		return errw.Wrap(err, "could not set umask")
	}
    // 准备好将要实例化的beat的参数
    // 这里的name的值就是metricbeat
	name := settings.Name
	idxPrefix := settings.IndexPrefix
	version := settings.Version
	elasticLicensed := settings.ElasticLicensed
	return handleError(func() error {
		defer func() {
			if r := recover(); r != nil {
				logp.NewLogger(name).Fatalw("Failed due to panic.",
					"panic", r, zap.Stack("stack"))
			}
		}()
        // 实例化一个beat
		b, err := NewBeat(name, idxPrefix, version, elasticLicensed)
		if err != nil {
			return err
		}
		// 将实例化的beat的信息添加进注册表和监视器
		registry := monitoring.GetNamespace("info").GetRegistry()
		monitoring.NewString(registry, "version").Set(b.Info.Version)
		monitoring.NewString(registry, "beat").Set(b.Info.Beat)
		monitoring.NewString(registry, "name").Set(b.Info.Name)
		monitoring.NewString(registry, "hostname").Set(b.Info.Hostname)
		// 添加额外信息到管理state的注册表,之后发送到监视器
		stateRegistry := monitoring.GetNamespace("state").GetRegistry()
		serviceRegistry := stateRegistry.NewRegistry("service")
		monitoring.NewString(serviceRegistry, "version").Set(b.Info.Version)
		monitoring.NewString(serviceRegistry, "name").Set(b.Info.Beat)
		beatRegistry := stateRegistry.NewRegistry("beat")
		monitoring.NewString(beatRegistry, "name").Set(b.Info.Name)
		monitoring.NewFunc(stateRegistry, "host", host.ReportInfo, monitoring.Report)
        // 启动这个beat
		return b.launch(settings, bt)
	}())
}

以下是刚才创建的 Beat 实例的启动流程。

func (b *Beat) launch(settings Settings, bt beat.Creator) error {
	defer logp.Sync()
	defer logp.Info("%s stopped.", b.Info.Beat)
    // beat初始化
	err := b.InitWithSettings(settings)
	if err != nil {
		return err
	}
	defer func() {
		if err := b.processing.Close(); err != nil {
			logp.Warn("Failed to close global processing: %v", err)
		}
	}()
	// 将服务标记为停止
	defer svc.NotifyTermination()
	// 上排他锁
	bl := newLocker(b)
	err = bl.lock()
	if err != nil {
		return err
	}
	defer bl.unlock()
	// 从注册表中变量中设置beat的id
	infoRegistry := monitoring.GetNamespace("info").GetRegistry()
	monitoring.NewString(infoRegistry, "uuid").Set(b.Info.ID.String())
	serviceRegistry := monitoring.GetNamespace("state").GetRegistry().GetRegistry("service")
	monitoring.NewString(serviceRegistry, "id").Set(b.Info.ID.String())
	svc.BeforeRun()
	defer svc.Cleanup()
	// 在Seccomp锁定之前启动API服务器,这样是为了创建Unix套接字集,以对Unix域文件设置适当的权限
	if b.Config.HTTP.Enabled() {
		s, err := api.NewWithDefaultRoutes(logp.NewLogger(""), b.Config.HTTP, monitoring.GetNamespace)
		if err != nil {
			return errw.Wrap(err, "could not start the HTTP server for the API")
		}
		s.Start()
		defer s.Stop()
	}
	if err = seccomp.LoadFilter(b.Config.Seccomp); err != nil {
		return err
	}
	beater, err := b.createBeater(bt)
	if err != nil {
		return err
	}
	r, err := b.setupMonitoring(settings)
	if err != nil {
		return err
	}
	if r != nil {
		defer r.Stop()
	}
	if b.Config.MetricLogging == nil || b.Config.MetricLogging.Enabled() {
		reporter, err := log.MakeReporter(b.Info, b.Config.MetricLogging)
		if err != nil {
			return err
		}
		defer reporter.Stop()
	}
	ctx, cancel := context.WithCancel(context.Background())
	var stopBeat = func() {
		b.Instrumentation.Tracer().Close()
		beater.Stop()
	}
	svc.HandleSignals(stopBeat, cancel)
	err = b.loadDashboards(ctx, false)
	if err != nil {
		return err
	}
	logp.Info("%s start running.", b.Info.Beat)
	// 启动该beat的配置管理器
	b.Manager.Start(beater.Stop)
	defer b.Manager.Stop()
    // 运行该beat
	return beater.Run(&b.Beat)
}

从这里开始离开 libbeat 库了,正式开始 Metricbeat 本身的逻辑了。方法 Run 启动 Metricbeat 的工作程序,并且在最后等待队列进行堵塞,直到调用了方法 Stop并完成工作为止。

func (bt *Metricbeat) Run(b *beat.Beat) error {
	var wg sync.WaitGroup
	for _, r := range bt.runners {
        // 启动metricbeat实例的runner
		r.Start()
        // 等待队列加一
		wg.Add(1)
		thatRunner := r
        // 对于与Metribeat关联的每个主机都有其自己的goroutine来获取数据
		go func() {
			defer wg.Done()
			<-bt.done
			thatRunner.Stop()
		}()
	}
	// 管理modules
	factory := module.NewFactory(b.Info, bt.moduleOptions...)
	modules := cfgfile.NewRunnerList(management.DebugK, factory, b.Publisher)
	reload.Register.MustRegisterList(b.Info.Beat+".modules", modules)
	wg.Add(1)
	go func() {
		defer wg.Done()
		<-bt.done
		modules.Stop()
	}()
	if bt.config.ConfigModules.Enabled() {
		moduleReloader := cfgfile.NewReloader(b.Publisher, bt.config.ConfigModules)
		if err := moduleReloader.Check(factory); err != nil {
			return err
		}
		go moduleReloader.Run(factory)
		wg.Add(1)
		go func() {
			defer wg.Done()
			<-bt.done
			moduleReloader.Stop()
		}()
	}
	if bt.autodiscover != nil {
		bt.autodiscover.Start()
		wg.Add(1)
		go func() {
			defer wg.Done()
			<-bt.done
			bt.autodiscover.Stop()
		}()
	}
	wg.Wait()
	return nil
}

之后启动 Metricbeat 实例的 runner。这里是 Metricbeat 很重要的一个分水岭,该方法首先从服务中获得了一系列指标数据,之后再将这些指标参数发送到指定的后端,例如 Elasticsearch。那么我们接下来也依次去阅读这两个部分的代码。

func (mr *runner) Start() {
    // startOnce是runner的同步锁,下面给出runner的结构体
	mr.startOnce.Do(func() {
        // 采集数据
		output := mr.mod.Start(mr.done)
        // runner的等待队列加一
		mr.wg.Add(1)
		moduleList.Add(mr.mod.Name())
		go func() {
			defer mr.wg.Done()
            // 发送数据
			PublishChannels(mr.client, output)
		}()
	})
}
// runner结构体
type runner struct {
	done      chan struct{}
	wg        sync.WaitGroup
	startOnce sync.Once
	stopOnce  sync.Once
	mod       *Wrapper
	client    beat.Client
}

Metricbeat 数据采集流程

Wrapper 是Metricbeat 中很重要的一种数据结构,看它的名字就能猜到,它是个包装器,包装了 Metricbeat 中最重要的两个组件:Module 和 Metricset。Wrapper 的 Start 方法实质上是启动了 Module 中的 Metricset 工作器,而这个工作器的作用就是获取(Fetch)指标数据。

func (mw *Wrapper) Start(done <-chan struct{}) <-chan beat.Event {
	debugf("Starting %s", mw)
    // 建立共享管道,用来存储结果数据
	out := make(chan beat.Event, 1)
	var wg sync.WaitGroup
	wg.Add(len(mw.metricSets))
	for _, msw := range mw.metricSets {
        // 每个metricset都和一个主机搭配,启动一个线程
		go func(msw *metricSetWrapper) {
			metricsPath := msw.ID()
			registry := monitoring.GetNamespace("dataset").GetRegistry()
			defer registry.Remove(metricsPath)
			defer releaseStats(msw.stats)
			defer wg.Done()
			defer msw.close()
			registry.Add(metricsPath, msw.Metrics(), monitoring.Full)
			monitoring.NewString(msw.Metrics(), "starttime").Set(common.Time(time.Now()).String())
            // 运行metricSetWrapper,将结果数据放入共享管道
			msw.run(done, out)
		}(msw)
	}
	// 单开一条线程,负责在所有写入结束时关闭共享管道
	go func() {
		wg.Wait()
		close(out)
		debugf("Stopped %s", mw)
	}()
	return out
}
// Wrapper结构体
type Wrapper struct {
    // module
	mb.Module
    // metricset列表
	metricSets []*metricSetWrapper
	// 相关设置
	maxStartDelay  time.Duration
	eventModifiers []mb.EventModifier
}

MetricSetWrapper 的 run 方法用于从指定的主机中获取指标数据。

func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) {
	defer logp.Recover(fmt.Sprintf("recovered from panic while fetching "+
		"'%s/%s' for host '%s'", msw.module.Name(), msw.Name(), msw.Host()))
	// 在限制时间maxStartDelay内随机启动每一个metricset
	if msw.module.maxStartDelay > 0 {
		delay := time.Duration(rand.Int63n(int64(msw.module.maxStartDelay)))
		debugf("%v/%v will start after %v", msw.module.Name(), msw.Name(), delay)
		select {
		case <-done:
			return
		case <-time.After(delay):
		}
	}
	debugf("Starting %s", msw)
	defer debugf("Stopped %s", msw)
	// reporter用于报告event和错误
	reporter := &eventReporter{
		msw:  msw,
		out:  out,
		done: done,
	}
	switch ms := msw.MetricSet.(type) {
	case mb.PushMetricSet:
		ms.Run(reporter.V1())
	case mb.PushMetricSetV2:
		ms.Run(reporter.V2())
	case mb.PushMetricSetV2WithContext:
		ms.Run(&channelContext{done}, reporter.V2())
	case mb.EventFetcher, mb.EventsFetcher,
		mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error, mb.ReportingMetricSetV2WithContext:
        // 开始定期取回指标数据
		msw.startPeriodicFetching(&channelContext{done}, reporter)
	default:
		logp.Err("MetricSet '%s/%s' does not implement an event producing interface",
			msw.Module().Name(), msw.Name())
	}
}

方法 startPeriodicFetching 负责定期取回指标数据。

func (msw *metricSetWrapper) startPeriodicFetching(ctx context.Context, reporter reporter) {
	// 表示它作为定期取回数据的程序启动
	msw.periodic = true
	// 首先立即取回一次数据
	msw.fetch(ctx, reporter)
	// 启动定时器
	t := time.NewTicker(msw.Module().Config().Period)
	defer t.Stop()
    // 每隔一段时间取回一次数据
	for {
		select {
		case <-reporter.V2().Done():
			return
		case <-t.C:
			msw.fetch(ctx, reporter)
		}
	}
}

方法 fetch 的功能就是根据不同的情况调用更为具体的取回(Fetch)方法。

func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) {
	switch fetcher := msw.MetricSet.(type) {
	case mb.EventFetcher:
        // 单event取回
		msw.singleEventFetch(fetcher, reporter)
	case mb.EventsFetcher:
        // 多event取回
		msw.multiEventFetch(fetcher, reporter)
	case mb.ReportingMetricSet:
		reporter.StartFetchTimer()
		fetcher.Fetch(reporter.V1())
	case mb.ReportingMetricSetV2:
		reporter.StartFetchTimer()
		fetcher.Fetch(reporter.V2())
	case mb.ReportingMetricSetV2Error:
		reporter.StartFetchTimer()
		err := fetcher.Fetch(reporter.V2())
		if err != nil {
			reporter.V2().Error(err)
			logp.Info("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
		}
	case mb.ReportingMetricSetV2WithContext:
		reporter.StartFetchTimer()
		err := fetcher.Fetch(ctx, reporter.V2())
		if err != nil {
			reporter.V2().Error(err)
			logp.Info("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
		}
	default:
		panic(fmt.Sprintf("unexpected fetcher type for %v", msw))
	}
}
// reporter结构体
type reporter interface {
	StartFetchTimer()
	V1() mb.PushReporter
	V2() mb.PushReporterV2
}

方法 multiEventFetch 能够取回多个event,原理和方法 singleEventFetch 是一样的。

func (msw *metricSetWrapper) multiEventFetch(fetcher mb.EventsFetcher, reporter reporter) {
	reporter.StartFetchTimer()
	events, err := fetcher.Fetch()
	if len(events) == 0 {
		reporter.V1().ErrorWith(err, nil)
	} else {
		for _, event := range events {
			reporter.V1().ErrorWith(err, event)
		}
	}
}

方法 ErrorWith 指的是如果有错误,会由这个方法进行处理。这个方法只是个情况判断,具体逻辑在方法 Event 中。

func (r reporterV1) ErrorWith(err error, meta common.MapStr) bool {
	if err == nil && meta == nil {
		return true
	}
    // 将common.MapStr转换为event
	return r.v2.Event(mb.TransformMapStrToEvent(r.module, meta, err))
}
func (r reporterV2) Event(event mb.Event) bool {
    //先进行一系列异常情况判断
	if event.Took == 0 && !r.start.IsZero() {
		event.Took = time.Since(r.start)
	}
    // 检测是否为定期启动程序,之前我们说过的
	if r.msw.periodic {
		event.Period = r.msw.Module().Config().Period
	}
	if event.Timestamp.IsZero() {
		if !r.start.IsZero() {
			event.Timestamp = r.start
		} else {
			event.Timestamp = time.Now().UTC()
		}
	}
	if event.Host == "" {
		event.Host = r.msw.HostData().SanitizedURI
	}
	if event.Error == nil {
		r.msw.stats.success.Add(1)
	} else {
		r.msw.stats.failures.Add(1)
	}
	if event.Namespace == "" {
		event.Namespace = r.msw.Registration().Namespace
	}
    // 把event包装一下,改改数据,生成beatEvent
	beatEvent := event.BeatEvent(r.msw.module.Name(), r.msw.MetricSet.Name(), r.msw.module.eventModifiers...)
    // 把生成的beatEvent发送到之前创建的共享管道里
	if !writeEvent(r.done, r.out, beatEvent) {
		return false
	}
	r.msw.stats.events.Add(1)
	return true
}
// 一个工具类,作用是将上面得到beatEvent发送到之前我们创建的共享管道里
func writeEvent(done <-chan struct{}, out chan<- beat.Event, event beat.Event) bool {
	select {
	case <-done:
		return false
	case out <- event:
		return true
	}
}

方法 BeatEvent 对 event 进行一个包装,并且能够更改 event 中的基础数据。

func (e *Event) BeatEvent(module, metricSet string, modifiers ...EventModifier) beat.Event {
	if e.RootFields == nil {
		e.RootFields = common.MapStr{}
	}
	for _, modify := range modifiers {
		modify(module, metricSet, e)
	}
	b := beat.Event{
		Timestamp:  e.Timestamp,
		Fields:     e.RootFields,
		TimeSeries: !e.DisableTimeSeries,
	}
	if len(e.ModuleFields) > 0 {
		b.Fields.Put(module, e.ModuleFields)
		e.ModuleFields = nil
	}
	if e.Service == "" {
		e.Service = module
	}
	e.RootFields.Put("service.type", e.Service)
	if len(e.MetricSetFields) > 0 {
		switch e.Namespace {
		case ".":
			// "."的意思就是上一级目录,这个方法的意思就是将fields添加到当前根目录
			b.Fields.DeepUpdate(e.MetricSetFields)
		case "":
			b.Fields.Put(module+"."+metricSet, e.MetricSetFields)
		default:
			b.Fields.Put(e.Namespace, e.MetricSetFields)
		}
		e.MetricSetFields = nil
	}
	// 设置索引前缀来覆盖默认值
	if e.Index != "" {
		b.Meta = common.MapStr{"index": e.Index}
	}
	if e.ID != "" {
		b.SetID(e.ID)
	}
	if e.Error != nil {
		b.Fields["error"] = common.MapStr{
			"message": e.Error.Error(),
		}
	}
	return b
}

Metricbeat 数据发送流程

现在我们回到 runner 实例的 Start 方法。刚才我们走的是采集数据的那条路,接下来我们继续走发送数据的那条路。这就相对简单很多了,和之前说的 Filebeat 差不多。

func (mr *runner) Start() {
    // startOnce是runner的同步锁,下面给出runner的结构体
	mr.startOnce.Do(func() {
        // 采集数据
		output := mr.mod.Start(mr.done)
        // runner的等待队列加一
		mr.wg.Add(1)
		moduleList.Add(mr.mod.Name())
		go func() {
			defer mr.wg.Done()
            // 发送数据
			PublishChannels(mr.client, output)
		}()
	})
}

Metricbeat 也是会将收集到的指标数据通过管道进行传输。方法 PublishChannels 会将每个管道读取到的 event 发送到指定的客户端。关于客户端 Client 及其发布 Publish 的问题,也和 Filebeat 中一样,是由各个后端来实现接口的。

func PublishChannels(client beat.Client, cs ...<-chan beat.Event) {
	var wg sync.WaitGroup
	// output从beat.Event管道发布了event直到管道关闭
	output := func(c <-chan beat.Event) {
		defer wg.Done()
		for event := range c {
			client.Publish(event)
		}
	}
	// 为管道中的每一个输入通道开启一个output的线程
	wg.Add(len(cs))
	for _, c := range cs {
		go output(c)
	}
	wg.Wait()
}

Client 接口如下。

type Client interface {
	Publish(Event)
	PublishAll([]Event)
	Close() error
}

写在最后

开头也说了,这次 Metricbeat 的源码展示的是最主要的那条流程,其他暗线也有提到,但是不多。后面的话,我会针对比较具体的场景来进行源码分析,比如说 Beats 中添加更加具体的过滤和梳理数据的模块,而不是直接用 Logstash,毕竟它是跑在 JVM 里面的。那么针对添加新模块,我会展示如何定位到与新功能相关的代码,然后对其分析并进行修改和添加,之后再考虑能不能继续改进或优化。

林皓伟

《【Elastic Stack系列】第四章:源码分析(二) Metricbeat篇》有 2 条评论
    1. 当然可以啊。
      填写昵称是方便大家称呼你,填写邮箱是为了让我能够把你的评论下的回复抄送一份发给你,方便大家及时沟通。
      如果都不用的话,可以随便填,只要发表的评论不违法,都会保留的。

发表回复

您的电子邮箱地址不会被公开。