diff --git a/contrib/interconnect/ic_common.c b/contrib/interconnect/ic_common.c index d629eb09698..7e266b69efb 100644 --- a/contrib/interconnect/ic_common.c +++ b/contrib/interconnect/ic_common.c @@ -541,14 +541,15 @@ GetMotionSentRecordTypmod(ChunkTransportState * transportStates, int16 motNodeID, int16 targetRoute) { - MotionConn *conn; + MotionConn *conn = NULL; ChunkTransportStateEntry *pEntry = NULL; getChunkTransportState(transportStates, motNodeID, &pEntry); - if (targetRoute == BROADCAST_SEGIDX) - conn = &pEntry->conns[0]; - else - conn = &pEntry->conns[targetRoute]; + if (targetRoute == BROADCAST_SEGIDX) { + targetRoute = 0; + } + + getMotionConn(pEntry, targetRoute, &conn); return &conn->sent_record_typmod; } diff --git a/contrib/interconnect/ic_internal.h b/contrib/interconnect/ic_internal.h index 77c08ee6e29..8557115cd33 100644 --- a/contrib/interconnect/ic_internal.h +++ b/contrib/interconnect/ic_internal.h @@ -10,6 +10,11 @@ */ #ifndef INTER_CONNECT_INTERNAL_H #define INTER_CONNECT_INTERNAL_H +#include +#include +#include +#include +#include #include "tcp/ic_tcp.h" #include "udp/ic_udpifc.h" @@ -33,6 +38,27 @@ typedef enum MotionConnState mcsEosSent } MotionConnState; +struct udp_send_vars +{ + /* send sequence variables */ + uint32_t snd_una; /* send unacknoledged */ + uint32_t snd_wnd; /* send window (unscaled) */ + + /* retransmission timeout variables */ + uint8_t nrtx; /* number of retransmission */ + uint8_t max_nrtx; /* max number of retransmission */ + uint32_t rto; /* retransmission timeout */ + uint32_t ts_rto; /* timestamp for retransmission timeout */ + + /* congestion control variables */ + uint32_t cwnd; /* congestion window */ + uint32_t ssthresh; /* slow start threshold */ + + TAILQ_ENTRY(MotionConnUDP) send_link; + TAILQ_ENTRY(MotionConnUDP) timer_link; /* timer link (rto list) */ + +}; + /* * Structure used for keeping track of a pt-to-pt connection between two * Cdb Entities (either QE or QD). @@ -153,6 +179,32 @@ typedef struct MotionConnUDP uint64 stat_count_resent; uint64 stat_max_resent; uint64 stat_count_dropped; + + struct { + uint32_t ts_rto; + uint32_t rto; + uint32_t srtt; + uint32_t rttvar; + uint32_t snd_una; + uint16_t nrtx; + uint16_t max_nrtx; + uint32_t mss; + uint32_t cwnd; + uint32_t ssthresh; + uint32_t fss; + uint8_t loss_count; + uint32_t mdev; + uint32_t mdev_max; + uint32_t rtt_seq; /* sequence number to update rttvar */ + uint32_t ts_all_rto; + bool karn_mode; + } rttvar; + + uint8_t on_timewait_list; + int16_t on_rto_idx; + + uint32_t snd_nxt; /* send next */ + struct udp_send_vars sndvar; } MotionConnUDP; typedef struct MotionConnTCP diff --git a/contrib/interconnect/test/ic_test_env.c b/contrib/interconnect/test/ic_test_env.c index 5333a143de5..1c9f2d0ce05 100644 --- a/contrib/interconnect/test/ic_test_env.c +++ b/contrib/interconnect/test/ic_test_env.c @@ -330,6 +330,7 @@ client_side_global_var_init(MotionIPCLayer * motion_ipc_layer, pid_t *ic_proxy_p Gp_interconnect_queue_depth = 800; Gp_interconnect_snd_queue_depth = 600; + Gp_interconnect_mem_size = 20; Gp_interconnect_timer_period = 1; Gp_interconnect_timer_checking_period = 2; InitializeLatchSupport(); @@ -374,6 +375,7 @@ server_side_global_var_init(MotionIPCLayer * motion_ipc_layer, pid_t *ic_proxy_p Gp_interconnect_queue_depth = 800; Gp_interconnect_snd_queue_depth = 600; + Gp_interconnect_mem_size = 20; Gp_interconnect_timer_period = 1; Gp_interconnect_timer_checking_period = 2; InitializeLatchSupport(); diff --git a/contrib/interconnect/udp/ic_udpifc.c b/contrib/interconnect/udp/ic_udpifc.c index 63e8c9301dd..0a5d9bdfd08 100644 --- a/contrib/interconnect/udp/ic_udpifc.c +++ b/contrib/interconnect/udp/ic_udpifc.c @@ -26,13 +26,17 @@ #include "ic_udpifc.h" #include "ic_internal.h" #include "ic_common.h" - #include #include #include #include #include #include +#include +#include +#include +#include +#include #include "access/transam.h" #include "access/xact.h" @@ -116,6 +120,57 @@ WSAPoll( #undef select #endif +#define TIMEOUT_Z +#define RTT_SHIFT_ALPHA (3) /* srtt (0.125) */ +#define LOSS_THRESH (3) /* Packet loss triggers Karn */ +#define RTO_MIN (5000) /* MIN RTO(ms) */ +#define RTO_MAX (100000) /* MAX RTO(ms) */ +#define UDP_INFINITE_SSTHRESH 0x7fffffff + +#define SEC_TO_USEC(t) ((t) * 1000000) +#define SEC_TO_MSEC(t) ((t) * 1000) +#define MSEC_TO_USEC(t) ((t) * 1000) +#define USEC_TO_SEC(t) ((t) / 1000000) +#define TIME_TICK (1000000/HZ)/* in us */ + +#define UDP_INITIAL_RTO (MSEC_TO_USEC(200)) +#define UDP_DEFAULT_MSS 1460 + +#define RTO_HASH (3000) + +#define UDP_SEQ_LT(a,b) ((int32_t)((a)-(b)) < 0) +#define UDP_SEQ_LEQ(a,b) ((int32_t)((a)-(b)) <= 0) +#define UDP_SEQ_GT(a,b) ((int32_t)((a)-(b)) > 0) +#define UDP_SEQ_GEQ(a,b) ((int32_t)((a)-(b)) >= 0) + +#ifndef MAX +#define MAX(a, b) ((a)>(b)?(a):(b)) +#endif +#ifndef MIN +#define MIN(a, b) ((a)<(b)?(a):(b)) +#endif + +#define UDP_RTO_MIN ((unsigned)(HZ/5)) + +struct rto_hashstore +{ + uint32_t rto_now_idx; /* pointing the hs_table_s index */ + uint32_t rto_now_ts; + + TAILQ_HEAD(rto_head, MotionConnUDP) rto_list[RTO_HASH + 1]; +}; + +struct mudp_manager +{ + struct rto_hashstore *rto_store; /* lists related to timeout */ + + int rto_list_cnt; + uint32_t cur_ts; +}; + +typedef struct mudp_manager* mudp_manager_t; +static struct mudp_manager mudp; + #define MAX_TRY (11) int timeoutArray[] = @@ -154,6 +209,7 @@ int #define UDPIC_FLAGS_DISORDER (32) #define UDPIC_FLAGS_DUPLICATE (64) #define UDPIC_FLAGS_CAPACITY (128) +#define UDPIC_FLAGS_FULL (256) /* * ConnHtabBin @@ -516,8 +572,10 @@ static ICGlobalControlInfo ic_control_info; */ #define UNACK_QUEUE_RING_SLOTS_NUM (2000) #define TIMER_SPAN (Gp_interconnect_timer_period * 1000ULL) /* default: 5ms */ -#define TIMER_CHECKING_PERIOD (Gp_interconnect_timer_checking_period) /* default: 20ms */ +#define TIMER_SPAN_LOSS (Gp_interconnect_timer_period * 500ULL) /* default: 5ms */ +#define TIMER_CHECKING_PERIOD Gp_interconnect_timer_checking_period /* default: 20ms */ #define UNACK_QUEUE_RING_LENGTH (UNACK_QUEUE_RING_SLOTS_NUM * TIMER_SPAN) +#define UNACK_QUEUE_RING_LENGTH_LOSS (UNACK_QUEUE_RING_SLOTS_NUM * TIMER_SPAN_LOSS) #define DEFAULT_RTT (Gp_interconnect_default_rtt * 1000) /* default: 20ms */ #define MIN_RTT (100) /* 0.1ms */ @@ -537,6 +595,7 @@ static ICGlobalControlInfo ic_control_info; #define MAX_SEQS_IN_DISORDER_ACK (4) +#define MAX_QUEUE_SIZE (64) /* * UnackQueueRing * @@ -573,12 +632,19 @@ struct UnackQueueRing /* time slots */ ICBufferList slots[UNACK_QUEUE_RING_SLOTS_NUM]; +#ifdef TIMEOUT_Z + uint32_t retrans_count; + uint32_t no_retrans_count; + uint32_t time_difference; + uint32_t min; + uint32_t max; +#endif }; /* * All connections in a process share this unack queue ring instance. */ -static UnackQueueRing unack_queue_ring = {0, 0, 0}; +static UnackQueueRing unack_queue_ring = {0}; static int ICSenderSocket = -1; static int32 ICSenderPort = 0; @@ -746,8 +812,8 @@ static void checkQDConnectionAlive(void); static void *rxThreadFunc(void *arg); static bool handleMismatch(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len); -static void handleAckedPacket(MotionConn *ackConn, ICBuffer *buf, uint64 now); -static bool handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry); +static void handleAckedPacket(MotionConn *ackConn, ICBuffer *buf, uint64 now, struct icpkthdr *pkt); +static bool handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry, bool need_flush); static void handleStopMsgs(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry, int16 motionId); static void handleDisorderPacket(MotionConn *conn, int pos, uint32 tailSeq, icpkthdr *pkt); static bool handleDataPacket(MotionConn *conn, icpkthdr *pkt, struct sockaddr_storage *peer, socklen_t *peerlen, AckSendParam *param, bool *wakeup_mainthread); @@ -766,9 +832,11 @@ static void initSndBufferPool(); static void putIntoUnackQueueRing(UnackQueueRing *uqr, ICBuffer *buf, uint64 expTime, uint64 now); static void initUnackQueueRing(UnackQueueRing *uqr); +static void initUdpManager(mudp_manager_t mptr); +static inline void checkNetworkTimeout(ICBuffer *buf, uint64 now, bool *networkTimeoutIsLogged); static void checkExpiration(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *triggerConn, uint64 now); -static void checkDeadlock(ChunkTransportStateEntry *pChunkEntry, MotionConn *conn); +static void checkDeadlock(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry, MotionConn *conn); static bool cacheFuturePacket(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len); static void cleanupStartupCache(void); @@ -924,6 +992,349 @@ dumpTransProtoStats() #endif /* TRANSFER_PROTOCOL_STATS */ +static struct rto_hashstore* +initRTOHashstore() +{ + int i; + struct rto_hashstore* hs = palloc(sizeof(struct rto_hashstore)); + + for (i = 0; i < RTO_HASH; i++) + TAILQ_INIT(&hs->rto_list[i]); + + TAILQ_INIT(&hs->rto_list[RTO_HASH]); + + return hs; +} + +static void +initUdpManager(mudp_manager_t mudp) +{ + mudp->rto_store = initRTOHashstore(); + mudp->rto_list_cnt = 0; + mudp->cur_ts = 0; +} + +static inline void +addtoRTOList(mudp_manager_t mudp, MotionConnUDP *cur_stream) +{ + if (!mudp->rto_list_cnt) + { + mudp->rto_store->rto_now_idx = 0; + mudp->rto_store->rto_now_ts = cur_stream->sndvar.ts_rto; + } + + if (cur_stream->on_rto_idx < 0 ) + { + if (cur_stream->on_timewait_list) + return; + + int diff = (int32_t)(cur_stream->sndvar.ts_rto - mudp->rto_store->rto_now_ts); + if (diff < RTO_HASH) + { + int offset= (diff + mudp->rto_store->rto_now_idx) % RTO_HASH; + cur_stream->on_rto_idx = offset; + TAILQ_INSERT_TAIL(&(mudp->rto_store->rto_list[offset]), + cur_stream, sndvar.timer_link); + } + else + { + cur_stream->on_rto_idx = RTO_HASH; + TAILQ_INSERT_TAIL(&(mudp->rto_store->rto_list[RTO_HASH]), + cur_stream, sndvar.timer_link); + } + mudp->rto_list_cnt++; + } +} + +static inline void +removeFromRTOList(mudp_manager_t mudp, + MotionConnUDP *cur_stream) +{ + if (cur_stream->on_rto_idx < 0) + return; + + TAILQ_REMOVE(&mudp->rto_store->rto_list[cur_stream->on_rto_idx], + cur_stream, sndvar.timer_link); + cur_stream->on_rto_idx = -1; + + mudp->rto_list_cnt--; +} + +static inline void +updateRetransmissionTimer(mudp_manager_t mudp, + MotionConnUDP *cur_stream, + uint32_t cur_ts) +{ + cur_stream->sndvar.nrtx = 0; + + /* if in rto list, remove it */ + if (cur_stream->on_rto_idx >= 0) + removeFromRTOList(mudp, cur_stream); + + /* Reset retransmission timeout */ + if (UDP_SEQ_GT(cur_stream->snd_nxt, cur_stream->sndvar.snd_una)) + { + /* there are packets sent but not acked */ + /* update rto timestamp */ + cur_stream->sndvar.ts_rto = cur_ts + cur_stream->sndvar.rto; + addtoRTOList(mudp, cur_stream); + } + + if (cur_stream->on_rto_idx == -1) + { + cur_stream->sndvar.ts_rto = cur_ts + cur_stream->sndvar.rto; + addtoRTOList(mudp, cur_stream); + } +} + +static int +handleRTO(mudp_manager_t mudp, + uint32_t cur_ts, + MotionConnUDP *cur_stream, + ChunkTransportState *transportStates, + ChunkTransportStateEntry *pEntry, + MotionConn *triggerConn) +{ + /* check for expiration */ + int count = 0; + int retransmits = 0; + MotionConnUDP *currBuffConn = NULL; + uint32_t now = cur_ts; + + Assert(unack_queue_ring.currentTime != 0); + removeFromRTOList(mudp, cur_stream); + + while (now >= (unack_queue_ring.currentTime + TIMER_SPAN) && count++ < UNACK_QUEUE_RING_SLOTS_NUM) + { + /* expired, need to resend them */ + ICBuffer *curBuf = NULL; + + while ((curBuf = icBufferListPop(&unack_queue_ring.slots[unack_queue_ring.idx])) != NULL) + { + curBuf->nRetry++; + putIntoUnackQueueRing( + &unack_queue_ring, + curBuf, + computeExpirationPeriod(curBuf->conn, curBuf->nRetry), now); + +#ifdef TRANSFER_PROTOCOL_STATS + updateStats(TPE_DATA_PKT_SEND, curBuf->conn, curBuf->pkt); +#endif + + sendOnce(transportStates, pEntry, curBuf, curBuf->conn); + + currBuffConn = CONTAINER_OF(curBuf->conn, MotionConnUDP, mConn); + + retransmits++; + ic_statistics.retransmits++; + currBuffConn->stat_count_resent++; + currBuffConn->stat_max_resent = Max(currBuffConn->stat_max_resent, currBuffConn->stat_count_resent); + checkNetworkTimeout(curBuf, now, &transportStates->networkTimeoutIsLogged); + +#ifdef AMS_VERBOSE_LOGGING + write_log("RESEND pkt with seq %d (retry %d, rtt " UINT64_FORMAT ") to route %d", + curBuf->pkt->seq, curBuf->nRetry, curBuf->conn->rtt, curBuf->conn->route); + logPkt("RESEND PKT in checkExpiration", curBuf->pkt); +#endif + } + + unack_queue_ring.currentTime += TIMER_SPAN; + unack_queue_ring.idx = (unack_queue_ring.idx + 1) % (UNACK_QUEUE_RING_SLOTS_NUM); + } + return 0; +} + +static inline void +rearrangeRTOStore(mudp_manager_t mudp) +{ + MotionConnUDP *walk, *next; + struct rto_head* rto_list = &mudp->rto_store->rto_list[RTO_HASH]; + int cnt = 0; + + for (walk = TAILQ_FIRST(rto_list); walk != NULL; walk = next) + { + next = TAILQ_NEXT(walk, sndvar.timer_link); + + int diff = (int32_t)(mudp->rto_store->rto_now_ts - walk->sndvar.ts_rto); + if (diff < RTO_HASH) + { + int offset = (diff + mudp->rto_store->rto_now_idx) % RTO_HASH; + TAILQ_REMOVE(&mudp->rto_store->rto_list[RTO_HASH], + walk, sndvar.timer_link); + walk->on_rto_idx = offset; + TAILQ_INSERT_TAIL(&(mudp->rto_store->rto_list[offset]), + walk, sndvar.timer_link); + } + cnt++; + } +} + +static inline void +checkRtmTimeout(mudp_manager_t mudp, + uint32_t cur_ts, + int thresh, + ChunkTransportState *transportStates, + ChunkTransportStateEntry *pEntry, + MotionConn *triggerConn) +{ + MotionConnUDP *walk, *next; + struct rto_head* rto_list; + int cnt; + + if (!mudp->rto_list_cnt) + return; + + cnt = 0; + + while (1) + { + rto_list = &mudp->rto_store->rto_list[mudp->rto_store->rto_now_idx]; + if ((int32_t)(cur_ts - mudp->rto_store->rto_now_ts) < 0) + break; + + for (walk = TAILQ_FIRST(rto_list); walk != NULL; walk = next) + { + if (++cnt > thresh) + break; + next = TAILQ_NEXT(walk, sndvar.timer_link); + + if (walk->on_rto_idx >= 0) + { + TAILQ_REMOVE(rto_list, walk, sndvar.timer_link); + mudp->rto_list_cnt--; + walk->on_rto_idx = -1; + handleRTO(mudp, cur_ts, walk, transportStates, pEntry, triggerConn); + } + } + + if (cnt > thresh) + { + break; + } + else + { + mudp->rto_store->rto_now_idx = (mudp->rto_store->rto_now_idx + 1) % RTO_HASH; + mudp->rto_store->rto_now_ts++; + if (!(mudp->rto_store->rto_now_idx % 1000)) + rearrangeRTOStore(mudp); + } + + } +} + +/* + * estimateRTT - Dynamically estimates the Round-Trip Time (RTT) and adjusts Retransmission Timeout (RTO) + * + * This function implements a variant of the Jacobson/Karels algorithm for RTT estimation, adapted for UDP-based + * motion control connections. It updates smoothed RTT (srtt), mean deviation (mdev), and RTO values based on + * newly measured RTT samples (mrtt). The RTO calculation ensures reliable data transmission over unreliable networks. + * + * Key Components: + * - srtt: Smoothed Round-Trip Time (weighted average of historical RTT samples) + * - mdev: Mean Deviation (measure of RTT variability) + * - rttvar: Adaptive RTT variation bound (used to clamp RTO updates) + * - rto: Retransmission Timeout (dynamically adjusted based on srtt + rttvar) + * + * Algorithm Details: + * 1. For the first RTT sample: + * srtt = mrtt << 3 (scaled by 8 for fixed-point arithmetic) + * mdev = mrtt << 1 (scaled by 2) + * rttvar = max(mdev, rto_min) + * 2. For subsequent samples: + * Delta = mrtt - (srtt >> 3) (difference between new sample and smoothed RTT) + * srtt += Delta (update srtt with 1/8 weight of new sample) + * Delta = abs(Delta) - (mdev >> 2) + * mdev += Delta (update mdev with 1/4 weight) + * 3. rttvar bounds the maximum RTT variation: + * If mdev > mdev_max, update mdev_max and rttvar + * On new ACKs (snd_una > rtt_seq), decay rttvar toward mdev_max + * 4. Final RTO calculation: + * rto = (srtt >> 3) + rttvar (clamped to RTO_MAX) + * + * Parameters: + * @mConn: Parent motion connection context (container of MotionConnUDP) + * @mrtt: Measured Round-Trip Time (in microseconds) for the latest packet + * + * Notes: + * - Designed for non-retransmitted packets to avoid sampling bias. + * - Uses fixed-point arithmetic to avoid floating-point operations. + * - Minimum RTO (rto_min) is set to 20ms (HZ/5/10, assuming HZ=100). + * - Critical for adaptive timeout control in UDP protocols where reliability is implemented at the application layer. + * - Thread-unsafe: Must be called in a synchronized context (e.g., packet processing loop). + */ + +static inline void +estimateRTT(MotionConn *mConn , uint32_t mrtt) +{ + /* This function should be called for not retransmitted packets */ + /* TODO: determine rto_min */ + MotionConnUDP *conn = NULL; + + conn = CONTAINER_OF(mConn, MotionConnUDP, mConn); + long m = mrtt; + uint32_t rto_min = UDP_RTO_MIN / 10; + + if (m == 0) + m = 1; + + /* + * Special RTO optimization for high-speed networks: + * When measured RTT (m) is below 100 microseconds and current RTO is under 10ms, + * forcibly set RTO to half of RTO_MIN. This targets two scenarios: + * - Loopback interfaces (localhost communication) + * - Ultra-low-latency networks (e.g., InfiniBand, RDMA) + */ + if(m < 100 && conn->rttvar.rto < 10000) + { + conn->rttvar.rto = RTO_MIN / 2; + } + + if (conn->rttvar.srtt != 0) + { + /* rtt = 7/8 rtt + 1/8 new */ + m -= (conn->rttvar.srtt >> LOSS_THRESH); + conn->rttvar.srtt += m; + if (m < 0) + { + m = -m; + m -= (conn->rttvar.mdev >> RTT_SHIFT_ALPHA); + if (m > 0) + m >>= LOSS_THRESH; + } + else + { + m -= (conn->rttvar.mdev >> RTT_SHIFT_ALPHA); + } + conn->rttvar.mdev += m; + if (conn->rttvar.mdev > conn->rttvar.mdev_max) + { + conn->rttvar.mdev_max = conn->rttvar.mdev; + if (conn->rttvar.mdev_max > conn->rttvar.rttvar) + { + conn->rttvar.rttvar = conn->rttvar.mdev_max; + } + } + if (UDP_SEQ_GT(conn->rttvar.snd_una, conn->rttvar.rtt_seq)) + { + if (conn->rttvar.mdev_max < conn->rttvar.rttvar) + { + conn->rttvar.rttvar -= (conn->rttvar.rttvar - conn->rttvar.mdev_max) >> RTT_SHIFT_ALPHA; + } + conn->rttvar.mdev_max = rto_min; + } + } + else + { + /* fresh measurement */ + conn->rttvar.srtt = m << LOSS_THRESH; + conn->rttvar.mdev = m << 1; + conn->rttvar.mdev_max = conn->rttvar.rttvar = MAX(conn->rttvar.mdev, rto_min); + } + + conn->rttvar.rto = ((conn->rttvar.srtt >> LOSS_THRESH) + conn->rttvar.rttvar) > RTO_MAX ? RTO_MAX : ((conn->rttvar.srtt >> LOSS_THRESH) + conn->rttvar.rttvar); +} + + /* * initCursorICHistoryTable * Initialize cursor ic history table. @@ -2522,6 +2933,14 @@ initUnackQueueRing(UnackQueueRing *uqr) { icBufferListInit(&uqr->slots[i], ICBufferListType_Secondary); } + +#ifdef TIMEOUT_Z + uqr->retrans_count = 0; + uqr->no_retrans_count = 0; + uqr->time_difference = 0; + uqr->min = 0; + uqr->max = 0; +#endif } /* @@ -2556,6 +2975,9 @@ computeExpirationPeriod(MotionConn *mConn, uint32 retry) else #endif { + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE) + return Min(retry > 3 ? conn->rttvar.rto * retry : conn->rttvar.rto, UNACK_QUEUE_RING_LENGTH_LOSS); + uint32 factor = (retry <= 12 ? retry : 12); return Max(MIN_EXPIRATION_PERIOD, Min(MAX_EXPIRATION_PERIOD, (conn->rtt + (conn->dev << 2)) << (factor))); @@ -2968,6 +3390,19 @@ setupOutgoingUDPConnection(ChunkTransportState *transportStates, ChunkTransportS conn->mConn.msgSize = sizeof(conn->conn_info); conn->mConn.stillActive = true; conn->conn_info.seq = 1; + conn->rttvar.ts_rto = 0; + conn->rttvar.rto = UDP_INITIAL_RTO; + conn->rttvar.srtt = 0; + conn->rttvar.rttvar = 0; + conn->rttvar.snd_una = 0; + conn->rttvar.nrtx = 0; + conn->rttvar.max_nrtx = 0; + conn->rttvar.mss = UDP_DEFAULT_MSS; + conn->rttvar.cwnd = 2; + conn->rttvar.ssthresh = UDP_INFINITE_SSTHRESH; + conn->rttvar.loss_count = 0; + conn->rttvar.karn_mode = false; + conn->on_rto_idx = -1; Assert(conn->peer.ss_family == AF_INET || conn->peer.ss_family == AF_INET6); } /* setupOutgoingUDPConnection */ @@ -3207,6 +3642,19 @@ SetupUDPIFCInterconnect_Internal(SliceTable *sliceTable) conn->conn_info.icId = sliceTable->ic_instance_id; conn->conn_info.flags = UDPIC_FLAGS_RECEIVER_TO_SENDER; + conn->rttvar.ts_rto = 0; + conn->rttvar.rto = UDP_INITIAL_RTO; + conn->rttvar.srtt = 0; + conn->rttvar.rttvar = 0; + conn->rttvar.snd_una = 0; + conn->rttvar.nrtx = 0; + conn->rttvar.max_nrtx = 0; + conn->rttvar.mss = UDP_DEFAULT_MSS; + conn->rttvar.cwnd = 2; + conn->rttvar.ssthresh = UDP_INFINITE_SSTHRESH; + conn->rttvar.loss_count = 0; + conn->rttvar.karn_mode = false; + conn->on_rto_idx = -1; connAddHash(&ic_control_info.connHtab, &conn->mConn); } } @@ -3221,6 +3669,8 @@ SetupUDPIFCInterconnect_Internal(SliceTable *sliceTable) { initSndBufferPool(&snd_buffer_pool); initUnackQueueRing(&unack_queue_ring); + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_TIMER) + initUdpManager(&mudp); ic_control_info.isSender = true; ic_control_info.lastExpirationCheckTime = getCurrentTime(); ic_control_info.lastPacketSendTime = ic_control_info.lastExpirationCheckTime; @@ -3284,6 +3734,9 @@ static inline void SetupUDPIFCInterconnect(EState *estate) { ChunkTransportState *icContext = NULL; + int32 sliceNum = 0; + int32 calcQueueDepth = 0; + int32 calcSndDepth = 0; PG_TRY(); { /* @@ -3291,6 +3744,39 @@ SetupUDPIFCInterconnect(EState *estate) * technically it is not part of current query, discard it directly. */ resetRxThreadError(); + if (estate != NULL && estate->es_sliceTable != NULL) + sliceNum = estate->es_sliceTable->numSlices; + else + sliceNum = 1; + + if (Gp_interconnect_mem_size > 0 && + Gp_interconnect_queue_depth == 4 && + Gp_interconnect_snd_queue_depth == 2) + { + int32 perQueue = Gp_interconnect_mem_size / + (Gp_max_packet_size * sliceNum); + + calcSndDepth = Max(Gp_interconnect_snd_queue_depth, perQueue / 2); + calcQueueDepth = Max(Gp_interconnect_queue_depth, perQueue - calcSndDepth); + + if (calcSndDepth > MAX_QUEUE_SIZE) + calcSndDepth = MAX_QUEUE_SIZE; + + if (calcQueueDepth > MAX_QUEUE_SIZE) + calcQueueDepth = MAX_QUEUE_SIZE; + + Gp_interconnect_snd_queue_depth = calcSndDepth; + Gp_interconnect_queue_depth = calcQueueDepth; + + elog(DEBUG1, "SetupUDPIFCInterconnect: queue depth, " + "queue_depth=%d, snd_queue_depth=%d, " + "mem_size=%d, slices=%d, packet_size=%d", + Gp_interconnect_queue_depth, + Gp_interconnect_snd_queue_depth, + Gp_interconnect_mem_size, + sliceNum, + Gp_max_packet_size); + } icContext = SetupUDPIFCInterconnect_Internal(estate->es_sliceTable); @@ -3815,7 +4301,6 @@ static TupleChunkListItem receiveChunksUDPIFC(ChunkTransportState *pTransportStates, ChunkTransportStateEntry *pEntry, int16 motNodeID, int16 *srcRoute, MotionConn *mConn) { - bool directed = false; int nFds = 0; int *waitFds = NULL; int nevent = 0; @@ -3832,7 +4317,6 @@ receiveChunksUDPIFC(ChunkTransportState *pTransportStates, ChunkTransportStateEn if (mConn != NULL) { conn = CONTAINER_OF(mConn, MotionConnUDP, mConn); - directed = true; *srcRoute = conn->route; setMainThreadWaiting(&rx_control_info.mainWaitingState, motNodeID, conn->route, pTransportStates->sliceTable->ic_instance_id); @@ -4472,7 +4956,7 @@ logPkt(char *prefix, icpkthdr *pkt) * packet is retransmitted. */ static void -handleAckedPacket(MotionConn *ackMotionConn, ICBuffer *buf, uint64 now) +handleAckedPacket(MotionConn *ackMotionConn, ICBuffer *buf, uint64 now, struct icpkthdr *pkt) { uint64 ackTime = 0; bool bufIsHead = false; @@ -4485,6 +4969,39 @@ handleAckedPacket(MotionConn *ackMotionConn, ICBuffer *buf, uint64 now) buf = icBufferListDelete(&ackConn->unackQueue, buf); + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE || Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_TIMER) + { + bufConn = CONTAINER_OF(buf->conn, MotionConnUDP, mConn); + buf = icBufferListDelete(&unack_queue_ring.slots[buf->unackQueueRingSlot], buf); + unack_queue_ring.numOutStanding--; + if (icBufferListLength(&ackConn->unackQueue) >= 1) + unack_queue_ring.numSharedOutStanding--; + + ackTime = now - buf->sentTime; + + if (buf->nRetry == 0) + { + /* adjust the congestion control window. */ + if (snd_control_info.cwnd < snd_control_info.ssthresh) + snd_control_info.cwnd += 2; + else + snd_control_info.cwnd += 1 / snd_control_info.cwnd; + snd_control_info.cwnd = Min(snd_control_info.cwnd, snd_buffer_pool.maxCount); + } + + if ((bufConn->rttvar.rto << 1) > ackTime && pkt->retry_times != Gp_interconnect_min_retries_before_timeout) + estimateRTT(buf->conn, (now - pkt->send_time)); + + if (buf->nRetry && pkt->retry_times > 0 && pkt->retry_times < Gp_interconnect_min_retries_before_timeout) + bufConn->rttvar.rto += (bufConn->rttvar.rto >> 4 * buf->nRetry); + + if (unlikely(Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_TIMER)) + { + bufConn->sndvar.ts_rto = bufConn->rttvar.rto; + addtoRTOList(&mudp, bufConn); + } + } + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS) { buf = icBufferListDelete(&unack_queue_ring.slots[buf->unackQueueRingSlot], buf); @@ -4564,7 +5081,7 @@ handleAckedPacket(MotionConn *ackMotionConn, ICBuffer *buf, uint64 now) * if we receive a stop message, return true (caller will clean up). */ static bool -handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry) +handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry, bool need_flush) { ChunkTransportStateEntryUDP * pEntry = NULL; bool ret = false; @@ -4577,7 +5094,6 @@ handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChun struct icpkthdr *pkt = snd_control_info.ackBuffer; - bool shouldSendBuffers = false; SliceTable *sliceTbl = transportStates->sliceTable; @@ -4702,6 +5218,12 @@ handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChun shouldSendBuffers |= (handleAckForDisorderPkt(transportStates, &pEntry->entry, &ackConn->mConn, pkt)); break; } + else if (pkt->flags & UDPIC_FLAGS_FULL) + { + if (DEBUG1 >= log_min_messages) + write_log("Recv buff is full [seq %d] from route %d; srcpid %d dstpid %d cmd %d flags 0x%x connseq %d", pkt->seq, ackConn->route, pkt->srcPid, pkt->dstPid, pkt->icId, pkt->flags, ackConn->conn_info.seq); + break; + } /* * don't get out of the loop if pkt->seq equals to @@ -4751,7 +5273,7 @@ handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChun while (!icBufferListIsHead(&ackConn->unackQueue, link) && buf->pkt->seq <= pkt->seq) { next = link->next; - handleAckedPacket(&ackConn->mConn, buf, now); + handleAckedPacket(&ackConn->mConn, buf, now, pkt); shouldSendBuffers = true; link = next; buf = GET_ICBUFFER_FROM_PRIMARY(link); @@ -4767,7 +5289,7 @@ handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChun * still send here, since in STOP/EOS race case, we may have been * in EOS sending logic and will not check stop message. */ - if (shouldSendBuffers) + if (shouldSendBuffers && need_flush) sendBuffers(transportStates, &pEntry->entry, &ackConn->mConn); } else if (DEBUG1 >= log_min_messages) @@ -5011,7 +5533,7 @@ handleStopMsgs(ChunkTransportState *transportStates, ChunkTransportStateEntry *p { if (pollAcks(transportStates, pEntry->txfd, 0)) { - if (handleAcks(transportStates, &pEntry->entry)) + if (handleAcks(transportStates, &pEntry->entry, true)) { /* more stops found, loop again. */ i = 0; @@ -5053,7 +5575,7 @@ sendBuffers(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEnt { ICBuffer *buf = NULL; - if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS && + if ((Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS || Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE) && (icBufferListLength(&conn->unackQueue) > 0 && unack_queue_ring.numSharedOutStanding >= (snd_control_info.cwnd - snd_control_info.minCwnd))) break; @@ -5074,7 +5596,7 @@ sendBuffers(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEnt icBufferListAppend(&conn->unackQueue, buf); - if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS) + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS || Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE) { unack_queue_ring.numOutStanding++; if (icBufferListLength(&conn->unackQueue) > 1) @@ -5098,6 +5620,10 @@ sendBuffers(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEnt updateStats(TPE_DATA_PKT_SEND, conn, buf->pkt); #endif + struct icpkthdr *pkt_ = buf->pkt; + pkt_->send_time = now; + pkt_->recv_time = 0; + pkt_->retry_times = buf->nRetry; sendOnce(transportStates, pEntry, buf, &conn->mConn); ic_statistics.sndPktNum++; @@ -5245,7 +5771,7 @@ handleAckForDisorderPkt(ChunkTransportState *transportStates, if (buf->pkt->seq == pkt->seq) { - handleAckedPacket(&conn->mConn, buf, now); + handleAckedPacket(&conn->mConn, buf, now, pkt); shouldSendBuffers = true; break; } @@ -5255,7 +5781,7 @@ handleAckForDisorderPkt(ChunkTransportState *transportStates, /* this is a lost packet, retransmit */ buf->nRetry++; - if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS) + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS || Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE) { buf = icBufferListDelete(&unack_queue_ring.slots[buf->unackQueueRingSlot], buf); putIntoUnackQueueRing(&unack_queue_ring, buf, @@ -5284,7 +5810,7 @@ handleAckForDisorderPkt(ChunkTransportState *transportStates, /* remove packet already received. */ next = link->next; - handleAckedPacket(&conn->mConn, buf, now); + handleAckedPacket(&conn->mConn, buf, now, pkt); shouldSendBuffers = true; link = next; buf = GET_ICBUFFER_FROM_PRIMARY(link); @@ -5301,7 +5827,7 @@ handleAckForDisorderPkt(ChunkTransportState *transportStates, lostPktCnt--; } } - if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS) + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS || Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE) { snd_control_info.ssthresh = Max(snd_control_info.cwnd / 2, snd_control_info.minCwnd); snd_control_info.cwnd = snd_control_info.ssthresh; @@ -5354,7 +5880,7 @@ handleAckForDuplicatePkt(MotionConn *mConn, icpkthdr *pkt) while (!icBufferListIsHead(&conn->unackQueue, link) && (buf->pkt->seq <= pkt->extraSeq)) { next = link->next; - handleAckedPacket(&conn->mConn, buf, now); + handleAckedPacket(&conn->mConn, buf, now, pkt); shouldSendBuffers = true; link = next; buf = GET_ICBUFFER_FROM_PRIMARY(link); @@ -5366,7 +5892,7 @@ handleAckForDuplicatePkt(MotionConn *mConn, icpkthdr *pkt) next = link->next; if (buf->pkt->seq == pkt->seq) { - handleAckedPacket(&conn->mConn, buf, now); + handleAckedPacket(&conn->mConn, buf, now, pkt); shouldSendBuffers = true; break; } @@ -5448,55 +5974,230 @@ checkExpiration(ChunkTransportState *transportStates, uint64 now) { /* check for expiration */ - int count = 0; - int retransmits = 0; + int count = 0; + int retransmits = 0; MotionConnUDP *currBuffConn = NULL; Assert(unack_queue_ring.currentTime != 0); - while (now >= (unack_queue_ring.currentTime + TIMER_SPAN) && count++ < UNACK_QUEUE_RING_SLOTS_NUM) + + if (unlikely(Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_TIMER)) { - /* expired, need to resend them */ - ICBuffer *curBuf = NULL; + checkRtmTimeout(&mudp, now, 500, transportStates, pEntry, triggerConn); + return; + } - while ((curBuf = icBufferListPop(&unack_queue_ring.slots[unack_queue_ring.idx])) != NULL) + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE) + { + uint64 timer_span_time = unack_queue_ring.currentTime + TIMER_SPAN_LOSS; + + while (now >= (timer_span_time + unack_queue_ring.time_difference) && count++ < UNACK_QUEUE_RING_SLOTS_NUM) { - curBuf->nRetry++; - putIntoUnackQueueRing( - &unack_queue_ring, - curBuf, - computeExpirationPeriod(curBuf->conn, curBuf->nRetry), now); + /* expired, need to resend them */ + ICBuffer *curBuf = NULL; + + while ((curBuf = icBufferListPop(&unack_queue_ring.slots[unack_queue_ring.idx])) != NULL) + { + MotionConnUDP *conn = NULL; + conn = CONTAINER_OF(curBuf->conn, MotionConnUDP, mConn); + curBuf->nRetry++; + + /* + * Fixed Timeout Thresholds: Traditional TCP-style Retransmission Timeout + * (RTTVAR.RTO) calculations may be too rigid for networks with volatile + * latency. This leads to: + * Premature Retransmissions: Unnecessary data resends during temporary + * latency spikes, wasting bandwidth. + * Delayed Recovery: Slow reaction to actual packet loss when RTO is + * overly conservative. + * + * Lack of Context Awareness: Static RTO ignores real-time network behavior + * patterns, reducing throughput and responsiveness. + * + * Solution: Dynamic Timeout Threshold Adjustment + * Implements an adaptive timeout mechanism to optimize retransmission: + * if (now < (curBuf->sentTime + conn->rttvar.rto)) { + * uint32_t diff = (curBuf->sentTime + conn->rttvar.rto) - now; + * // ... (statistical tracking and threshold adjustment) + * } + * Temporary Latency Spike: Uses max (conservative) to avoid false + * retransmits, reducing bandwidth waste (vs. traditional mistaken + * retransmissions). + * Persistent Packet Loss: Prioritizes min (aggressive) via + * weight_retrans, accelerating recovery (vs. slow fixed-RTO reaction). + * Stable Network: Balances weights for equilibrium throughput (vs. + * static RTO limitations). + */ + if (now < (curBuf->sentTime + conn->rttvar.rto)) + { +#ifdef TIMEOUT_Z + uint32_t diff = (curBuf->sentTime + conn->rttvar.rto) - now; + if(unack_queue_ring.retrans_count == 0 && unack_queue_ring.no_retrans_count == 0) + { + unack_queue_ring.min = diff; + unack_queue_ring.max = diff; + } + + if (diff < unack_queue_ring.min) unack_queue_ring.min = diff; + if (diff > unack_queue_ring.max) unack_queue_ring.max = diff; + + if (unack_queue_ring.retrans_count == 0) + unack_queue_ring.time_difference = unack_queue_ring.max; + else if (unack_queue_ring.no_retrans_count == 0 && ic_statistics.retransmits < (Gp_interconnect_min_retries_before_timeout / 4)) + unack_queue_ring.time_difference = 0; + else + { + uint32_t total_count = unack_queue_ring.retrans_count + unack_queue_ring.no_retrans_count; + double weight_retrans = (double)unack_queue_ring.retrans_count / total_count; + double weight_no_retrans = (double)unack_queue_ring.no_retrans_count / total_count; + unack_queue_ring.time_difference = (uint32_t)(unack_queue_ring.max * weight_no_retrans + unack_queue_ring.min * weight_retrans); + } + + ++unack_queue_ring.no_retrans_count; + } + else + ++unack_queue_ring.retrans_count; +#endif #ifdef TRANSFER_PROTOCOL_STATS - updateStats(TPE_DATA_PKT_SEND, curBuf->conn, curBuf->pkt); + updateStats(TPE_DATA_PKT_SEND, curBuf->conn, curBuf->pkt); #endif + ChunkTransportStateEntryUDP *pEntryUdp; + pEntryUdp = CONTAINER_OF(pEntry, ChunkTransportStateEntryUDP, entry); + putIntoUnackQueueRing(&unack_queue_ring, + curBuf, + computeExpirationPeriod(curBuf->conn, curBuf->nRetry), getCurrentTime()); + struct icpkthdr *pkt_ = curBuf->pkt; - sendOnce(transportStates, pEntry, curBuf, curBuf->conn); + pkt_->send_time = getCurrentTime(); + pkt_->recv_time = 0; + pkt_->retry_times = curBuf->nRetry; - currBuffConn = CONTAINER_OF(curBuf->conn, MotionConnUDP, mConn); + sendOnce(transportStates, pEntry, curBuf, curBuf->conn); - retransmits++; - ic_statistics.retransmits++; - currBuffConn->stat_count_resent++; - currBuffConn->stat_max_resent = Max(currBuffConn->stat_max_resent, - currBuffConn->stat_count_resent); + /* + * Adaptive Retry Backoff with Polling for Network Asymmetry Mitigation + * + * This logic addresses two critical network pathologies: + * 1. RTO Distortion Amplification: + * - Packet loss in volatile networks causes RTO-based retransmission errors + * - Multiple spurious retries increase network load and congestion collapse risk + * 2. Data Skew-Induced Starvation: + * - Under unbalanced workloads, low-traffic nodes experience MON (Message Order Number) delays + * - Delayed ACKs trigger false retransmissions even when packets arrive eventually + * - Unacked queue inflation worsens congestion in high-traffic nodes + */ + int32_t loop_ack = curBuf->nRetry; + uint32_t rto_min = UDP_RTO_MIN / 10; + uint32_t rtoMs = conn->rttvar.rto / 1000; + int32_t wait_time = rto_min > rtoMs ? rto_min : rtoMs; + int32_t loop = 0; - checkNetworkTimeout(curBuf, now, &transportStates->networkTimeoutIsLogged); + /* + * To optimize performance, we need to process all the time-out file descriptors (fds) + * in each batch together. + */ + if (loop_ack > 0) + { + while (loop++ < loop_ack) + { + if (pollAcks(transportStates, pEntryUdp->txfd, wait_time)) + { + handleAcks(transportStates, pEntry, false); + curBuf->nRetry = 0; + break; + } + + struct icpkthdr *pkt_ = curBuf->pkt; + pkt_->send_time = getCurrentTime(); + pkt_->recv_time = 0; + pkt_->retry_times = curBuf->nRetry; + + sendOnce(transportStates, pEntry, curBuf, curBuf->conn); + + if (loop_ack < (Gp_interconnect_min_retries_before_timeout / 10)) + wait_time += wait_time / 10; + else if (loop_ack > (Gp_interconnect_min_retries_before_timeout / 10) && loop_ack < (Gp_interconnect_min_retries_before_timeout / 5)) + wait_time += RTO_MAX / 10; + else if (loop_ack > (Gp_interconnect_min_retries_before_timeout / 5) && loop_ack < (Gp_interconnect_min_retries_before_timeout / 2)) + wait_time += RTO_MAX / 5; + else if (loop_ack < (Gp_interconnect_min_retries_before_timeout)) + wait_time += RTO_MAX; + }; + } + + if (loop_ack > Gp_interconnect_min_retries_before_timeout / 5) + write_log("Resending packet (seq %d) to %s (pid %d cid %d) with %d retries in %lu seconds", + curBuf->pkt->seq, curBuf->conn->remoteHostAndPort, + curBuf->pkt->dstPid, curBuf->pkt->dstContentId, curBuf->nRetry, + (now - curBuf->sentTime) / 1000 / 1000); + + currBuffConn = CONTAINER_OF(curBuf->conn, MotionConnUDP, mConn); + + retransmits++; + ic_statistics.retransmits++; + currBuffConn->stat_count_resent++; + currBuffConn->stat_max_resent = Max(currBuffConn->stat_max_resent, + currBuffConn->stat_count_resent); + + checkNetworkTimeout(curBuf, now, &transportStates->networkTimeoutIsLogged); #ifdef AMS_VERBOSE_LOGGING - write_log("RESEND pkt with seq %d (retry %d, rtt " UINT64_FORMAT ") to route %d", - curBuf->pkt->seq, curBuf->nRetry, currBuffConn->rtt, currBuffConn->route); - logPkt("RESEND PKT in checkExpiration", curBuf->pkt); + write_log("RESEND pkt with seq %d (retry %d, rtt " UINT64_FORMAT ") to route %d", + curBuf->pkt->seq, curBuf->nRetry, currBuffConn->rtt, currBuffConn->route); + logPkt("RESEND PKT in checkExpiration", curBuf->pkt); #endif + } + + timer_span_time += TIMER_SPAN_LOSS; + unack_queue_ring.idx = (unack_queue_ring.idx + 1) % (UNACK_QUEUE_RING_SLOTS_NUM); } + } + else + { + while (now >= (unack_queue_ring.currentTime + TIMER_SPAN) && count++ < UNACK_QUEUE_RING_SLOTS_NUM) + { + /* expired, need to resend them */ + ICBuffer *curBuf = NULL; - unack_queue_ring.currentTime += TIMER_SPAN; - unack_queue_ring.idx = (unack_queue_ring.idx + 1) % (UNACK_QUEUE_RING_SLOTS_NUM); + while ((curBuf = icBufferListPop(&unack_queue_ring.slots[unack_queue_ring.idx])) != NULL) + { + curBuf->nRetry++; + putIntoUnackQueueRing( + &unack_queue_ring, + curBuf, + computeExpirationPeriod(curBuf->conn, curBuf->nRetry), now); + +#ifdef TRANSFER_PROTOCOL_STATS + updateStats(TPE_DATA_PKT_SEND, curBuf->conn, curBuf->pkt); +#endif + + sendOnce(transportStates, pEntry, curBuf, curBuf->conn); + + currBuffConn = CONTAINER_OF(curBuf->conn, MotionConnUDP, mConn); + + retransmits++; + ic_statistics.retransmits++; + currBuffConn->stat_count_resent++; + currBuffConn->stat_max_resent = Max(currBuffConn->stat_max_resent, currBuffConn->stat_count_resent); + checkNetworkTimeout(curBuf, now, &transportStates->networkTimeoutIsLogged); + +#ifdef AMS_VERBOSE_LOGGING + write_log("RESEND pkt with seq %d (retry %d, rtt " UINT64_FORMAT ") to route %d", + curBuf->pkt->seq, curBuf->nRetry, curBuf->conn->rtt, curBuf->conn->route); + logPkt("RESEND PKT in checkExpiration", curBuf->pkt); +#endif + } + + unack_queue_ring.currentTime += TIMER_SPAN; + unack_queue_ring.idx = (unack_queue_ring.idx + 1) % (UNACK_QUEUE_RING_SLOTS_NUM); + } + + /* + * deal with case when there is a long time this function is not called. + */ + unack_queue_ring.currentTime = now - (now % (TIMER_SPAN)); } - /* - * deal with case when there is a long time this function is not called. - */ - unack_queue_ring.currentTime = now - (now % TIMER_SPAN); if (retransmits > 0) { snd_control_info.ssthresh = Max(snd_control_info.cwnd / 2, snd_control_info.minCwnd); @@ -5524,7 +6225,7 @@ checkExpiration(ChunkTransportState *transportStates, * */ static void -checkDeadlock(ChunkTransportStateEntry *pChunkEntry, MotionConn *mConn) +checkDeadlock(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry, MotionConn *mConn) { uint64 deadlockCheckTime; ChunkTransportStateEntryUDP *pEntry = NULL; @@ -5561,17 +6262,31 @@ checkDeadlock(ChunkTransportStateEntry *pChunkEntry, MotionConn *mConn) ic_control_info.lastDeadlockCheckTime = now; ic_statistics.statusQueryMsgNum++; + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE && pollAcks(transportStates, pEntry->txfd, 50)) + { + handleAcks(transportStates, pChunkEntry, false); + conn->deadlockCheckBeginTime = now; + } + /* check network error. */ - if ((now - conn->deadlockCheckBeginTime) > ((uint64) Gp_interconnect_transmit_timeout * 1000 * 1000)) + if ((now - conn->deadlockCheckBeginTime) > ((uint64) Gp_interconnect_transmit_timeout * 100 * 1000)) { - ereport(ERROR, - (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), - errmsg("interconnect encountered a network error, please check your network"), - errdetail("Did not get any response from %s (pid %d cid %d) in %d seconds.", - conn->mConn.remoteHostAndPort, - conn->conn_info.dstPid, - conn->conn_info.dstContentId, - Gp_interconnect_transmit_timeout))); + write_log("Did not get any response from %s (pid %d cid %d) in 600 seconds.",conn->mConn.remoteHostAndPort, + conn->conn_info.dstPid, + conn->conn_info.dstContentId); + + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_TIMER) + conn->capacity += 1; + + if ((now - conn->deadlockCheckBeginTime) > ((uint64) Gp_interconnect_transmit_timeout * 1000 * 1000)) + ereport(ERROR, + (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), + errmsg("interconnect encountered a network error, please check your network"), + errdetail("Did not get any response from %s (pid %d cid %d) in %d seconds.", + conn->mConn.remoteHostAndPort, + conn->conn_info.dstPid, + conn->conn_info.dstContentId, + Gp_interconnect_transmit_timeout))); } } } @@ -5690,7 +6405,7 @@ checkExceptions(ChunkTransportState *transportStates, checkExpirationCapacityFC(transportStates, pEntry, conn, timeout); } - if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS) + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS || Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE) { uint64 now = getCurrentTime(); @@ -5703,7 +6418,7 @@ checkExceptions(ChunkTransportState *transportStates, if ((retry & 0x3) == 2) { - checkDeadlock(pEntry, conn); + checkDeadlock(transportStates, pEntry, conn); checkRxThreadError(); ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive); } @@ -5735,14 +6450,24 @@ static inline int computeTimeout(MotionConn *mConn, int retry) { MotionConnUDP *conn = NULL; + uint32_t rtoMs = 0; conn = CONTAINER_OF(mConn, MotionConnUDP, mConn); + rtoMs = conn->rttvar.rto / 1000; if (icBufferListLength(&conn->unackQueue) == 0) return TIMER_CHECKING_PERIOD; ICBufferLink *bufLink = icBufferListFirst(&conn->unackQueue); ICBuffer *buf = GET_ICBUFFER_FROM_PRIMARY(bufLink); + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE) + { + if (buf->nRetry == 0 && retry == 0 && unack_queue_ring.numSharedOutStanding < (snd_control_info.cwnd - snd_control_info.minCwnd)) + return 0; + + return rtoMs > TIMER_CHECKING_PERIOD ? rtoMs: TIMER_CHECKING_PERIOD; + } + if (buf->nRetry == 0 && retry == 0) return 0; @@ -5830,7 +6555,7 @@ SendChunkUDPIFC(ChunkTransportState *transportStates, if (pollAcks(transportStates, pEntry->txfd, timeout)) { - if (handleAcks(transportStates, &pEntry->entry)) + if (handleAcks(transportStates, &pEntry->entry, true)) { /* * We make sure that we deal with the stop messages only after @@ -5843,6 +6568,9 @@ SendChunkUDPIFC(ChunkTransportState *transportStates, } checkExceptions(transportStates, &pEntry->entry, &conn->mConn, retry++, timeout); doCheckExpiration = false; + + if (!doCheckExpiration && icBufferListLength(&conn->unackQueue) == 0 && conn->capacity > 0 && icBufferListLength(&conn->sndQueue) > 0) + sendBuffers(transportStates, &pEntry->entry, &conn->mConn); } conn->mConn.pBuff = (uint8 *) conn->curBuff->pkt; @@ -5987,12 +6715,15 @@ SendEOSUDPIFC(ChunkTransportState *transportStates, timeout = computeTimeout(&conn->mConn, retry); if (pollAcks(transportStates, pEntry->txfd, timeout)) - handleAcks(transportStates, &pEntry->entry); - + handleAcks(transportStates, &pEntry->entry, true); checkExceptions(transportStates, &pEntry->entry, &conn->mConn, retry++, timeout); if (retry >= MAX_TRY) + { + if (icBufferListLength(&conn->unackQueue) == 0) + sendBuffers(transportStates, &pEntry->entry, &conn->mConn); break; + } } if ((!conn->mConn.cdbProc) || (icBufferListLength(&conn->unackQueue) == 0 && @@ -6217,24 +6948,60 @@ getCurrentTime(void) static void putIntoUnackQueueRing(UnackQueueRing *uqr, ICBuffer *buf, uint64 expTime, uint64 now) { + MotionConnUDP *buffConn = NULL; + buffConn = CONTAINER_OF(buf->conn, MotionConnUDP, mConn); uint64 diff = 0; int idx = 0; - - /* The first packet, currentTime is not initialized */ - if (uqr->currentTime == 0) - uqr->currentTime = now - (now % TIMER_SPAN); - - diff = now + expTime - uqr->currentTime; - if (diff >= UNACK_QUEUE_RING_LENGTH) + + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE) { + /* The first packet, currentTime is not initialized */ +#ifndef TIMEOUT_Z + if (uqr->currentTime == 0) + uqr->currentTime = now - (now % TIMER_SPAN_LOSS); +#else + if (uqr->currentTime == 0 && buffConn->rttvar.rto == 0) + uqr->currentTime = now - (now % TIMER_SPAN_LOSS); + else + uqr->currentTime = now + buffConn->rttvar.rto; + +#endif + diff = expTime; + if (diff >= UNACK_QUEUE_RING_LENGTH_LOSS) + { #ifdef AMS_VERBOSE_LOGGING - write_log("putIntoUnackQueueRing:" "now " UINT64_FORMAT "expTime " UINT64_FORMAT "diff " UINT64_FORMAT "uqr-currentTime " UINT64_FORMAT, now, expTime, diff, uqr->currentTime); + write_log("putIntoUnackQueueRing:" "now " UINT64_FORMAT "expTime " UINT64_FORMAT "diff " UINT64_FORMAT "uqr-currentTime " UINT64_FORMAT, now, expTime, diff, uqr->currentTime); #endif - diff = UNACK_QUEUE_RING_LENGTH - 1; + diff = UNACK_QUEUE_RING_LENGTH_LOSS - 1; + } + else if (diff < TIMER_SPAN_LOSS) + { + diff = diff < TIMER_SPAN_LOSS ? TIMER_SPAN_LOSS : diff; + } } - else if (diff < TIMER_SPAN) + else { - diff = TIMER_SPAN; + if (uqr->currentTime == 0) + uqr->currentTime = now - (now % TIMER_SPAN_LOSS); + + diff = now + expTime - uqr->currentTime; + if (diff >= UNACK_QUEUE_RING_LENGTH) + { +#ifdef AMS_VERBOSE_LOGGING + write_log("putIntoUnackQueueRing:" "now " UINT64_FORMAT "expTime " UINT64_FORMAT "diff " UINT64_FORMAT "uqr-currentTime " UINT64_FORMAT, now, expTime, diff, uqr->currentTime); +#endif + diff = UNACK_QUEUE_RING_LENGTH - 1; + } + else if (diff < TIMER_SPAN) + { + diff = TIMER_SPAN; + } + + idx = (uqr->idx + diff / TIMER_SPAN) % UNACK_QUEUE_RING_SLOTS_NUM; + +#ifdef AMS_VERBOSE_LOGGING + write_log("PUTTW: curtime " UINT64_FORMAT " now " UINT64_FORMAT " (diff " UINT64_FORMAT ") expTime " UINT64_FORMAT " previdx %d, nowidx %d, nextidx %d", uqr->currentTime, now, diff, expTime, buf->unackQueueRingSlot, uqr->idx, idx); +#endif } idx = (uqr->idx + diff / TIMER_SPAN) % UNACK_QUEUE_RING_SLOTS_NUM; @@ -6397,6 +7164,30 @@ handleDataPacket(MotionConn *mConn, icpkthdr *pkt, struct sockaddr_storage *peer logPkt("Interconnect error: received a packet when the queue is full ", pkt); ic_statistics.disorderedPktNum++; conn->stat_count_dropped++; + + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_TIMER && rx_control_info.mainWaitingState.waiting && + rx_control_info.mainWaitingState.waitingNode == pkt->motNodeId && + rx_control_info.mainWaitingState.waitingQuery == pkt->icId) + { + if (rx_control_info.mainWaitingState.waitingRoute == ANY_ROUTE) + { + if (rx_control_info.mainWaitingState.reachRoute == ANY_ROUTE) + rx_control_info.mainWaitingState.reachRoute = conn->route; + } + else if (rx_control_info.mainWaitingState.waitingRoute == conn->route) + { + if (DEBUG2 >= log_min_messages) + write_log("rx thread: main_waiting waking it route %d", rx_control_info.mainWaitingState.waitingRoute); + rx_control_info.mainWaitingState.reachRoute = conn->route; + } + /* WAKE MAIN THREAD HERE */ + *wakeup_mainthread = true; + } + + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE) + { + setAckSendParam(param, &conn->mConn, UDPIC_FLAGS_FULL, conn->conn_info.seq - 1, conn->conn_info.extraSeq); + } return false; } @@ -6681,9 +7472,25 @@ rxThreadFunc(void *arg) if (conn != NULL) { + uint64 now = getCurrentTime(); + uint64 send_time = pkt->send_time; + uint64 recv_time = now; + uint64 retry_times = pkt->retry_times; + MotionConnUDP *connUdp = NULL; + + connUdp = CONTAINER_OF(conn, MotionConnUDP, mConn); + bool drop_ack = pkt->seq < connUdp->conn_info.seq ? true : false; /* Handling a regular packet */ if (handleDataPacket(conn, pkt, &peer, &peerlen, ¶m, &wakeup_mainthread)) pkt = NULL; + if (!pkt) + { + param.msg.send_time = send_time; + param.msg.recv_time = recv_time; + param.msg.retry_times = retry_times; + } + if (drop_ack) + param.msg.retry_times = Gp_interconnect_min_retries_before_timeout; ic_statistics.recvPktNum++; } else diff --git a/contrib/interconnect/udp/ic_udpifc.h b/contrib/interconnect/udp/ic_udpifc.h index 76403abb3f3..af3ca72ba3b 100644 --- a/contrib/interconnect/udp/ic_udpifc.h +++ b/contrib/interconnect/udp/ic_udpifc.h @@ -90,6 +90,9 @@ typedef struct icpkthdr */ uint32 seq; uint32 extraSeq; + uint64_t send_time; + uint64_t recv_time; + uint8_t retry_times; } icpkthdr; typedef struct ICBuffer ICBuffer; diff --git a/src/backend/cdb/cdbvars.c b/src/backend/cdb/cdbvars.c index 9db3389e0bf..1606adc0dfd 100644 --- a/src/backend/cdb/cdbvars.c +++ b/src/backend/cdb/cdbvars.c @@ -198,6 +198,7 @@ int Gp_interconnect_queue_depth = 4; /* max number of messages * waiting in rx-queue before * we drop. */ int Gp_interconnect_snd_queue_depth = 2; +int Gp_interconnect_mem_size = 10; int Gp_interconnect_timer_period = 5; int Gp_interconnect_timer_checking_period = 20; int Gp_interconnect_default_rtt = 20; diff --git a/src/backend/utils/misc/guc_gp.c b/src/backend/utils/misc/guc_gp.c index e342d762705..3c60c588a3a 100644 --- a/src/backend/utils/misc/guc_gp.c +++ b/src/backend/utils/misc/guc_gp.c @@ -552,6 +552,8 @@ static const struct config_enum_entry gp_autostats_modes[] = { static const struct config_enum_entry gp_interconnect_fc_methods[] = { {"loss", INTERCONNECT_FC_METHOD_LOSS}, {"capacity", INTERCONNECT_FC_METHOD_CAPACITY}, + {"loss_advance", INTERCONNECT_FC_METHOD_LOSS_ADVANCE}, + {"loss_timer", INTERCONNECT_FC_METHOD_LOSS_TIMER}, {NULL, 0} }; @@ -3708,6 +3710,16 @@ struct config_int ConfigureNamesInt_gp[] = NULL, NULL, NULL }, + { + {"gp_interconnect_mem_size", PGC_USERSET, GP_ARRAY_TUNING, + gettext_noop("Sets the maximum size(in MB) of the send/recv queue memory for all connections in the UDP interconnect"), + NULL + }, + &Gp_interconnect_mem_size, + 10, 1, 1024, + NULL, NULL, NULL + }, + { {"gp_interconnect_timer_period", PGC_USERSET, GP_ARRAY_TUNING, gettext_noop("Sets the timer period (in ms) for UDP interconnect"), diff --git a/src/include/cdb/cdbvars.h b/src/include/cdb/cdbvars.h index 90af5177ce0..745f9cd3013 100644 --- a/src/include/cdb/cdbvars.h +++ b/src/include/cdb/cdbvars.h @@ -334,6 +334,8 @@ typedef enum GpVars_Interconnect_Method { INTERCONNECT_FC_METHOD_CAPACITY = 0, INTERCONNECT_FC_METHOD_LOSS = 2, + INTERCONNECT_FC_METHOD_LOSS_ADVANCE = 3, + INTERCONNECT_FC_METHOD_LOSS_TIMER = 4, } GpVars_Interconnect_Method; extern int Gp_interconnect_fc_method; @@ -367,6 +369,7 @@ extern int Gp_interconnect_min_rto; extern int Gp_interconnect_transmit_timeout; extern int Gp_interconnect_min_retries_before_timeout; extern int Gp_interconnect_debug_retry_interval; +extern int Gp_interconnect_mem_size; /* UDP recv buf size in KB. For testing */ extern int Gp_udp_bufsize_k; diff --git a/src/include/utils/sync_guc_name.h b/src/include/utils/sync_guc_name.h index 286762e6d6f..532bf2b8638 100644 --- a/src/include/utils/sync_guc_name.h +++ b/src/include/utils/sync_guc_name.h @@ -82,6 +82,7 @@ "gp_interconnect_queue_depth", "gp_interconnect_setup_timeout", "gp_interconnect_snd_queue_depth", + "gp_interconnect_mem_size", "gp_interconnect_tcp_listener_backlog", "gp_interconnect_timer_checking_period", "gp_interconnect_timer_period", diff --git a/src/test/regress/expected/icudp/gp_interconnect_fc_method.out b/src/test/regress/expected/icudp/gp_interconnect_fc_method.out index b115c95a393..4dcda3b8124 100644 --- a/src/test/regress/expected/icudp/gp_interconnect_fc_method.out +++ b/src/test/regress/expected/icudp/gp_interconnect_fc_method.out @@ -96,3 +96,49 @@ SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(len 29 | 100 | 2600 (30 rows) +SET gp_interconnect_fc_method = "loss_advance"; +SHOW gp_interconnect_fc_method; + gp_interconnect_fc_method +--------------------------- + loss_advance +(1 row) + +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + diff --git a/src/test/regress/expected/icudp/queue_depth_combination_capacity.out b/src/test/regress/expected/icudp/queue_depth_combination_capacity.out index ec8ea9594d6..64c177e36d5 100644 --- a/src/test/regress/expected/icudp/queue_depth_combination_capacity.out +++ b/src/test/regress/expected/icudp/queue_depth_combination_capacity.out @@ -266,3 +266,260 @@ SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(len 29 | 100 | 2600 (30 rows) +-- Skew with gather+redistribute +SET gp_interconnect_fc_method = "loss_advance"; +SHOW gp_interconnect_fc_method; + gp_interconnect_fc_method +--------------------------- + loss_advance +(1 row) + +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 1; +SET gp_interconnect_queue_depth = 1; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 4096; +SET gp_interconnect_queue_depth = 4096; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 1; +SET gp_interconnect_queue_depth = 4096; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 4096; +SET gp_interconnect_queue_depth = 1; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 1024; +SET gp_interconnect_queue_depth = 1024; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + diff --git a/src/test/regress/expected/icudp/queue_depth_combination_loss_advance.out b/src/test/regress/expected/icudp/queue_depth_combination_loss_advance.out new file mode 100644 index 00000000000..c509dd84ef7 --- /dev/null +++ b/src/test/regress/expected/icudp/queue_depth_combination_loss_advance.out @@ -0,0 +1,260 @@ +-- +-- @description Interconncet flow control test case: combination guc value +-- @created 2025-09-12 +-- Set mode +SET gp_interconnect_fc_method = "loss_advance"; +-- Create a table +CREATE TEMP TABLE small_table(dkey INT, jkey INT, rval REAL, tval TEXT default 'abcdefghijklmnopqrstuvwxyz') DISTRIBUTED BY (dkey); +-- Generate some data +INSERT INTO small_table VALUES(generate_series(1, 5000), generate_series(5001, 10000), sqrt(generate_series(5001, 10000))); +-- Functional tests +-- Skew with gather+redistribute +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 1; +SET gp_interconnect_queue_depth = 1; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 4096; +SET gp_interconnect_queue_depth = 4096; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 1; +SET gp_interconnect_queue_depth = 4096; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 4096; +SET gp_interconnect_queue_depth = 1; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 1024; +SET gp_interconnect_queue_depth = 1024; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + diff --git a/src/test/regress/greenplum_schedule b/src/test/regress/greenplum_schedule index 09331d8d4a8..83be12085b5 100755 --- a/src/test/regress/greenplum_schedule +++ b/src/test/regress/greenplum_schedule @@ -73,7 +73,7 @@ test: gp_dump_query_oids analyze gp_owner_permission incremental_analyze truncat test: indexjoin as_alias regex_gp gpparams with_clause transient_types gp_rules dispatch_encoding motion_gp gp_pullup_expr # interconnect tests -test: icudp/gp_interconnect_queue_depth icudp/gp_interconnect_queue_depth_longtime icudp/gp_interconnect_snd_queue_depth icudp/gp_interconnect_snd_queue_depth_longtime icudp/gp_interconnect_min_retries_before_timeout icudp/gp_interconnect_transmit_timeout icudp/gp_interconnect_cache_future_packets icudp/gp_interconnect_default_rtt icudp/gp_interconnect_fc_method icudp/gp_interconnect_min_rto icudp/gp_interconnect_timer_checking_period icudp/gp_interconnect_timer_period icudp/queue_depth_combination_loss icudp/queue_depth_combination_capacity +test: icudp/gp_interconnect_queue_depth icudp/gp_interconnect_queue_depth_longtime icudp/gp_interconnect_snd_queue_depth icudp/gp_interconnect_snd_queue_depth_longtime icudp/gp_interconnect_min_retries_before_timeout icudp/gp_interconnect_transmit_timeout icudp/gp_interconnect_cache_future_packets icudp/gp_interconnect_default_rtt icudp/gp_interconnect_fc_method icudp/gp_interconnect_min_rto icudp/gp_interconnect_timer_checking_period icudp/gp_interconnect_timer_period icudp/queue_depth_combination_loss icudp/queue_depth_combination_capacity icudp/queue_depth_combination_loss_advance # event triggers cannot run concurrently with any test that runs DDL test: event_trigger_gp diff --git a/src/test/regress/icudp_schedule b/src/test/regress/icudp_schedule index 6a2e3d44a3f..f0332edb7d3 100644 --- a/src/test/regress/icudp_schedule +++ b/src/test/regress/icudp_schedule @@ -4,7 +4,7 @@ # Below cases are also in greenplum_schedule, but as they are fast enough # we duplicate them here to make this pipeline cover more on icudp. -test: icudp/gp_interconnect_queue_depth icudp/gp_interconnect_queue_depth_longtime icudp/gp_interconnect_snd_queue_depth icudp/gp_interconnect_snd_queue_depth_longtime icudp/gp_interconnect_min_retries_before_timeout icudp/gp_interconnect_transmit_timeout icudp/gp_interconnect_cache_future_packets icudp/gp_interconnect_default_rtt icudp/gp_interconnect_fc_method icudp/gp_interconnect_min_rto icudp/gp_interconnect_timer_checking_period icudp/gp_interconnect_timer_period icudp/queue_depth_combination_loss icudp/queue_depth_combination_capacity icudp/icudp_regression +test: icudp/gp_interconnect_queue_depth icudp/gp_interconnect_queue_depth_longtime icudp/gp_interconnect_snd_queue_depth icudp/gp_interconnect_snd_queue_depth_longtime icudp/gp_interconnect_min_retries_before_timeout icudp/gp_interconnect_transmit_timeout icudp/gp_interconnect_cache_future_packets icudp/gp_interconnect_default_rtt icudp/gp_interconnect_fc_method icudp/gp_interconnect_min_rto icudp/gp_interconnect_timer_checking_period icudp/gp_interconnect_timer_period icudp/queue_depth_combination_loss icudp/queue_depth_combination_capacity icudp/icudp_regression icudp/queue_depth_combination_loss_advance # Below case is very slow, do not add it in greenplum_schedule. test: icudp/icudp_full diff --git a/src/test/regress/sql/icudp/gp_interconnect_fc_method.sql b/src/test/regress/sql/icudp/gp_interconnect_fc_method.sql index 52af4a220a1..929cd9c5d42 100644 --- a/src/test/regress/sql/icudp/gp_interconnect_fc_method.sql +++ b/src/test/regress/sql/icudp/gp_interconnect_fc_method.sql @@ -27,3 +27,11 @@ SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(len JOIN small_table USING(jkey) GROUP BY rval2 ORDER BY rval2; + +SET gp_interconnect_fc_method = "loss_advance"; +SHOW gp_interconnect_fc_method; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; diff --git a/src/test/regress/sql/icudp/queue_depth_combination_capacity.sql b/src/test/regress/sql/icudp/queue_depth_combination_capacity.sql index 7e7348d09ca..43aed3fd59e 100644 --- a/src/test/regress/sql/icudp/queue_depth_combination_capacity.sql +++ b/src/test/regress/sql/icudp/queue_depth_combination_capacity.sql @@ -66,3 +66,58 @@ SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(len JOIN small_table USING(jkey) GROUP BY rval2 ORDER BY rval2; + +-- Skew with gather+redistribute +SET gp_interconnect_fc_method = "loss_advance"; +SHOW gp_interconnect_fc_method; + +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 1; +SET gp_interconnect_queue_depth = 1; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 4096; +SET gp_interconnect_queue_depth = 4096; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 1; +SET gp_interconnect_queue_depth = 4096; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 4096; +SET gp_interconnect_queue_depth = 1; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 1024; +SET gp_interconnect_queue_depth = 1024; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; diff --git a/src/test/regress/sql/icudp/queue_depth_combination_loss_advance.sql b/src/test/regress/sql/icudp/queue_depth_combination_loss_advance.sql new file mode 100644 index 00000000000..a214e2a48ba --- /dev/null +++ b/src/test/regress/sql/icudp/queue_depth_combination_loss_advance.sql @@ -0,0 +1,65 @@ +-- +-- @description Interconncet flow control test case: combination guc value +-- @created 2025-09-12 + +-- Set mode +SET gp_interconnect_fc_method = "loss_advance"; + +-- Create a table +CREATE TEMP TABLE small_table(dkey INT, jkey INT, rval REAL, tval TEXT default 'abcdefghijklmnopqrstuvwxyz') DISTRIBUTED BY (dkey); + +-- Generate some data +INSERT INTO small_table VALUES(generate_series(1, 5000), generate_series(5001, 10000), sqrt(generate_series(5001, 10000))); + +-- Functional tests +-- Skew with gather+redistribute +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 1; +SET gp_interconnect_queue_depth = 1; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 4096; +SET gp_interconnect_queue_depth = 4096; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 1; +SET gp_interconnect_queue_depth = 4096; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 4096; +SET gp_interconnect_queue_depth = 1; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 1024; +SET gp_interconnect_queue_depth = 1024; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2;