Skip to content

Conversation

@lss602726449
Copy link
Contributor

Fix issue #1459

What does this PR do?

Type of Change

  • Bug fix (non-breaking change)
  • New feature (non-breaking change)
  • Breaking change (fix or feature with breaking changes)
  • Documentation update

Breaking Changes

Test Plan

  • Unit tests added/updated
  • Integration tests added/updated
  • Passed make installcheck
  • Passed make -C src/test installcheck-cbdb-parallel

Impact

Performance:

User-facing changes:

Dependencies:

Checklist

Additional Context

CI Skip Instructions


oracleloyall and others added 3 commits January 21, 2026 11:33
* The primary goal is to address various issues currently encountered during concurrent processes,
such as excessive motion retries, congestion, retransmission storms, and network skew.
The code addresses inefficient network retransmission handling in
unreliable network environments. Specifically:

Fixed Timeout Thresholds: Traditional TCP-style Retransmission Timeout
(RTTVAR.RTO) calculations may be too rigid for networks with volatile
latency (e.g., satellite links, wireless networks). This leads to:
• Premature Retransmissions: Unnecessary data resends during temporary
latency spikes, wasting bandwidth.
• Delayed Recovery: Slow reaction to actual packet loss when RTO is
overly conservative.

Lack of Context Awareness: Static RTO ignores real-time network behavior
patterns, reducing throughput and responsiveness.

Solution: Dynamic Timeout Threshold Adjustment
Implements an adaptive timeout mechanism to optimize retransmission:
if (now < (curBuf->sentTime + conn->rttvar.rto)) {
uint32_t diff = (curBuf->sentTime + conn->rttvar.rto) - now;
// ... (statistical tracking and threshold adjustment)
}

Key Components:
• Statistical Tracking:
\- min/max: Tracks observed minimum/maximum residual time (time
left until RTO expiry).
\- retrans_count/no_retrans_count: Counts retransmission vs.
non-retransmission events.

• Weighted Threshold Calculation:
unack_queue_ring.time_difference = (uint32_t)(
unack_queue_ring.max * weight_no_retrans +
unack_queue_ring.min * weight_retrans
);
Weights derived from historical ratios of retransmissions
(weight_retrans) vs. successful deliveries (weight_no_retrans).

How It Solves the Problem:
• Temporary Latency Spike: Uses max (conservative) to avoid false
retransmits, reducing bandwidth waste (vs. traditional mistaken
retransmissions).
• Persistent Packet Loss: Prioritizes min (aggressive) via
weight_retrans, accelerating recovery (vs. slow fixed-RTO reaction).
• Stable Network: Balances weights for equilibrium throughput (vs.
static RTO limitations).

EstimateRTT - Dynamically estimates the Round-Trip Time (RTT) and adjusts Retransmission Timeout (RTO)

This function implements a variant of the Jacobson/Karels algorithm for RTT estimation, adapted for UDP-based
motion control connections. It updates smoothed RTT (srtt), mean deviation (mdev), and RTO values based on
newly measured RTT samples (mrtt). The RTO calculation ensures reliable data transmission over unreliable networks.

Key Components:

* srtt:   Smoothed Round-Trip Time (weighted average of historical RTT samples)
* mdev:   Mean Deviation (measure of RTT variability)
* rttvar: Adaptive RTT variation bound (used to clamp RTO updates)
* rto:    Retransmission Timeout (dynamically adjusted based on srtt + rttvar)

Algorithm Details:

1. For the first RTT sample:
   srtt    = mrtt << 3   (scaled by 8 for fixed-point arithmetic)
   mdev    = mrtt << 1   (scaled by 2)
   rttvar  = max(mdev, rto_min)
2. For subsequent samples:
   Delta   = mrtt - (srtt >> 3)  (difference between new sample and smoothed RTT)
   srtt   += Delta               (update srtt with 1/8 weight of new sample)
   Delta   = abs(Delta) - (mdev >> 2)
   mdev   += Delta               (update mdev with 1/4 weight)
3. rttvar bounds the maximum RTT variation:
   If mdev > mdev_max, update mdev_max and rttvar
   On new ACKs (snd_una > rtt_seq), decay rttvar toward mdev\_max
4. Final RTO calculation:
   rto = (srtt >> 3) + rttvar   (clamped to RTO_MAX)

Network Latency Filtering and RTO Optimization

This logic mitigates RTO distortion caused by non-network delays in database
execution pipelines. Key challenges addressed:

* Operator processing delays (non-I/O wait) inflate observed ACK times
* Spurious latency amplification in lossy networks triggers excessive RTO_MAX waits
* Congestion collapse from synchronized retransmissions

