=== Applying patches on top of PostgreSQL commit ID d8d7c5dc8f74506d35c7e8242be997fd5cf388eb === /etc/rc.d/jail: WARNING: Per-jail configuration via jail_* variables is obsolete. Please consider migrating to /etc/jail.conf. Fri Feb 6 22:17:28 UTC 2026 On branch cf/6205 nothing to commit, working tree clean === using 'git am' to apply patch ./notify-through-wal-v9.patch === Applying: Implement WAL-based async notifications for improved throughput .git/rebase-apply/patch:1562: trailing whitespace. * metadata (dbid, xid, notify_lsn) that points to the full notification .git/rebase-apply/patch:1803: trailing whitespace. /* .git/rebase-apply/patch:2342: new blank line at EOF. + warning: 3 lines add whitespace errors. Using index info to reconstruct a base tree... M src/backend/access/rmgrdesc/meson.build M src/backend/access/rmgrdesc/xactdesc.c M src/backend/access/transam/rmgr.c M src/backend/access/transam/xact.c M src/backend/access/transam/xlog.c M src/backend/commands/async.c M src/backend/tcop/utility.c M src/bin/pg_rewind/parsexlog.c M src/bin/pg_waldump/rmgrdesc.c M src/bin/pg_waldump/t/001_basic.pl M src/include/access/rmgrlist.h M src/include/access/xact.h M src/include/commands/async.h M src/include/storage/proc.h M src/test/isolation/expected/async-notify.out M src/test/isolation/specs/async-notify.spec Falling back to patching base and 3-way merge... Auto-merging src/test/isolation/specs/async-notify.spec CONFLICT (content): Merge conflict in src/test/isolation/specs/async-notify.spec Auto-merging src/test/isolation/expected/async-notify.out Auto-merging src/include/storage/proc.h Auto-merging src/include/commands/async.h Auto-merging src/include/access/xact.h Auto-merging src/include/access/rmgrlist.h Auto-merging src/bin/pg_waldump/t/001_basic.pl Auto-merging src/bin/pg_waldump/rmgrdesc.c Auto-merging src/bin/pg_rewind/parsexlog.c Auto-merging src/backend/tcop/utility.c Auto-merging src/backend/commands/async.c CONFLICT (content): Merge conflict in src/backend/commands/async.c Auto-merging src/backend/access/transam/xlog.c Auto-merging src/backend/access/transam/xact.c Auto-merging src/backend/access/transam/rmgr.c Auto-merging src/backend/access/rmgrdesc/xactdesc.c Auto-merging src/backend/access/rmgrdesc/meson.build error: Failed to merge in the changes. hint: Use 'git am --show-current-patch=diff' to see the failed patch Patch failed at 0001 Implement WAL-based async notifications for improved throughput When you have resolved this problem, run "git am --continue". If you prefer to skip this patch, run "git am --skip" instead. To restore the original branch and stop patching, run "git am --abort". Unstaged changes after reset: M src/backend/access/rmgrdesc/Makefile M src/backend/access/rmgrdesc/meson.build M src/backend/access/rmgrdesc/xactdesc.c M src/backend/access/transam/rmgr.c M src/backend/access/transam/xact.c M src/backend/access/transam/xlog.c M src/backend/commands/async.c M src/backend/tcop/utility.c M src/bin/pg_rewind/parsexlog.c M src/bin/pg_waldump/rmgrdesc.c M src/bin/pg_waldump/t/001_basic.pl M src/include/access/rmgrlist.h M src/include/access/xact.h M src/include/commands/async.h M src/include/storage/proc.h M src/test/isolation/expected/async-notify.out M src/test/isolation/expected/stats.out M src/test/isolation/expected/stats_1.out M src/test/isolation/specs/async-notify.spec Removing src/backend/access/rmgrdesc/asyncdesc.c Removing src/include/access/async_xlog.h Removing src/test/modules/test_listen_notify/ === using patch(1) to apply patch ./notify-through-wal-v9.patch === patching file src/backend/access/rmgrdesc/Makefile patching file src/backend/access/rmgrdesc/asyncdesc.c patching file src/backend/access/rmgrdesc/meson.build patching file src/backend/access/rmgrdesc/xactdesc.c patching file src/backend/access/transam/rmgr.c patching file src/backend/access/transam/xact.c Hunk #2 succeeded at 5862 (offset 4 lines). Hunk #3 succeeded at 5949 with fuzz 1 (offset 4 lines). Hunk #4 succeeded at 6020 (offset 4 lines). Hunk #5 succeeded at 6280 (offset 4 lines). patching file src/backend/access/transam/xlog.c Hunk #2 succeeded at 3901 (offset 18 lines). Hunk #3 succeeded at 3923 (offset 18 lines). patching file src/backend/commands/async.c Hunk #1 FAILED at 13. Hunk #2 FAILED at 21. Hunk #3 succeeded at 31 (offset -3 lines). Hunk #4 FAILED at 66. Hunk #5 succeeded at 100 (offset -8 lines). Hunk #6 FAILED at 127. Hunk #7 succeeded at 176 (offset 33 lines). Hunk #8 succeeded at 196 with fuzz 1 (offset 35 lines). Hunk #9 succeeded at 206 with fuzz 2 (offset 36 lines). Hunk #10 succeeded at 363 (offset 48 lines). Hunk #11 succeeded at 592 (offset 141 lines). Hunk #12 succeeded at 623 with fuzz 1 (offset 152 lines). Hunk #13 succeeded at 637 (offset 152 lines). Hunk #14 succeeded at 871 (offset 291 lines). Hunk #15 succeeded at 901 with fuzz 1 (offset 291 lines). Hunk #16 succeeded at 933 (offset 295 lines). Hunk #17 FAILED at 1000. Hunk #18 succeeded at 1497 (offset 381 lines). Hunk #19 succeeded at 1528 (offset 381 lines). Hunk #20 succeeded at 2062 (offset 679 lines). Hunk #21 succeeded at 2070 (offset 679 lines). Hunk #22 succeeded at 2123 (offset 679 lines). Hunk #23 succeeded at 2280 with fuzz 1 (offset 738 lines). Hunk #24 succeeded at 2358 with fuzz 1 (offset 738 lines). Hunk #25 succeeded at 2538 (offset 738 lines). Hunk #26 succeeded at 2686 (offset 746 lines). Hunk #27 succeeded at 2705 (offset 746 lines). Hunk #28 FAILED at 2003. Hunk #29 succeeded at 2875 (offset 748 lines). Hunk #30 succeeded at 2983 (offset 748 lines). Hunk #31 succeeded at 3252 (offset 775 lines). 6 out of 31 hunks FAILED -- saving rejects to file src/backend/commands/async.c.rej patching file src/backend/tcop/utility.c patching file src/bin/pg_rewind/parsexlog.c patching file src/bin/pg_waldump/rmgrdesc.c Hunk #2 succeeded at 24 with fuzz 2. patching file src/bin/pg_waldump/t/001_basic.pl patching file src/include/access/async_xlog.h patching file src/include/access/rmgrlist.h patching file src/include/access/xact.h patching file src/include/commands/async.h patching file src/include/storage/proc.h patching file src/test/isolation/expected/async-notify.out Hunk #1 succeeded at 229 (offset 122 lines). Hunk #2 succeeded at 241 (offset 122 lines). patching file src/test/isolation/expected/stats.out patching file src/test/isolation/expected/stats_1.out patching file src/test/isolation/specs/async-notify.spec Hunk #1 succeeded at 45 with fuzz 1 (offset 14 lines). patching file src/test/modules/test_listen_notify/Makefile patching file src/test/modules/test_listen_notify/meson.build patching file src/test/modules/test_listen_notify/t/002_queue_full.pl patching file src/test/modules/test_listen_notify/t/003_wal_pin_test.pl Unstaged changes after reset: M src/backend/access/rmgrdesc/Makefile M src/backend/access/rmgrdesc/meson.build M src/backend/access/rmgrdesc/xactdesc.c M src/backend/access/transam/rmgr.c M src/backend/access/transam/xact.c M src/backend/access/transam/xlog.c M src/backend/commands/async.c M src/backend/tcop/utility.c M src/bin/pg_rewind/parsexlog.c M src/bin/pg_waldump/rmgrdesc.c M src/bin/pg_waldump/t/001_basic.pl M src/include/access/rmgrlist.h M src/include/access/xact.h M src/include/commands/async.h M src/include/storage/proc.h M src/test/isolation/expected/async-notify.out M src/test/isolation/expected/stats.out M src/test/isolation/expected/stats_1.out M src/test/isolation/specs/async-notify.spec Removing src/backend/access/rmgrdesc/asyncdesc.c Removing src/backend/commands/async.c.rej Removing src/include/access/async_xlog.h Removing src/test/modules/test_listen_notify/ === using 'git apply' to apply patch ./notify-through-wal-v9.patch === /work/patches/./notify-through-wal-v9.patch:1581: trailing whitespace. * metadata (dbid, xid, notify_lsn) that points to the full notification /work/patches/./notify-through-wal-v9.patch:1822: trailing whitespace. /* Applied patch to 'src/backend/access/rmgrdesc/Makefile' cleanly. Falling back to direct application... Applied patch to 'src/backend/access/rmgrdesc/meson.build' cleanly. Applied patch to 'src/backend/access/rmgrdesc/xactdesc.c' cleanly. Applied patch to 'src/backend/access/transam/rmgr.c' cleanly. Applied patch to 'src/backend/access/transam/xact.c' cleanly. Applied patch to 'src/backend/access/transam/xlog.c' cleanly. Applied patch to 'src/backend/commands/async.c' with conflicts. Applied patch to 'src/backend/tcop/utility.c' cleanly. Applied patch to 'src/bin/pg_rewind/parsexlog.c' cleanly. Applied patch to 'src/bin/pg_waldump/rmgrdesc.c' cleanly. Applied patch to 'src/bin/pg_waldump/t/001_basic.pl' cleanly. Falling back to direct application... Applied patch to 'src/include/access/rmgrlist.h' cleanly. Applied patch to 'src/include/access/xact.h' cleanly. Applied patch to 'src/include/commands/async.h' cleanly. Applied patch to 'src/include/storage/proc.h' cleanly. Applied patch to 'src/test/isolation/expected/async-notify.out' cleanly. Applied patch to 'src/test/isolation/expected/stats.out' cleanly. Applied patch to 'src/test/isolation/expected/stats_1.out' cleanly. Applied patch to 'src/test/isolation/specs/async-notify.spec' with conflicts. Falling back to direct application... Falling back to direct application... /work/patches/./notify-through-wal-v9.patch:2361: new blank line at EOF. + Falling back to direct application... Falling back to direct application... U src/backend/commands/async.c U src/test/isolation/specs/async-notify.spec warning: 3 lines add whitespace errors. diff --cc src/backend/commands/async.c index 657c591618d,7efa51320b3..00000000000 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@@ -13,16 -13,22 +13,31 @@@ */ /*------------------------------------------------------------------------- ++<<<<<<< ours + * Async Notification Model as of v19: ++======= + * Async Notification Model as of 19.0 (WAL-backed payloads): ++>>>>>>> theirs * - * 1. Multiple backends on same machine. Multiple backends listening on - * several channels. (Channels are also called "conditions" in other - * parts of the code.) + * 1. Multiple backends on same machine. Multiple backends may be listening + * on each of several channels. * * 2. There is one central queue in disk-based storage (directory pg_notify/), * with actively-used pages mapped into shared memory by the slru.c module. ++<<<<<<< ours + * All notification messages are placed in the queue and later read out + * by listening backends. The single queue allows us to guarantee that + * notifications are received in commit order. ++======= + * Unlike earlier versions, the queue now stores only compact, fixed-size + * metadata entries; the full notification payloads are written to WAL. + * Each queue entry contains: the sender's database OID, the sender's + * transaction ID, and an LSN pointing to a WAL record that contains the + * serialized list of notifications for that transaction. + * + * There is no central knowledge of which backend listens on which channel; + * every backend has its own list of interesting channels. ++>>>>>>> theirs * * Although there is only one queue, notifications are treated as being * database-local; this is done by including the sender's database OID @@@ -53,13 -69,13 +78,23 @@@ * that has been sent, it can easily add some unique string into the extra * payload parameter. * ++<<<<<<< ours + * When the transaction is ready to commit, PreCommit_Notify() adds the + * pending notifications to the head of the queue. The head pointer of the + * queue always points to the next free position and a position is just a + * page number and the offset in that page. This is done before marking the + * transaction as committed in clog. If we run into problems writing the + * notifications, we can still call elog(ERROR, ...) and the transaction + * will roll back safely. ++======= + * When the transaction is ready to commit, PreCommit_Notify() first writes + * a WAL record containing the transaction's notification payloads and + * reserves space for a compact queue entry. Just before emitting the + * commit record, while holding NotifyQueueLock exclusively, a fixed-size + * compact entry is appended to the pg_notify queue that points to the WAL + * record (by LSN). If any step fails before commit, the transaction can + * still be aborted safely. ++>>>>>>> theirs * * Once we have put all of the notifications into the queue, we return to * CommitTransaction() which will then do the actual transaction commit. @@@ -150,9 -130,11 +185,17 @@@ * other backends will never be missed by ignoring self-notifies. * * The amount of shared memory used for notify management (notify_buffers) ++<<<<<<< ours + * can be varied without affecting anything but performance. The maximum + * amount of notification data that can be queued at one time is determined + * by the max_notify_queue_pages GUC. ++======= + * controls the SLRU cache for the compact queue pages. The total number of + * queue pages is limited by max_notify_queue_pages. Payload size and volume + * do not directly impact queue space, since payloads live in WAL; however, + * WAL retention must consider the oldest notification-data LSN needed by the + * queue or by any listener. ++>>>>>>> theirs *------------------------------------------------------------------------- */ @@@ -178,9 -166,10 +229,11 @@@ #include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/procsignal.h" + #include "storage/procarray.h" + #include "storage/proc.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" +#include "utils/dsa.h" #include "utils/guc_hooks.h" #include "utils/memutils.h" #include "utils/ps_status.h" @@@ -238,33 -248,17 +312,22 @@@ StaticAssertDecl(BLCKSZ % sizeof(AsyncQ #define SET_QUEUE_POS(x,y,z) \ do { \ - (x).page = (y); \ - (x).offset = (z); \ + (x) = ((int64) (y)) * ASYNC_ENTRIES_PER_PAGE + ((z) / ASYNC_ENTRY_SIZE); \ } while (0) - #define QUEUE_POS_EQUAL(x,y) \ - ((x).page == (y).page && (x).offset == (y).offset) + #define QUEUE_POS_EQUAL(x,y) ((x) == (y)) - #define QUEUE_POS_IS_ZERO(x) \ - ((x).page == 0 && (x).offset == 0) + #define QUEUE_POS_IS_ZERO(x) ((x) == 0) - /* choose logically smaller QueuePosition */ - #define QUEUE_POS_MIN(x,y) \ - (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \ - (x).page != (y).page ? (y) : \ - (x).offset < (y).offset ? (x) : (y)) - - /* choose logically larger QueuePosition */ - #define QUEUE_POS_MAX(x,y) \ - (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \ - (x).page != (y).page ? (x) : \ - (x).offset > (y).offset ? (x) : (y)) + /* choose logically smaller/larger positions */ + #define QUEUE_POS_MIN(x,y) ((x) <= (y) ? (x) : (y)) + #define QUEUE_POS_MAX(x,y) ((x) >= (y) ? (x) : (y)) +/* returns true if x comes before y in queue order */ +#define QUEUE_POS_PRECEDES(x,y) \ + (asyncQueuePagePrecedes((x).page, (y).page) || \ + ((x).page == (y).page && (x).offset < (y).offset)) + /* * Parameter determining how often we try to advance the tail pointer: * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is @@@ -542,25 -454,10 +606,27 @@@ static bool unlistenExitRegistered = fa /* True if we're currently registered as a listener in asyncQueueControl */ static bool amRegisteredListener = false; +/* + * Queue head positions for direct advancement. + * These are captured during PreCommit_Notify while holding the heavyweight + * lock on database 0, ensuring no other backend can insert notifications + * between them. SignalBackends uses these to advance idle backends. + */ +static QueuePosition queueHeadBeforeWrite; +static QueuePosition queueHeadAfterWrite; + +/* + * Workspace arrays for SignalBackends. These are preallocated in + * PreCommit_Notify to avoid needing memory allocation after committing to + * clog. + */ +static int32 *signalPids = NULL; +static ProcNumber *signalProcnos = NULL; + /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */ static bool tryAdvanceTail = false; + /* true if this backend reserved one compact entry pre-commit */ + static bool notifyEntryReserved = false; /* GUC parameters */ bool Trace_notify = false; @@@ -571,31 -468,16 +637,27 @@@ int max_notify_queue_pages = 1048576 /* local function prototypes */ static inline int64 asyncQueuePageDiff(int64 p, int64 q); static inline bool asyncQueuePagePrecedes(int64 p, int64 q); +static inline void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, + const char *channel); +static dshash_hash globalChannelTableHash(const void *key, size_t size, + void *arg); +static void initGlobalChannelTable(void); +static void initLocalChannelTable(void); static void queue_listen(ListenActionKind action, const char *channel); static void Async_UnlistenOnExit(int code, Datum arg); -static void Exec_ListenPreCommit(void); -static void Exec_ListenCommit(const char *channel); -static void Exec_UnlistenCommit(const char *channel); -static void Exec_UnlistenAllCommit(void); +static void BecomeRegisteredListener(void); +static void PrepareTableEntriesForListen(const char *channel); +static void PrepareTableEntriesForUnlisten(const char *channel); +static void PrepareTableEntriesForUnlistenAll(void); +static void RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr, + ListenerEntry *listeners, + int idx); +static void ApplyPendingListenActions(bool isCommit); +static void CleanupListenersOnExit(void); static bool IsListeningOn(const char *channel); static void asyncQueueUnregister(void); - static bool asyncQueueIsFull(void); static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength); - static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe); - static ListCell *asyncQueueAddEntries(ListCell *nextNotify); static double asyncQueueUsage(void); - static void asyncQueueFillWarning(void); static void SignalBackends(void); static void asyncQueueReadAllNotifications(void); static bool asyncQueueProcessPageEntries(QueuePosition *current, @@@ -809,10 -615,9 +934,11 @@@ AsyncShmemInit(void SET_QUEUE_POS(QUEUE_HEAD, 0, 0); SET_QUEUE_POS(QUEUE_TAIL, 0, 0); QUEUE_STOP_PAGE = 0; + asyncQueueControl->reservedEntries = 0; QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER; asyncQueueControl->lastQueueFillWarn = 0; + asyncQueueControl->globalChannelTableDSA = DSA_HANDLE_INVALID; + asyncQueueControl->globalChannelTableDSH = DSHASH_HANDLE_INVALID; for (int i = 0; i < MaxBackends; i++) { QUEUE_BACKEND_PID(i) = InvalidPid; @@@ -1214,142 -1005,115 +1358,222 @@@ PreCommit_Notify(void } } - /* Queue any pending notifies (must happen after the above) */ + /* Write notification data to WAL if we have any */ if (pendingNotifies) { ++<<<<<<< ours + ListCell *nextNotify; + bool firstIteration = true; + + /* + * Build list of unique channel names being notified for use by + * SignalBackends(). + * + * If uniqueChannelHash is available, use it to efficiently get the + * unique channels. Otherwise, fall back to the O(N^2) approach. + */ + pendingNotifies->uniqueChannelNames = NIL; + if (pendingNotifies->uniqueChannelHash != NULL) + { + HASH_SEQ_STATUS status; + ChannelName *channelEntry; + + hash_seq_init(&status, pendingNotifies->uniqueChannelHash); + while ((channelEntry = (ChannelName *) hash_seq_search(&status)) != NULL) + pendingNotifies->uniqueChannelNames = + lappend(pendingNotifies->uniqueChannelNames, + channelEntry->channel); + } + else + { + /* O(N^2) approach is better for small number of notifications */ + foreach_ptr(Notification, n, pendingNotifies->events) + { + char *channel = n->data; + bool found = false; + + /* Name present in list? */ + foreach_ptr(char, oldchan, pendingNotifies->uniqueChannelNames) + { + if (strcmp(oldchan, channel) == 0) + { + found = true; + break; + } + } + /* Add if not already in list */ + if (!found) + pendingNotifies->uniqueChannelNames = + lappend(pendingNotifies->uniqueChannelNames, + channel); + } + } + + /* Preallocate workspace that will be needed by SignalBackends() */ + if (signalPids == NULL) + signalPids = MemoryContextAlloc(TopMemoryContext, + MaxBackends * sizeof(int32)); + + if (signalProcnos == NULL) + signalProcnos = MemoryContextAlloc(TopMemoryContext, + MaxBackends * sizeof(ProcNumber)); ++======= + TransactionId currentXid; + ListCell *l; + size_t total_size = 0; + uint32 nnotifications = 0; + char *notifications_data; + char *ptr; + XLogRecPtr notify_lsn; ++>>>>>>> theirs /* * Make sure that we have an XID assigned to the current transaction. * GetCurrentTransactionId is cheap if we already have an XID, but not - * so cheap if we don't, and we'd prefer not to do that work while - * holding NotifyQueueLock. + * so cheap if we don't. */ - (void) GetCurrentTransactionId(); + currentXid = GetCurrentTransactionId(); /* - * Serialize writers by acquiring a special lock that we hold till - * after commit. This ensures that queue entries appear in commit - * order, and in particular that there are never uncommitted queue - * entries ahead of committed ones, so an uncommitted transaction - * can't block delivery of deliverable notifications. - * - * We use a heavyweight lock so that it'll automatically be released - * after either commit or abort. This also allows deadlocks to be - * detected, though really a deadlock shouldn't be possible here. - * - * The lock is on "database 0", which is pretty ugly but it doesn't - * seem worth inventing a special locktag category just for this. - * (Historical note: before PG 9.0, a similar lock on "database 0" was - * used by the flatfiles mechanism.) + * Step 1: Reserve space in the in-memory queue for the compact entry. */ - LockSharedObject(DatabaseRelationId, InvalidOid, 0, - AccessExclusiveLock); + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + { + QueuePosition reserved_head = QUEUE_HEAD + asyncQueueControl->reservedEntries; + int64 headPage = QUEUE_POS_PAGE(reserved_head); + int headSlot = (int) (reserved_head % ASYNC_ENTRIES_PER_PAGE); + int64 tailPage = QUEUE_POS_PAGE(QUEUE_TAIL); + LWLock *nextbank; + + /* If at last slot, ensure advancing to next page is allowed */ + if (headSlot == ASYNC_ENTRIES_PER_PAGE - 1) + { + if (asyncQueuePageDiff(headPage + 1, tailPage) >= max_notify_queue_pages) + { + LWLockRelease(NotifyQueueLock); + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not queue notification before commit"), + errdetail("asynchronous notification queue is full"))); + } + + /* Pre-initialize the next page so commit path doesn't fault it in */ + nextbank = SimpleLruGetBankLock(NotifyCtl, headPage + 1); + LWLockAcquire(nextbank, LW_EXCLUSIVE); + (void) SimpleLruZeroPage(NotifyCtl, headPage + 1); + LWLockRelease(nextbank); + } + + /* Reserve one entry */ + asyncQueueControl->reservedEntries++; + notifyEntryReserved = true; + } + LWLockRelease(NotifyQueueLock); /* ++<<<<<<< ours + * For the direct advancement optimization in SignalBackends(), we + * need to ensure that no other backend can insert queue entries + * between queueHeadBeforeWrite and queueHeadAfterWrite. The + * heavyweight lock above provides this guarantee, since it serializes + * all writers. + * + * Note: if the heavyweight lock were ever removed for scalability + * reasons, we could achieve the same guarantee by holding + * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather + * than releasing and reacquiring it for each page as we do below. + */ + + /* Initialize values to a safe default in case list is empty */ + SET_QUEUE_POS(queueHeadBeforeWrite, 0, 0); + SET_QUEUE_POS(queueHeadAfterWrite, 0, 0); + + /* Now push the notifications into the queue */ + nextNotify = list_head(pendingNotifies->events); + while (nextNotify != NULL) + { + /* + * Add the pending notifications to the queue. We acquire and + * release NotifyQueueLock once per page, which might be overkill + * but it does allow readers to get in while we're doing this. + * + * A full queue is very uncommon and should really not happen, + * given that we have so much space available in the SLRU pages. + * Nevertheless we need to deal with this possibility. Note that + * when we get here we are in the process of committing our + * transaction, but we have not yet committed to clog, so at this + * point in time we can still roll the transaction back. + */ + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + if (firstIteration) + { + queueHeadBeforeWrite = QUEUE_HEAD; + firstIteration = false; + } + asyncQueueFillWarning(); + if (asyncQueueIsFull()) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("too many notifications in the NOTIFY queue"))); + nextNotify = asyncQueueAddEntries(nextNotify); + queueHeadAfterWrite = QUEUE_HEAD; + LWLockRelease(NotifyQueueLock); ++======= + * Step 2: Write notification data to WAL. + */ + /* First pass: calculate total size needed for serialization */ + foreach(l, pendingNotifies->events) + { + Notification *n = (Notification *) lfirst(l); + + /* Size: 2 bytes for channel_len + 2 bytes for payload_len + strings */ + total_size += 4 + n->channel_len + 1 + n->payload_len + 1; + nnotifications++; ++>>>>>>> theirs } - /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */ + /* Allocate buffer for notification data */ + notifications_data = palloc(total_size); + ptr = notifications_data; + + /* Second pass: serialize all notifications */ + foreach(l, pendingNotifies->events) + { + Notification *n = (Notification *) lfirst(l); + char *channel = n->data; + char *payload = n->data + n->channel_len + 1; + + /* Write channel length, payload length, channel, and payload */ + memcpy(ptr, &n->channel_len, 2); + ptr += 2; + memcpy(ptr, &n->payload_len, 2); + ptr += 2; + memcpy(ptr, channel, n->channel_len + 1); + ptr += n->channel_len + 1; + memcpy(ptr, payload, n->payload_len + 1); + ptr += n->payload_len + 1; + } + + /* + * Conservatively pre-pin before WAL insert. There is a small window between + * the notify wal data being written, and the actual notify commit lsn being assigned + * to MyProc, where the recycler could remove the notify wal record, since it wouldn't + * be considered while trying to calculate the min. Assign the current LSN, which + * we know is <= notify_lsn. + */ + MyProc->notifyDataLsn = GetXLogInsertRecPtr(); + + /* Write notification data to WAL */ + notify_lsn = LogAsyncNotifyData(MyDatabaseId, currentXid, MyProcPid, + nnotifications, total_size, + notifications_data); + + pfree(notifications_data); + + /* Publish the uncommitted notify lsn for the proc */ + MyProc->notifyDataLsn = notify_lsn; + + /* Notification payloads are now read directly from WAL at delivery time. */ } } @@@ -2392,11 -1589,73 +2388,70 @@@ SignalBackends(void * NotifyQueueLock; which is unlikely but certainly possible. So we * just log a low-level debug message if it happens. */ - if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0) + if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, signalProcnos[i]) < 0) elog(DEBUG3, "could not signal backend with PID %d: %m", pid); } - - pfree(pids); - pfree(procnos); } + /* + * SignalBackendsForDatabase + * + * Wake listeners that are registered for the specified database OID. + * Intended for use by the startup/redo process when replaying a commit + * that enqueued NOTIFY entries for that database. + */ + void + SignalBackendsForDatabase(Oid dboid) + { + int32 *pids; + ProcNumber *procnos; + int count; + + /* Build a list of target PIDs for listeners in dboid */ + pids = (int32 *) palloc(MaxBackends * sizeof(int32)); + procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber)); + count = 0; + + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) + { + int32 pid = QUEUE_BACKEND_PID(i); + QueuePosition pos; + + Assert(pid != InvalidPid); + if (QUEUE_BACKEND_DBOID(i) != dboid) + continue; /* only same DB */ + + pos = QUEUE_BACKEND_POS(i); + /* Skip if already caught up */ + if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD)) + continue; + + pids[count] = pid; + procnos[count] = i; + count++; + } + LWLockRelease(NotifyQueueLock); + + /* Now send signals */ + for (int i = 0; i < count; i++) + { + int32 pid = pids[i]; + + if (pid == MyProcPid) + { + notifyInterruptPending = true; + continue; + } + + if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0) + elog(DEBUG3, "could not signal backend with PID %d: %m", pid); + } + + pfree(pids); + pfree(procnos); + } + /* * AtAbort_Notify * @@@ -2409,13 -1667,28 +2464,27 @@@ void AtAbort_Notify(void) { - /* - * If we LISTEN but then roll back the transaction after PreCommit_Notify, - * we have registered as a listener but have not made any entry in - * listenChannels. In that case, deregister again. - */ - if (amRegisteredListener && listenChannels == NIL) + /* Revert staged listen/unlisten changes */ + ApplyPendingListenActions(false); + + /* If we're no longer listening on anything, unregister */ + if (amRegisteredListener && LocalChannelTableIsEmpty()) asyncQueueUnregister(); + /* Release any reserved queue entry */ + if (notifyEntryReserved) + { + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + if (asyncQueueControl->reservedEntries > 0) + asyncQueueControl->reservedEntries--; + LWLockRelease(NotifyQueueLock); + notifyEntryReserved = false; + } + + /* Clear per-backend NOTIFY pin (no lock needed) */ + if (!XLogRecPtrIsInvalid(MyProc->notifyDataLsn)) + MyProc->notifyDataLsn = InvalidXLogRecPtr; + /* And clean up */ ClearPendingActionsAndNotifies(); } @@@ -2789,66 -2058,368 +2862,382 @@@ asyncQueueProcessPageEntries(QueuePosit } /* ++<<<<<<< ours + * Quick check for the case that we're not listening on any + * channels, before calling TransactionIdDidCommit(). This makes + * that case a little faster, but more importantly, it ensures + * that if there's a bad entry in the queue for which + * TransactionIdDidCommit() fails for some reason, we can skip + * over it on the first LISTEN in a session, and not get stuck on + * it indefinitely. (This is a little trickier than it looks: it + * works because BecomeRegisteredListener runs this code before we + * have made the first entry in localChannelTable.) + */ + if (LocalChannelTableIsEmpty()) ++======= + * Since queue entries are written atomically with commit records + * while holding NotifyQueueLock exclusively, all entries in the queue + * are guaranteed to be from committed transactions. + * + * Step 5: Read notification data using stored LSN from WAL. + * The compact entry only contains metadata. + */ + processNotificationFromWAL(qe->notify_lsn); + } + + /* Loop back if we're not at end of page */ + } while (!reachedEndOfPage); + + /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ + LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); + + /* + * No direct delivery from queue buffer: payloads have already been + * processed via processNotificationFromWAL() above. + */ + + if (QUEUE_POS_EQUAL(*current, stop)) + reachedStop = true; + + return reachedStop; + } + + /* + * processNotificationFromWAL + * + * Fetch notification data from WAL using the stored LSN and process + * the individual notifications for delivery to listening frontend. + * This implements Step 5 of the new WAL-based notification system. + */ + static void + processNotificationFromWAL(XLogRecPtr notify_lsn) + { + XLogReaderState *xlogreader; + DecodedXLogRecord *record; + xl_async_notify_data *xlrec; + char *data; + char *ptr; + uint32_t remaining; + int srcPid; + char *errormsg; + Oid dboid; + uint32 nnotifications; + + /* + * Create XLog reader to fetch the notification data record. + * We use a temporary reader since this is called during normal + * notification processing, not during recovery. + */ + xlogreader = XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(.page_read = &read_local_xlog_page, + .segment_open = &wal_segment_open, + .segment_close = &wal_segment_close), + NULL); + if (!xlogreader) + elog(ERROR, "failed to allocate XLog reader for notification data"); + + /* Start reading exactly at the NOTIFY_DATA record begin LSN */ + XLogBeginRead(xlogreader, notify_lsn); + + /* Read the NOTIFY_DATA record */ + record = (DecodedXLogRecord *) XLogReadRecord(xlogreader, &errormsg); + if (record == NULL) + { + XLogReaderFree(xlogreader); + elog(ERROR, "failed to read notification data from WAL at %X/%X: %s", + LSN_FORMAT_ARGS(notify_lsn), errormsg ? errormsg : "no error message"); + } + + /* Verify this is the expected record type */ + if (XLogRecGetRmid(xlogreader) != RM_ASYNC_ID || + (XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK) != XLOG_ASYNC_NOTIFY_DATA) + elog(ERROR, "expected NOTIFY_DATA at %X/%X, found rmgr %u info %u", + LSN_FORMAT_ARGS(notify_lsn), + XLogRecGetRmid(xlogreader), + (XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK)); + + /* Extract the notification data from the WAL record */ + xlrec = (xl_async_notify_data *) XLogRecGetData(xlogreader); + srcPid = xlrec->srcPid; + dboid = xlrec->dbid; + data = (char *) xlrec + SizeOfAsyncNotifyData; + ptr = data; + remaining = XLogRecGetDataLen(xlogreader) - SizeOfAsyncNotifyData; + nnotifications = xlrec->nnotifications; + + /* + * Process each notification in the serialized data. + * The format is: 2-byte channel_len, 2-byte payload_len, + * null-terminated channel, null-terminated payload. + */ + for (uint32_t i = 0; i < nnotifications && remaining >= 4; i++) + { + uint16 channel_len; + uint16 payload_len; + char *channel; + char *payload; + + /* Read lengths */ + memcpy(&channel_len, ptr, 2); + ptr += 2; + memcpy(&payload_len, ptr, 2); + ptr += 2; + remaining -= 4; + + /* Verify we have enough data */ + if (remaining < channel_len + 1 + payload_len + 1) + break; + + /* Extract channel and payload strings */ + channel = ptr; + ptr += channel_len + 1; + payload = ptr; + ptr += payload_len + 1; + remaining -= (channel_len + 1 + payload_len + 1); + + /* Deliver notification if we're listening on this channel */ + if (dboid == MyDatabaseId && IsListeningOn(channel)) + NotifyMyFrontEnd(channel, payload, srcPid); + } + + /* Clean up */ + XLogReaderFree(xlogreader); + } + + + /* + * AsyncNotifyOldestRequiredLSN + * + * Compute the oldest WAL LSN required to satisfy NOTIFY delivery for any + * still-present queue entry. Returns true and sets *oldest_lsn when the + * queue is non-empty (QUEUE_TAIL != QUEUE_HEAD). Otherwise returns false. + * + * We look at the queue entry at QUEUE_TAIL; since that is the oldest entry + * still needed by some listener, its notify_lsn is the minimum WAL position + * that must be retained. + */ + bool + AsyncNotifyOldestRequiredLSN(XLogRecPtr *oldest_lsn) + { + XLogRecPtr committed_min = InvalidXLogRecPtr; + XLogRecPtr pin_min = InvalidXLogRecPtr; + bool have_any = false; + + /* First, scan per-backend pins under ProcArrayLock */ + LWLockAcquire(ProcArrayLock, LW_SHARED); + { + int i; + for (i = 0; i < MaxBackends; i++) + { + PGPROC *proc = GetPGProcByNumber(i); + XLogRecPtr lsn; + + /* Skip unused PGPROC slots */ + if (proc->pid == 0) ++>>>>>>> theirs continue; - if (TransactionIdDidCommit(qe->xid)) + lsn = proc->notifyDataLsn; + if (!XLogRecPtrIsInvalid(lsn)) { - memcpy(local_buf_end, qe, qe->length); - local_buf_end += qe->length; + if (XLogRecPtrIsInvalid(pin_min) || lsn < pin_min) + pin_min = lsn; } - else + } + } + LWLockRelease(ProcArrayLock); + + /* Then, scan per-page committed mins under shared lock */ + LWLockAcquire(NotifyQueueLock, LW_SHARED); + if (NotifyPageMins != NULL) + { + int i; + for (i = 0; i < max_notify_queue_pages; i++) + { + if (NotifyPageMins[i].page_no >= 0 && !XLogRecPtrIsInvalid(NotifyPageMins[i].min_lsn)) { - /* - * The source transaction aborted or crashed, so we just - * ignore its notifications. - */ + if (XLogRecPtrIsInvalid(committed_min) || (NotifyPageMins[i].min_lsn < committed_min)) + committed_min = NotifyPageMins[i].min_lsn; } } + } + LWLockRelease(NotifyQueueLock); - /* Loop back if we're not at end of page */ - } while (!reachedEndOfPage); + if (!XLogRecPtrIsInvalid(pin_min)) + { + *oldest_lsn = pin_min; + have_any = true; + } + if (!XLogRecPtrIsInvalid(committed_min)) + { + if (!have_any || (committed_min < *oldest_lsn)) + *oldest_lsn = committed_min; + have_any = true; + } - /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ - LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); + return have_any; + } + + + /* + * asyncQueueAddCompactEntry + * + * Add a compact entry to the notification SLRU queue containing only + * metadata (dbid, xid, notify_lsn) that points to the full notification + * data in WAL. This is much more efficient than the old approach of + * storing complete notification content in the SLRU queue. + */ + void + asyncQueueAddCompactEntry(Oid dbid, TransactionId xid, XLogRecPtr notify_lsn) + { + AsyncQueueEntry entry; + QueuePosition queue_head; + int64 pageno; + int64 entry_pageno = -1; /* page where the entry is written */ + int offset; + int slotno; + LWLock *banklock; /* - * Now that we have let go of the SLRU bank lock, send the notifications - * to our backend + * Fill in the compact entry with just the metadata. + * No payload data is stored here - it's all in WAL. */ - Assert(local_buf_end - local_buf <= BLCKSZ); - for (char *p = local_buf; p < local_buf_end;) + entry.dboid = dbid; + entry.xid = xid; + entry.notify_lsn = notify_lsn; + + /* Caller should already hold NotifyQueueLock in exclusive mode */ + queue_head = QUEUE_HEAD; + + /* Capacity was reserved in PreCommit_Notify. Just write the entry. */ + + /* + * Get the current page. If this is the first write since postmaster + * started, initialize the first page. + */ + pageno = QUEUE_POS_PAGE(queue_head); + banklock = SimpleLruGetBankLock(NotifyCtl, pageno); + + LWLockAcquire(banklock, LW_EXCLUSIVE); + + if (QUEUE_POS_IS_ZERO(queue_head)) + slotno = SimpleLruZeroPage(NotifyCtl, pageno); + else + slotno = SimpleLruReadPage(NotifyCtl, pageno, true, + InvalidTransactionId); + + /* Mark the page dirty before writing */ + NotifyCtl->shared->page_dirty[slotno] = true; + + offset = QUEUE_POS_OFFSET(queue_head); + + /* Check if the compact entry fits on the current page */ + if (offset + sizeof(AsyncQueueEntry) <= QUEUE_PAGESIZE) { - AsyncQueueEntry *qe = (AsyncQueueEntry *) p; + /* Copy the compact entry to the shared buffer */ + memcpy(NotifyCtl->shared->page_buffer[slotno] + offset, + &entry, + sizeof(AsyncQueueEntry)); + + entry_pageno = pageno; - /* qe->data is the null-terminated channel name */ - char *channel = qe->data; + /* Advance queue head by the size of our compact entry */ + if (asyncQueueAdvance(&queue_head, sizeof(AsyncQueueEntry))) + { + /* + * Page became full. Initialize the next page to ensure SLRU + * consistency (similar to what asyncQueueAddEntries does). + */ + LWLock *nextlock; + + pageno = QUEUE_POS_PAGE(queue_head); + nextlock = SimpleLruGetBankLock(NotifyCtl, pageno); + if (nextlock != banklock) + { + LWLockRelease(banklock); + LWLockAcquire(nextlock, LW_EXCLUSIVE); + } + SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head)); + if (nextlock != banklock) + { + LWLockRelease(nextlock); + LWLockAcquire(banklock, LW_EXCLUSIVE); + } + + /* Set cleanup flag if appropriate */ + if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0) + tryAdvanceTail = true; + } - if (IsListeningOn(channel)) + /* Update the global queue head and consume reservation (not in recovery) */ + QUEUE_HEAD = queue_head; + if (!RecoveryInProgress()) { - /* payload follows channel name */ - char *payload = qe->data + strlen(channel) + 1; + Assert(asyncQueueControl->reservedEntries > 0); + asyncQueueControl->reservedEntries--; + } + } + else + { + /* + * No room on current page. Move to the next page and write entry at + * offset 0; padding is unnecessary with fixed-size entries and bounded + * scans that stop at QUEUE_HEAD. + */ + LWLockRelease(banklock); - NotifyMyFrontEnd(channel, payload, qe->srcPid); + /* Move head to the start of the next page */ + SET_QUEUE_POS(queue_head, QUEUE_POS_PAGE(queue_head) + 1, 0); + + /* Ensure next page is present */ + banklock = SimpleLruGetBankLock(NotifyCtl, QUEUE_POS_PAGE(queue_head)); + LWLockAcquire(banklock, LW_EXCLUSIVE); + slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head)); + NotifyCtl->shared->page_dirty[slotno] = true; + + /* Write entry at beginning of the new page */ + memcpy(NotifyCtl->shared->page_buffer[slotno], &entry, sizeof(AsyncQueueEntry)); + + entry_pageno = QUEUE_POS_PAGE(queue_head); + + /* Advance queue head and initialize subsequent page if needed */ + if (asyncQueueAdvance(&queue_head, sizeof(AsyncQueueEntry))) + { + LWLock *nextlock; + pageno = QUEUE_POS_PAGE(queue_head); + nextlock = SimpleLruGetBankLock(NotifyCtl, pageno); + if (nextlock != banklock) + { + LWLockRelease(banklock); + LWLockAcquire(nextlock, LW_EXCLUSIVE); + } + SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head)); + if (nextlock != banklock) + { + LWLockRelease(nextlock); + LWLockAcquire(banklock, LW_EXCLUSIVE); + } + if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0) + tryAdvanceTail = true; } - p += qe->length; + /* Update the global queue head and consume reservation (not in recovery) */ + QUEUE_HEAD = queue_head; + if (!RecoveryInProgress()) + { + Assert(asyncQueueControl->reservedEntries > 0); + asyncQueueControl->reservedEntries--; + } } - if (QUEUE_POS_EQUAL(*current, stop)) - reachedStop = true; + /* Update per-page minimum under locks. */ + if (entry_pageno >= 0) + { + /* Caller holds NotifyQueueLock EXCLUSIVE (see xact.c commit path). */ + NotifyPageMinUpdateForPage(entry_pageno, notify_lsn); + } - return reachedStop; + LWLockRelease(banklock); } /* diff --cc src/test/isolation/specs/async-notify.spec index d09c2297f09,2e2e3e186be..00000000000 --- a/src/test/isolation/specs/async-notify.spec +++ b/src/test/isolation/specs/async-notify.spec @@@ -31,21 -31,7 +31,25 @@@ step notifys1 ROLLBACK TO SAVEPOINT s2; COMMIT; } ++<<<<<<< ours +step notifys_simple { + BEGIN; + SAVEPOINT s1; + NOTIFY c1, 'simple1'; + NOTIFY c2, 'simple2'; + RELEASE SAVEPOINT s1; + COMMIT; +} +step notify_many_with_dup { + BEGIN; + SELECT pg_notify('c1', 'msg' || s::text) FROM generate_series(1, 17) s; + SELECT pg_notify('c1', 'msg1'); + COMMIT; +} +step usage { SELECT pg_notification_queue_usage() > 0 AS nonzero; } ++======= + step usage { SELECT pg_notification_queue_usage() = 0 AS nonzero; } ++>>>>>>> theirs step bignotify { SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; } teardown { UNLISTEN *; }