=== Applying patches on top of PostgreSQL commit ID 9cbb1d21d67ec3cb2d5342073d220a0c1e0ad82c === /etc/rc.d/jail: WARNING: Per-jail configuration via jail_* variables is obsolete. Please consider migrating to /etc/jail.conf. Fri Jan 16 04:51:25 UTC 2026 On branch cf/6248 nothing to commit, working tree clean === using 'git am' to apply patch ./0001-async-avoid-pallocs-in-critical-section-v3.patch === Applying: Preallocate signal arrays to avoid pallocs AtCommit Using index info to reconstruct a base tree... M src/backend/commands/async.c Falling back to patching base and 3-way merge... Auto-merging src/backend/commands/async.c CONFLICT (content): Merge conflict in src/backend/commands/async.c error: Failed to merge in the changes. hint: Use 'git am --show-current-patch=diff' to see the failed patch Patch failed at 0001 Preallocate signal arrays to avoid pallocs AtCommit When you have resolved this problem, run "git am --continue". If you prefer to skip this patch, run "git am --skip" instead. To restore the original branch and stop patching, run "git am --abort". Unstaged changes after reset: M src/backend/commands/async.c === using patch(1) to apply patch ./0001-async-avoid-pallocs-in-critical-section-v3.patch === patching file src/backend/commands/async.c Hunk #1 succeeded at 559 with fuzz 2 (offset 141 lines). Hunk #2 succeeded at 774 with fuzz 1 (offset 291 lines). Hunk #3 succeeded at 1308 (offset 379 lines). Hunk #4 FAILED at 1614. Hunk #5 FAILED at 1658. Hunk #6 FAILED at 1667. Hunk #7 FAILED at 1685. 4 out of 7 hunks FAILED -- saving rejects to file src/backend/commands/async.c.rej Unstaged changes after reset: M src/backend/commands/async.c Removing src/backend/commands/async.c.rej === using 'git apply' to apply patch ./0001-async-avoid-pallocs-in-critical-section-v3.patch === Applied patch to 'src/backend/commands/async.c' with conflicts. U src/backend/commands/async.c diff --cc src/backend/commands/async.c index 657c591618d,37526c7b726..00000000000 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@@ -543,21 -419,10 +543,28 @@@ static bool unlistenExitRegistered = fa static bool amRegisteredListener = false; /* ++<<<<<<< ours + * Queue head positions for direct advancement. + * These are captured during PreCommit_Notify while holding the heavyweight + * lock on database 0, ensuring no other backend can insert notifications + * between them. SignalBackends uses these to advance idle backends. + */ +static QueuePosition queueHeadBeforeWrite; +static QueuePosition queueHeadAfterWrite; + +/* + * Workspace arrays for SignalBackends. These are preallocated in + * PreCommit_Notify to avoid needing memory allocation after committing to + * clog. + */ +static int32 *signalPids = NULL; +static ProcNumber *signalProcnos = NULL; ++======= + * Arrays for SignalBackends. + */ + static int32 *notifySignalPids = NULL; + static ProcNumber *notifySignalProcs = NULL; ++>>>>>>> theirs /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */ static bool tryAdvanceTail = false; @@@ -630,142 -484,24 +637,163 @@@ asyncQueuePagePrecedes(int64 p, int64 q } /* ++<<<<<<< ours + * GlobalChannelKeyInit + * Prepare a global channel table key for hashing. + */ +static inline void +GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel) +{ + memset(key, 0, sizeof(GlobalChannelKey)); + key->dboid = dboid; + strlcpy(key->channel, channel, NAMEDATALEN); +} + +/* + * globalChannelTableHash + * Hash function for global channel table keys. + */ +static dshash_hash +globalChannelTableHash(const void *key, size_t size, void *arg) +{ + const GlobalChannelKey *k = (const GlobalChannelKey *) key; + dshash_hash h; + + h = DatumGetUInt32(hash_uint32(k->dboid)); + h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel, + strnlen(k->channel, NAMEDATALEN))); + + return h; +} + +/* parameters for the global channel table */ +static const dshash_parameters globalChannelTableDSHParams = { + sizeof(GlobalChannelKey), + sizeof(GlobalChannelEntry), + dshash_memcmp, + globalChannelTableHash, + dshash_memcpy, + LWTRANCHE_NOTIFY_CHANNEL_HASH +}; + +/* + * initGlobalChannelTable + * Lazy initialization of the global channel table. + */ +static void +initGlobalChannelTable(void) +{ + MemoryContext oldcontext; + + /* Quick exit if we already did this */ + if (asyncQueueControl->globalChannelTableDSH != DSHASH_HANDLE_INVALID && + globalChannelTable != NULL) + return; + + /* Otherwise, use a lock to ensure only one process creates the table */ + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + + /* Be sure any local memory allocated by DSA routines is persistent */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + + if (asyncQueueControl->globalChannelTableDSH == DSHASH_HANDLE_INVALID) + { + /* Initialize dynamic shared hash table for global channels */ + globalChannelDSA = dsa_create(LWTRANCHE_NOTIFY_CHANNEL_HASH); + dsa_pin(globalChannelDSA); + dsa_pin_mapping(globalChannelDSA); + globalChannelTable = dshash_create(globalChannelDSA, + &globalChannelTableDSHParams, + NULL); + + /* Store handles in shared memory for other backends to use */ + asyncQueueControl->globalChannelTableDSA = dsa_get_handle(globalChannelDSA); + asyncQueueControl->globalChannelTableDSH = + dshash_get_hash_table_handle(globalChannelTable); + } + else if (!globalChannelTable) + { + /* Attach to existing dynamic shared hash table */ + globalChannelDSA = dsa_attach(asyncQueueControl->globalChannelTableDSA); + dsa_pin_mapping(globalChannelDSA); + globalChannelTable = dshash_attach(globalChannelDSA, + &globalChannelTableDSHParams, + asyncQueueControl->globalChannelTableDSH, + NULL); + } + + MemoryContextSwitchTo(oldcontext); + LWLockRelease(NotifyQueueLock); +} + +/* + * initLocalChannelTable + * Lazy initialization of the local channel table. + * Once created, this table lasts for the life of the session. + */ +static void +initLocalChannelTable(void) +{ + HASHCTL hash_ctl; + + /* Quick exit if we already did this */ + if (localChannelTable != NULL) + return; + + /* Initialize local hash table for this backend's listened channels */ + hash_ctl.keysize = NAMEDATALEN; + hash_ctl.entrysize = sizeof(ChannelName); + + localChannelTable = + hash_create("Local Listen Channels", + 64, + &hash_ctl, + HASH_ELEM | HASH_STRINGS); +} + +/* + * initPendingListenActions + * Lazy initialization of the pending listen actions hash table. + * This is allocated in CurTransactionContext during PreCommit_Notify, + * and destroyed at transaction end. + */ +static void +initPendingListenActions(void) +{ + HASHCTL hash_ctl; + + if (pendingListenActions != NULL) + return; + + hash_ctl.keysize = NAMEDATALEN; + hash_ctl.entrysize = sizeof(PendingListenEntry); + hash_ctl.hcxt = CurTransactionContext; + + pendingListenActions = + hash_create("Pending Listen Actions", + list_length(pendingActions->actions), + &hash_ctl, + HASH_ELEM | HASH_STRINGS | HASH_CONTEXT); ++======= + * initSignalArrays + * Lazy initialization of the signal arrays. + */ + static void + initSignalArrays(void) + { + MemoryContext oldcontext; + + if (notifySignalProcs != NULL) + return; + + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + + if (notifySignalPids == NULL) + notifySignalPids = (int32 *) palloc(MaxBackends * sizeof(int32)); + notifySignalProcs = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber)); + + MemoryContextSwitchTo(oldcontext); ++>>>>>>> theirs } /* @@@ -2259,122 -1616,51 +2294,138 @@@ SignalBackends(void { int count; + /* Can't get here without PreCommit_Notify having made the global table */ + Assert(globalChannelTable != NULL); + + /* It should have set up these arrays, too */ + Assert(signalPids != NULL && signalProcnos != NULL); + /* * Identify backends that we need to signal. We don't want to send ++<<<<<<< ours + * signals while holding the NotifyQueueLock, so this part just builds a + * list of target PIDs in signalPids[] and signalProcnos[]. ++======= + * signals while holding the NotifyQueueLock, so this loop just builds a + * list of target PIDs. ++>>>>>>> theirs */ count = 0; LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + + /* Scan each channel name that we notified in this transaction */ + foreach_ptr(char, channel, pendingNotifies->uniqueChannelNames) + { + GlobalChannelKey key; + GlobalChannelEntry *entry; + ListenerEntry *listeners; + + GlobalChannelKeyInit(&key, MyDatabaseId, channel); + entry = dshash_find(globalChannelTable, &key, false); + if (entry == NULL) + continue; /* nobody is listening */ + + listeners = (ListenerEntry *) dsa_get_address(globalChannelDSA, + entry->listenersArray); + + /* Identify listeners that now need waking, add them to arrays */ + for (int j = 0; j < entry->numListeners; j++) + { + ProcNumber i; + int32 pid; + QueuePosition pos; + + if (!listeners[j].listening) + continue; /* ignore not-yet-committed listeners */ + + i = listeners[j].procNo; + + if (QUEUE_BACKEND_WAKEUP_PENDING(i)) + continue; /* already signaled, no need to repeat */ + + pid = QUEUE_BACKEND_PID(i); + pos = QUEUE_BACKEND_POS(i); + + if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD)) + continue; /* it's fully caught up already */ + + Assert(pid != InvalidPid); + + QUEUE_BACKEND_WAKEUP_PENDING(i) = true; + signalPids[count] = pid; + signalProcnos[count] = i; + count++; + } + + dshash_release_lock(globalChannelTable, entry); + } + + /* + * Scan all listeners. Any that are not already pending wakeup must not + * be interested in our notifications (else we'd have set their wakeup + * flags above). Check to see if we can directly advance their queue + * pointers to save a wakeup. Otherwise, if they are far behind, wake + * them anyway so they will catch up. + */ for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) { - int32 pid = QUEUE_BACKEND_PID(i); + int32 pid; QueuePosition pos; - Assert(pid != InvalidPid); + if (QUEUE_BACKEND_WAKEUP_PENDING(i)) + continue; + + /* If it's currently advancing, we should not touch it */ + if (QUEUE_BACKEND_IS_ADVANCING(i)) + continue; + + pid = QUEUE_BACKEND_PID(i); pos = QUEUE_BACKEND_POS(i); - if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId) + + /* + * We can directly advance the other backend's queue pointer if it's + * not currently advancing (else there are race conditions), and its + * current pointer is not behind queueHeadBeforeWrite (else we'd make + * it miss some older messages), and we'd not be moving the pointer + * backward. + */ + if (!QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite) && + QUEUE_POS_PRECEDES(pos, queueHeadAfterWrite)) { - /* - * Always signal listeners in our own database, unless they're - * already caught up (unlikely, but possible). - */ - if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD)) - continue; + /* We can directly advance its pointer past what we wrote */ + QUEUE_BACKEND_POS(i) = queueHeadAfterWrite; } - else + else if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), + QUEUE_POS_PAGE(pos)) >= QUEUE_CLEANUP_DELAY) { - /* - * Listeners in other databases should be signaled only if they - * are far behind. - */ - if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), - QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY) - continue; + /* It's idle and far behind, so wake it up */ + Assert(pid != InvalidPid); + + QUEUE_BACKEND_WAKEUP_PENDING(i) = true; + signalPids[count] = pid; + signalProcnos[count] = i; + count++; } ++<<<<<<< ours ++======= + /* OK, need to signal this one */ + notifySignalPids[count] = pid; + notifySignalProcs[count] = i; + count++; ++>>>>>>> theirs } + LWLockRelease(NotifyQueueLock); /* Now send signals */ for (int i = 0; i < count; i++) { ++<<<<<<< ours + int32 pid = signalPids[i]; ++======= + int32 pid = notifySignalPids[i]; ++>>>>>>> theirs /* * If we are signaling our own process, no need to involve the kernel; @@@ -2392,9 -1678,10 +2443,16 @@@ * NotifyQueueLock; which is unlikely but certainly possible. So we * just log a low-level debug message if it happens. */ ++<<<<<<< ours + if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, signalProcnos[i]) < 0) + elog(DEBUG3, "could not signal backend with PID %d: %m", pid); + } ++======= + if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, notifySignalProcs[i]) < 0) + elog(DEBUG3, "could not signal backend with PID %d: %m", pid); + } + ++>>>>>>> theirs } /*