重庆分公司,新征程启航

为企业提供网站建设、域名注册、服务器等服务

PostgreSQL源码解读(154)-后台进程#6(walsender#2)

本节继续介绍PostgreSQL的后台进程walsender,重点介绍的是调用栈中的exec_replication_command和StartReplication函数.
调用栈如下:


(gdb) bt
#0  0x00007fb6e6390903 in __epoll_wait_nocancel () from /lib64/libc.so.6
#1  0x000000000088e668 in WaitEventSetWaitBlock (set=0x10ac808, cur_timeout=29999, occurred_events=0x7ffd634441b0, 
    nevents=1) at latch.c:1048
#2  0x000000000088e543 in WaitEventSetWait (set=0x10ac808, timeout=29999, occurred_events=0x7ffd634441b0, nevents=1, 
    wait_event_info=83886092) at latch.c:1000
#3  0x000000000088dcec in WaitLatchOrSocket (latch=0x7fb6dcbfc4d4, wakeEvents=27, sock=10, timeout=29999, 
    wait_event_info=83886092) at latch.c:385
#4  0x000000000085405b in WalSndLoop (send_data=0x8547fe ) at walsender.c:2229
#5  0x0000000000851c93 in StartReplication (cmd=0x10ab750) at walsender.c:684
#6  0x00000000008532f0 in exec_replication_command (cmd_string=0x101dd78 "START_REPLICATION 0/5D000000 TIMELINE 16")
    at walsender.c:1539
#7  0x00000000008c0170 in PostgresMain (argc=1, argv=0x1049cb8, dbname=0x1049ba8 "", username=0x1049b80 "replicator")
    at postgres.c:4178
#8  0x000000000081e06c in BackendRun (port=0x103fb50) at postmaster.c:4361
#9  0x000000000081d7df in BackendStartup (port=0x103fb50) at postmaster.c:4033
#10 0x0000000000819bd9 in ServerLoop () at postmaster.c:1706
#11 0x000000000081948f in PostmasterMain (argc=1, argv=0x1018a50) at postmaster.c:1379
#12 0x0000000000742931 in main (argc=1, argv=0x1018a50) at main.c:228

一、数据结构

StringInfo
StringInfoData结构体保存关于扩展字符串的相关信息.


/*-------------------------
 * StringInfoData holds information about an extensible string.
 * StringInfoData结构体保存关于扩展字符串的相关信息.
 *      data    is the current buffer for the string (allocated with palloc).
 *      data    通过palloc分配的字符串缓存
 *      len     is the current string length.  There is guaranteed to be
 *              a terminating '\0' at data[len], although this is not very
 *              useful when the string holds binary data rather than text.
 *      len     是当前字符串的长度.保证以ASCII 0(\0)结束(data[len] = '\0').
 *              虽然如果存储的是二进制数据而不是文本时不太好使.
 *      maxlen  is the allocated size in bytes of 'data', i.e. the maximum
 *              string size (including the terminating '\0' char) that we can
 *              currently store in 'data' without having to reallocate
 *              more space.  We must always have maxlen > len.
 *      maxlen  以字节为单位已分配的'data'的大小,限定了最大的字符串大小(包括结尾的ASCII 0)
 *              小于此尺寸的数据可以直接存储而无需重新分配.
 *      cursor  is initialized to zero by makeStringInfo or initStringInfo,
 *              but is not otherwise touched by the stringinfo.c routines.
 *              Some routines use it to scan through a StringInfo.
 *      cursor  通过makeStringInfo或initStringInfo初始化为0,但不受stringinfo.c例程的影响.
 *              某些例程使用该字段扫描StringInfo
 *-------------------------
 */
typedef struct StringInfoData
{
    char       *data;
    int         len;
    int         maxlen;
    int         cursor;
} StringInfoData;
typedef StringInfoData *StringInfo;

二、源码解读

