PostgreSQL 逻辑解码输出插件

2021-09-06 11:32 更新
48.6.1. 初始化函数
48.6.2. 能力
48.6.3. 输出模式
48.6.4. 输出插件回调
48.6.5. 用于产生输出的函数

可以在 PostgreSQL 源码树的 contrib/test_decoding 子目录中找到一个输出插件的例子。

48.6.1. 初始化函数

一个输出插件是通过动态载入一个以输出插件名称作为基础名称的共享库来载入的。 将使用普通的库搜索路径来定位该库。为了提供所要求的输出插件回调并且指示该 库确实是一个输出插件,需要提供一个名为 _PG_output_plugin_init的函数。这个函数会被传入一个 结构,其中被填充了各个动作的回调函数指针。

typedef struct OutputPluginCallbacks
{
    LogicalDecodeStartupCB startup_cb;
    LogicalDecodeBeginCB begin_cb;
    LogicalDecodeChangeCB change_cb;
    LogicalDecodeTruncateCB truncate_cb;
    LogicalDecodeCommitCB commit_cb;
    LogicalDecodeMessageCB message_cb;
    LogicalDecodeFilterByOriginCB filter_by_origin_cb;
    LogicalDecodeShutdownCB shutdown_cb;
} OutputPluginCallbacks;

typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);

回调函数begin_cbchange_cb 以及commit_cb是必需的,而 startup_cbfilter_by_origin_cbtruncate_cbshutdown_cb是可选的。如果没有设置truncate_cb但是要对一个TRUNCATE进行编码,则该动作将被忽略。

48.6.2. 能力

要解码、格式化并且输出更改,输出插件可以使用大部分后端的标准功能,包括调用 输出函数。只要访问的关系是initdbpg_catalog模式中创建的或者被使用

ALTER TABLE user_catalog_table SET (user_catalog_table = true);
CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);

标记为用户提供的系统表,就允许对关系的只读访问。任何导致事务 ID 分配的动作 都被禁止。其中包括写表、执行 DDL 更改以及调用pg_current_xact_id()

48.6.3. 输出模式

输出插件回调可以以近乎任意格式向消费者传递数据。对于某些用例,例如通过 SQL 查看更改,以可能包含任何数据的数据类型(例如bytea)返回数据 可能会很麻烦。如果输出插件只输出服务器编码的文本数据,它可以在 启动回调中通过把 OutputPluginOptions.output_type设 置为OUTPUT_PLUGIN_TEXTUAL_OUTPUT替代 OUTPUT_PLUGIN_BINARY_OUTPUT来声明这一点。在这种情况下, 所有的数据必须是属于服务器的编码,这样一个text数据就能包含它。在 启用了断言的编译中会检查这一点。

48.6.4. 输出插件回调

一个输出插件需要提供一些回调,它通过它们得到有关更改发生的通知。

并发事务以提交顺序被解码,并且只有属于特定事务的更改会在 begincommit回调之间被解码。被显式 或隐式回滚的事务不会被解码。成功的检查点被折叠到包含它们的事务中,并且 保持它们在该事务中被执行的顺序。

注意

只有已经被安全地刷入磁盘的事务将会被解码。当 synchronous_commit被设置为off 时,这会导致一个COMMIT在随后的 pg_logical_slot_get_changes()中不会立即被解码。

48.6.4.1. 启动回调

只要一个复制槽被创建或者被要求流式传送更改,可选的 startup_cb回调就会被调用,不管有多少更改准备输出。

typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
                                        OutputPluginOptions *options,
                                        bool is_init);

当复制槽被创建时,is_init参数将为真,否则为假。 options指向一个输出插件可以设置的选项 的结构:

typedef struct OutputPluginOptions
{
    OutputPluginOutputType output_type;
    bool        receive_rewrites;
} OutputPluginOptions;

output_type必须被设置为 OUTPUT_PLUGIN_TEXTUAL_OUTPUT 或者OUTPUT_PLUGIN_BINARY_OUTPUT。另见 本文中的第 48.6.3 节。如果receive_rewrites为真,还将为在某些DDL操作期间的堆重写造成的更改调用输出插件。这些是处理DDL复制的插件感兴趣的事情,但是它们要求特殊的处理。

启动回调应该验证出现在 ctx->output_plugin_options中的选项。如果输出插件 需要有一个状态,它可以使用 ctx->output_plugin_private来存储之。

48.6.4.2. 关闭回调

