重庆分公司,新征程启航

为企业提供网站建设、域名注册、服务器等服务

MDLog的示例分析

这篇文章主要介绍MDLog的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

创新互联公司一直在为企业提供服务,多年的磨炼,使我们在创意设计,成都全网营销到技术研发拥有了开发经验。我们擅长倾听企业需求,挖掘用户对产品需求服务价值,为企业制作有用的创意设计体验。核心团队拥有超过十多年以上行业经验,涵盖创意,策化,开发等专业领域,公司涉及领域有基础互联网服务成都温江机房App定制开发、手机移动建站、网页设计、网络整合营销。

ReplayThread类:负责log的replay事件处理。

RecoveryThread类:负责log的recovery事件处理。

SubmitThread类:负责log的submit事件处理。

map segments;          记录log的序列

set expiring_segments;             记录expiring的log集合

set expired_segments;               记录expired的log集合

uint64_t event_seq;                                              记录log event的当前序列值

int expiring_events;                                               记录expiring的log个数

int expired_events;                                                记录expired的log个数

MDLog::write_head()

|__Journaler::write_head()                              直接调用Journaler类对应的函数进行处理

MDLog::get_read_pos()

|__Journaler::get_read_pos()

MDLog::get_write_pos()

|__Journaler::get_write_pos()

MDLog::get_safe_pos()

|__Journaler::get_write_safe_pos()

MDLog::create()

|__创建C_GatherBuilder类对象

|__设置C_GatherBuilder类的finisher函数

|__确定inode的默认journal号,即:ino = MDS_INO_LOG_OFFSET+mds->get_nodeid()

|__创建Journaler类对象,该类对象写入到metadata pool里

|__设置Journaler类对象的写出错处理函数

|__设置Journaler类对象可写

|__Journaler::create()

|__Journaler::write_head()

|__创建JournalPointer类对象,该类对象写入到metadata pool里

|__设置JournalPointer的front值为inode号

|__设置JournalPointer的back为0

|__JournalPointer::save()

|__C_GatherBuilder::activate()

|__SubmitThread::create()                          创建SubmitThread线程

MDLog::open()

|__RecoveryThread::set_completion()          设置RecoveryThread的completion回调函数

|__RecoveryThread::create()                         创建RecoveryThread线程

|__SubmitThread::create()                              创建SubmitThread线程 

MDLog::reopen()

|__删除Journaler类对象

|__RecoveryThread::join()                              等待RecoveryThread结束

|__RecoveryThread::set_completion()               设置RecoveryThrad结束时的回调函数MDLog::append()

|__RecoveryThread::create()                         创建RecoveryThread线程

MDLog::append()

|__Journaler::set_read_pos(Journaler::get_write_pos())          设置Journaler类对象的read/write position指向同一个地方

|__Journaler::set_expire_pos(Journaler::get_write_pos())          设置Journaler类对象的expire position为write position

|__Journaler::set_writable()                         设置Journaler为可写

MDLog::_start_entry()

|__设置cur_event=e                    设置cur_event为当前待处理的LogEvent

|__event_seq++                           增加Event的序列号

|__从LogEvent类对象中得到EMetaBlob对象,即:LogEvent::get_metablob()

|__设置EMetaBlob的event_seq为当前的event_seq,last_subtree_map为当前最后一个segment

MDLog::cancel_entry()

|__设置cur_event=NULL               清空cur_event

|__删除LogEvent参数类对象

MDLog::_submit_entry()

|__设置cur_event = NULL

|__从segments集合中得到最后一个LogSegment类对象

|__增加LogSement类对象的event个数,即:num_events++

|__设置LogEvent::_segment=LogSement类对象

|__LogEvent::update_segment()                    更新LogEvent的segment字段

|__以LogEvent作为参数,更新pending_events集合

|__num_events++                    增加events的个数

|__unflushed++                         增加unflushed的个数

|__若LogEvent的type是EVENT_SUBTREEMAP或EVENT_IMPORTFINISH且mds处于resolve状态

     |__直接退出

|__若LogSegment::end/Journaler::get_layout_period()!=LogSegment::offset/Journaler::get_layout_period()

     |___start_new_segment()

|__若LogEvent的类型是EVENT_SUBTREEMAP_TEST

     |__创建LogEvent类对象

     |__设置LogEvent类对象的类型是EVENT_SUBTREEMAP_TEST

     |___submit_entry()

MDLog::_submit_thread()

|__判断当前MDS进程是否stopping,若是则直接退出

|__判断mds_log_pause是否为真,若为真则调用submit_cond.Wait()

|__判断pending_events集合是否为空,若为空则调用submit_cond.Wait()

|__从pending_event中得到PendingEvent类对象

|__若PendingEvent中包含有效的LogEvent

     |__得到LogEvent和LogSegment

     |__encode LogEvent到bufferlist中

     |__设置LogEvent::set_start_off(write_pos)     设置LogEvent的start offset值为write position值

     |__Journaler::append_entry(bufferlist)               将encoded LogEvent写入到Journaler中

     |__更新LogSegment::end为最新的write position

     |__Journaler::wait_for_flush()                         等待Journaler flush LogEvent到磁盘上,完成flush后调用回调函数设置MDLog::safe_pos=最新的write position

     |__Journaler::flush()

     |__删除LogEvent类对象