exec_replication_command
exec_replication_command执行复制命令,如cmd_string被识别为WalSender命令,返回T,否则返回F.
其主要逻辑如下:
1.执行相关初始化和校验
2.切换内存上下文
3.初始化复制扫描器
4.执行事务相关的判断或校验
5.初始化输入输出消息
6.根据命令类型执行相应的命令
6.1命令类型为T_StartReplicationCmd,调用StartReplication


/*
 * Execute an incoming replication command.
 * 执行复制命令.
 *
 * Returns true if the cmd_string was recognized as WalSender command, false
 * if not.
 * 如cmd_string被识别为WalSender命令,返回T,否则返回F
 */
bool
exec_replication_command(const char *cmd_string)
{
    int         parse_rc;
    Node       *cmd_node;
    MemoryContext cmd_context;
    MemoryContext old_context;
    /*
     * If WAL sender has been told that shutdown is getting close, switch its
     * status accordingly to handle the next replication commands correctly.
     * 如果WAL sender已被通知关闭,切换状态以应对接下来的复制命令.
     */
    if (got_STOPPING)
        WalSndSetState(WALSNDSTATE_STOPPING);
    /*
     * Throw error if in stopping mode.  We need prevent commands that could
     * generate WAL while the shutdown checkpoint is being written.  To be
     * safe, we just prohibit all new commands.
     * 如在stopping模式,则抛出错误.
     * 我们需要在shutdown checkpoint写入期间禁止命令的产生.
     * 安全期间,禁止所有新的命令.
     */
    if (MyWalSnd->state == WALSNDSTATE_STOPPING)
        ereport(ERROR,
                (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
    /*
     * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
     * command arrives. Clean up the old stuff if there's anything.
     * CREATE_REPLICATION_SLOT ... LOGICAL 导出快照直至下个命令到达.
     * 如存在,则清理旧的stuff.
     * 
     */
    SnapBuildClearExportedSnapshot();
    //检查中断
    CHECK_FOR_INTERRUPTS();
    //命令上下文
    cmd_context = AllocSetContextCreate(CurrentMemoryContext,
                                        "Replication command context",
                                        ALLOCSET_DEFAULT_SIZES);
    old_context = MemoryContextSwitchTo(cmd_context);
    //初始化复制扫描器
    replication_scanner_init(cmd_string);
    parse_rc = replication_yyparse();
    if (parse_rc != 0)
        ereport(ERROR,
                (errcode(ERRCODE_SYNTAX_ERROR),
                 (errmsg_internal("replication command parser returned %d",
                                  parse_rc))));
    cmd_node = replication_parse_result;
    /*
     * Log replication command if log_replication_commands is enabled. Even
     * when it's disabled, log the command with DEBUG1 level for backward
     * compatibility. Note that SQL commands are not logged here, and will be
     * logged later if log_statement is enabled.
     * 如log_replication_commands启用,则记录复制命令在日志中.
     * 就算该选项被禁止,通过DEBUG1级别记录日志.
     * 注意SQL命令不在这里记录,在log_statement启用的情况下在后续进行记录.
     * 
     */
    if (cmd_node->type != T_SQLCmd)
        ereport(log_replication_commands ? LOG : DEBUG1,
                (errmsg("received replication command: %s", cmd_string)));
    /*
     * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
     * called outside of transaction the snapshot should be cleared here.
     * CREATE_REPLICATION_SLOT ... LOGICAL导出快照.
     * 该命令如果在事务的外层被调用,那么快照应在这里清除.
     */
    if (!IsTransactionBlock())
        SnapBuildClearExportedSnapshot();
    /*
     * For aborted transactions, don't allow anything except pure SQL, the
     * exec_simple_query() will handle it correctly.
     * 对于废弃的事务,除了纯SQL外不允许其他命令,exec_simple_query()函数可以正确处理这种情况.
     */
    if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
        ereport(ERROR,
                (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
                 errmsg("current transaction is aborted, "
                        "commands ignored until end of transaction block")));
    CHECK_FOR_INTERRUPTS();
    /*
     * Allocate buffers that will be used for each outgoing and incoming
     * message.  We do this just once per command to reduce palloc overhead.
     * 为消息I/O分配缓存.
     * 每个命令执行一次以减少palloc的负载.
     */
    initStringInfo(&output_message);
    initStringInfo(&reply_message);
    initStringInfo(&tmpbuf);
    /* Report to pgstat that this process is running */
    //向pgstat报告该进程正在运行.
    pgstat_report_activity(STATE_RUNNING, NULL);
    //根据命令类型执行相应的命令
    switch (cmd_node->type)
    {
        case T_IdentifySystemCmd:
            //识别系统
            IdentifySystem();
            break;
        case T_BaseBackupCmd:
            //BASE_BACKUP
            PreventInTransactionBlock(true, "BASE_BACKUP");
            SendBaseBackup((BaseBackupCmd *) cmd_node);
            break;
        case T_CreateReplicationSlotCmd:
            //创建复制slot
            CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
            break;
        case T_DropReplicationSlotCmd:
            //删除复制slot
            DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
            break;
        case T_StartReplicationCmd:
            //START_REPLICATION
            {
                StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
                PreventInTransactionBlock(true, "START_REPLICATION");
                if (cmd->kind == REPLICATION_KIND_PHYSICAL)
                    StartReplication(cmd);
                else
                    StartLogicalReplication(cmd);
                break;
            }
        case T_TimeLineHistoryCmd:
            //构造时间线历史 TIMELINE_HISTORY
            PreventInTransactionBlock(true, "TIMELINE_HISTORY");
            SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
            break;
        case T_VariableShowStmt:
            //
            {
                DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
                VariableShowStmt *n = (VariableShowStmt *) cmd_node;
                GetPGVariable(n->name, dest);
            }
            break;
        case T_SQLCmd:
            //SQL命令
            if (MyDatabaseId == InvalidOid)
                ereport(ERROR,
                        (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
            /* Report to pgstat that this process is now idle */
            pgstat_report_activity(STATE_IDLE, NULL);
            /* Tell the caller that this wasn't a WalSender command. */
            return false;
        default:
            //其他命令
            elog(ERROR, "unrecognized replication command node tag: %u",
                 cmd_node->type);
    }
    /* done */
    //执行完毕,回到原来的内存上下文中
    MemoryContextSwitchTo(old_context);
    MemoryContextDelete(cmd_context);
    /* Send CommandComplete message */
    //命令结束
    EndCommand("SELECT", DestRemote);
    /* Report to pgstat that this process is now idle */
    //报告状态
    pgstat_report_activity(STATE_IDLE, NULL);
    return true;
}

StartReplication
StartReplication处理START_REPLICATION命令.
其主要逻辑如下:
1.执行相关初始化和校验
2.选择时间线
3.进入COPY模式
3.1设置状态
3.2发送CopyBothResponse消息,启动streaming
3.3初始化相关变量,如共享内存状态等
3.4进入主循环(WalSndLoop)


/*
 * Handle START_REPLICATION command.
 * 处理START_REPLICATION命令
 *
 * At the moment, this never returns, but an ereport(ERROR) will take us back
 * to the main loop.
 * 该函数不会返回,但ereport(ERROR)调用可以回到主循环
 */
static void
StartReplication(StartReplicationCmd *cmd)
{
    StringInfoData buf;
    XLogRecPtr  FlushPtr;
    if (ThisTimeLineID == 0)
        //时间线校验
        ereport(ERROR,
                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                 errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
    /*
     * We assume here that we're logging enough information in the WAL for
     * log-shipping, since this is checked in PostmasterMain().
     * 在这里,由于在PostmasterMain()假定已为log-shipping记录了足够多的信息
     *
     * NOTE: wal_level can only change at shutdown, so in most cases it is
     * difficult for there to be WAL data that we can still see that was
     * written at wal_level='minimal'.
     * 注意:wal_level只能在shutdown的情况下进行修改,
     *   因此在大多数情况下,很难看到在wal_level='minimal'的情况下的WAL数据.
     */
    if (cmd->slotname)
    {
        ReplicationSlotAcquire(cmd->slotname, true);
        //#define SlotIsLogical ( slot ) (slot->data.database != InvalidOid)
        if (SlotIsLogical(MyReplicationSlot))
            ereport(ERROR,
                    (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                     (errmsg("cannot use a logical replication slot for physical replication"))));
    }
    /*
     * Select the timeline. If it was given explicitly by the client, use
     * that. Otherwise use the timeline of the last replayed record, which is
     * kept in ThisTimeLineID.
     * 选择时间线.
     * 如果通过客户端明确给出,则使用该值.
     * 否则的话,使用最后重放记录的时间线,在ThisTimeLineID中保存.
     */
    if (am_cascading_walsender)
    {
        /* this also updates ThisTimeLineID */
        //这也会更新ThisTimeLineID变量
        FlushPtr = GetStandbyFlushRecPtr();
    }
    else
        FlushPtr = GetFlushRecPtr();
    if (cmd->timeline != 0)
    {
        XLogRecPtr  switchpoint;
        sendTimeLine = cmd->timeline;
        if (sendTimeLine == ThisTimeLineID)
        {
            sendTimeLineIsHistoric = false;
            sendTimeLineValidUpto = InvalidXLogRecPtr;
        }
        else
        {
            List       *timeLineHistory;
            sendTimeLineIsHistoric = true;
            /*
             * Check that the timeline the client requested exists, and the
             * requested start location is on that timeline.
             * 检查客户端请求的时间线是否存在,请求的开始位置是否在该时间线上.
             */
            timeLineHistory = readTimeLineHistory(ThisTimeLineID);
            switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
                                         &sendTimeLineNextTLI);
            list_free_deep(timeLineHistory);
            /*
             * Found the requested timeline in the history. Check that
             * requested startpoint is on that timeline in our history.
             * 通过历史文件找到请求的时间线.
             * 在历史中检查请求的开始点是否在时间线上.
             *
             * This is quite loose on purpose. We only check that we didn't
             * fork off the requested timeline before the switchpoint. We
             * don't check that we switched *to* it before the requested
             * starting point. This is because the client can legitimately
             * request to start replication from the beginning of the WAL
             * segment that contains switchpoint, but on the new timeline, so
             * that it doesn't end up with a partial segment. If you ask for
             * too old a starting point, you'll get an error later when we
             * fail to find the requested WAL segment in pg_wal.
             * 这是有意为之.我们只检查在切换点之前没有fork off的请求的时间线.
             * 我们不会检查在请求的开始点之前的时间线.
             * 这是因为客户端可以合法地请求从包含交换点的WAL端的开始处进行复制,
             *   在新的时间线上如此执行,以避免出现由于部分segment的问题导致出错.
             * 如果客户端请求一个较旧的开始点,在pg_wal中无法找到请求的WAL段时会报错.
             *
             * XXX: we could be more strict here and only allow a startpoint
             * that's older than the switchpoint, if it's still in the same
             * WAL segment.
             * XXX: 我们可以更严格,如果仍然在同一个WAL segment中,那么可以只允许比切换点旧的开始点
             */
            if (!XLogRecPtrIsInvalid(switchpoint) &&
                switchpoint < cmd->startpoint)
            {
                ereport(ERROR,
                        (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
                                (uint32) (cmd->startpoint >> 32),
                                (uint32) (cmd->startpoint),
                                cmd->timeline),
                         errdetail("This server's history forked from timeline %u at %X/%X.",
                                   cmd->timeline,
                                   (uint32) (switchpoint >> 32),
                                   (uint32) (switchpoint))));
            }
            sendTimeLineValidUpto = switchpoint;
        }
    }
    else
    {
        sendTimeLine = ThisTimeLineID;
        sendTimeLineValidUpto = InvalidXLogRecPtr;
        sendTimeLineIsHistoric = false;
    }
    streamingDoneSending = streamingDoneReceiving = false;
    /* If there is nothing to stream, don't even enter COPY mode */
    //如果没有任何东西需要stream,不需要启动COPY命令
    if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
    {
        /*
         * When we first start replication the standby will be behind the
         * primary. For some applications, for example synchronous
         * replication, it is important to have a clear state for this initial
         * catchup mode, so we can trigger actions when we change streaming
         * state later. We may stay in this state for a long time, which is
         * exactly why we want to be able to monitor whether or not we are
         * still here.
         * 在首次启动复制时,standby节点会落后于master节点.
         * 对于某些应用,比如同步复制,对于这种初始的catchup模式有一个干净的状态是十分重要的,
         *   因此在改变streaming状态时我们可以触发相关的动作.
         * 我们可以处于这种状态很长时间,这正是我们希望有能力监控我们是否仍在这里的原因.
         */
        //设置状态
        WalSndSetState(WALSNDSTATE_CATCHUP);
        /* Send a CopyBothResponse message, and start streaming */
        //发送CopyBothResponse消息,启动streaming
        pq_beginmessage(&buf, 'W');//W->COPY命令?
        pq_sendbyte(&buf, 0);
        pq_sendint16(&buf, 0);
        pq_endmessage(&buf);
        pq_flush();
        /*
         * Don't allow a request to stream from a future point in WAL that
         * hasn't been flushed to disk in this server yet.
         * 不允许请求该服务器上一个尚未刷入到磁盘上的WAL未来位置.
         */
        if (FlushPtr < cmd->startpoint)
        {
            ereport(ERROR,
                    (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
                            (uint32) (cmd->startpoint >> 32),
                            (uint32) (cmd->startpoint),
                            (uint32) (FlushPtr >> 32),
                            (uint32) (FlushPtr))));
        }
        /* Start streaming from the requested point */
        //从请求点开始streaming
        sentPtr = cmd->startpoint;
        /* Initialize shared memory status, too */
        //初始化共享内存状态
        SpinLockAcquire(&MyWalSnd->mutex);
        MyWalSnd->sentPtr = sentPtr;
        SpinLockRelease(&MyWalSnd->mutex);
        SyncRepInitConfig();
        /* Main loop of walsender */
        //walsender主循环,开始复制,激活复制
        replication_active = true;
        //主循环
        WalSndLoop(XLogSendPhysical);
        //完结后设置为非活动状态
        replication_active = false;
        if (got_STOPPING)
            proc_exit(0);//退出
        //设置状态
        WalSndSetState(WALSNDSTATE_STARTUP);
        Assert(streamingDoneSending && streamingDoneReceiving);
    }
    if (cmd->slotname)
        ReplicationSlotRelease();
    /*
     * Copy is finished now. Send a single-row result set indicating the next
     * timeline.
     * Copy命令已完结.发送单行结果集以提升下一个timeline
     */
    if (sendTimeLineIsHistoric)
    {
        char        startpos_str[8 + 1 + 8 + 1];
        DestReceiver *dest;
        TupOutputState *tstate;
        TupleDesc   tupdesc;
        Datum       values[2];
        bool        nulls[2];
        snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
                 (uint32) (sendTimeLineValidUpto >> 32),
                 (uint32) sendTimeLineValidUpto);
        dest = CreateDestReceiver(DestRemoteSimple);
        MemSet(nulls, false, sizeof(nulls));
        /*
         * Need a tuple descriptor representing two columns. int8 may seem
         * like a surprising data type for this, but in theory int4 would not
         * be wide enough for this, as TimeLineID is unsigned.
         */
        tupdesc = CreateTemplateTupleDesc(2);
        TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
                                  INT8OID, -1, 0);
        TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
                                  TEXTOID, -1, 0);
        /* prepare for projection of tuple */
        tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
        values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
        values[1] = CStringGetTextDatum(startpos_str);
        /* send it to dest */
        do_tup_output(tstate, values, nulls);
        end_tup_output(tstate);
    }
    /* Send CommandComplete message */
    pq_puttextmessage('C', "START_STREAMING");
}

