重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
模拟编写了一个Flume 1.7中TAILDIR的功能实现,通过手动控制文件的读取位置来达到对文件的读写,防止flume挂了之后重复消费的情况。
以下是代码实现,仅做参考,生产上直接用TAILDIR读取文件内容即可,若要读取一个目录下的子目录,可使用github上以实现的这个项目包:https://github.com/qwurey/flume-source-taildir-recursive
创新互联长期为1000+客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为满城企业提供专业的成都做网站、网站建设,满城网站改版等技术服务。拥有10多年丰富建站经验和众多成功案例,为您定制开发。
package com.fwmagic.flume.source;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @Description:自定义Source 1、读取指定目录下的文件,如nginx的access.log
* 2、读取文件前先判断offset文件是否存在,不存在则创建它
* 3、每次读取完都写一个offset文件记录读取到文件的什么位置,防止重启flume时发生重复消费的情况
* 4、如何自定义?参考ExecSource
*
* (1):获取自定义配置文件属性
* (2):创建线程池,用channelProcessor发送数据给channel
* (3):线程池提交(启动任务)
* 任务内容:
* (1):读取偏移量文件,没有则创建,有则获取偏移量,将读取的指针重置到指定偏移量
* (2):读取指定的日志文件,将读取的一行内容打包成Event,用Channel发送Event
* (3):获取读取内容后的偏移量,重置偏移量
* (4):stop方法调用,关闭线程池,调用super.stop方法。
* @Date:Create in 2018/8/19
*/
public class TailFileSource extends AbstractSource implements EventDrivenSource, Configurable {
/*监听的文件*/
private String filePath;
/*记录读取偏移量的文件*/
private String posiFile;
/*若读取文件暂无内容,则等待数秒*/
private Long interval;
/*读写文件的字符集*/
private String charset;
/*读取文件内容的线程*/
private FileRunner fileRunner;
/*线程池*/
private ExecutorService executor;
private static final Logger logger = LoggerFactory.getLogger(TailFileSource.class);
/**
* 初始化配置文件内容
*
* @param context
*/
@Override
public void configure(Context context) {
filePath = context.getString("filePath");
posiFile = context.getString("posiFile");
interval = context.getLong("interval", 2000L);
charset = context.getString("charset", "UTF-8");
}
@Override
public synchronized void start() {
//启动一个线程,用于监听对应的日志文件
//创建一个线程池
executor = Executors.newSingleThreadExecutor();
//用channelProcessor发送数据给channel
ChannelProcessor channelProcessor = super.getChannelProcessor();
fileRunner = new FileRunner(filePath, posiFile, interval, charset, channelProcessor);
executor.submit(fileRunner);
super.start();
}
@Override
public synchronized void stop() {
fileRunner.setFlag(Boolean.FALSE);
while (!executor.isTerminated()) {
logger.debug("waiting for exec executor service to stop");
try {
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
logger.debug("Interrupted while waiting for executor service to stop,Just exiting.");
Thread.currentThread().interrupt();
}
}
super.stop();
}
public static class FileRunner implements Runnable {
private Long interval;
private String charset;
private Long offset = 0L;
private File pFile;
private RandomAccessFile raf;
private ChannelProcessor channelProcessor;
private Boolean flag = Boolean.TRUE;
public void setFlag(Boolean flag) {
this.flag = flag;
}
public FileRunner(String filePath, String posiFile, Long interval, String charset, ChannelProcessor channelProcessor) {
this.interval = interval;
this.charset = charset;
this.channelProcessor = channelProcessor;
//1、判断是否有偏移量文件,有则读取偏移量,没有则创建
pFile = new File(posiFile);
if (!pFile.exists()) {
try {
pFile.createNewFile();
} catch (IOException e) {
e.printStackTrace();
logger.error("create position file error!", e);
}
}
//2、判断偏移量中的文件内容是否大于0
try {
String offsetStr = FileUtils.readFileToString(pFile, this.charset);
// 3、如果偏移量文件中有记录,则将内容转换为Long
if (StringUtils.isNotBlank(offsetStr)) {
offset = Long.parseLong(offsetStr);
}
// 4、如果有偏移量,则直接跳到文件的偏移量位置
raf = new RandomAccessFile(filePath, "r");
// 跳到指定的位置
raf.seek(offset);
} catch (IOException e) {
e.printStackTrace();
logger.error("read position file error!", e);
}
}
@Override
public void run() {
//监听文件
while (flag) {
// 读取文件中的内容
String line = null;
try {
line = raf.readLine();
if (StringUtils.isNotBlank(line)) {
// 把数据打包成Event,发送到Channel
line = new String(line.getBytes("ISO-8859-1"), "UTF-8");
Event event = EventBuilder.withBody(line.getBytes());
channelProcessor.processEvent(event);
//更新偏移量文件,把偏移量写入文件
offset = raf.getFilePointer();
FileUtils.writeStringToFile(pFile, offset.toString());
} else {
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
e.printStackTrace();
logger.error("thread sleep error", e);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}