|__若PendingEvent中没有包含有效的LogEvent

     |__Journaler::wait_for_flush()                         等待Journaler flush LogEvent到磁盘上,完成flush后调用回调函数设置MDLog::safe_pos=Journaler::get_write_pos()

     |__Journaler::flush()

MDLog::wait_for_safe()                                        若目前仍然还有pending的event,则不做wait_for_flush

|__判断pending_events是否不为空

     |__向pending_events集合的末尾添加一个NULL的PendingEvent类对象

     |__设置no_pending=false

     |__submit_cond.Signal()

|__判断no_pending==true

     |__Journaler::wait_for_flush()

MDLog::flush()                                             若目前仍然还有pending的event,则不做flush

|__判断pending_events集合不为空

     |__向pending_event集合末尾插入一个NULL的PendingEvent类对象

     |__设置do_flush=false

|__判断do_flush==true

     |__Journaler::flush()

MDLog::shutdown()

|__判断SubmitThread是否正在运行

     |__SubmitThread::join()                              等待SubmitThread进程停止

|__Journaler::shutdown()                                 调用Journaler的shutdown()函数

|__判断ReplayThread是否正在运行

     |__ReplayThread::join()                              等待ReplayThread进程停止 

|__判断RecoveryThread是否正在运行

     |__RecoveryThread::join()                              等待RecoveryThread进程停止

MDLog::_prepare_new_segment()

|__得到seq的值为event_seq + 1

|__在segments[seq]处新创建一个LogSegment类对象

|__MDCache::advance_stray()

MDLog::_journal_segment_subtree_map()

|__MDCache::create_subtree_map()                    从MDCache中得到一个ESubtreeMap

|__设置改ESubtreeMap的event_seq值为segment的最后一个元素

|___submit_entry()

MDLog::_start_new_segment()

|___prepare_new_segment()

|___journal_segment_subtree_map()

MDLog::trim()

|__根据配置文件得到max_segments和max_events

|__遍历segments集合

     |__若该LogSegment在pending_events集合中,则直接退出,不能进行trim操作

     |__若该LogSegment在expiring_segments或expired_segments集合中,则遍历下一个

     |__将该LogSegment插入到expiring_segments集合中

     |__try_expire() 

|___trim_expired_segments()

MDLog::trim_all()               处理过程与MDLog::trim()类似,只不过是遍历所有的segments并没有trim数量上的限制

MDLog::try_expire()

|__从expiring_segments集合中删除指定的LogSegment

|___expired()

     |__将该LogSegment插入到expired_segments集合中

MDLog::_trim_expired_segments()

|__遍历segments集合

     |__判断LogSegment是否在expired_segments集合中,若不在则停止遍历

     |__将LogSegment从expired_segments集合中删除

     |__将LogSegment从segments集合中删除

     |__删除LogSegment

     |__设置trim=true

|__若trim==true

     |__Journaler::write_head(0)

MDLog::_maybe_expired()

|__try_expire()

MDLog::replay()

|__判断Journaler的read pos是否和write pos一致

     |__不需要replay,直接返回

|__waitfor_replay.push_back(c)               添加replay waiter

|__ReplayThread::create()                         创建一个新的ReplayThread

MDLog::_replay_thread()                              执行replay操作的独立线程

|__判断Journaler是否还有未读数据

     |__Journaler::wait_for_readable()               等待Journaler可读并且没有未读的数据

|__判断Journaler是否有出错信息

     |__处理Journaler的出错信息

|__若Journaler不可读并且Journaler的read pos==write pos

     |__直接退出

|__得到Journaler的read position

     |__Journaler::try_read_entry()                     从Journaler类对象中读取数据

     |__得到LogEvent类对象

     |__设置LogEvent的start offset为read position

|__LogEvent的类型是EVENT_SUBTREEMAP或EVENT_RESETJOURNAL

     |__在segments集合中创建一个新的LogSegment类对象

|__若segments集合不为空

     |__设置LogEvent::_segment=get_current_segment()

     |__设置LogEvent::_segment->end = Journaler::get_read_pos()

     |__LogEvent::replay()

|__设置safe_pos = Journaler::get_write_safe_pos()

MDLog::_recovery_thread()

|__创建JournalPointer类对象

|__JournalPointer::load(mds->objecter)          从JournalPointer处load出数据

|__若JournalPointer.back不为空,则说明有写Journal未完成的情况

     |__根据JournalPointer.back创建Journaler类对象

     |__Journaler::recover()                              调用Journaler类对象的recover()函数进行recover操作

|__从JournalPointer.front创建Journaler类对象

|__Journaler::recover()                                   调用Journaler类对象的recover()函数进行recover操作

|__若MDS处于standby replay模式或者stream_format() >= mds_journal_format

     |__Journaler::set_write_error_handler()               设置Journaler的write error handler

