Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,21 @@ jobs:
MINIO_ROOT_PASSWORD: miniosecret
AWS_EC2_METADATA_DISABLED: true

- name: Generate ClickHouse TLS certificates
run: |
mkdir -p ch-certs
# CA
openssl genrsa -out ch-certs/ca.key 2048
openssl req -new -x509 -key ch-certs/ca.key -out ch-certs/ca.crt -days 3650 -subj "/CN=ClickHouse-CA"
# Server cert (CN=localhost, SAN for TLS 1.3)
openssl genrsa -out ch-certs/server.key 2048
openssl req -new -key ch-certs/server.key -out ch-certs/server.csr -subj "/CN=localhost" -addext "subjectAltName=DNS:localhost,IP:127.0.0.1"
openssl x509 -req -days 3650 -in ch-certs/server.csr -CA ch-certs/ca.crt -CAkey ch-certs/ca.key -CAcreateserial -out ch-certs/server.crt -copy_extensions copyall
# Client cert for mTLS (CN=peerdb-client)
openssl genrsa -out ch-certs/client.key 2048
openssl req -new -key ch-certs/client.key -out ch-certs/client.csr -subj "/CN=peerdb-client"
openssl x509 -req -days 3650 -in ch-certs/client.csr -CA ch-certs/ca.crt -CAkey ch-certs/ca.key -CAcreateserial -out ch-certs/client.crt

- name: create postgres extensions, increase logical replication limits, and setup catalog database
run: >
docker exec "${{ job.services.catalog.id }}" apk add --no-cache build-base git &&
Expand Down Expand Up @@ -292,16 +307,39 @@ jobs:
<access_management>1</access_management>
<named_collection_control>1</named_collection_control>
</default>
<peerdb_tls>
<ssl_certificates>
<common_name>peerdb-client</common_name>
</ssl_certificates>
<networks>
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
<access_management>1</access_management>
</peerdb_tls>
</users>
<logger><level>none</level></logger>
<path>var/lib/clickhouse</path>
<tmp_path>var/lib/clickhouse/tmp</tmp_path>
<user_files_path>var/lib/clickhouse/user_files</user_files_path>
<format_schema_path>var/lib/clickhouse/format_schemas</format_schema_path>
<tcp_port>9000</tcp_port>
<tcp_port_secure>9440</tcp_port_secure>
<http_port remove="1"/>
<postgresql_port remove="1"/>
<mysql_port remove="1"/>
<openSSL>
<server>
<certificateFile>../ch-certs/server.crt</certificateFile>
<privateKeyFile>../ch-certs/server.key</privateKeyFile>
<caConfig>../ch-certs/ca.crt</caConfig>
<verificationMode>relaxed</verificationMode>
<cacheSessions>true</cacheSessions>
<disableProtocols>sslv2,sslv3</disableProtocols>
<preferServerCiphers>true</preferServerCiphers>
</server>
</openSSL>
<macros>
<shard>1</shard>
<replica>1</replica>
Expand Down Expand Up @@ -347,16 +385,39 @@ jobs:
<access_management>1</access_management>
<named_collection_control>1</named_collection_control>
</default>
<peerdb_tls>
<ssl_certificates>
<common_name>peerdb-client</common_name>
</ssl_certificates>
<networks>
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
<access_management>1</access_management>
</peerdb_tls>
</users>
<logger><level>none</level></logger>
<path>var/lib/clickhouse</path>
<tmp_path>var/lib/clickhouse/tmp</tmp_path>
<user_files_path>var/lib/clickhouse/user_files</user_files_path>
<format_schema_path>var/lib/clickhouse/format_schemas</format_schema_path>
<tcp_port>9001</tcp_port>
<tcp_port_secure>9441</tcp_port_secure>
<http_port remove="1"/>
<postgresql_port remove="1"/>
<mysql_port remove="1"/>
<openSSL>
<server>
<certificateFile>../ch-certs/server.crt</certificateFile>
<privateKeyFile>../ch-certs/server.key</privateKeyFile>
<caConfig>../ch-certs/ca.crt</caConfig>
<verificationMode>relaxed</verificationMode>
<cacheSessions>true</cacheSessions>
<disableProtocols>sslv2,sslv3</disableProtocols>
<preferServerCiphers>true</preferServerCiphers>
</server>
</openSSL>
<macros>
<shard>2</shard>
<replica>1</replica>
Expand Down Expand Up @@ -511,6 +572,12 @@ jobs:
FLOW_TESTS_AWS_ACCESS_KEY_ID: ${{ steps.setup-aws.outputs.aws-access-key-id }}
FLOW_TESTS_AWS_SECRET_ACCESS_KEY: ${{ steps.setup-aws.outputs.aws-secret-access-key }}
FLOW_TESTS_AWS_SESSION_TOKEN: ${{ steps.setup-aws.outputs.aws-session-token }}
# ClickHouse TLS/mTLS test certificates
PEERDB_CLICKHOUSE_TLS_PORT: "9440"
PEERDB_CLICKHOUSE_TLS_CLUSTER_PORT: "9441"
PEERDB_CLICKHOUSE_TLS_CA_CERT_PATH: ${{ github.workspace }}/ch-certs/ca.crt
PEERDB_CLICKHOUSE_TLS_CLIENT_CERT_PATH: ${{ github.workspace }}/ch-certs/client.crt
PEERDB_CLICKHOUSE_TLS_CLIENT_KEY_PATH: ${{ github.workspace }}/ch-certs/client.key