三、跟踪分析

在主节点上用gdb跟踪postmaster,在PostgresMain上设置断点后启动standby节点,进入断点

在永泰等地区,都构建了全面的区域性战略布局,加强发展的系统性、市场前瞻性、产品创新能力,以专注、极致的服务理念,为客户提供成都网站设计、成都网站制作 网站设计制作按需网站设计,公司网站建设,企业网站建设,品牌网站制作,成都全网营销推广,成都外贸网站建设,永泰网站建设费用合理。


[xdb@localhost ~]$ ps -ef|grep postgres
xdb       1339     1  2 14:45 pts/0    00:00:00 /appdb/xdb/pg11.2/bin/postgres
[xdb@localhost ~]$ gdb -p 1339
GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-100.el7
...
(gdb) set follow-fork-mode child
(gdb) b exec_replication_command
Breakpoint 1 at 0x852fd2: file walsender.c, line 1438.
(gdb) c
Continuing.
[New process 1356]
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
[Switching to Thread 0x7f5df9d2d8c0 (LWP 1356)]
Breakpoint 1, exec_replication_command (cmd_string=0x1d66d78 "IDENTIFY_SYSTEM") at walsender.c:1438
1438        if (got_STOPPING)
(gdb)

第一个命令是IDENTIFY_SYSTEM,第二个命令才是需要跟踪的对象START_REPLICATION


