当前位置:首页 > 应用开发

Flume架构与源码分析-核心组件分析-2

4、架件分整体流程

从以上部分我们可以看出,构源不管是码分Source还是Sink都依赖Channel,那么启动时应该先启动Channel然后再启动Source或Sink即可。析核心组析

Flume有两种启动方式:使用EmbeddedAgent内嵌在Java应用中或使用Application单独启动一个进程,架件分此处我们已Application分析为主。构源

首先进入org.apache.flume.node.Application的码分main方法启动:

Java代码

//1、设置默认值启动参数、析核心组析参数是架件分否必须的    Options options = new Options();    Option option = new Option("n", "name", true, "the name of this agent");    option.setRequired(true);    options.addOption(option);    option = new Option("f", "conf-file", true,    "specify a config file (required if -z missing)");    option.setRequired(false);    options.addOption(option);    //2、接着解析命令行参数    CommandLineParser parser = new GnuParser();    CommandLine commandLine = parser.parse(options,构源 args);    String agentName = commandLine.getOptionValue(n);    boolean reload = !commandLine.hasOption("no-reload-conf");    if (commandLine.hasOption(z) || commandLine.hasOption("zkConnString")) {       isZkConfigured = true;    }    if (isZkConfigured) {         //3、如果是码分通过ZooKeeper配置,则使用ZooKeeper参数启动,析核心组析此处忽略,架件分我们以配置文件讲解    } else {       //4、构源打开配置文件,码分如果不存在则快速失败      File configurationFile = new File(commandLine.getOptionValue(f));      if (!configurationFile.exists()) {              throw new ParseException(            "The specified configuration file does not exist: " + path);      }      List<LifecycleAware> components = Lists.newArrayList();      if (reload) {  //5、如果需要定期reload配置文件,则走如下方式        //5.1、此处使用Guava提供的事件总线        EventBus eventBus = new EventBus(agentName + "-event-bus");        //5.2、读取配置文件,使用定期轮训拉起策略,默认30s拉取一次        PollingPropertiesFileConfigurationProvider configurationProvider =            new PollingPropertiesFileConfigurationProvider(              agentName, configurationFile, eventBus, 30);        components.add(configurationProvider);        application = new Application(components); //5.3、向Application注册组件        //5.4、向事件总线注册本应用,EventBus会自动注册Application中使用@Subscribe声明的方法        eventBus.register(application);      } else {  //5、配置文件不支持定期reload        PropertiesFileConfigurationProvider configurationProvider =            new PropertiesFileConfigurationProvider(              agentName, configurationFile);        application = new Application();        //6.2、云南idc服务商直接使用配置文件初始化Flume组件        application.handleConfigurationEvent(configurationProvider          .getConfiguration());      }    }    //7、启动Flume应用    application.start();    //8、注册虚拟机关闭钩子,当虚拟机关闭时调用Application的stop方法进行终止    final Application appReference = application;    Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {       @Override      public void run() {         appReference.stop();      }    });    

以上流程只提取了核心代码中的一部分,比如ZK的实现直接忽略了,而Flume启动大体流程如下:

1、读取命令行参数;

2、读取配置文件;

3、根据是否需要reload使用不同的策略初始化Flume;如果需要reload,则使用Guava的事件总线实现,Application的handleConfigurationEvent是事件订阅者,PollingPropertiesFileConfigurationProvider是事件发布者,其会定期轮训检查文件是否变更,如果变更则重新读取配置文件,发布配置文件事件变更,而handleConfigurationEvent会收到该配置变更重新进行初始化;

4、启动Application,并注册虚拟机关闭钩子。

handleConfigurationEvent方法比较简单,首先调用了stopAllComponents停止所有组件,接着调用startAllComponents使用配置文件初始化所有组件:

Java代码

@Subscribe    public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {       stopAllComponents();      startAllComponents(conf);    }     

MaterializedConfiguration存储Flume运行时需要的组件:Source、Channel、亿华云Sink、SourceRunner、SinkRunner等,其是通过ConfigurationProvider进行初始化获取,比如PollingPropertiesFileConfigurationProvider会读取配置文件然后进行组件的初始化。

对于startAllComponents实现大体如下:

Java代码

//1、首先启动Channel    supervisor.supervise(Channels,          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);    //2、确保所有Channel是否都已启动    for(Channel ch: materializedConfiguration.getChannels().values()){       while(ch.getLifecycleState() != LifecycleState.START          && !supervisor.isComponentInErrorState(ch)){         try {           Thread.sleep(500);        } catch (InterruptedException e) {             Throwables.propagate(e);        }      }    }    //3、启动SinkRunner    supervisor.supervise(SinkRunners,      new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);    //4、启动SourceRunner    supervisor.supervise(SourceRunner,    new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);    //5、初始化监控服务    this.loadMonitoring();    