只要一个之前活跃的复制槽不再使用,就会调用可选的 shutdown_cb回调,它可以被用来释放输出插件 私有的资源。该槽并不一定需要被删除,只要其中的流被停止即可。

typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);

48.6.4.3. 事务开始回调

只要一个已提交事务的开始动作被解码,就会调用必须提供的 begin_cb回调。被中止的事务及其内容不会被解码。

typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
                                      ReorderBufferTXN *txn);

txn参数包含有关该事务的元信息,例如该 事务被提交的时间戳以及该事务的 XID。

48.6.4.4. 事务结束回调

只要一个已提交事务的提交动作被解码,就会调用必须提供的 commit_cb回调。在此之前,如果有任何被修改 的行,将为所有被修改的行调用change_cb回调。

typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
                                       ReorderBufferTXN *txn,
                                       XLogRecPtr commit_lsn);

48.6.4.5. 更改回调

对于一个事务中的每一个行修改,都将调用必须提供的 change_cb回调,这种修改可能是一个 INSERTUPDATE或者 DELETE。即使原始命令一次修改了多行,该回调也会 为其中的每一行调用一次。

typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
                                       ReorderBufferTXN *txn,
                                       Relation relation,
                                       ReorderBufferChange *change);

ctxtxn参数与 begin_cbcommit_cb 回调具有相同的内容,但是额外多出一个关系描述符 relation指向该行所属的关系以及一个结构 change 描述被传入的行修改。

注意

只有没有被标记为“不做日志”(见 UNLOGGED)并且非临时(见 TEMPORARY or TEMP)的用户定义表中的 更改才能用逻辑解码抽取。

48.6.4.6. 截断回调

truncate_cb回调会为一个TRUNCATE命令被调用。

typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
                                         ReorderBufferTXN *txn,
                                         int nrelations,
                                         Relation relations[],
                                         ReorderBufferChange *change);

参数类似于change_cb回调。不过,由于通过外键连接起来的表上的TRUNCATE动作需要一起被执行,这个回调会接收到一个关系的数组而不是单个关系。详情请见对TRUNCATE语句的介绍。

48.6.4.7. 源过滤器回调

可选的filter_by_origin_cb回调被用来 决定从origin_id重放的数据是否是 输出插件感兴趣的数据。

typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
                                               RepOriginId origin_id);

ctx参数具有和其他回调相同的内容。 对这个回调只有复制源的信息可用。要标志传进来的节点上发生的 更改是无关的,返回真,这会导致这些更改被过滤掉,否则返回假。 对于被过滤掉的事务和更改将不会调用其他回调。

在实现级联或者多向复制方案时,这个回调可以派上用场。用源头 过滤允许阻止在这样的设置下来回地复制同样的更改。虽然事务和 更改也携带了有关源头的信息,通过这个回调过滤明显更有效些。

48.6.4.8. 通用消息回调

只要一个逻辑解码消息被解码出来,可选的message_cb回调就会被调用。

typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
                                        XLogRecPtr message_lsn,
                                        bool transactional,
                                        const char *prefix,
                                        Size message_size,
                                        const char *message);

txn参数包含关于该事务的元信息,如被提交的时间戳和 XID。不过要注意,当消息是非事务性的并且记录该消息的事务中还没有被分配 XID 时,这个参数可以为 NULL。lsn是该消息的 WAL 位置。transactional说明该消息是否为事务性的。prefix是一个任意的空终结的前缀,它当前插件被用来标识感兴趣的消息。最后的 message 参数保存着大小为message_size的消息。

应该格外小心确保输出插件用于标识感兴趣消息的前缀是唯一的。建议使用扩展或者输出插件本身的名称。

48.6.5. 用于产生输出的函数

begin_cbcommit_cb或者 change_cb回调中,为了实际产生输出, 输出插件可以把数据写入到ctx->out中的 StringInfo输出缓冲区中。在写出到输出缓冲区之前,必须先 调用OutputPluginPrepareWrite(ctx, last_write),在完 成写入到缓冲区后,必须调用 OutputPluginWrite(ctx, last_write)来执行写出。 last_write指出一次特定的写出是否为该回调的最后 一次写出。

下面的例子展示了如何把数据输出给一个输出插件的消费者:

OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
OutputPluginWrite(ctx, true);


以上内容是否对您有帮助:
在线笔记
App下载
App下载

扫描二维码

下载编程狮App

公众号
微信公众号

编程狮公众号