|___reformat_journal()

MDLog::_reformat_journal()

|__根据参数JournalPointer的front值来确定JournalPointer的back值

|__根据参数JournalPointer的back,创建一个新的Journaler

|__Journaler::set_writeable()

|__Journaler::create()

|__Journaler::write_head()

|__从参数的old_journaler处读取数据到bufferlist

|__Journaler::append_entry(bufferlist)               将old_journaler读取到的数据写入到新的Journaler中

|__Journaler::flush()

补充一下针对MDLog的理解:

MDLog::create()核心处理流程如下:

1、得到Journaler对应的inode号,即:MDS_INO_LOG_OFFSET+mds->get_nodeid()

2、创建Journaler类对象(Journaler写入到metadata pool中)

3、创建JournalPointer类对象(以mds->get_nodeid()和metadata pool为参数)

4、设置JournalPointer类对象的front = inode,back = 0

5、保存JournalPointer类对象,即:jp->save(mds->objecter, gather.new_sub())

6、创建SubmitThread线程类,即:submit_thread.create(),之后执行SubmitThread类的entry()函数,即:_submit_thread()

MDLog::_submit_thread()主要处理流程如下:

1、遍历pending_event数组,若数组为空或数组中的PendingEvent为空则重新遍历

2、从pending_event数组中得到PendingEvent类对象

3、从PendingEvent类对象中得到LogEvent,之后从LogEvent类对象中得到LogSegment

4、对于LogSegment,序列化其header信息。序列化的header信息如下:

     EVENT_NEW_ENCODING

     1

     _type

     mdsmap->get_up_features()

5、得到当前Journaler类对象写入日志的位置,即:journaler->get_write_pos()

6、将序列化的header信息写入到journaler,即:journaler->append_entry(bl)

7、flush journaler到磁盘,即:journaler->wait_for_flush()

MDLog::_recovery_thread()

1、根据mds->get_nodeid()以及metadata pool得到JournalPointer类对象

2、读取JournalPointer的object,即:jp.load(mds->objecter)

3、若JournalPointer没有object,则创建一个JournalPointer的front inode号且保存JournalPointer(jp.save(mds->objecter))

4、若jp.back不为空,则说明之前日志回写的时候有错误出现,因此需要删除jp.back对应的日志

     |__根据jp.back创建Journaler类对象

     |__执行Journaler::recover()

     |__执行Journaler::erase()

     |__更新JournalPointer

          |__设置jp.back=0

          |__调用jp.save(mds->objecter)

5、根据jp.front创建Journaler类对象

6、执行Journaler::recover()

7、执行_reformat_journal()     

MDLog::_reformat_journal()               将日志信息写入到JournalPointer的back处,若写入成功则设置JournalPointer的front=back以及back=0

1、得到JournalPointer的back值

2、保存JournalPointer类对象,即:jp.save()

3、根据jp.back创建Journaler类对象

4、从老的日志中读取日志信息,即:old_journal->try_read_entry(bl)

5、将老的日志信息写入到新的日志中,即:new_journal->append_entry(bl)

6、刷新日志到磁盘,即:new_journal->flush()

7、调换JournalPointer的front和back值且保存JournalPointer,即:jp.save()

8、删除老的日志,即:old_journal->erase()

9、更新JournalPointer,即:jp.back=0且jp.save()

MDLog::_start_entry()

1、设置cur_event为当前待处理的entry

2、递增event_seq的值

3、更新LogEvent的event_seq值和last_subtree_map值

MDLog::_submit_entry()

1、清空cur_event值

2、从segments数组的末尾得到LogSegment

3、设置LogEvent的_segment为segments数组中的LogSegment

4、更新pending_events数组,即:pending_events[ls->seq].push_back(PendingEvent(le, c))

5、更新num_events值和unflushed值

MDLog::_prepare_new_segment()

1、根据event_seq值,得到seq值,即:seq = event_seq + 1

2、更新segments数组,即:segments[seq] = new LogSegment(seq)

MDLog::trim()

MDLog::trim_all()     这两个函数用于从segments数组中trim掉满足条件的LogSegment。这里使用了两个数组expiring_segments和expired_segments保存trim过程中的LogSegment。

MDLog::replay()     当journaler的read_pos和write_pos不一致时,需要进行replay操作

1、更新waitfor_replay数组

2、设置already_replayed=true

3、创建ReplayThread线程类,之后执行ReplayThread.entry()函数,即:执行_replay_thread()

MDLog::_replay_thread()

1、从Journaler中读取日志内容,即:journaler->try_read_entry(bl)

2、从日志中解析出LogEvent,即:LogEvent::decode(bl)

3、更新segments数组,即:segments[event_seq] = new LogSegment(event_seq, pos)

4、更新LogEvent的_segment

5、执行LogEvent的replay()操作

以上是“MDLog的示例分析”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注创新互联行业资讯频道!


分享题目:MDLog的示例分析
当前链接:http://cqcxhl.cn/article/gchsoo.html

其他资讯

在线咨询
服务热线
服务热线:028-86922220
TOP