从如下代码中可以看到,首先要准备好Channel,因为Source和Sink会操作它,对于Channel如果初始化失败则整个流程是失败的;然后启动SinkRunner,先准备好消费者;接着启动SourceRunner开始进行采集日志。此处我们发现有两个单独的组件LifecycleSupervisor和MonitorService,一个是组件守护哨兵,一个是监控服务。守护哨兵对这些组件进行守护,假设出问题了默认策略是自动重启这些组件。

对于stopAllComponents实现大体如下:

Java代码

//1、首先停止SourceRunner    supervisor.unsupervise(SourceRunners);    //2、接着停止SinkRunner    supervisor.unsupervise(SinkRunners);    //3、然后停止Channel    supervisor.unsupervise(Channels);    //4、站群服务器***停止MonitorService    monitorServer.stop();     

此处可以看出,停止的顺序是Source、Sink、Channel,即先停止生产,再停止消费,***停止管道。

Application中的start方法代码实现如下:

Java代码

public synchronized void start() {       for(LifecycleAware component : components) {         supervisor.supervise(component,            new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);      }    }     

其循环Application注册的组件,然后守护哨兵对它进行守护,默认策略是出现问题会自动重启组件,假设我们支持reload配置文件,则之前启动Application时注册过PollingPropertiesFileConfigurationProvider组件,即该组件会被守护哨兵守护着,出现问题默认策略自动重启。

而Application关闭执行了如下动作:

Java代码

public synchronized void stop() {       supervisor.stop();      if(monitorServer != null) {         monitorServer.stop();      }    }     

即关闭守护哨兵和监控服务。

到此基本的Application分析结束了,我们还有很多疑问,守护哨兵怎么实现的。

整体流程可以总结为:

1、首先初始化命令行配置;

2、接着读取配置文件;

3、根据是否需要reload初始化配置文件中的组件;如果需要reload会使用Guava事件总线进行发布订阅变化;

4、接着创建Application,创建守护哨兵,并先停止所有组件,接着启动所有组件;启动顺序:Channel、SinkRunner、SourceRunner,并把这些组件注册给守护哨兵、初始化监控服务;停止顺序:SourceRunner、SinkRunner、Channel;

5、如果配置文件需要定期reload,则需要注册Polling***ConfigurationProvider到守护哨兵;

6、***注册虚拟机关闭钩子,停止守护哨兵和监控服务。

轮训实现的SourceRunner 和SinkRunner会创建一个线程进行工作,之前已经介绍了其工作方式。接下来我们看下守护哨兵的实现。

首先创建LifecycleSupervisor:

Java代码

//1、用于存放被守护的组件    supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>();    //2、用于存放正在被监控的组件    monitorFutures = new HashMap<LifecycleAware, ScheduledFuture<?>>();    //3、创建监控服务线程池    monitorService = new ScheduledThreadPoolExecutor(10,        new ThreadFactoryBuilder().setNameFormat(            "lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d")            .build());    monitorService.setMaximumPoolSize(20);    monitorService.setKeepAliveTime(30, TimeUnit.SECONDS);    //4、定期清理被取消的组件    purger = new Purger();    //4.1、默认不进行清理    needToPurge = false;     

LifecycleSupervisor启动时会进行如下操作:

Java代码

public synchronized void start() {       monitorService.scheduleWithFixedDelay(purger, 2, 2, TimeUnit.HOURS);      lifecycleState = LifecycleState.START;    }     

首先每隔两个小时执行清理组件,然后改变状态为启动。而LifecycleSupervisor停止时直接停止了监控服务,然后更新守护组件状态为STOP:

Java代码

//1、首先停止守护监控服务    if (monitorService != null) {       monitorService.shutdown();      try {         monitorService.awaitTermination(10, TimeUnit.SECONDS);      } catch (InterruptedException e) {         logger.error("Interrupted while waiting for monitor service to stop");      }    }    //2、更新所有守护组件状态为STOP,并调用组件的stop方法进行停止    for (final Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses.entrySet()) {       if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) {         entry.getValue().status.desiredState = LifecycleState.STOP;        entry.getKey().stop();      }    }    //3、更新本组件状态    if (lifecycleState.equals(LifecycleState.START)) {       lifecycleState = LifecycleState.STOP;    }     //4、***的清理    supervisedProcesses.clear();     monitorFutures.clear();     

接下来就是调用supervise进行组件守护了:

