diff --git a/bringyourctl/main.go b/bringyourctl/main.go index 4dc1bb7a..a0857765 100644 --- a/bringyourctl/main.go +++ b/bringyourctl/main.go @@ -22,6 +22,8 @@ import ( "github.com/urnetwork/server/search" "github.com/urnetwork/server/session" "github.com/urnetwork/server/task" + + "github.com/urnetwork/proxy" ) func main() { @@ -73,6 +75,8 @@ Usage: bringyourctl query location bringyourctl upgrade-plan --network_id= bringyourctl proxy parse-id + bringyourctl proxy keygen + bringyourctl proxy reset-client-ipv4 Options: -h --help Show this screen. @@ -232,6 +236,10 @@ Options: } else if proxy, _ := opts.Bool("proxy"); proxy { if parseId, _ := opts.Bool("parse-id"); parseId { proxyParseId(opts) + } else if keygen, _ := opts.Bool("keygen"); keygen { + proxyKeygen(opts) + } else if r, _ := opts.Bool("reset-client-ipv4"); r { + proxyResetClientIpv4(opts) } } else { fmt.Println(usage) @@ -1099,3 +1107,16 @@ func proxyParseId(opts docopt.Opts) { fmt.Printf("%s\n", proxyId) } + +func proxyKeygen(opts docopt.Opts) { + privateKey, publicKey, err := proxy.WgGenKeyPair() + if err != nil { + panic(err) + } + fmt.Printf("%s %s\n", privateKey, publicKey) +} + +func proxyResetClientIpv4(opts docopt.Opts) { + ctx := context.Background() + model.ResetProxyClientIpv4(ctx) +} diff --git a/connect/resident_proxy.go b/connect/resident_proxy.go index b6cee3d9..16f85a5c 100644 --- a/connect/resident_proxy.go +++ b/connect/resident_proxy.go @@ -46,7 +46,7 @@ type ResidentProxyDevice struct { proxyDeviceConfig *model.ProxyDeviceConfig deviceLocal *sdk.DeviceLocal - tnet *proxy.Net + tnet *proxy.Tun settings *ResidentProxyDeviceSettings } @@ -124,9 +124,11 @@ func NewResidentProxyDevice( deviceLocal.SetConnectLocation(initialDeviceState.Location) } - tnet, err := proxy.CreateNetTun( + tunSettings := proxy.DefaultTunSettings() + tunSettings.Mtu = settings.Mtu + tnet, err := proxy.CreateTun( cancelCtx, - settings.Mtu, + tunSettings, ) if err != nil { return nil, err diff --git a/db_migrations.go b/db_migrations.go index 6582f893..ed30ff2b 100644 --- a/db_migrations.go +++ b/db_migrations.go @@ -2688,4 +2688,54 @@ var migrations = []any{ CREATE INDEX transfer_balance_code_network_id_end_time ON transfer_balance_code(network_id, end_time); `), + + newSqlMigration(` + CREATE TABLE proxy_client ( + proxy_id uuid NOT NULL, + client_id uuid NOT NULL, + instance_id uuid NOT NULL, + proxy_host varchar(128) NOT NULL, + block varchar(128) NOT NULL, + client_ipv4 bigint NULL, + client_public_key varchar(128) NULL, + proxy_client_json text NOT NULL, + + PRIMARY KEY (proxy_id) + ) + `), + + newSqlMigration(` + CREATE UNIQUE INDEX proxy_client_proxy_host_block_client_ipv4 ON proxy_client (proxy_host, block, client_ipv4) + `), + + newSqlMigration(` + CREATE UNIQUE INDEX proxy_client_proxy_host_block_client_public_key ON proxy_client (proxy_host, block, client_public_key) + `), + + newSqlMigration(` + CREATE INDEX proxy_client_client_id_instance_id_proxy_id ON proxy_client (client_id, instance_id, proxy_id) + `), + + newSqlMigration(` + CREATE TABLE proxy_client_change ( + proxy_host varchar(128) NOT NULL, + block varchar(128) NOT NULL, + change_id bigint GENERATED ALWAYS AS IDENTITY, + proxy_id uuid NOT NULL, + + PRIMARY KEY (proxy_host, block, change_id) + ) + `), + + newSqlMigration(` + CREATE TABLE proxy_client_ipv4 ( + sequence_id bigint NOT NULL, + client_ipv4 bigint NOT NULL, + + PRIMARY KEY (sequence_id, client_ipv4) + ) + `), + + // newCodeMigration(migration_20260214_ResetProxyClientIpv4), + } diff --git a/go.mod b/go.mod index d67304a0..03d9e84e 100644 --- a/go.mod +++ b/go.mod @@ -33,6 +33,7 @@ require ( github.com/urnetwork/glog v0.0.0 github.com/urnetwork/proxy v0.0.0 github.com/urnetwork/sdk v0.0.0 + github.com/urnetwork/userwireguard v0.0.0 golang.org/x/crypto v0.47.0 golang.org/x/exp v0.0.0-20260112195511-716be5621a96 google.golang.org/protobuf v1.36.11 @@ -76,6 +77,7 @@ require ( golang.org/x/net v0.49.0 // indirect golang.org/x/term v0.39.0 // indirect golang.org/x/time v0.14.0 // indirect + golang.zx2c4.com/wireguard/wgctrl v0.0.0-20241231184526-a9ab2273dd10 // indirect gvisor.dev/gvisor v0.0.0-20260202191832-0bd9aedd142c // indirect ) @@ -106,3 +108,5 @@ replace github.com/urnetwork/proxy => ../proxy replace github.com/urnetwork/sdk => ../sdk replace github.com/urnetwork/glog => ../glog + +replace github.com/urnetwork/userwireguard => ../userwireguard diff --git a/go.sum b/go.sum index 2ef290bc..939d1e87 100644 --- a/go.sum +++ b/go.sum @@ -351,6 +351,8 @@ golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.zx2c4.com/wireguard/wgctrl v0.0.0-20241231184526-a9ab2273dd10 h1:3GDAcqdIg1ozBNLgPy4SLT84nfcBjr6rhGtXYtrkWLU= +golang.zx2c4.com/wireguard/wgctrl v0.0.0-20241231184526-a9ab2273dd10/go.mod h1:T97yPqesLiNrOYxkwmhMI0ZIlJDm+p0PMR8eRVeR5tQ= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/model/network_client_model.go b/model/network_client_model.go index 45eb87d1..38420f03 100644 --- a/model/network_client_model.go +++ b/model/network_client_model.go @@ -9,7 +9,7 @@ import ( "net/netip" // "regexp" "strconv" - "strings" + // "strings" // "sync" // "bytes" @@ -99,6 +99,7 @@ type ProxyConfig struct { LockIpList []string `json:"lock_ip_list"` HttpsRequireAuth bool `json:"https_require_auth"` + EnableWg bool `json:"enable_wg"` InitialDeviceState *ExtendedProxyDeviceState `json:"initial_device_state,omitempty"` } @@ -122,18 +123,8 @@ type AuthNetworkClientError struct { } type ProxyConfigResult struct { - KeepaliveSeconds int `json:"keepalive_seconds"` - SocksProxyUrl string `json:"socks_proxy_url"` - HttpProxyUrl string `json:"http_proxy_url"` - HttpsProxyUrl string `json:"https_proxy_url"` - ApiBaseUrl string `json:"api_base_url"` - AuthToken string `json:"auth_token"` - InstanceId server.Id `json:"instance_id"` - ProxyHost string `json:"proxy_host"` - HttpProxyPort int `json:"http_proxy_port"` - HttpsProxyPort int `json:"https_proxy_port"` - SocksProxyPort int `json:"socks_proxy_port"` - ApiPort int `json:"api_port"` + KeepaliveSeconds int `json:"keepalive_seconds"` + ProxyClient } type ProxyAuthResult struct { @@ -282,59 +273,27 @@ func AuthNetworkClient( } err := CreateProxyDeviceConfig(session.Ctx, proxyDeviceConfig) if err == nil { - signedProxyId := SignProxyId(proxyDeviceConfig.ProxyId) - // FIXME pull the current avaiable far edges and use least recently used with a threshold - socksProxyPort := 8080 - httpProxyPort := 8081 - httpsProxyPort := 8082 - apiPort := 8083 - - proxyHost := fmt.Sprintf("%s.%s", "cosmic", server.RequireDomain()) - - socksProxyUrl := fmt.Sprintf("socks5h://%s:%d", proxyHost, socksProxyPort) - - httpProxyUrl := fmt.Sprintf( - "http://%s:%d", - proxyHost, - httpProxyPort, - ) - - var httpsProxyUrl string - if authClient.ProxyConfig.HttpsRequireAuth { - // use the encoded proxy id for the url, since the signed proxy id will be passed in auth - httpsProxyUrl = fmt.Sprintf( - "https://%s:%d", - proxyHost, - httpsProxyPort, - ) - } else { - httpsProxyUrl = fmt.Sprintf( - "https://%s.%s:%d", - strings.ToLower(signedProxyId), - proxyHost, - httpsProxyPort, - ) + opts := CreateProxyClientOptions{ + HttpsRequireAuth: authClient.ProxyConfig.HttpsRequireAuth, + EnableWg: authClient.ProxyConfig.EnableWg, } - - apiBaseUrl := fmt.Sprintf( - "https://api.%s:%d", - proxyHost, - apiPort, + proxyClient, err := CreateProxyClient( + session.Ctx, + proxyDeviceConfig.ProxyId, + proxyDeviceConfig.ClientId, + proxyDeviceConfig.InstanceId, + opts, ) - authClientResult.ProxyConfigResult = &ProxyConfigResult{ - SocksProxyUrl: socksProxyUrl, - HttpProxyUrl: httpProxyUrl, - HttpsProxyUrl: httpsProxyUrl, - ApiBaseUrl: apiBaseUrl, - AuthToken: strings.ToLower(signedProxyId), - InstanceId: proxyDeviceConfig.InstanceId, - ProxyHost: proxyHost, - SocksProxyPort: socksProxyPort, - HttpProxyPort: httpProxyPort, - HttpsProxyPort: httpsProxyPort, - ApiPort: apiPort, + if err == nil { + authClientResult.ProxyConfigResult = &ProxyConfigResult{ + ProxyClient: *proxyClient, + } + } else { + authClientResult.Error = &AuthNetworkClientError{ + Message: "Could not create proxy client", + } } } else { authClientResult.Error = &AuthNetworkClientError{ diff --git a/model/network_client_proxy_model.go b/model/network_client_proxy_model.go index ce1896c5..af6417e3 100644 --- a/model/network_client_proxy_model.go +++ b/model/network_client_proxy_model.go @@ -5,16 +5,22 @@ import ( "crypto/hmac" "crypto/sha1" "encoding/base32" + "encoding/binary" "encoding/json" + "errors" "fmt" + mathrand "math/rand" "net/netip" "slices" "strings" "sync" - // "github.com/urnetwork/glog" + "gopkg.in/yaml.v3" + + "github.com/urnetwork/glog" "github.com/urnetwork/connect" + "github.com/urnetwork/proxy" "github.com/urnetwork/sdk" "github.com/urnetwork/server" ) @@ -38,6 +44,30 @@ var proxySigningSecret = sync.OnceValue(func() []byte { return proxySigningSecrets()[0] }) +type ServerProxyConfig struct { + Hosts []string `yaml:"hosts"` + Blocks []string `yaml:"blocks"` + // block -> service -> port + Ports map[string]map[string]int `yaml:"ports"` + Secrets []string `yaml:"secrets"` + Wg ServerProxyConfigWg `yaml:"wg"` +} + +type ServerProxyConfigWg struct { + PublicKey string `yaml:"public_key"` + PrivateKey string `yaml:"private_key"` +} + +var LoadServerProxyConfig = sync.OnceValue(func() ServerProxyConfig { + proxyConfigBytes := server.Vault.RequireBytes("proxy.yml") + var proxyConfig ServerProxyConfig + err := yaml.Unmarshal(proxyConfigBytes, &proxyConfig) + if err != nil { + panic(err) + } + return proxyConfig +}) + // the signed proxy id is intended to use in the proxy hostname, // to make it hard to guess a proxy id (160 bits of entropy), // and to allow the server to fast reject a request @@ -405,3 +435,493 @@ func GetConnectLocationForCountryCode(ctx context.Context, countryCode string) * CountryLocationId: server.ToSdkId(c.LocationId), } } + +type ProxyClient struct { + ChangeId int64 `json:"change_id,omitempty"` + ProxyId server.Id `json:"proxy_id"` + ClientId server.Id `json:"client_id"` + InstanceId server.Id `json:"instance_id"` + SocksProxyUrl string `json:"socks_proxy_url"` + HttpProxyUrl string `json:"http_proxy_url"` + HttpsProxyUrl string `json:"https_proxy_url"` + ApiBaseUrl string `json:"api_base_url"` + AuthToken string `json:"auth_token"` + ProxyHost string `json:"proxy_host"` + Block string `json:"block"` + HttpProxyPort int `json:"http_proxy_port"` + HttpsProxyPort int `json:"https_proxy_port"` + SocksProxyPort int `json:"socks_proxy_port"` + ApiPort int `json:"api_port"` + + WgConfig *WgConfig `json:"wg_config"` +} + +type WgConfig struct { + WgProxyPort int `json:"wg_proxy_port"` + ClientPrivateKey string `json:"client_private_key"` + ClientPublicKey string `json:"client_public_key"` + ProxyPublicKey string `json:"proxy_public_key"` + ClientIpv4 netip.Addr `json:"client_ipv4"` + Config string `json:"config"` +} + +type CreateProxyClientOptions struct { + HttpsRequireAuth bool + EnableWg bool +} + +func CreateProxyClient( + ctx context.Context, + proxyId server.Id, + clientId server.Id, + instanceId server.Id, + opts CreateProxyClientOptions, +) ( + proxyClient *ProxyClient, + returnErr error, +) { + proxyConfig := LoadServerProxyConfig() + signedProxyId := SignProxyId(proxyId) + + server.Tx(ctx, func(tx server.PgTx) { + + // FIXME + // randomly choose host + // randomly choose block + // FIXME pull the current avaiable far edges and use least recently used with a threshold + socksProxyPort := 8080 + httpProxyPort := 8081 + httpsProxyPort := 8082 + apiPort := 8083 + wgPort := 8084 + + proxyHost := fmt.Sprintf("%s.%s", "cosmic", server.RequireDomain()) + block := "" + + socksProxyUrl := fmt.Sprintf("socks5h://%s:%d", proxyHost, socksProxyPort) + + httpProxyUrl := fmt.Sprintf( + "http://%s:%d", + proxyHost, + httpProxyPort, + ) + + var httpsProxyUrl string + if opts.HttpsRequireAuth { + // use the encoded proxy id for the url, since the signed proxy id will be passed in auth + httpsProxyUrl = fmt.Sprintf( + "https://%s:%d", + proxyHost, + httpsProxyPort, + ) + } else { + httpsProxyUrl = fmt.Sprintf( + "https://%s.%s:%d", + strings.ToLower(signedProxyId), + proxyHost, + httpsProxyPort, + ) + } + + apiBaseUrl := fmt.Sprintf( + "https://api.%s:%d", + proxyHost, + apiPort, + ) + + proxyClient = &ProxyClient{ + ProxyId: proxyId, + ClientId: clientId, + InstanceId: instanceId, + SocksProxyUrl: socksProxyUrl, + HttpProxyUrl: httpProxyUrl, + HttpsProxyUrl: httpsProxyUrl, + ApiBaseUrl: apiBaseUrl, + AuthToken: signedProxyId, + ProxyHost: proxyHost, + Block: block, + HttpProxyPort: httpProxyPort, + HttpsProxyPort: httpsProxyPort, + SocksProxyPort: socksProxyPort, + ApiPort: apiPort, + } + + if opts.EnableWg { + + var clientIpv4 int64 + + result, err := tx.Query( + ctx, + ` + SELECT + proxy_client_ipv4.client_ipv4 + FROM proxy_client_ipv4 + LEFT JOIN proxy_client ON + proxy_client.proxy_host = $1 AND + proxy_client.block = $2 AND + proxy_client.client_ipv4 = proxy_client_ipv4.client_ipv4 + WHERE + $3 <= proxy_client_ipv4.sequence_id AND + proxy_client.client_ipv4 IS NULL + ORDER BY proxy_client_ipv4.sequence_id + LIMIT 1 + `, + proxyHost, + block, + mathrand.Intn((31*ProxyClientIpv4Count)/32), + ) + server.WithPgResult(result, err, func() { + if result.Next() { + server.Raise(result.Scan(&clientIpv4)) + } else { + panic(&server.PgRetry{}) + } + }) + + clientPrivateKey, clientPublicKey, err := proxy.WgGenKeyPairStrings() + if err != nil { + returnErr = err + return + } + + proxyPublicKey := proxyConfig.Wg.PublicKey + + clientAddr := IntToIpv4(clientIpv4) + + config := fmt.Sprintf(` + [Interface] + PrivateKey = %s + Address = %s/32 + DNS = 1.1.1.1 + + [Peer] + PublicKey = %s + Endpoint = %s + AllowedIPs = 0.0.0.0/0 + PresharedKey = %s + `, + clientPrivateKey, + clientAddr, + proxyPublicKey, + fmt.Sprintf("%s:%d", proxyHost, wgPort), + signedProxyId, + ) + + proxyClient.WgConfig = &WgConfig{ + WgProxyPort: wgPort, + ClientPrivateKey: clientPrivateKey, + ClientPublicKey: clientPublicKey, + ProxyPublicKey: proxyPublicKey, + ClientIpv4: clientAddr, + Config: config, + } + } + + var clientIpv4 *int64 + var clientPublicKey *string + if proxyClient.WgConfig != nil { + b := Ipv4ToInt(proxyClient.WgConfig.ClientIpv4) + clientIpv4 = &b + clientPublicKey = &proxyClient.WgConfig.ClientPublicKey + } + + proxyClientJson, err := json.Marshal(proxyClient) + if err != nil { + returnErr = err + return + } + + server.RaisePgResult(tx.Exec( + ctx, + ` + INSERT INTO proxy_client ( + proxy_id, + client_id, + instance_id, + proxy_host, + block, + client_ipv4, + client_public_key, + proxy_client_json + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + `, + proxyId, + clientId, + instanceId, + proxyClient.ProxyHost, + proxyClient.Block, + clientIpv4, + clientPublicKey, + proxyClientJson, + )) + + result, err := tx.Query( + ctx, + ` + INSERT INTO proxy_client_change ( + proxy_host, + block, + proxy_id + ) + VALUES ($1, $2, $3) + RETURNING change_id + `, + proxyClient.ProxyHost, + proxyClient.Block, + proxyId, + ) + server.WithPgResult(result, err, func() { + if result.Next() { + server.Raise(result.Scan(&proxyClient.ChangeId)) + } + }) + + }) + + return +} + +func GetProxyClientsSince( + ctx context.Context, + proxyHost string, + block string, + changeId int64, +) ( + proxyClients map[server.Id]*ProxyClient, + maxChangeId int64, + returnErr error, +) { + proxyClients = map[server.Id]*ProxyClient{} + maxChangeId = changeId + server.Db(ctx, func(conn server.PgConn) { + result, err := conn.Query( + ctx, + ` + SELECT + proxy_client_change.change_id, + proxy_client_change.proxy_id, + proxy_client.proxy_client_json + FROM proxy_client_change + INNER JOIN proxy_client ON proxy_client.proxy_id = proxy_client_change.proxy_id + WHERE + proxy_client_change.proxy_host = $1 AND + proxy_client_change.block = $2 AND + $3 <= proxy_client_change.change_id + `, + proxyHost, + block, + changeId, + ) + server.WithPgResult(result, err, func() { + for result.Next() { + var changeId int64 + var proxyId server.Id + var proxyClientJson string + server.Raise(result.Scan(&changeId, &proxyId, &proxyClientJson)) + maxChangeId = max(maxChangeId, changeId) + + var proxyClient ProxyClient + err := json.Unmarshal([]byte(proxyClientJson), &proxyClient) + if err == nil { + proxyClients[proxyId] = &proxyClient + } else { + returnErr = errors.Join(returnErr, err) + } + } + }) + }) + return +} + +func GetProxyIdsSince(ctx context.Context, proxyHost string, block string, changeId int64) (proxyIds []server.Id, maxChangeId int64) { + maxChangeId = changeId + server.Db(ctx, func(conn server.PgConn) { + result, err := conn.Query( + ctx, + ` + SELECT + change_id, + proxy_id + FROM proxy_client_change + WHERE + proxy_host = $1 AND + block = $2 AND + $3 <= change_id + `, + proxyHost, + block, + changeId, + ) + server.WithPgResult(result, err, func() { + for result.Next() { + var changeId int64 + var proxyId server.Id + server.Raise(result.Scan(&changeId, &proxyId)) + proxyIds = append(proxyIds, proxyId) + maxChangeId = max(maxChangeId, changeId) + } + }) + }) + return +} + +func GetProxyClients(ctx context.Context, proxyIds ...server.Id) (proxyClients map[server.Id]*ProxyClient, returnErr error) { + proxyClients = map[server.Id]*ProxyClient{} + + server.Tx(ctx, func(tx server.PgTx) { + server.CreateTempTableInTx(ctx, tx, "temp_proxy_id(proxy_id uuid)", proxyIds...) + + result, err := tx.Query( + ctx, + ` + SELECT + proxy_client.proxy_id, + proxy_client.proxy_client_json + FROM proxy_client + INNER JOIN temp_proxy_id ON temp_proxy_id.proxy_id = proxy_client.proxy_id + `, + ) + server.WithPgResult(result, err, func() { + for result.Next() { + var proxyId server.Id + var proxyClientJson string + server.Raise(result.Scan(&proxyId, &proxyClientJson)) + + var proxyClient ProxyClient + err := json.Unmarshal([]byte(proxyClientJson), &proxyClient) + if err == nil { + proxyClients[proxyId] = &proxyClient + } else { + returnErr = errors.Join(returnErr, err) + } + } + }) + }) + return +} + +func Ipv4ToInt(addr netip.Addr) int64 { + return int64(binary.BigEndian.Uint32(addr.AsSlice())) +} + +func IntToIpv4(ipv4 int64) netip.Addr { + var b [4]byte + binary.BigEndian.PutUint32(b[:], uint32(ipv4)) + return netip.AddrFrom4(b) +} + +// 10m per (host, block) +const ProxyClientIpv4Count = 10_000_000 + +// reset the entire table proxy_client_ipv4 with a randomized list of client ips that avoid popular subnets +// this generates `ProxyClientIpv4Count` ipv4s in a random order +func ResetProxyClientIpv4(ctx context.Context) { + subnets := func(subnetStrs ...string) []netip.Prefix { + var prefixes []netip.Prefix + for _, subnetStr := range subnetStrs { + prefix := netip.MustParsePrefix(subnetStr) + prefixes = append(prefixes, prefix) + } + return prefixes + } + + containsAny := func(prefixes []netip.Prefix, addr netip.Addr) bool { + for _, prefix := range prefixes { + if prefix.Contains(addr) { + return true + } + } + return false + } + + ipv4Subnets := subnets( + "10.0.0.0/8", + "172.16.0.0/12", + "192.168.0.0/16", + ) + + commonIpv4Subnets := subnets( + "192.168.1.0/24", + "192.168.0.0/24", + "10.0.0.0/24", + "192.168.2.0/24", + "192.168.100.0/24", + "192.168.86.0/24", + "192.168.4.0/22", + "192.168.50.0/24", + "192.168.68.0/24", + "192.168.85.0/24", + "192.168.178.0/24", + "192.168.179.0/24", + "192.168.88.0/24", + "192.168.8.0/24", + "192.168.31.0/24", + "10.8.0.0/24", + "10.6.0.0/24", + "100.64.0.0/10", + "172.17.0.0/16", + "10.252.0.0/24", + "172.16.0.0/16", + "192.168.10.0/24", + "192.168.11.0/24", + "192.168.15.0/24", + "192.168.123.0/24", + "192.168.254.0/24", + "10.1.1.0/24", + "10.1.10.0/24", + "10.90.90.0/24", + "192.168.168.0/24", + "192.168.99.0/24", + "192.168.115.0/24", + "10.74.0.0/24", + "172.28.0.0/24", + "192.168.201.0/24", + ) + + var addrs []netip.Addr + for _, prefix := range ipv4Subnets { + for addr := range connect.AddrsInPrefix(prefix) { + if !containsAny(commonIpv4Subnets, addr) { + addrs = append(addrs, addr) + } + } + } + + mathrand.Shuffle(len(addrs), func(i int, j int) { + addrs[i], addrs[j] = addrs[j], addrs[i] + }) + + if len(addrs) < ProxyClientIpv4Count { + panic(fmt.Errorf("must have at least %d ipv4 addresses (found %d)", ProxyClientIpv4Count, len(addrs))) + } + + addrs = addrs[:ProxyClientIpv4Count] + + server.Tx(ctx, func(tx server.PgTx) { + server.BatchInTx(ctx, tx, func(batch server.PgBatch) { + batch.Queue( + "DELETE FROM proxy_client_ipv4", + ) + + for i, addr := range addrs { + if (i+1)%10000 == 0 { + glog.Infof("[reset][%d/%d]%.2f%% queued\n", i+1, len(addrs), (100.0*float64(i+1))/float64(len(addrs))) + } + batch.Queue( + ` + INSERT INTO proxy_client_ipv4 ( + sequence_id, + client_ipv4 + ) + VALUES ($1, $2) + `, + i, + Ipv4ToInt(addr), + ) + } + }) + }) + + glog.Infof("[reset][%d/%d]done\n", len(addrs), len(addrs)) +} diff --git a/model/network_client_proxy_model_test.go b/model/network_client_proxy_model_test.go index c5e7f620..56920dbf 100644 --- a/model/network_client_proxy_model_test.go +++ b/model/network_client_proxy_model_test.go @@ -1,12 +1,14 @@ package model import ( + "context" mathrand "math/rand" "strings" "testing" "github.com/go-playground/assert/v2" + "github.com/urnetwork/glog" "github.com/urnetwork/server" ) @@ -53,21 +55,33 @@ func TestSignProxyIdHosts(t *testing.T) { } -// FIXME -/* -func TestProxyDeviceConfig() { - - // create config - - // load config for proxy id - // load config for client - - // load connection for proxy id - // load conenction for client - - CreateProxyDeviceConfig(&ProxyDeviceConfig{ - ClientId: clientId, +func TestCreateProxyClient(t *testing.T) { + server.DefaultTestEnv().Run(func() { + ctx := context.Background() + + ResetProxyClientIpv4(ctx) + + // create n proxy clients + n := 1024 + for i := range n { + proxyDeviceConfig := &ProxyDeviceConfig{} + proxyDeviceConfig.ClientId = server.NewId() + err := CreateProxyDeviceConfig(ctx, proxyDeviceConfig) + assert.Equal(t, err, nil) + + proxyClient, err := CreateProxyClient( + ctx, + proxyDeviceConfig.ProxyId, + proxyDeviceConfig.ClientId, + proxyDeviceConfig.InstanceId, + CreateProxyClientOptions{ + EnableWg: true, + }, + ) + assert.Equal(t, err, nil) + assert.NotEqual(t, proxyClient, nil) + + glog.Infof("[ncpm][%d/%d]ip=%s\n", i+1, n, proxyClient.WgConfig.ClientIpv4) + } }) - } -*/ diff --git a/proxy/main.go b/proxy/main.go index 110fbbe3..71e9abca 100644 --- a/proxy/main.go +++ b/proxy/main.go @@ -2,17 +2,18 @@ package main import ( "context" - "crypto/tls" + // "crypto/tls" "encoding/base64" - "encoding/json" + // "encoding/json" "fmt" - "io" + // "io" "net" "net/http" "net/netip" "os" - "strconv" + // "strconv" "strings" + // "sync" "syscall" "time" @@ -24,7 +25,7 @@ import ( "github.com/urnetwork/proxy" "github.com/urnetwork/server" "github.com/urnetwork/server/model" - "github.com/urnetwork/server/router" + // "github.com/urnetwork/server/router" ) // FIXME this is meant to be deployed with no lb and no containers @@ -44,6 +45,7 @@ const ListenSocksPort = 8080 const ListenHttpPort = 8081 const ListenHttpsPort = 8082 const ListenApiPort = 8083 +const ListenWgPort = 8084 func DefaultProxySettings() *ProxySettings { return &ProxySettings{ @@ -59,6 +61,7 @@ type ProxySettings struct { ProxyWriteTimeout time.Duration ProxyIdleTimeout time.Duration ProxyTlsHandshakeTimeout time.Duration + NotificationTimeout time.Duration } func main() { @@ -121,8 +124,26 @@ func main() { settings, ) + wg := newWgServer( + ctx, + cancel, + proxyDeviceManager, + settings, + ) + newWatchdog(ctx, 5*time.Second) + notif := newProxyClientNotification(ctx, settings) + sub := notif.AddProxyClientsCallback(func(proxyClients []*model.ProxyClient) { + for _, proxyClient := range proxyClients { + // warm up the device + proxyDeviceManager.OpenProxyDevice(proxyClient.ProxyId) + } + + wg.AddProxyClients(proxyClients) + }) + defer sub() + select { case <-ctx.Done(): } @@ -325,27 +346,32 @@ func (self *httpServer) run() { } } -type apiServer struct { +type wgServer struct { ctx context.Context cancel context.CancelFunc proxyDeviceManager *ProxyDeviceManager - transportTls *server.TransportTls settings *ProxySettings + wgProxy *proxy.WgProxy } -func newApiServer( +func newWgServer( ctx context.Context, cancel context.CancelFunc, proxyDeviceManager *ProxyDeviceManager, - transportTls *server.TransportTls, settings *ProxySettings, -) *apiServer { - s := &apiServer{ +) *wgServer { + serverConfig := model.LoadServerProxyConfig() + + wgProxySettings := proxy.DefaultWgProxySettings() + wgProxySettings.PrivateKey = serverConfig.Wg.PrivateKey + wgProxy := proxy.NewWgProxy(ctx, wgProxySettings) + + s := &wgServer{ ctx: ctx, cancel: cancel, proxyDeviceManager: proxyDeviceManager, - transportTls: transportTls, settings: settings, + wgProxy: wgProxy, } go server.HandleError(s.run, cancel) @@ -353,87 +379,38 @@ func newApiServer( return s } -func (self *apiServer) run() { +func (self *wgServer) run() { defer self.cancel() - routes := []*router.Route{ - router.NewRoute("POST", "/warmup", self.HandleWarmup), - } - - reusePort := false - - httpServerOptions := server.HttpServerOptions{ - ReadTimeout: 15 * time.Second, - WriteTimeout: 30 * time.Second, - IdleTimeout: 5 * time.Minute, - } - - tlsConfig := &tls.Config{ - GetConfigForClient: self.transportTls.GetTlsConfigForClient, - } - - err := server.HttpListenAndServeTlsWithReusePort( - self.ctx, - net.JoinHostPort("", strconv.Itoa(ListenApiPort)), - router.NewRouter(self.ctx, routes), - reusePort, - httpServerOptions, - tlsConfig, - ) + err := self.wgProxy.ListenAndServe("udp", fmt.Sprintf(":%d", ListenWgPort)) if err != nil { panic(err) } } -type WarmupRequest struct { - TimeoutSeconds int `json:"timeout_seconds,omitempty"` -} - -type WarmupResponse struct { - Ready bool `json:"ready"` -} - -func (self *apiServer) HandleWarmup(w http.ResponseWriter, r *http.Request) { - authHeader := r.Header.Get("Authorization") - proxyId, err := authHeaderProxyId(authHeader) - if err != nil { - http.Error(w, err.Error(), http.StatusUnauthorized) - return - } - - var warmupRequest WarmupRequest - - defer r.Body.Close() - bodyBytes, err := io.ReadAll(r.Body) - - if 0 < len(bodyBytes) { - err = json.Unmarshal(bodyBytes, &warmupRequest) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return +func (self *wgServer) AddProxyClients(proxyClients []*model.ProxyClient) { + serverConfig := model.LoadServerProxyConfig() + + var clients map[netip.Addr]*proxy.WgClient + for _, proxyClient := range proxyClients { + if proxyClient.WgConfig != nil && proxyClient.WgConfig.ProxyPublicKey == serverConfig.Wg.PublicKey { + // verify that the access token is still valid + proxyId, err := model.ParseSignedProxyId(proxyClient.AuthToken) + if err == nil && proxyId == proxyClient.ProxyId { + proxyDevice, err := self.proxyDeviceManager.OpenProxyDevice(proxyClient.ProxyId) + if err == nil { + client := &proxy.WgClient{ + PublicKey: proxyClient.WgConfig.ClientPublicKey, + PresharedKey: proxyClient.AuthToken, + ClientIpv4: proxyClient.WgConfig.ClientIpv4, + Tun: proxyDevice.Tun(), + } + clients[proxyClient.WgConfig.ClientIpv4] = client + } + } } } - // else use the default object - - proxyDevice, err := self.proxyDeviceManager.OpenProxyDevice(proxyId) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - timeout := time.Duration(warmupRequest.TimeoutSeconds) * time.Second - ready := proxyDevice.WaitForReady(r.Context(), timeout) - - warmupResponse := &WarmupResponse{ - Ready: ready, - } - - out, err := json.Marshal(warmupResponse) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - w.Write(out) + self.wgProxy.AddClients(clients) } func authHeaderProxyId(authHeader string) (server.Id, error) { diff --git a/proxy/proxy_api.go b/proxy/proxy_api.go new file mode 100644 index 00000000..da969c5e --- /dev/null +++ b/proxy/proxy_api.go @@ -0,0 +1,128 @@ +package main + +import ( + "context" + "crypto/tls" + "encoding/json" + // "fmt" + "io" + "net" + "net/http" + "strconv" + "time" + + "github.com/urnetwork/server" + // "github.com/urnetwork/server/model" + "github.com/urnetwork/server/router" +) + +type apiServer struct { + ctx context.Context + cancel context.CancelFunc + proxyDeviceManager *ProxyDeviceManager + transportTls *server.TransportTls + settings *ProxySettings +} + +func newApiServer( + ctx context.Context, + cancel context.CancelFunc, + proxyDeviceManager *ProxyDeviceManager, + transportTls *server.TransportTls, + settings *ProxySettings, +) *apiServer { + s := &apiServer{ + ctx: ctx, + cancel: cancel, + proxyDeviceManager: proxyDeviceManager, + transportTls: transportTls, + settings: settings, + } + + go server.HandleError(s.run, cancel) + + return s +} + +func (self *apiServer) run() { + defer self.cancel() + + routes := []*router.Route{ + router.NewRoute("POST", "/warmup", self.HandleWarmup), + } + + reusePort := false + + httpServerOptions := server.HttpServerOptions{ + ReadTimeout: 15 * time.Second, + WriteTimeout: 30 * time.Second, + IdleTimeout: 5 * time.Minute, + } + + tlsConfig := &tls.Config{ + GetConfigForClient: self.transportTls.GetTlsConfigForClient, + } + + err := server.HttpListenAndServeTlsWithReusePort( + self.ctx, + net.JoinHostPort("", strconv.Itoa(ListenApiPort)), + router.NewRouter(self.ctx, routes), + reusePort, + httpServerOptions, + tlsConfig, + ) + if err != nil { + panic(err) + } +} + +type WarmupRequest struct { + TimeoutSeconds int `json:"timeout_seconds,omitempty"` +} + +type WarmupResponse struct { + Ready bool `json:"ready"` +} + +func (self *apiServer) HandleWarmup(w http.ResponseWriter, r *http.Request) { + authHeader := r.Header.Get("Authorization") + proxyId, err := authHeaderProxyId(authHeader) + if err != nil { + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } + + var warmupRequest WarmupRequest + + defer r.Body.Close() + bodyBytes, err := io.ReadAll(r.Body) + + if 0 < len(bodyBytes) { + err = json.Unmarshal(bodyBytes, &warmupRequest) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + // else use the default object + + proxyDevice, err := self.proxyDeviceManager.OpenProxyDevice(proxyId) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + timeout := time.Duration(warmupRequest.TimeoutSeconds) * time.Second + ready := proxyDevice.WaitForReady(r.Context(), timeout) + + warmupResponse := &WarmupResponse{ + Ready: ready, + } + + out, err := json.Marshal(warmupResponse) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Write(out) +} diff --git a/proxy/proxy_client_notification.go b/proxy/proxy_client_notification.go new file mode 100644 index 00000000..36464c53 --- /dev/null +++ b/proxy/proxy_client_notification.go @@ -0,0 +1,72 @@ +package main + +import ( + "context" + "time" + + "golang.org/x/exp/maps" + + "github.com/urnetwork/connect" + "github.com/urnetwork/glog" + // "github.com/urnetwork/proxy" + "github.com/urnetwork/server" + "github.com/urnetwork/server/model" + // "github.com/urnetwork/server/router" +) + +type ProxyClientsFunction = func(proxyClients []*model.ProxyClient) + +type proxyClientNotification struct { + ctx context.Context + + settings *ProxySettings + + proxyClientsCallbacks *connect.CallbackList[ProxyClientsFunction] +} + +func newProxyClientNotification(ctx context.Context, settings *ProxySettings) *proxyClientNotification { + return &proxyClientNotification{ + ctx: ctx, + settings: settings, + proxyClientsCallbacks: connect.NewCallbackList[ProxyClientsFunction](), + } +} + +func (self *proxyClientNotification) run() { + nextChangeId := int64(0) + for { + var proxyClients map[server.Id]*model.ProxyClient + var err error + proxyClients, nextChangeId, err = model.GetProxyClientsSince( + self.ctx, + server.RequireHost(), + server.RequireBlock(), + nextChangeId, + ) + if err != nil { + glog.Infof("[pcn]err=%s\n", err) + } else if 0 < len(proxyClients) { + self.proxyClients(maps.Values(proxyClients)) + } + select { + case <-self.ctx.Done(): + return + case <-time.After(self.settings.NotificationTimeout): + } + } +} + +func (self *proxyClientNotification) proxyClients(proxyClients []*model.ProxyClient) { + for _, proxyClientsCallback := range self.proxyClientsCallbacks.Get() { + server.HandleError(func() { + proxyClientsCallback(proxyClients) + }) + } +} + +func (self *proxyClientNotification) AddProxyClientsCallback(proxyClientsCallback ProxyClientsFunction) func() { + callbackId := self.proxyClientsCallbacks.Add(proxyClientsCallback) + return func() { + self.proxyClientsCallbacks.Remove(callbackId) + } +} diff --git a/proxy/proxy_device.go b/proxy/proxy_device.go index 7d296a13..64e1b61c 100644 --- a/proxy/proxy_device.go +++ b/proxy/proxy_device.go @@ -162,7 +162,7 @@ type ProxyDevice struct { proxyDeviceConfig *model.ProxyDeviceConfig deviceLocal *sdk.DeviceLocal - tnet *proxy.Net + tun *proxy.Tun settings *ProxyDeviceSettings stateLock sync.Mutex @@ -218,9 +218,12 @@ func NewProxyDevice( dnsResolverSettings = initialDeviceState.DnsResolverSettings } - tnet, err := proxy.CreateNetTunWithResolver( + tunSettings := proxy.DefaultTunSettings() + tunSettings.Mtu = settings.Mtu + + tun, err := proxy.CreateTunWithResolver( cancelCtx, - settings.Mtu, + tunSettings, dnsResolverSettings, ) if err != nil { @@ -232,7 +235,7 @@ func NewProxyDevice( cancel: cancel, proxyDeviceConfig: proxyDeviceConfig, deviceLocal: deviceLocal, - tnet: tnet, + tun: tun, settings: settings, lastActivityTime: time.Now(), } @@ -242,7 +245,7 @@ func NewProxyDevice( return proxyDevice, nil } -// directly copy between tnet and device +// directly copy between tun and device func (self *ProxyDevice) Run() { defer self.cancel() @@ -252,7 +255,7 @@ func (self *ProxyDevice) Run() { if !self.UpdateActivity() { return } - self.tnet.Write(packet) + self.tun.Write(packet) } sub := self.deviceLocal.AddReceivePacketCallback(receiveCallback) defer sub() @@ -261,7 +264,7 @@ func (self *ProxyDevice) Run() { if !self.UpdateActivity() { return } - packet, err := self.tnet.Read() + packet, err := self.tun.Read() if err != nil { return } @@ -272,8 +275,8 @@ func (self *ProxyDevice) Run() { } } -func (self *ProxyDevice) Tun() *proxy.Net { - return self.tnet +func (self *ProxyDevice) Tun() *proxy.Tun { + return self.tun } func (self *ProxyDevice) WaitForReady(ctx context.Context, timeout time.Duration) bool { @@ -368,5 +371,5 @@ func (self *ProxyDevice) Close() { self.cancel() self.deviceLocal.Close() - self.tnet.Close() + self.tun.Close() }