- name: Upload peer-flow logs
if: always()
Expand Down
4 changes: 3 additions & 1 deletion flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,9 @@ func (h *FlowRequestHandler) CreatePeer(
req *protos.CreatePeerRequest,
) (*protos.CreatePeerResponse, APIError) {
if !req.DisableValidation {
status, validateErr := h.ValidatePeer(ctx, &protos.ValidatePeerRequest{Peer: req.Peer})
status, validateErr := h.ValidatePeer(ctx, &protos.ValidatePeerRequest{
Peer: req.Peer,
})
if validateErr != nil {
return nil, validateErr
}
Expand Down
125 changes: 106 additions & 19 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
"fmt"
"log/slog"
"net/url"
"os"
"path/filepath"
"slices"
"strings"
"time"
Expand Down Expand Up @@ -231,30 +233,115 @@
return nil
}

func Connect(ctx context.Context, env map[string]string, config *protos.ClickhouseConfig) (clickhouse.Conn, error) {
var tlsSetting *tls.Config
if !config.DisableTls {
tlsSetting = &tls.Config{MinVersion: tls.VersionTLS13}
if config.Certificate != nil || config.PrivateKey != nil {
if config.Certificate == nil || config.PrivateKey == nil {
return nil, errors.New("both certificate and private key must be provided if using certificate-based authentication")
}
cert, err := tls.X509KeyPair([]byte(*config.Certificate), []byte(*config.PrivateKey))
if err != nil {
return nil, fmt.Errorf("failed to parse provided certificate: %w", err)
}
tlsSetting.Certificates = []tls.Certificate{cert}
// configureDirectoryTLS configures the tls.Config by loading certificate files
// from a directory. It expects tls.crt and tls.key files, and optionally ca.crt.
// This is typically used when a Kubernetes Secret is mounted as a volume.
//
// Client certificates are loaded via GetClientCertificate so that every TLS
// handshake re-reads the files from disk, automatically picking up rotated
// certificates (e.g. renewed by cert-manager) without requiring a reconnect.
func configureDirectoryTLS(tlsConfig *tls.Config, dir string) error {
certPath := filepath.Join(dir, "tls.crt")
keyPath := filepath.Join(dir, "tls.key")
caPath := filepath.Join(dir, "ca.crt")

// Verify that the required files are readable at configuration time
if _, err := os.ReadFile(certPath); err != nil {
return fmt.Errorf("failed to read TLS certificate from %q: %w", certPath, err)
}
if _, err := os.ReadFile(keyPath); err != nil {
return fmt.Errorf("failed to read TLS private key from %q: %w", keyPath, err)
}

// Use GetClientCertificate so the cert/key are re-read on every TLS
// handshake, ensuring rotated certificates are picked up immediately.
tlsConfig.GetClientCertificate = func(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
certPEM, err := os.ReadFile(certPath)
if err != nil {
return nil, fmt.Errorf("failed to read TLS certificate from %q: %w", certPath, err)
}
keyPEM, err := os.ReadFile(keyPath)
if err != nil {
return nil, fmt.Errorf("failed to read TLS private key from %q: %w", keyPath, err)
}
if config.RootCa != nil {
cert, err := tls.X509KeyPair(certPEM, keyPEM)
if err != nil {
return nil, fmt.Errorf("failed to parse TLS certificate from directory %q: %w", dir, err)
}

caCertPEM, err := os.ReadFile(caPath)
if err == nil && len(caCertPEM) > 0 {
caPool := x509.NewCertPool()
if !caPool.AppendCertsFromPEM([]byte(*config.RootCa)) {
return nil, errors.New("failed to parse provided root CA")
if !caPool.AppendCertsFromPEM(caCertPEM) {
return nil, fmt.Errorf("failed to parse CA certificate from %q", caPath)
}
tlsSetting.RootCAs = caPool
tlsConfig.RootCAs = caPool
}
if config.TlsHost != "" {
tlsSetting.ServerName = config.TlsHost

return &cert, nil
}

return nil
}

// configureInlineTLS configures the tls.Config using inline certificate/key
// PEM values from the ClickHouse peer config.
func configureInlineTLS(tlsConfig *tls.Config, config *protos.ClickhouseConfig) error {
if config.Certificate != nil || config.PrivateKey != nil {
if config.Certificate == nil || config.PrivateKey == nil {
return errors.New("both certificate and private key must be provided if using certificate-based authentication")
}
cert, err := tls.X509KeyPair([]byte(*config.Certificate), []byte(*config.PrivateKey))
if err != nil {
return fmt.Errorf("failed to parse provided certificate: %w", err)
}
tlsConfig.Certificates = []tls.Certificate{cert}
}
if config.RootCa != nil {
caPool := x509.NewCertPool()
if !caPool.AppendCertsFromPEM([]byte(*config.RootCa)) {
return errors.New("failed to parse provided root CA")
}
tlsConfig.RootCAs = caPool
}
return nil
}

// buildTLSConfig builds the TLS configuration for a ClickHouse connection.
// When a TLS certificate directory is configured, it loads certificates from the directory.
// Otherwise, inline certificates are used.
func buildTLSConfig(config *protos.ClickhouseConfig) (*tls.Config, error) {
if config.DisableTls {
return nil, nil
}

tlsConfig := &tls.Config{
MinVersion: tls.VersionTLS13,
ServerName: config.TlsHost,
}
certDir := config.GetTlsCertificateDirectory()

if certDir != "" {
if err := configureDirectoryTLS(tlsConfig, certDir); err != nil {
return nil, err
}
} else {
if err := configureInlineTLS(tlsConfig, config); err != nil {
return nil, err
}
}

if len(tlsConfig.Certificates) == 0 {
return nil, fmt.Errorf("TLS is enabled but it was not possible to configure it")

Check failure on line 335 in flow/connectors/clickhouse/clickhouse.go

View workflow job for this annotation

GitHub Actions / lint

error-format: fmt.Errorf can be replaced with errors.New (perfsprint)
}

return tlsConfig, nil
}

func Connect(ctx context.Context, env map[string]string, config *protos.ClickhouseConfig) (clickhouse.Conn, error) {
tlsSetting, err := buildTLSConfig(config)
if err != nil {
return nil, err
}

settings := clickhouse.Settings{
Expand Down
Loading
Loading