Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions contrib/interconnect/ic_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -541,14 +541,15 @@ GetMotionSentRecordTypmod(ChunkTransportState * transportStates,
int16 motNodeID,
int16 targetRoute)
{
MotionConn *conn;
MotionConn *conn = NULL;
ChunkTransportStateEntry *pEntry = NULL;
getChunkTransportState(transportStates, motNodeID, &pEntry);

if (targetRoute == BROADCAST_SEGIDX)
conn = &pEntry->conns[0];
else
conn = &pEntry->conns[targetRoute];
if (targetRoute == BROADCAST_SEGIDX) {
targetRoute = 0;
}

getMotionConn(pEntry, targetRoute, &conn);

return &conn->sent_record_typmod;
}
Expand Down
52 changes: 52 additions & 0 deletions contrib/interconnect/ic_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
*/
#ifndef INTER_CONNECT_INTERNAL_H
#define INTER_CONNECT_INTERNAL_H
#include <stdint.h>
#include <stdlib.h>
#include <sys/time.h>
#include <sys/queue.h>
#include <pthread.h>

#include "tcp/ic_tcp.h"
#include "udp/ic_udpifc.h"
Expand All @@ -33,6 +38,27 @@ typedef enum MotionConnState
mcsEosSent
} MotionConnState;

struct udp_send_vars
{
/* send sequence variables */
uint32_t snd_una; /* send unacknoledged */
uint32_t snd_wnd; /* send window (unscaled) */

/* retransmission timeout variables */
uint8_t nrtx; /* number of retransmission */
uint8_t max_nrtx; /* max number of retransmission */
uint32_t rto; /* retransmission timeout */
uint32_t ts_rto; /* timestamp for retransmission timeout */

/* congestion control variables */
uint32_t cwnd; /* congestion window */
uint32_t ssthresh; /* slow start threshold */

TAILQ_ENTRY(MotionConnUDP) send_link;
TAILQ_ENTRY(MotionConnUDP) timer_link; /* timer link (rto list) */

};

/*
* Structure used for keeping track of a pt-to-pt connection between two
* Cdb Entities (either QE or QD).
Expand Down Expand Up @@ -153,6 +179,32 @@ typedef struct MotionConnUDP
uint64 stat_count_resent;
uint64 stat_max_resent;
uint64 stat_count_dropped;

struct {
uint32_t ts_rto;
uint32_t rto;
uint32_t srtt;
uint32_t rttvar;
uint32_t snd_una;
uint16_t nrtx;
uint16_t max_nrtx;
uint32_t mss;
uint32_t cwnd;
uint32_t ssthresh;
uint32_t fss;
uint8_t loss_count;
uint32_t mdev;
uint32_t mdev_max;
uint32_t rtt_seq; /* sequence number to update rttvar */
uint32_t ts_all_rto;
bool karn_mode;
} rttvar;

uint8_t on_timewait_list;
int16_t on_rto_idx;

uint32_t snd_nxt; /* send next */
struct udp_send_vars sndvar;
} MotionConnUDP;

typedef struct MotionConnTCP
Expand Down
2 changes: 2 additions & 0 deletions contrib/interconnect/test/ic_test_env.c
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ client_side_global_var_init(MotionIPCLayer * motion_ipc_layer, pid_t *ic_proxy_p

Gp_interconnect_queue_depth = 800;
Gp_interconnect_snd_queue_depth = 600;
Gp_interconnect_mem_size = 20;
Gp_interconnect_timer_period = 1;
Gp_interconnect_timer_checking_period = 2;
InitializeLatchSupport();
Expand Down Expand Up @@ -374,6 +375,7 @@ server_side_global_var_init(MotionIPCLayer * motion_ipc_layer, pid_t *ic_proxy_p

Gp_interconnect_queue_depth = 800;
Gp_interconnect_snd_queue_depth = 600;
Gp_interconnect_mem_size = 20;
Gp_interconnect_timer_period = 1;
Gp_interconnect_timer_checking_period = 2;
InitializeLatchSupport();
Expand Down
Loading