(gdb) c
Continuing.
Breakpoint 1, exec_replication_command (cmd_string=0x1d66d78 "START_REPLICATION 0/5D000000 TIMELINE 16") at walsender.c:1438
1438        if (got_STOPPING)
(gdb)

1.执行相关初始化和校验


(gdb) n
1446        if (MyWalSnd->state == WALSNDSTATE_STOPPING)
(gdb) 
1454        SnapBuildClearExportedSnapshot();
(gdb) p *MyWalSnd
$1 = {pid = 1356, state = WALSNDSTATE_STARTUP, sentPtr = 0, needreload = false, write = 0, flush = 0, apply = 0, 
  writeLag = -1, flushLag = -1, applyLag = -1, mutex = 0 '\000', latch = 0x7f5dee92c4d4, sync_standby_priority = 0}
(gdb) n
1456        CHECK_FOR_INTERRUPTS();
(gdb)

2.切换内存上下文


(gdb) 
1458        cmd_context = AllocSetContextCreate(CurrentMemoryContext,
(gdb) 
1461        old_context = MemoryContextSwitchTo(cmd_context);
(gdb)

3.初始化复制扫描器


(gdb) 
1463        replication_scanner_init(cmd_string);
(gdb) n
1464        parse_rc = replication_yyparse();
(gdb) 
1465        if (parse_rc != 0)
(gdb) p parse_rc
$3 = 0
(gdb) 
(gdb) n
1471        cmd_node = replication_parse_result;
(gdb)
(gdb) 
1479        if (cmd_node->type != T_SQLCmd)
(gdb) n
1480            ereport(log_replication_commands ? LOG : DEBUG1,
(gdb) p cmd_node
$4 = (Node *) 0x1df4710
(gdb) p *cmd_node
$5 = {type = T_StartReplicationCmd}
(gdb)

4.执行事务相关的判断或校验


(gdb) n
1487        if (!IsTransactionBlock())
(gdb) 
1488            SnapBuildClearExportedSnapshot();
(gdb) 
1494        if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
(gdb) 
1500        CHECK_FOR_INTERRUPTS();
(gdb)

5.初始化输入输出消息


(gdb) 
1506        initStringInfo(&output_message);
(gdb) 
1507        initStringInfo(&reply_message);
(gdb) 
1508        initStringInfo(&tmpbuf);
(gdb) 
1511        pgstat_report_activity(STATE_RUNNING, NULL);

6.根据命令类型执行相应的命令
6.1命令类型为T_StartReplicationCmd,调用StartReplication


(gdb) n
1513        switch (cmd_node->type)
(gdb) 
1534                    StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
(gdb) 
1536                    PreventInTransactionBlock(true, "START_REPLICATION");
(gdb) 
1538                    if (cmd->kind == REPLICATION_KIND_PHYSICAL)
(gdb) 
1539                        StartReplication(cmd);

进入StartReplication


1539                        StartReplication(cmd);
(gdb) step
StartReplication (cmd=0x1df4710) at walsender.c:532
532     if (ThisTimeLineID == 0)
(gdb)

1.执行相关初始化和校验


(gdb) n
546     if (cmd->slotname)
(gdb) 
560     if (am_cascading_walsender)
(gdb)

2.选择时间线


(gdb) n
568     if (cmd->timeline != 0)
(gdb) 
572         sendTimeLine = cmd->timeline;
(gdb) 
573         if (sendTimeLine == ThisTimeLineID)
(gdb) 
575             sendTimeLineIsHistoric = false;
(gdb) p FlushPtr
$9 = 1560397696
(gdb) n
576             sendTimeLineValidUpto = InvalidXLogRecPtr;
(gdb) 
634     streamingDoneSending = streamingDoneReceiving = false;
(gdb) p sendTimeLine
$10 = 16
(gdb) p ThisTimeLineID
$11 = 16
(gdb) p *cmd
$12 = {type = T_StartReplicationCmd, kind = REPLICATION_KIND_PHYSICAL, slotname = 0x0, timeline = 16, 
  startpoint = 1560281088, options = 0x0}
(gdb)

3.进入COPY模式


(gdb) n
637     if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
(gdb)

3.1设置状态


648         WalSndSetState(WALSNDSTATE_CATCHUP);
(gdb) p sendTimeLineValidUpto
$13 = 0
(gdb) p cmd->startpoint
$14 = 1560281088
(gdb)

3.2发送CopyBothResponse消息,启动streaming


(gdb) n
651         pq_beginmessage(&buf, 'W');
(gdb) 
652         pq_sendbyte(&buf, 0);
(gdb) 
653         pq_sendint16(&buf, 0);
(gdb) 
654         pq_endmessage(&buf);
(gdb) p buf
$15 = {data = 0x1df53b0 "", len = 3, maxlen = 1024, cursor = 87}
(gdb) p buf->data
$16 = 0x1df53b0 ""
(gdb) x/hb buf->data
0x1df53b0:  0
(gdb) x/32hb buf->data
0x1df53b0:  0   0   0   127 127 127 127 127
0x1df53b8:  127 127 127 127 127 127 127 127
0x1df53c0:  127 127 127 127 127 127 127 127
0x1df53c8:  127 127 127 127 127 127 127 127
(gdb)

3.3初始化相关变量,如共享内存状态等


(gdb) n
655         pq_flush();
(gdb) 
661         if (FlushPtr < cmd->startpoint)
(gdb) p FlushPtr
$17 = 1560397696
(gdb) p cmd->startpoint
$18 = 1560281088
(gdb) n
672         sentPtr = cmd->startpoint;
(gdb) 
675         SpinLockAcquire(&MyWalSnd->mutex);
(gdb) 
676         MyWalSnd->sentPtr = sentPtr;
(gdb) 
677         SpinLockRelease(&MyWalSnd->mutex);
(gdb) 
679         SyncRepInitConfig();
(gdb) 
682         replication_active = true;

3.4进入主循环(WalSndLoop)


(gdb) 
684         WalSndLoop(XLogSendPhysical);
(gdb)

DONE!

四、参考资料

PG Source Code


网站栏目:PostgreSQL源码解读(154)-后台进程#6(walsender#2)
标题网址:http://cqcxhl.cn/article/psejge.html

其他资讯

在线咨询
服务热线
服务热线:028-86922220
TOP