Core Mechanisms:

1. Valid RTT Sampling Filter:
   Condition: 4 * (pkt->recv_time - pkt->send_time) > ackTime && pkt->retry_times != Gp_interconnect_min_retries_before_timeout
   Rationale:

   * Filters packets exceeding 2x expected round-trips (4x one-way)
   * Excludes artificial retries (retry_times=Gp_interconnect_min_retries_before_timeout) to avoid sampling bias
     Action: Update RTT estimation only with valid samples via EstimateRTT()

2. Randomized Backoff:
   Condition: buf->nRetry > 0
   Algorithm:
   rto += (rto >> (4 * buf->nRetry))
   Benefits:

   * Exponential decay: Shifts create geometrically decreasing increments
   * Connection-specific randomization: Prevents global synchronization
   * Dynamic scaling: Adapts to retry depth (nRetry)

3. Timer List Management (NEW_TIMER):
   Operations:
   RemoveFromRTOList(&mudp, bufConn) → Detaches from monitoring
   AddtoRTOList(\&mudp, bufConn)       → Reinserts with updated rto
   Purpose: Maintains real-time ordering of expiration checks

We conducted multiple full-scale TPCDS benchmarks using both a single physical machine with 48 nodes and four physical machines with 96 nodes, testing with MTU values of 1500 and 9000.
In the single-node environment with no network bottlenecks, there were no significant performance differences between using MTU 1500 and 9000. In the 96-node environment, under single-threaded execution, there were no significant performance differences.
However, under multi-threaded execution (4 threads), SQL statements with a high percentage of data movement showed significant performance variations, ranging from 5 to 10 times, especially with MTU 1500.

* Cleaning up the code

---------

Co-authored-by: zhaoxi <oracleloyal@gmail.com>
Co-authored-by: zhaoxi <zhaoxi@hashdata.cn>
…g in cloudberry may encounter specific anomalies related to Motion layer UDP communication. Below are four key scenarios and how the code modifications address them.

Four Anomaly Scenarios

1. Capacity Mismatch:
   The receiving end’s buffer becomes full, but the sender is unaware. As a result, the sender’s unacknowledged packet queue continues transmitting, leading to unnecessary retransmissions and packet drops.

2. False Deadlock Detection:
   The peer node processes heartbeat packets but fails to free up buffer capacity. This triggers a false deadlock judgment, incorrectly flagging network anomalies.

3. Unprocessed Packets Require Main Thread Wakeup:
   When the receive queue is full, incoming data packets are discarded. However, the main thread still needs to be awakened to process backlogged packets in the queue, preventing permanent stalling.

4. Execution Time Mismatch Across Nodes:
   Issues like data skew, computational performance gaps, or I/O bottlenecks cause significant differences in execution time between nodes. For example, in a hash join, if the inner table’s  is not ready, the node cannot process data from other nodes, leading to packet timeouts.
   *Example Plan*: Packets from  to  (via ) timeout because the  in  remains unready, blocking packet processing.

Code Modifications and Their Impact

The code changes target the above scenarios by enhancing UDP communication feedback, adjusting deadlock checks, and ensuring proper thread wakeup. Key modifications:

1. Addressing Capacity Mismatch:
   - Added  (256) to flag when the receive buffer is full.
   - When the receive queue is full (), a response with  is sent to the sender (). This notifies the sender to pause or adjust transmission, preventing blind retransmissions.

2. Fixing False Deadlock Detection:
   - Modified  to accept  as a parameter, enabling ACK polling during deadlock checks.
   - Extended the initial timeout for deadlock suspicion from  to 600 seconds, reducing premature network error reports.
   - If no response is received after 600 seconds, the buffer capacity is incrementally increased () to alleviate false bottlenecks, with detailed logging before triggering an error.

3. Ensuring Main Thread Wakeup on Full Queue:
   - In , even when packets are dropped due to a full queue, the main thread is awakened () if the packet matches the waiting query/node/route. This ensures backlogged packets in the queue are processed.

4. Mitigating Node Execution Mismatches:
   - Added logging for retransmissions after  attempts, providing visibility into prolonged packet delays (e.g., due to unready ).
   - Reset  after successful ACK polling, preventing excessive retry counts from triggering false timeouts.
…ibute motion

In the `GetMotionSentRecordTypmod` method, `MotionConn` is not call the CAST function,
but directly accessed the object by index, which causes an invalid read inside array.
@my-ship-it
Copy link
Contributor

It seems the PR is to fix the failure in the 2X_STABLE branch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants