重庆分公司,新征程启航

为企业提供网站建设、域名注册、服务器等服务

flume如何自定义source、sink

这篇文章主要为大家展示了“flume如何自定义source、sink”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“flume如何自定义source、sink”这篇文章吧。

创新互联是一家专业提供大田企业网站建设,专注与网站制作、做网站H5开发、小程序制作等业务。10年已为大田众多企业、政府机构等服务。创新互联专业网站设计公司优惠进行中。

自定义source开发:

source是收集日志存入channel。

Source提供了两种机制:PollableSource(轮训拉取)和EventDrivenSource(事件驱动),

如果使用EventDrivenSource,你可以在start方法中启动额外的线程,不断的往channel中发数据。如果使用PollableSource,你可以在process()实现不断重发。

public class MySource extends AbstractSource implements Configurable, PollableSource {
  private String myProp;

  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");

    // Process the myProp value (e.g. validation, convert to another type, ...)

    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }

  @Override
  public void start() {
    // Initialize the connection to the external client
  }

  @Override
  public void stop () {
    // Disconnect from external client and do any additional cleanup
    // (e.g. releasing resources or nulling-out field values) ..
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;

    try {
      // This try clause includes whatever Channel/Event operations you want to do

      // Receive new data
      Event e = getSomeData();

      // Store the Event into this Source's associated Channel(s)
      getChannelProcessor().processEvent(e);

      status = Status.READY;
    } catch (Throwable t) {
      // Log exception, handle individual exceptions as needed

      status = Status.BACKOFF;

      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    } finally {
      txn.close();
    }
    return status;
  }}

或者

package  org.apache.flume;
 
import  org.apache.flume.conf.Configurable;
import  org.apache.flume.source.AbstractSource;
 
public  class  TailSource  extends  AbstractSource  implements  EventDrivenSource,
         Configurable {
     @Override
     public  void  configure(Context context) {
 
     }
 
     @Override
     public  synchronized  void  start() {
 
     }
 
     @Override
     public  synchronized  void  stop() {
 
     }
}

自定义sink:

sink是从channel中拉取日志处理。

process会不断调用,你只需在process中去取channel的数据即可。

public class MySink extends AbstractSink implements Configurable {
  private String myProp;

  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");

    // Process the myProp value (e.g. validation)

    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }

  @Override
  public void start() {
    // Initialize the connection to the external repository (e.g. HDFS) that
    // this Sink will forward Events to ..
  }

  @Override
  public void stop () {
    // Disconnect from the external respository and do any
    // additional cleanup (e.g. releasing resources or nulling-out
    // field values) ..
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;

    // Start transaction
    Channel ch = getChannel();
    Transaction txn = ch.getTransaction();
    txn.begin();
    try {
      // This try clause includes whatever Channel operations you want to do

      Event event = ch.take();

      // Send the Event to the external repository.
      // storeSomeData(e);

      txn.commit();
      status = Status.READY;
    } catch (Throwable t) {
      txn.rollback();

      // Log exception, handle individual exceptions as needed

      status = Status.BACKOFF;

      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    }
    return status;
  }}

以上是“flume如何自定义source、sink”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注创新互联行业资讯频道!


文章名称:flume如何自定义source、sink
URL标题:http://cqcxhl.cn/article/gdjiee.html

其他资讯

在线咨询
服务热线
服务热线:028-86922220
TOP