Java代码

 if(this.monitorService.isShutdown() || this.monitorService.isTerminated()      || this.monitorService.isTerminating()){         //1、如果哨兵已停止则抛出异常,不再接收任何组件进行守护      }      //2、初始化守护组件      Supervisoree process = new Supervisoree();      process.status = new Status();      //2.1、默认策略是失败重启      process.policy = policy;      //2.2、初始化组件默认状态,大多数组件默认为START      process.status.desiredState = desiredState;      process.status.error = false;      //3、组件监控器,用于定时获取组件的***状态,或者重新启动组件      MonitorRunnable monitorRunnable = new MonitorRunnable();      monitorRunnable.lifecycleAware = lifecycleAware;      monitorRunnable.supervisoree = process;      monitorRunnable.monitorService = monitorService;      supervisedProcesses.put(lifecycleAware, process);      //4、定期的去执行组件监控器,获取组件***状态,或者重新启动组件      ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(          monitorRunnable, 0, 3, TimeUnit.SECONDS);      monitorFutures.put(lifecycleAware, future);    }    

如果不需要守护了,则需要调用unsupervise:

Java代码

public synchronized void unsupervise(LifecycleAware lifecycleAware) {       synchronized (lifecycleAware) {         Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware);        //1.1、设置守护组件的状态为被丢弃        supervisoree.status.discard = true;        //1.2、设置组件盼望的***生命周期状态为STOP        this.setDesiredState(lifecycleAware, LifecycleState.STOP);        //1.3、停止组件        lifecycleAware.stop();      }      //2、从守护组件中移除      supervisedProcesses.remove(lifecycleAware);      //3、取消定时监控组件服务      monitorFutures.get(lifecycleAware).cancel(false);      //3.1、通知Purger需要进行清理,Purger会定期的移除cancel的组件      needToPurge = true;      monitorFutures.remove(lifecycleAware);    }    

接下来我们再看下MonitorRunnable的实现,其负责进行组件状态迁移或组件故障恢复:

Java代码

public synchronized void unsupervise(LifecycleAware lifecycleAware) {       synchronized (lifecycleAware) {         Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware);        //1.1、设置守护组件的状态为被丢弃        supervisoree.status.discard = true;        //1.2、设置组件盼望的***生命周期状态为STOP        this.setDesiredState(lifecycleAware, LifecycleState.STOP);        //1.3、停止组件        lifecycleAware.stop();      }      //2、从守护组件中移除      supervisedProcesses.remove(lifecycleAware);      //3、取消定时监控组件服务      monitorFutures.get(lifecycleAware).cancel(false);      //3.1、通知Purger需要进行清理,Purger会定期的移除cancel的组件      needToPurge = true;      monitorFutures.remove(lifecycleAware);    }    接下来我们再看下MonitorRunnable的实现,其负责进行组件状态迁移或组件故障恢复:  Java代码    public void run() {       long now = System.currentTimeMillis();      try {         if (supervisoree.status.firstSeen == null) {             supervisoree.status.firstSeen = now; //1、记录***次状态查看时间        }        supervisoree.status.lastSeen = now; //2、记录***一次状态查看时间        synchronized (lifecycleAware) {             //3、如果守护组件被丢弃或出错了,则直接返回            if (supervisoree.status.discard || supervisoree.status.error) {               return;            }            //4、更新***一次查看到的状态            supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();            //5、如果组件的状态和守护组件看到的状态不一致,则以守护组件的状态为准,然后进行初始化            if (!lifecycleAware.getLifecycleState().equals(                supervisoree.status.desiredState)) {               switch (supervisoree.status.desiredState) {                  case START: //6、如果是启动状态,则启动组件                 try {                     lifecycleAware.start();                  } catch (Throwable e) {                     if (e instanceof Error) {                       supervisoree.status.desiredState = LifecycleState.STOP;                      try {                         lifecycleAware.stop();                      } catch (Throwable e1) {                         supervisoree.status.error = true;                        if (e1 instanceof Error) {                           throw (Error) e1;                        }                      }                    }                    supervisoree.status.failures++;                  }                  break;                case STOP: //7、如果是停止状态,则停止组件                  try {                     lifecycleAware.stop();                  } catch (Throwable e) {                     if (e instanceof Error) {                       throw (Error) e;                    }                    supervisoree.status.failures++;                  }                  break;                default:              }        } catch(Throwable t) {         }      }    }    

如上代码进行了一些简化,整体逻辑即定时去采集组件的状态,如果发现守护组件和组件的状态不一致,则可能需要进行启动或停止。即守护监视器可以用来保证组件如能失败后自动启动。默认策略是总是失败后重启,还有一种策略是只启动一次。

【本文是专栏作者张开涛的原创文章,作者微信公众号:开涛的博客,id:kaitao-1234567】

分享到:

滇ICP备2023006006号-16