=== Applying patches on top of PostgreSQL commit ID e76defbcf09e22941d8cea462e2deef36d43fa04 === /etc/rc.d/jail: WARNING: Per-jail configuration via jail_* variables is obsolete. Please consider migrating to /etc/jail.conf. Mon Nov 17 03:53:23 UTC 2025 On branch cf/6205 nothing to commit, working tree clean === using 'git am' to apply patch ./notify-through-wal-v8.patch === Applying: Implement WAL-based async notifications for improved throughput .git/rebase-apply/patch:1546: trailing whitespace. * metadata (dbid, xid, notify_lsn) that points to the full notification .git/rebase-apply/patch:1771: trailing whitespace. /* .git/rebase-apply/patch:2310: new blank line at EOF. + warning: 3 lines add whitespace errors. Using index info to reconstruct a base tree... M src/backend/access/transam/xact.c M src/backend/access/transam/xlog.c M src/backend/commands/async.c M src/backend/tcop/utility.c M src/include/commands/async.h Falling back to patching base and 3-way merge... Auto-merging src/include/commands/async.h CONFLICT (content): Merge conflict in src/include/commands/async.h Auto-merging src/backend/tcop/utility.c Auto-merging src/backend/commands/async.c CONFLICT (content): Merge conflict in src/backend/commands/async.c Auto-merging src/backend/access/transam/xlog.c Auto-merging src/backend/access/transam/xact.c error: Failed to merge in the changes. hint: Use 'git am --show-current-patch=diff' to see the failed patch Patch failed at 0001 Implement WAL-based async notifications for improved throughput When you have resolved this problem, run "git am --continue". If you prefer to skip this patch, run "git am --skip" instead. To restore the original branch and stop patching, run "git am --abort". Unstaged changes after reset: M src/backend/access/rmgrdesc/Makefile M src/backend/access/rmgrdesc/meson.build M src/backend/access/rmgrdesc/xactdesc.c M src/backend/access/transam/rmgr.c M src/backend/access/transam/xact.c M src/backend/access/transam/xlog.c M src/backend/commands/async.c M src/backend/tcop/utility.c M src/bin/pg_rewind/parsexlog.c M src/bin/pg_waldump/rmgrdesc.c M src/bin/pg_waldump/t/001_basic.pl M src/include/access/rmgrlist.h M src/include/access/xact.h M src/include/commands/async.h M src/include/storage/proc.h M src/test/isolation/expected/async-notify.out M src/test/isolation/expected/stats.out M src/test/isolation/expected/stats_1.out M src/test/isolation/specs/async-notify.spec Removing src/backend/access/rmgrdesc/asyncdesc.c Removing src/include/access/async_xlog.h Removing src/test/modules/test_listen_notify/ === using patch(1) to apply patch ./notify-through-wal-v8.patch === patching file src/backend/access/rmgrdesc/Makefile patching file src/backend/access/rmgrdesc/asyncdesc.c patching file src/backend/access/rmgrdesc/meson.build patching file src/backend/access/rmgrdesc/xactdesc.c patching file src/backend/access/transam/rmgr.c patching file src/backend/access/transam/xact.c Hunk #1 succeeded at 1449 (offset 1 line). Hunk #2 succeeded at 5858 (offset 6 lines). Hunk #3 succeeded at 5945 (offset 6 lines). Hunk #4 succeeded at 6016 (offset 6 lines). Hunk #5 succeeded at 6276 (offset 6 lines). patching file src/backend/access/transam/xlog.c Hunk #2 succeeded at 3883 (offset 1 line). Hunk #3 succeeded at 3905 (offset 1 line). patching file src/backend/commands/async.c Hunk #7 FAILED at 461. Hunk #8 succeeded at 479 (offset -1 lines). Hunk #9 succeeded at 574 (offset -1 lines). Hunk #10 succeeded at 604 (offset -1 lines). Hunk #11 succeeded at 632 (offset -1 lines). Hunk #12 succeeded at 994 (offset -1 lines). Hunk #13 succeeded at 1160 (offset -1 lines). Hunk #14 succeeded at 1191 (offset -1 lines). Hunk #15 succeeded at 1427 (offset -1 lines). Hunk #16 FAILED at 1436. Hunk #20 FAILED at 2021. Hunk #21 succeeded at 2045 (offset -7 lines). Hunk #22 FAILED at 2116. Hunk #23 succeeded at 2097 with fuzz 2 (offset -32 lines). Hunk #24 FAILED at 2139. Hunk #25 FAILED at 2167. Hunk #26 succeeded at 2306 (offset 10 lines). Hunk #27 succeeded at 2656 (offset 124 lines). 6 out of 27 hunks FAILED -- saving rejects to file src/backend/commands/async.c.rej patching file src/backend/tcop/utility.c Hunk #1 succeeded at 325 (offset 2 lines). patching file src/bin/pg_rewind/parsexlog.c patching file src/bin/pg_waldump/rmgrdesc.c patching file src/bin/pg_waldump/t/001_basic.pl patching file src/include/access/async_xlog.h patching file src/include/access/rmgrlist.h patching file src/include/access/xact.h patching file src/include/commands/async.h Hunk #2 succeeded at 63 with fuzz 2 (offset 3 lines). patching file src/include/storage/proc.h patching file src/test/isolation/expected/async-notify.out patching file src/test/isolation/expected/stats.out patching file src/test/isolation/expected/stats_1.out patching file src/test/isolation/specs/async-notify.spec patching file src/test/modules/test_listen_notify/Makefile patching file src/test/modules/test_listen_notify/meson.build patching file src/test/modules/test_listen_notify/t/002_queue_full.pl patching file src/test/modules/test_listen_notify/t/003_wal_pin_test.pl Unstaged changes after reset: M src/backend/access/rmgrdesc/Makefile M src/backend/access/rmgrdesc/meson.build M src/backend/access/rmgrdesc/xactdesc.c M src/backend/access/transam/rmgr.c M src/backend/access/transam/xact.c M src/backend/access/transam/xlog.c M src/backend/commands/async.c M src/backend/tcop/utility.c M src/bin/pg_rewind/parsexlog.c M src/bin/pg_waldump/rmgrdesc.c M src/bin/pg_waldump/t/001_basic.pl M src/include/access/rmgrlist.h M src/include/access/xact.h M src/include/commands/async.h M src/include/storage/proc.h M src/test/isolation/expected/async-notify.out M src/test/isolation/expected/stats.out M src/test/isolation/expected/stats_1.out M src/test/isolation/specs/async-notify.spec Removing src/backend/access/rmgrdesc/asyncdesc.c Removing src/backend/commands/async.c.rej Removing src/include/access/async_xlog.h Removing src/test/modules/test_listen_notify/ === using 'git apply' to apply patch ./notify-through-wal-v8.patch === /work/patches/./notify-through-wal-v8.patch:1562: trailing whitespace. * metadata (dbid, xid, notify_lsn) that points to the full notification /work/patches/./notify-through-wal-v8.patch:1787: trailing whitespace. /* Applied patch to 'src/backend/access/rmgrdesc/Makefile' cleanly. Falling back to direct application... Applied patch to 'src/backend/access/rmgrdesc/meson.build' cleanly. Applied patch to 'src/backend/access/rmgrdesc/xactdesc.c' cleanly. Applied patch to 'src/backend/access/transam/rmgr.c' cleanly. Applied patch to 'src/backend/access/transam/xact.c' cleanly. Applied patch to 'src/backend/access/transam/xlog.c' cleanly. Applied patch to 'src/backend/commands/async.c' with conflicts. Applied patch to 'src/backend/tcop/utility.c' cleanly. Applied patch to 'src/bin/pg_rewind/parsexlog.c' cleanly. Applied patch to 'src/bin/pg_waldump/rmgrdesc.c' cleanly. Applied patch to 'src/bin/pg_waldump/t/001_basic.pl' cleanly. Falling back to direct application... Applied patch to 'src/include/access/rmgrlist.h' cleanly. Applied patch to 'src/include/access/xact.h' cleanly. Applied patch to 'src/include/commands/async.h' with conflicts. Applied patch to 'src/include/storage/proc.h' cleanly. Applied patch to 'src/test/isolation/expected/async-notify.out' cleanly. Applied patch to 'src/test/isolation/expected/stats.out' cleanly. Applied patch to 'src/test/isolation/expected/stats_1.out' cleanly. Applied patch to 'src/test/isolation/specs/async-notify.spec' cleanly. Falling back to direct application... Falling back to direct application... /work/patches/./notify-through-wal-v8.patch:2326: new blank line at EOF. + Falling back to direct application... Falling back to direct application... U src/backend/commands/async.c U src/include/commands/async.h warning: 3 lines add whitespace errors. diff --cc src/backend/commands/async.c index e1cf659485a,375984c95d4..00000000000 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@@ -438,17 -461,13 +461,17 @@@ static void Exec_UnlistenCommit(const c static void Exec_UnlistenAllCommit(void); static bool IsListeningOn(const char *channel); static void asyncQueueUnregister(void); - static bool asyncQueueIsFull(void); static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength); - static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe); - static ListCell *asyncQueueAddEntries(ListCell *nextNotify); static double asyncQueueUsage(void); - static void asyncQueueFillWarning(void); static void SignalBackends(void); static void asyncQueueReadAllNotifications(void); -static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, +static bool asyncQueueProcessPageEntries(QueuePosition *current, QueuePosition stop, ++<<<<<<< ours + Snapshot snapshot); ++======= + char *page_buffer); ++>>>>>>> theirs static void asyncQueueAdvanceTail(void); static void ProcessIncomingNotify(bool flush); static bool AsyncExistsPendingNotify(Notification *n); @@@ -1313,167 -1443,6 +1447,170 @@@ asyncQueueAdvance(volatile QueuePositio } /* ++<<<<<<< ours + * Fill the AsyncQueueEntry at *qe with an outbound notification message. + */ +static void +asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) +{ + size_t channellen = n->channel_len; + size_t payloadlen = n->payload_len; + int entryLength; + + Assert(channellen < NAMEDATALEN); + Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH); + + /* The terminators are already included in AsyncQueueEntryEmptySize */ + entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen; + entryLength = QUEUEALIGN(entryLength); + qe->length = entryLength; + qe->dboid = MyDatabaseId; + qe->xid = GetCurrentTransactionId(); + qe->srcPid = MyProcPid; + memcpy(qe->data, n->data, channellen + payloadlen + 2); +} + +/* + * Add pending notifications to the queue. + * + * We go page by page here, i.e. we stop once we have to go to a new page but + * we will be called again and then fill that next page. If an entry does not + * fit into the current page, we write a dummy entry with an InvalidOid as the + * database OID in order to fill the page. So every page is always used up to + * the last byte which simplifies reading the page later. + * + * We are passed the list cell (in pendingNotifies->events) containing the next + * notification to write and return the first still-unwritten cell back. + * Eventually we will return NULL indicating all is done. + * + * We are holding NotifyQueueLock already from the caller and grab + * page specific SLRU bank lock locally in this function. + */ +static ListCell * +asyncQueueAddEntries(ListCell *nextNotify) +{ + AsyncQueueEntry qe; + QueuePosition queue_head; + int64 pageno; + int offset; + int slotno; + LWLock *prevlock; + + /* + * We work with a local copy of QUEUE_HEAD, which we write back to shared + * memory upon exiting. The reason for this is that if we have to advance + * to a new page, SimpleLruZeroPage might fail (out of disk space, for + * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise, + * subsequent insertions would try to put entries into a page that slru.c + * thinks doesn't exist yet.) So, use a local position variable. Note + * that if we do fail, any already-inserted queue entries are forgotten; + * this is okay, since they'd be useless anyway after our transaction + * rolls back. + */ + queue_head = QUEUE_HEAD; + + /* + * If this is the first write since the postmaster started, we need to + * initialize the first page of the async SLRU. Otherwise, the current + * page should be initialized already, so just fetch it. + */ + pageno = QUEUE_POS_PAGE(queue_head); + prevlock = SimpleLruGetBankLock(NotifyCtl, pageno); + + /* We hold both NotifyQueueLock and SLRU bank lock during this operation */ + LWLockAcquire(prevlock, LW_EXCLUSIVE); + + if (QUEUE_POS_IS_ZERO(queue_head)) + slotno = SimpleLruZeroPage(NotifyCtl, pageno); + else + slotno = SimpleLruReadPage(NotifyCtl, pageno, true, + InvalidTransactionId); + + /* Note we mark the page dirty before writing in it */ + NotifyCtl->shared->page_dirty[slotno] = true; + + while (nextNotify != NULL) + { + Notification *n = (Notification *) lfirst(nextNotify); + + /* Construct a valid queue entry in local variable qe */ + asyncQueueNotificationToEntry(n, &qe); + + offset = QUEUE_POS_OFFSET(queue_head); + + /* Check whether the entry really fits on the current page */ + if (offset + qe.length <= QUEUE_PAGESIZE) + { + /* OK, so advance nextNotify past this item */ + nextNotify = lnext(pendingNotifies->events, nextNotify); + } + else + { + /* + * Write a dummy entry to fill up the page. Actually readers will + * only check dboid and since it won't match any reader's database + * OID, they will ignore this entry and move on. + */ + qe.length = QUEUE_PAGESIZE - offset; + qe.dboid = InvalidOid; + qe.xid = InvalidTransactionId; + qe.data[0] = '\0'; /* empty channel */ + qe.data[1] = '\0'; /* empty payload */ + } + + /* Now copy qe into the shared buffer page */ + memcpy(NotifyCtl->shared->page_buffer[slotno] + offset, + &qe, + qe.length); + + /* Advance queue_head appropriately, and detect if page is full */ + if (asyncQueueAdvance(&(queue_head), qe.length)) + { + LWLock *lock; + + pageno = QUEUE_POS_PAGE(queue_head); + lock = SimpleLruGetBankLock(NotifyCtl, pageno); + if (lock != prevlock) + { + LWLockRelease(prevlock); + LWLockAcquire(lock, LW_EXCLUSIVE); + prevlock = lock; + } + + /* + * Page is full, so we're done here, but first fill the next page + * with zeroes. The reason to do this is to ensure that slru.c's + * idea of the head page is always the same as ours, which avoids + * boundary problems in SimpleLruTruncate. The test in + * asyncQueueIsFull() ensured that there is room to create this + * page without overrunning the queue. + */ + slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head)); + + /* + * If the new page address is a multiple of QUEUE_CLEANUP_DELAY, + * set flag to remember that we should try to advance the tail + * pointer (we don't want to actually do that right here). + */ + if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0) + tryAdvanceTail = true; + + /* And exit the loop */ + break; + } + } + + /* Success, so update the global QUEUE_HEAD */ + QUEUE_HEAD = queue_head; + + LWLockRelease(prevlock); + + return nextNotify; +} + +/* ++======= ++>>>>>>> theirs * SQL function to return the fraction of the notification queue currently * occupied. */ @@@ -1850,10 -1845,16 +2013,9 @@@ ProcessNotifyInterrupt(bool flush static void asyncQueueReadAllNotifications(void) { - volatile QueuePosition pos; + QueuePosition pos; QueuePosition head; - Snapshot snapshot; - /* page_buffer must be adequately aligned, so use a union */ - union - { - char buf[QUEUE_PAGESIZE]; - AsyncQueueEntry align; - } page_buffer; - /* Fetch current state */ LWLockAcquire(NotifyQueueLock, LW_SHARED); /* Assert checks that we have a valid state entry */ @@@ -1949,56 -1932,45 +2071,79 @@@ * rewrite pages under us. Especially we don't want to hold a lock * while sending the notifications to the frontend. */ ++<<<<<<< ours + reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot); ++======= + reachedStop = asyncQueueProcessPageEntries(&pos, head, + page_buffer.buf); ++>>>>>>> theirs } while (!reachedStop); - } - PG_FINALLY(); - { + /* Update shared state */ LWLockAcquire(NotifyQueueLock, LW_SHARED); QUEUE_BACKEND_POS(MyProcNumber) = pos; LWLockRelease(NotifyQueueLock); + + ExitOnAnyError = save_ExitOnAnyError; } - PG_END_TRY(); - /* Done with snapshot */ - UnregisterSnapshot(snapshot); } /* * Fetch notifications from the shared queue, beginning at position current, * and deliver relevant ones to my frontend. * ++<<<<<<< ours + * The function returns true once we have reached the stop position or an + * uncommitted notification, and false if we have finished with the page. ++======= + * The current page must have been fetched into page_buffer from shared + * memory. (We could access the page right in shared memory, but that + * would imply holding the SLRU bank lock throughout this routine.) + * + * We stop if we reach the "stop" position or reach the end of the page. + * + * The function returns true once we have reached the stop position, and false + * if we have finished with the page. ++>>>>>>> theirs * In other words: once it returns true there is no need to look further. * The QueuePosition *current is advanced past all processed messages. */ static bool -asyncQueueProcessPageEntries(volatile QueuePosition *current, +asyncQueueProcessPageEntries(QueuePosition *current, QueuePosition stop, ++<<<<<<< ours + Snapshot snapshot) ++======= + char *page_buffer) ++>>>>>>> theirs { + int64 curpage = QUEUE_POS_PAGE(*current); + int slotno; + char *page_buffer; bool reachedStop = false; bool reachedEndOfPage; ++<<<<<<< ours + + /* + * We copy the entries into a local buffer to avoid holding the SLRU lock + * while we transmit them to our frontend. The local buffer must be + * adequately aligned, so use a union. + */ + union + { + char buf[QUEUE_PAGESIZE]; + AsyncQueueEntry align; + } local_buf; + char *local_buf_end = local_buf.buf; + + slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, + InvalidTransactionId); + page_buffer = NotifyCtl->shared->page_buffer[slotno]; ++======= + AsyncQueueEntry *qe; + Snapshot snap = ActiveSnapshotSet() ? GetActiveSnapshot() : NULL; ++>>>>>>> theirs do { @@@ -2020,91 -1999,361 +2173,412 @@@ /* Ignore messages destined for other databases */ if (qe->dboid == MyDatabaseId) { - if (XidInMVCCSnapshot(qe->xid, snapshot)) + /* + * Since queue entries are written atomically with commit records + * while holding NotifyQueueLock exclusively, all entries in the queue + * are guaranteed to be from committed transactions. + * + * Step 5: Read notification data using stored LSN from WAL. + * The compact entry only contains metadata. + */ + processNotificationFromWAL(qe->notify_lsn); + } + + /* Loop back if we're not at end of page */ + } while (!reachedEndOfPage); + + if (QUEUE_POS_EQUAL(*current, stop)) + reachedStop = true; + + return reachedStop; + } + + /* + * processNotificationFromWAL + * + * Fetch notification data from WAL using the stored LSN and process + * the individual notifications for delivery to listening frontend. + * This implements Step 5 of the new WAL-based notification system. + */ + static void + processNotificationFromWAL(XLogRecPtr notify_lsn) + { + XLogReaderState *xlogreader; + DecodedXLogRecord *record; + xl_async_notify_data *xlrec; + char *data; + char *ptr; + uint32_t remaining; + int srcPid; + char *errormsg; + Oid dboid; + uint32 nnotifications; + + /* + * Create XLog reader to fetch the notification data record. + * We use a temporary reader since this is called during normal + * notification processing, not during recovery. + */ + xlogreader = XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(.page_read = &read_local_xlog_page, + .segment_open = &wal_segment_open, + .segment_close = &wal_segment_close), + NULL); + if (!xlogreader) + elog(ERROR, "failed to allocate XLog reader for notification data"); + + /* Start reading exactly at the NOTIFY_DATA record begin LSN */ + XLogBeginRead(xlogreader, notify_lsn); + + /* Read the NOTIFY_DATA record */ + record = (DecodedXLogRecord *) XLogReadRecord(xlogreader, &errormsg); + if (record == NULL) + { + XLogReaderFree(xlogreader); + elog(ERROR, "failed to read notification data from WAL at %X/%X: %s", + LSN_FORMAT_ARGS(notify_lsn), errormsg ? errormsg : "no error message"); + } + + /* Verify this is the expected record type */ + if (XLogRecGetRmid(xlogreader) != RM_ASYNC_ID || + (XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK) != XLOG_ASYNC_NOTIFY_DATA) + elog(ERROR, "expected NOTIFY_DATA at %X/%X, found rmgr %u info %u", + LSN_FORMAT_ARGS(notify_lsn), + XLogRecGetRmid(xlogreader), + (XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK)); + + /* Extract the notification data from the WAL record */ + xlrec = (xl_async_notify_data *) XLogRecGetData(xlogreader); + srcPid = xlrec->srcPid; + dboid = xlrec->dbid; + data = (char *) xlrec + SizeOfAsyncNotifyData; + ptr = data; + remaining = XLogRecGetDataLen(xlogreader) - SizeOfAsyncNotifyData; + nnotifications = xlrec->nnotifications; + + /* + * Process each notification in the serialized data. + * The format is: 2-byte channel_len, 2-byte payload_len, + * null-terminated channel, null-terminated payload. + */ + for (uint32_t i = 0; i < nnotifications && remaining >= 4; i++) + { + uint16 channel_len; + uint16 payload_len; + char *channel; + char *payload; + + /* Read lengths */ + memcpy(&channel_len, ptr, 2); + ptr += 2; + memcpy(&payload_len, ptr, 2); + ptr += 2; + remaining -= 4; + + /* Verify we have enough data */ + if (remaining < channel_len + 1 + payload_len + 1) + break; + + /* Extract channel and payload strings */ + channel = ptr; + ptr += channel_len + 1; + payload = ptr; + ptr += payload_len + 1; + remaining -= (channel_len + 1 + payload_len + 1); + + /* Deliver notification if we're listening on this channel */ + if (dboid == MyDatabaseId && IsListeningOn(channel)) + NotifyMyFrontEnd(channel, payload, srcPid); + } + + /* Clean up */ + XLogReaderFree(xlogreader); + } + + + /* + * AsyncNotifyOldestRequiredLSN + * + * Compute the oldest WAL LSN required to satisfy NOTIFY delivery for any + * still-present queue entry. Returns true and sets *oldest_lsn when the + * queue is non-empty (QUEUE_TAIL != QUEUE_HEAD). Otherwise returns false. + * + * We look at the queue entry at QUEUE_TAIL; since that is the oldest entry + * still needed by some listener, its notify_lsn is the minimum WAL position + * that must be retained. + */ + bool + AsyncNotifyOldestRequiredLSN(XLogRecPtr *oldest_lsn) + { + XLogRecPtr committed_min = InvalidXLogRecPtr; + XLogRecPtr pin_min = InvalidXLogRecPtr; + bool have_any = false; + + /* First, scan per-backend pins under ProcArrayLock */ + LWLockAcquire(ProcArrayLock, LW_SHARED); + { + int i; + for (i = 0; i < MaxBackends; i++) + { + PGPROC *proc = GetPGProcByNumber(i); + XLogRecPtr lsn; + + /* Skip unused PGPROC slots */ + if (proc->pid == 0) + continue; + + lsn = proc->notifyDataLsn; + if (!XLogRecPtrIsInvalid(lsn)) { - /* - * The source transaction is still in progress, so we can't - * process this message yet. Break out of the loop, but first - * back up *current so we will reprocess the message next - * time. (Note: it is unlikely but not impossible for - * TransactionIdDidCommit to fail, so we can't really avoid - * this advance-then-back-up behavior when dealing with an - * uncommitted message.) - * - * Note that we must test XidInMVCCSnapshot before we test - * TransactionIdDidCommit, else we might return a message from - * a transaction that is not yet visible to snapshots; compare - * the comments at the head of heapam_visibility.c. - * - * Also, while our own xact won't be listed in the snapshot, - * we need not check for TransactionIdIsCurrentTransactionId - * because our transaction cannot (yet) have queued any - * messages. - */ - *current = thisentry; - reachedStop = true; - break; + if (XLogRecPtrIsInvalid(pin_min) || lsn < pin_min) + pin_min = lsn; } ++<<<<<<< ours + + /* + * Quick check for the case that we're not listening on any + * channels, before calling TransactionIdDidCommit(). This makes + * that case a little faster, but more importantly, it ensures + * that if there's a bad entry in the queue for which + * TransactionIdDidCommit() fails for some reason, we can skip + * over it on the first LISTEN in a session, and not get stuck on + * it indefinitely. + */ + if (listenChannels == NIL) + continue; + + if (TransactionIdDidCommit(qe->xid)) + { + memcpy(local_buf_end, qe, qe->length); + local_buf_end += qe->length; ++======= + } + } + LWLockRelease(ProcArrayLock); + + /* Then, scan per-page committed mins under shared lock */ + LWLockAcquire(NotifyQueueLock, LW_SHARED); + if (NotifyPageMins != NULL) + { + int i; + for (i = 0; i < max_notify_queue_pages; i++) + { + if (NotifyPageMins[i].page_no >= 0 && !XLogRecPtrIsInvalid(NotifyPageMins[i].min_lsn)) + { + if (XLogRecPtrIsInvalid(committed_min) || (NotifyPageMins[i].min_lsn < committed_min)) + committed_min = NotifyPageMins[i].min_lsn; } - else + } + } + LWLockRelease(NotifyQueueLock); + + if (!XLogRecPtrIsInvalid(pin_min)) + { + *oldest_lsn = pin_min; + have_any = true; + } + if (!XLogRecPtrIsInvalid(committed_min)) + { + if (!have_any || (committed_min < *oldest_lsn)) + *oldest_lsn = committed_min; + have_any = true; + } + + return have_any; + } + + + /* + * asyncQueueAddCompactEntry + * + * Add a compact entry to the notification SLRU queue containing only + * metadata (dbid, xid, notify_lsn) that points to the full notification + * data in WAL. This is much more efficient than the old approach of + * storing complete notification content in the SLRU queue. + */ + void + asyncQueueAddCompactEntry(Oid dbid, TransactionId xid, XLogRecPtr notify_lsn) + { + AsyncQueueEntry entry; + QueuePosition queue_head; + int64 pageno; + int64 entry_pageno = -1; /* page where the entry is written */ + int offset; + int slotno; + LWLock *banklock; + + /* + * Fill in the compact entry with just the metadata. + * No payload data is stored here - it's all in WAL. + */ + entry.dboid = dbid; + entry.xid = xid; + entry.notify_lsn = notify_lsn; + + /* Caller should already hold NotifyQueueLock in exclusive mode */ + queue_head = QUEUE_HEAD; + + /* Capacity was reserved in PreCommit_Notify. Just write the entry. */ + + /* + * Get the current page. If this is the first write since postmaster + * started, initialize the first page. + */ + pageno = QUEUE_POS_PAGE(queue_head); + banklock = SimpleLruGetBankLock(NotifyCtl, pageno); + + LWLockAcquire(banklock, LW_EXCLUSIVE); + + if (QUEUE_POS_IS_ZERO(queue_head)) + slotno = SimpleLruZeroPage(NotifyCtl, pageno); + else + slotno = SimpleLruReadPage(NotifyCtl, pageno, true, + InvalidTransactionId); + + /* Mark the page dirty before writing */ + NotifyCtl->shared->page_dirty[slotno] = true; + + offset = QUEUE_POS_OFFSET(queue_head); + + /* Check if the compact entry fits on the current page */ + if (offset + sizeof(AsyncQueueEntry) <= QUEUE_PAGESIZE) + { + /* Copy the compact entry to the shared buffer */ + memcpy(NotifyCtl->shared->page_buffer[slotno] + offset, + &entry, + sizeof(AsyncQueueEntry)); + + entry_pageno = pageno; + + /* Advance queue head by the size of our compact entry */ + if (asyncQueueAdvance(&queue_head, sizeof(AsyncQueueEntry))) + { + /* + * Page became full. Initialize the next page to ensure SLRU + * consistency (similar to what asyncQueueAddEntries does). + */ + LWLock *nextlock; + + pageno = QUEUE_POS_PAGE(queue_head); + nextlock = SimpleLruGetBankLock(NotifyCtl, pageno); + if (nextlock != banklock) + { + LWLockRelease(banklock); + LWLockAcquire(nextlock, LW_EXCLUSIVE); ++>>>>>>> theirs + } + SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head)); + if (nextlock != banklock) { - /* - * The source transaction aborted or crashed, so we just - * ignore its notifications. - */ + LWLockRelease(nextlock); + LWLockAcquire(banklock, LW_EXCLUSIVE); } + + /* Set cleanup flag if appropriate */ + if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0) + tryAdvanceTail = true; } - /* Loop back if we're not at end of page */ - } while (!reachedEndOfPage); + /* Update the global queue head and consume reservation (not in recovery) */ + QUEUE_HEAD = queue_head; + if (!RecoveryInProgress()) + { + Assert(asyncQueueControl->reservedEntries > 0); + asyncQueueControl->reservedEntries--; + } + } + else + { + /* + * No room on current page. Move to the next page and write entry at + * offset 0; padding is unnecessary with fixed-size entries and bounded + * scans that stop at QUEUE_HEAD. + */ + LWLockRelease(banklock); ++<<<<<<< ours + /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ + LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); + + /* + * Now that we have let go of the SLRU bank lock, send the notifications + * to our backend + */ + Assert(local_buf_end - local_buf.buf <= BLCKSZ); + for (char *p = local_buf.buf; p < local_buf_end;) + { + AsyncQueueEntry *qe = (AsyncQueueEntry *) p; + + /* qe->data is the null-terminated channel name */ + char *channel = qe->data; + + if (IsListeningOn(channel)) + { + /* payload follows channel name */ + char *payload = qe->data + strlen(channel) + 1; + + NotifyMyFrontEnd(channel, payload, qe->srcPid); + } + + p += qe->length; + } + + if (QUEUE_POS_EQUAL(*current, stop)) + reachedStop = true; ++======= + /* Move head to the start of the next page */ + SET_QUEUE_POS(queue_head, QUEUE_POS_PAGE(queue_head) + 1, 0); ++>>>>>>> theirs - return reachedStop; + /* Ensure next page is present */ + banklock = SimpleLruGetBankLock(NotifyCtl, QUEUE_POS_PAGE(queue_head)); + LWLockAcquire(banklock, LW_EXCLUSIVE); + slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head)); + NotifyCtl->shared->page_dirty[slotno] = true; + + /* Write entry at beginning of the new page */ + memcpy(NotifyCtl->shared->page_buffer[slotno], &entry, sizeof(AsyncQueueEntry)); + + entry_pageno = QUEUE_POS_PAGE(queue_head); + + /* Advance queue head and initialize subsequent page if needed */ + if (asyncQueueAdvance(&queue_head, sizeof(AsyncQueueEntry))) + { + LWLock *nextlock; + pageno = QUEUE_POS_PAGE(queue_head); + nextlock = SimpleLruGetBankLock(NotifyCtl, pageno); + if (nextlock != banklock) + { + LWLockRelease(banklock); + LWLockAcquire(nextlock, LW_EXCLUSIVE); + } + SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head)); + if (nextlock != banklock) + { + LWLockRelease(nextlock); + LWLockAcquire(banklock, LW_EXCLUSIVE); + } + if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0) + tryAdvanceTail = true; + } + + /* Update the global queue head and consume reservation (not in recovery) */ + QUEUE_HEAD = queue_head; + if (!RecoveryInProgress()) + { + Assert(asyncQueueControl->reservedEntries > 0); + asyncQueueControl->reservedEntries--; + } + } + + /* Update per-page minimum under locks. */ + if (entry_pageno >= 0) + { + /* Caller holds NotifyQueueLock EXCLUSIVE (see xact.c commit path). */ + NotifyPageMinUpdateForPage(entry_pageno, notify_lsn); + } + + LWLockRelease(banklock); } /* diff --cc src/include/commands/async.h index aaec7314c10,90870e6ae82..00000000000 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@@ -46,7 -60,20 +60,25 @@@ extern void HandleNotifyInterrupt(void) /* process interrupts */ extern void ProcessNotifyInterrupt(bool flush); ++<<<<<<< ours +/* freeze old transaction IDs in notify queue (called by VACUUM) */ +extern void AsyncNotifyFreezeXids(TransactionId newFrozenXid); ++======= + /* WAL-based notification functions */ + extern XLogRecPtr LogAsyncNotifyData(Oid dboid, TransactionId xid, int32 srcPid, + uint32 nnotifications, Size data_len, char *data); + extern void async_redo(XLogReaderState *record); + extern void async_desc(StringInfo buf, XLogReaderState *record); + extern const char *async_identify(uint8 info); + + /* notification queue functions */ + extern void asyncQueueAddCompactEntry(Oid dbid, TransactionId xid, XLogRecPtr notify_lsn); + + /* wake listeners for a specific database (used during standby replay) */ + extern void SignalBackendsForDatabase(Oid dboid); + + /* Spill helper to be called before WAL recycle */ + extern bool AsyncNotifyOldestRequiredLSN(XLogRecPtr *oldest_lsn); ++>>>>>>> theirs #endif /* ASYNC_H */