From e2a2ea7febb7e49a724448982c6b12e5397b0229 Mon Sep 17 00:00:00 2001 From: AllyW Date: Tue, 18 Nov 2025 17:50:58 +0800 Subject: [PATCH 1/7] base connected --- go.mod | 9 +- go.sum | 31 +- main/main.go | 2 + websocketTest/main.go | 809 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 821 insertions(+), 30 deletions(-) create mode 100644 websocketTest/main.go diff --git a/go.mod b/go.mod index 183173a74..5ae86f561 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ toolchain go1.23.8 require ( github.com/alibabacloud-go/alibabacloud-gateway-sls v0.3.0 github.com/alibabacloud-go/darabonba-openapi/v2 v2.1.12 - github.com/alibabacloud-go/tea v1.3.12 + github.com/alibabacloud-go/tea v1.3.13 github.com/alibabacloud-go/tea-utils/v2 v2.0.7 github.com/aliyun/alibaba-cloud-sdk-go v1.63.107 github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible @@ -38,6 +38,7 @@ require ( github.com/clbanning/mxj/v2 v2.7.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/snappy v1.0.0 // indirect + github.com/gorilla/websocket v1.5.3 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.8 // indirect github.com/kr/pretty v0.3.1 // indirect @@ -56,3 +57,9 @@ require ( golang.org/x/time v0.11.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/alibabacloud-go/darabonba-openapi/v2 => ../darabonba-openapi/golang + +replace github.com/alibabacloud-go/tea => ../tea + +replace github.com/alibabacloud-go/tea-utils/v2 => ../tea-utils diff --git a/go.sum b/go.sum index 0447603bb..4299166bc 100644 --- a/go.sum +++ b/go.sum @@ -19,8 +19,6 @@ github.com/alibabacloud-go/darabonba-encode-util v0.0.2 h1:1uJGrbsGEVqWcWxrS9MyC github.com/alibabacloud-go/darabonba-encode-util v0.0.2/go.mod h1:JiW9higWHYXm7F4PKuMgEUETNZasrDM6vqVr/Can7H8= github.com/alibabacloud-go/darabonba-map v0.0.2 h1:qvPnGB4+dJbJIxOOfawxzF3hzMnIpjmafa0qOTp6udc= github.com/alibabacloud-go/darabonba-map v0.0.2/go.mod h1:28AJaX8FOE/ym8OUFWga+MtEzBunJwQGceGQlvaPGPc= -github.com/alibabacloud-go/darabonba-openapi/v2 v2.1.12 h1:e2yCrhtWd6Qcsy4he2OL+jIAU+93Lx9OcLlPRoFLT1w= -github.com/alibabacloud-go/darabonba-openapi/v2 v2.1.12/go.mod h1:f2wDpbM7hK9SvLIH09zSKVU1TsyemUNOqErMscMMl7c= github.com/alibabacloud-go/darabonba-signature-util v0.0.7 h1:UzCnKvsjPFzApvODDNEYqBHMFt1w98wC7FOo0InLyxg= github.com/alibabacloud-go/darabonba-signature-util v0.0.7/go.mod h1:oUzCYV2fcCH797xKdL6BDH8ADIHlzrtKVjeRtunBNTQ= github.com/alibabacloud-go/darabonba-string v1.0.2 h1:E714wms5ibdzCqGeYJ9JCFywE5nDyvIXIIQbZVFkkqo= @@ -33,23 +31,8 @@ github.com/alibabacloud-go/endpoint-util v1.1.0 h1:r/4D3VSw888XGaeNpP994zDUaxdgT github.com/alibabacloud-go/endpoint-util v1.1.0/go.mod h1:O5FuCALmCKs2Ff7JFJMudHs0I5EBgecXXxZRyswlEjE= github.com/alibabacloud-go/openapi-util v0.1.0 h1:0z75cIULkDrdEhkLWgi9tnLe+KhAFE/r5Pb3312/eAY= github.com/alibabacloud-go/openapi-util v0.1.0/go.mod h1:sQuElr4ywwFRlCCberQwKRFhRzIyG4QTP/P4y1CJ6Ws= -github.com/alibabacloud-go/tea v1.1.0/go.mod h1:IkGyUSX4Ba1V+k4pCtJUc6jDpZLFph9QMy2VUPTwukg= -github.com/alibabacloud-go/tea v1.1.7/go.mod h1:/tmnEaQMyb4Ky1/5D+SE1BAsa5zj/KeGOFfwYm3N/p4= -github.com/alibabacloud-go/tea v1.1.8/go.mod h1:/tmnEaQMyb4Ky1/5D+SE1BAsa5zj/KeGOFfwYm3N/p4= -github.com/alibabacloud-go/tea v1.1.11/go.mod h1:/tmnEaQMyb4Ky1/5D+SE1BAsa5zj/KeGOFfwYm3N/p4= -github.com/alibabacloud-go/tea v1.1.17/go.mod h1:nXxjm6CIFkBhwW4FQkNrolwbfon8Svy6cujmKFUq98A= -github.com/alibabacloud-go/tea v1.1.20/go.mod h1:nXxjm6CIFkBhwW4FQkNrolwbfon8Svy6cujmKFUq98A= -github.com/alibabacloud-go/tea v1.2.2/go.mod h1:CF3vOzEMAG+bR4WOql8gc2G9H3EkH3ZLAQdpmpXMgwk= -github.com/alibabacloud-go/tea v1.2.3-0.20240605082020-e6e537a31150/go.mod h1:SP/4ugxOFdctgZvPRC0Anqbq1Q1VmuPXoUVyncO5azs= -github.com/alibabacloud-go/tea v1.3.12 h1:ir2Io80UlBy1JHf7t+uCTxmaGQtiEta1WpV29NGJTkE= -github.com/alibabacloud-go/tea v1.3.12/go.mod h1:A560v/JTQ1n5zklt2BEpurJzZTI8TUT+Psg2drWlxRg= github.com/alibabacloud-go/tea-utils v1.3.1 h1:iWQeRzRheqCMuiF3+XkfybB3kTgUXkXX+JMrqfLeB2I= github.com/alibabacloud-go/tea-utils v1.3.1/go.mod h1:EI/o33aBfj3hETm4RLiAxF/ThQdSngxrpF8rKUDJjPE= -github.com/alibabacloud-go/tea-utils/v2 v2.0.1/go.mod h1:U5MTY10WwlquGPS34DOeomUGBB0gXbLueiq5Trwu0C4= -github.com/alibabacloud-go/tea-utils/v2 v2.0.5/go.mod h1:dL6vbUT35E4F4bFTHL845eUloqaerYBYPsdWR2/jhe4= -github.com/alibabacloud-go/tea-utils/v2 v2.0.6/go.mod h1:qxn986l+q33J5VkialKMqT/TTs3E+U9MJpd001iWQ9I= -github.com/alibabacloud-go/tea-utils/v2 v2.0.7 h1:WDx5qW3Xa5ZgJ1c8NfqJkF6w+AU5wB8835UdhPr6Ax0= -github.com/alibabacloud-go/tea-utils/v2 v2.0.7/go.mod h1:qxn986l+q33J5VkialKMqT/TTs3E+U9MJpd001iWQ9I= github.com/aliyun/alibaba-cloud-sdk-go v1.63.107 h1:qagvUyrgOnBIlVRQWOyCZGVKUIYbMBdGdJ104vBpRFU= github.com/aliyun/alibaba-cloud-sdk-go v1.63.107/go.mod h1:SOSDHfe1kX91v3W5QiBsWSLqeLxImobbMX1mxrFHsVQ= github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible h1:8psS8a+wKfiLt1iVDX79F7Y6wUM49Lcha2FMXt4UM8g= @@ -113,12 +96,13 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= -github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= @@ -137,8 +121,6 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= -github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= -github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= @@ -202,8 +184,6 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= -golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= -golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= @@ -249,10 +229,7 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= -golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= -golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= @@ -289,8 +266,6 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= @@ -303,8 +278,6 @@ golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuX golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU= -golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= -golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= diff --git a/main/main.go b/main/main.go index afc174703..84cc1878a 100644 --- a/main/main.go +++ b/main/main.go @@ -30,6 +30,7 @@ import ( "github.com/aliyun/aliyun-cli/v3/oss/lib" "github.com/aliyun/aliyun-cli/v3/ossutil" "github.com/aliyun/aliyun-cli/v3/otsutil" + websockettest "github.com/aliyun/aliyun-cli/v3/websocketTest" ) func Main(args []string) { @@ -82,6 +83,7 @@ func Main(args []string) { rootCmd.AddSubCommand(ossutil.NewOssutilCommand()) // tablestore command rootCmd.AddSubCommand(otsutil.NewOtsutilCommand()) + rootCmd.AddSubCommand(websockettest.NewWebsocketTestCommand()) if os.Getenv("GENERATE_METADATA") == "YES" { generateMetadata(rootCmd) } else { diff --git a/websocketTest/main.go b/websocketTest/main.go new file mode 100644 index 000000000..0bdf648c7 --- /dev/null +++ b/websocketTest/main.go @@ -0,0 +1,809 @@ +package websockettest + +import ( + "encoding/json" + "fmt" + "log" + "sync" + "time" + + "github.com/aliyun/aliyun-cli/v3/cli" + "github.com/aliyun/aliyun-cli/v3/config" + "github.com/aliyun/aliyun-cli/v3/i18n" + + openapiClient "github.com/alibabacloud-go/darabonba-openapi/v2/client" + openapiutil "github.com/alibabacloud-go/darabonba-openapi/v2/utils" + dara "github.com/alibabacloud-go/tea/dara" + "github.com/alibabacloud-go/tea/tea" +) + +func NewWebsocketTestCommand() *cli.Command { + return &cli.Command{ + Name: "websocketTest", + Short: i18n.T("Websocket Test", "WebSocket测试"), + Usage: "aliyun websocketTest", + Hidden: false, + Run: func(ctx *cli.Context, args []string) error { + return runWebsocketTest(ctx, args) + }, + } +} + +func runWebsocketTest(ctx *cli.Context, args []string) error { + profile, _ := config.LoadProfileWithContext(ctx) + credential, err := profile.GetCredential(ctx, nil) + if err != nil { + return fmt.Errorf("failed to get credential: %w", err) + } + + conf := &openapiClient.Config{ + Credential: credential, + RegionId: tea.String(profile.RegionId), + Endpoint: tea.String("openapi-mcp.cn-hangzhou.aliyuncs.com"), + } + + client, err := openapiClient.NewClient(conf) + if err != nil { + return err + } + + params := &openapiClient.Params{ + Action: tea.String("ListApiMcpServers"), + Version: tea.String("2024-11-30"), + Protocol: tea.String("HTTPS"), + Method: tea.String("GET"), + AuthType: tea.String("AK"), + Style: tea.String("ROA"), + Pathname: tea.String("/apimcpservers"), + ReqBodyType: tea.String("json"), + BodyType: tea.String("json"), + } + queries := map[string]interface{}{} + queries["id"] = tea.String("Bt4Td5W1tI31YAsu") + + runtime := &dara.RuntimeOptions{} + request := &openapiClient.OpenApiRequest{ + Query: openapiutil.Query(queries), + } + response, err := client.CallApi(params, request, runtime) + if err != nil { + return err + } + // bodyBytes, _ := GetContentFromApiResponse(response) + fmt.Printf("response: %s\n", response["statusCode"]) + + // testAwapWebSocket(ctx, args) + testGeneralWebSocket(ctx, args) + // testSequentialMessageReception(ctx, args) + return nil +} + +func GetContentFromApiResponse(response map[string]any) ([]byte, error) { + responseBody := response["body"] + if responseBody == nil { + return nil, fmt.Errorf("response body is nil") + } + switch v := responseBody.(type) { + case string: + return []byte(v), nil + case map[string]any, []any: + jsonData, _ := json.Marshal(v) + return jsonData, nil + case []byte: + return v, nil + default: + return []byte(fmt.Sprintf("%v", v)), nil + } +} + +func printSessionInfo(session *dara.WebSocketSessionInfo) { + if session == nil { + fmt.Println(" [Session] nil") + return + } + fmt.Printf(" [Session] ID: %s\n", session.SessionID) + if session.RequestID != "" { + fmt.Printf(" [Session] RequestID: %s\n", session.RequestID) + } + fmt.Printf(" [Session] ConnectedAt: %s\n", session.ConnectedAt.Format(time.RFC3339)) + fmt.Printf(" [Session] RemoteAddr: %s\n", session.RemoteAddr) + fmt.Printf(" [Session] LocalAddr: %s\n", session.LocalAddr) + if len(session.Attributes) > 0 { + attrJSON, _ := json.Marshal(session.Attributes) + fmt.Printf(" [Session] Attributes: %s\n", string(attrJSON)) + } +} + +type SimpleHandler struct { + dara.AbstractAwapWebSocketHandler +} + +func (h *SimpleHandler) AfterConnectionEstablished(session *dara.WebSocketSessionInfo) error { + fmt.Println("✓ Connected to WebSocket server") + printSessionInfo(session) + return nil +} + +func (h *SimpleHandler) HandleAwapMessage(session *dara.WebSocketSessionInfo, message *dara.AwapMessage) error { + fmt.Println("📨 Received AWAP message:") + jsonData, _ := json.Marshal(message) + fmt.Printf(" Message: %s\n", string(jsonData)) + return nil +} + +func (h *SimpleHandler) HandleAwapIncomingMessage(session *dara.WebSocketSessionInfo, message *dara.AwapIncomingMessage) error { + fmt.Println("📬 Received AWAP incoming message:") + jsonData, _ := json.Marshal(message) + fmt.Printf(" Incoming Message: %s\n", string(jsonData)) + return nil +} + +func (h *SimpleHandler) HandleRawMessage(session *dara.WebSocketSessionInfo, message *dara.WebSocketMessage) error { + // Parse the AWAP message ourselves and call HandleAwapMessage directly + // This avoids the issue where AbstractAwapWebSocketHandler.HandleRawMessage + // can't access the outer SimpleHandler type + awapMsg, err := dara.ParseAwapMessage(message) + if err != nil { + fmt.Printf("[Simple] Failed to parse AWAP message: %v\n", err) + return err + } + + // Call HandleAwapMessage directly on h (which is *SimpleHandler) + // This will call our overridden implementation + if err := h.HandleAwapMessage(session, awapMsg); err != nil { + return err + } + + // Also call HandleAwapIncomingMessage for event types + if awapMsg.Type == dara.AwapMessageTypeEvent || + awapMsg.Type == dara.AwapMessageTypeUpstreamTextEvent || + awapMsg.Type == dara.AwapMessageTypeUpstreamBinaryEvent || + awapMsg.Type == dara.AwapMessageTypeAckRequiredTextEvent || + awapMsg.Type == dara.AwapMessageTypeMessageReceiveEvent || + awapMsg.Type == dara.AwapMessageTypeDownstreamTextEvent || + awapMsg.Type == dara.AwapMessageTypeDownstreamBinaryEvent { + incoming := &dara.AwapIncomingMessage{ + AwapMessage: *awapMsg, + RawPayload: message.Payload, + } + return h.HandleAwapIncomingMessage(session, incoming) + } + + return nil +} + +func (h *SimpleHandler) SupportsPartialMessages() bool { + return false +} + +func (h *SimpleHandler) HandleError(session *dara.WebSocketSessionInfo, err error) error { + fmt.Printf("❌ Error: %v\n", err) + printSessionInfo(session) + return nil +} + +func (h *SimpleHandler) AfterConnectionClosed(session *dara.WebSocketSessionInfo, code int, reason string) error { + fmt.Printf("✗ Connection closed (code: %d, reason: %s)\n", code, reason) + printSessionInfo(session) + return nil +} + +func testAwapWebSocket(ctx *cli.Context, args []string) error { + fmt.Println("=== WebSocket Example ===") + profile, _ := config.LoadProfileWithContext(ctx) + credential, err := profile.GetCredential(ctx, nil) + if err != nil { + return fmt.Errorf("failed to get credential: %w", err) + } + config := &openapiClient.Config{ + Credential: credential, + Endpoint: dara.String("dalutest-pre.aliyuncs.com"), + Protocol: dara.String("https"), + } + + apiClient, err := openapiClient.NewClient(config) + if err != nil { + return fmt.Errorf("failed to create client: %w", err) + } + + // Setup WebSocket + params := &openapiClient.Params{ + // Action: tea.String("ListApiMcpServers"), + // Version: tea.String("2024-11-30"), + // Protocol: tea.String("HTTPS"), + // Method: tea.String("GET"), + // AuthType: tea.String("AK"), + // Style: tea.String("ROA"), + // Pathname: tea.String("/apimcpservers"), + // ReqBodyType: tea.String("json"), + // BodyType: tea.String("json"), + // Product: dara.String("DaluTestInner"), + Action: tea.String("WebsocketAwapDemoApi"), + Version: tea.String("2022-02-02"), + Protocol: tea.String("https"), + Method: tea.String("GET"), + Pathname: tea.String("/ws/awap-demo-api"), + AuthType: tea.String("AK"), + } + + request := &openapiClient.OpenApiRequest{ + Headers: map[string]*string{ + "Sec-Websocket-Protocol": tea.String("awap"), + }, + } + + runtime := &dara.ExtendedRuntimeOptions{ + RuntimeOptions: dara.RuntimeOptions{ + ReadTimeout: dara.Int(60000), // 60 seconds + ConnectTimeout: dara.Int(30000), // 30 seconds (increased for slow networks) + }, + WebSocketPingInterval: dara.Int(30000), // 30秒心跳 + WebSocketHandshakeTimeout: dara.Int(30000), // 30秒握手超时(增加以应对网络延迟) + WebSocketWriteTimeout: dara.Int(30000), // 30秒写入超时(增加以应对网络延迟) + WebSocketEnableReconnect: dara.Bool(true), // 启用重连 + WebSocketMaxReconnectTimes: dara.Int(5), // 最多重连5次 + } + + fmt.Println("Connecting...") + wsClient, err := apiClient.DoWebSocketRequest(params, request, runtime, &SimpleHandler{}) + if err != nil { + log.Fatalf("Connection failed: %v", err) + } + defer wsClient.Close() + + fmt.Println("\nSending AWAP message...") + + // 方法 1: 使用 SendAwapRequest 发送请求消息(推荐) + err = apiClient.SendAwapRequest(wsClient, "msg-001", 1, map[string]interface{}{ + "action": "test", + "data": "Hello WebSocket!", + }) + if err != nil { + log.Printf("Failed to send AWAP request: %v", err) + } + + // 方法 2: 使用 SendAwapEvent 发送事件消息 + err = apiClient.SendAwapEvent(wsClient, "msg-002", 2, map[string]interface{}{ + "eventType": "userAction", + "message": "Hello WebSocket!", + }) + if err != nil { + log.Printf("Failed to send AWAP event: %v", err) + } + + // 方法 3: 手动构建 AWAP 消息 + awapMsg := dara.BuildAwapRequest("msg-003", 3, map[string]interface{}{ + "message": "Hello WebSocket!", + }) + err = apiClient.SendAwapMessage(wsClient, awapMsg) + if err != nil { + log.Printf("Failed to send AWAP message: %v", err) + } + + // Wait for response + time.Sleep(3 * time.Second) + + fmt.Println("\n=== AWAP Example Complete ===") + return nil +} + +type GeneralHandler struct { + dara.AbstractGeneralWebSocketHandler +} + +func (h *GeneralHandler) AfterConnectionEstablished(session *dara.WebSocketSessionInfo) error { + fmt.Println("✓ CLI General Connected to General WebSocket server") + printSessionInfo(session) + return nil +} + +func (h *GeneralHandler) HandleGeneralTextMessage(session *dara.WebSocketSessionInfo, message *dara.GeneralMessage) error { + fmt.Println("📨 CLI General Received General text message:") + jsonData, _ := json.Marshal(message) + fmt.Printf(" Message: %s\n", string(jsonData)) + return nil +} + +func (h *GeneralHandler) HandleGeneralBinaryMessage(session *dara.WebSocketSessionInfo, data []byte) error { + fmt.Println("📦 CLI General Received General binary message:") + fmt.Printf(" Size: %d bytes\n", len(data)) + fmt.Printf(" Content: %s\n", string(data)) + return nil +} + +func (h *GeneralHandler) HandleGeneralIncomingMessage(session *dara.WebSocketSessionInfo, message *dara.GeneralIncomingMessage) error { + if message.IsBinary { + fmt.Println("📦 CLI General incoming binary message:") + } else { + fmt.Println("📨 CLI General incoming message:") + } + if message.IsBinary { + fmt.Printf(" Size: %d bytes\n", len(message.RawPayload)) + } else { + jsonData, _ := json.Marshal(message.Body) + fmt.Printf(" Body: %s\n", string(jsonData)) + } + return nil +} + +func (h *GeneralHandler) HandleRawMessage(session *dara.WebSocketSessionInfo, message *dara.WebSocketMessage) error { + // Parse and handle General messages directly + // This avoids the issue where AbstractGeneralWebSocketHandler.HandleRawMessage + // can't access the outer GeneralHandler type + if message.Type == dara.WebSocketMessageTypeText { + // Parse as General text message + generalMsg, err := dara.ParseGeneralMessage(message) + if err != nil { + fmt.Printf("[CLI General] Failed to parse General message: %v\n", err) + return err + } + + // Check message type from headers + msgType := generalMsg.Headers["type"] + fmt.Printf("[CLI General] Received text message, type from header: %s\n", msgType) + + // Create incoming message + incoming := &dara.GeneralIncomingMessage{ + Headers: generalMsg.Headers, + Body: generalMsg.Body, + RawPayload: message.Payload, + IsBinary: false, + } + + // Call both handlers directly on h (which is *GeneralHandler) + if err := h.HandleGeneralTextMessage(session, generalMsg); err != nil { + return err + } + return h.HandleGeneralIncomingMessage(session, incoming) + + } else if message.Type == dara.WebSocketMessageTypeBinary { + // Handle as binary message + fmt.Printf("[CLI General] Received binary message\n") + + incoming := &dara.GeneralIncomingMessage{ + Headers: make(map[string]string), + Body: nil, + RawPayload: message.Payload, + IsBinary: true, + } + + // Call both handlers directly on h (which is *GeneralHandler) + if err := h.HandleGeneralBinaryMessage(session, message.Payload); err != nil { + return err + } + return h.HandleGeneralIncomingMessage(session, incoming) + } + + return nil +} + +func (h *GeneralHandler) SupportsPartialMessages() bool { + return false +} + +func (h *GeneralHandler) HandleError(session *dara.WebSocketSessionInfo, err error) error { + fmt.Printf("❌ CLI General Error: %v\n", err) + printSessionInfo(session) + return nil +} + +func (h *GeneralHandler) AfterConnectionClosed(session *dara.WebSocketSessionInfo, code int, reason string) error { + fmt.Printf("✗ CLI General Connection closed (code: %d, reason: %s)\n", code, reason) + printSessionInfo(session) + return nil +} + +func testGeneralWebSocket(ctx *cli.Context, args []string) error { + fmt.Println("\n=== General WebSocket Example ===") + profile, _ := config.LoadProfileWithContext(ctx) + credential, err := profile.GetCredential(ctx, nil) + if err != nil { + return fmt.Errorf("failed to get credential: %w", err) + } + config := &openapiClient.Config{ + Credential: credential, + Endpoint: dara.String("dalutest-pre.aliyuncs.com"), + Protocol: dara.String("https"), + } + + apiClient, err := openapiClient.NewClient(config) + if err != nil { + return fmt.Errorf("failed to create client: %w", err) + } + + params := &openapiClient.Params{ + Action: tea.String("WebsocketGeneralDemoApi"), + Version: tea.String("2022-02-02"), + Protocol: tea.String("https"), + Method: tea.String("GET"), + Pathname: tea.String("/ws/general-demo-api"), + AuthType: tea.String("AK"), + } + + request := &openapiClient.OpenApiRequest{ + Headers: map[string]*string{ + "Sec-Websocket-Protocol": tea.String("general"), + }, + } + + runtime := &dara.ExtendedRuntimeOptions{ + RuntimeOptions: dara.RuntimeOptions{ + ReadTimeout: dara.Int(60000), // 60 seconds + ConnectTimeout: dara.Int(30000), // 30 seconds + }, + WebSocketPingInterval: dara.Int(30000), // 30秒心跳 + WebSocketHandshakeTimeout: dara.Int(30000), // 30秒握手超时 + WebSocketWriteTimeout: dara.Int(30000), // 30秒写入超时 + WebSocketEnableReconnect: dara.Bool(true), // 启用重连 + WebSocketMaxReconnectTimes: dara.Int(5), // 最多重连5次 + } + + fmt.Println("Connecting to General WebSocket...") + wsClient, err := apiClient.DoWebSocketRequest(params, request, runtime, &GeneralHandler{}) + if err != nil { + log.Fatalf("Connection failed: %v", err) + } + defer wsClient.Close() + + fmt.Println("\nSending General messages...") + + // 方法 1: 发送文本消息 + fmt.Println("1. Sending General text message...") + err = apiClient.SendGeneralTextMessage(wsClient, "Hello General WebSocket!") + if err != nil { + log.Printf("Failed to send General text message: %v", err) + } + + time.Sleep(1 * time.Second) + + // 方法 2: 发送 JSON 消息 + // fmt.Println("2. Sending General JSON message...") + // eventData := map[string]interface{}{ + // "name": "general-test", + // "object": map[string]interface{}{ + // "field1": "general", + // "field2": 2, + // "field3": []string{"test"}, + // }, + // "list": []map[string]interface{}{ + // {"enabled": false, "value": "general"}, + // }, + // "map": map[string]string{ + // "test": "test", + // }, + // } + // err = apiClient.SendGeneralJSONMessage(wsClient, eventData) + // if err != nil { + // log.Printf("Failed to send General JSON message: %v", err) + // } + + // time.Sleep(1 * time.Second) + + // 方法 3: 发送带自定义头部的消息 + // fmt.Println("3. Sending General message with custom headers...") + // generalMsg := dara.BuildGeneralJSONMessage(map[string]interface{}{ + // "action": "test", + // "data": "Hello with headers!", + // }) + // generalMsg.WithHeader("X-Custom-Header", "custom-value") + // generalMsg.WithHeader("Content-Type", "application/json") + // err = apiClient.SendGeneralMessage(wsClient, generalMsg) + // if err != nil { + // log.Printf("Failed to send General message with headers: %v", err) + // } + + // time.Sleep(1 * time.Second) + + // 方法 4: 发送二进制消息 + // fmt.Println("4. Sending General binary message...") + // binaryData := []byte("Binary General Data") + // err = apiClient.SendGeneralBinaryMessage(wsClient, binaryData) + // if err != nil { + // log.Printf("Failed to send General binary message: %v", err) + // } + + // Wait for response + time.Sleep(3 * time.Second) + + fmt.Println("\n=== General Example Complete ===") + return nil +} + +type SequentialHandler struct { + dara.AbstractAwapWebSocketHandler + receivedSeq []int64 // Track received sequence numbers + mu sync.Mutex + expectedCount int + done chan struct{} +} + +func NewSequentialHandler(expectedCount int) *SequentialHandler { + return &SequentialHandler{ + receivedSeq: make([]int64, 0, expectedCount), + expectedCount: expectedCount, + done: make(chan struct{}), + } +} + +func (h *SequentialHandler) AfterConnectionEstablished(session *dara.WebSocketSessionInfo) error { + fmt.Println("✓ Sequential Test - Connected to WebSocket server") + printSessionInfo(session) + return nil +} + +func (h *SequentialHandler) HandleAwapMessage(session *dara.WebSocketSessionInfo, message *dara.AwapMessage) error { + h.mu.Lock() + defer h.mu.Unlock() + + fmt.Printf("📨 Sequential Test - HandleAwapMessage called: seq=%d, type=%s, id=%s\n", + message.Seq, message.Type, message.ID) + printSessionInfo(session) + + // Print message payload for debugging + if message.Payload != nil { + payloadJSON, _ := json.Marshal(message.Payload) + fmt.Printf(" Payload: %s\n", string(payloadJSON)) + } + + // Track sequence number + h.receivedSeq = append(h.receivedSeq, message.Seq) + fmt.Printf(" Progress: %d/%d messages received\n", len(h.receivedSeq), h.expectedCount) + + // Check if we've received all expected messages + if len(h.receivedSeq) >= h.expectedCount { + if h.done != nil { + close(h.done) + h.done = nil // Prevent double close + } + } + + return nil +} + +func (h *SequentialHandler) HandleAwapIncomingMessage(session *dara.WebSocketSessionInfo, message *dara.AwapIncomingMessage) error { + // This is called after HandleAwapMessage, so we don't need to count again + // Just log for debugging + fmt.Printf("📬 Sequential Test - HandleAwapIncomingMessage called: seq=%d, type=%s, id=%s\n", + message.Seq, message.Type, message.ID) + return nil +} + +func (h *SequentialHandler) HandleRawMessage(session *dara.WebSocketSessionInfo, message *dara.WebSocketMessage) error { + // Parse the AWAP message ourselves and call HandleAwapMessage directly + // This avoids the issue where AbstractAwapWebSocketHandler.HandleRawMessage + // can't access the outer SequentialHandler type + awapMsg, err := dara.ParseAwapMessage(message) + if err != nil { + fmt.Printf("[Sequential] Failed to parse AWAP message: %v\n", err) + return err + } + + // Call HandleAwapMessage directly on h (which is *SequentialHandler) + // This will call our overridden implementation + fmt.Printf("[Sequential] Calling HandleAwapMessage directly: seq=%d, type=%s\n", awapMsg.Seq, awapMsg.Type) + if err := h.HandleAwapMessage(session, awapMsg); err != nil { + return err + } + + // Also call HandleAwapIncomingMessage for event types + if awapMsg.Type == dara.AwapMessageTypeEvent || + awapMsg.Type == dara.AwapMessageTypeUpstreamTextEvent || + awapMsg.Type == dara.AwapMessageTypeUpstreamBinaryEvent || + awapMsg.Type == dara.AwapMessageTypeAckRequiredTextEvent || + awapMsg.Type == dara.AwapMessageTypeMessageReceiveEvent || + awapMsg.Type == dara.AwapMessageTypeDownstreamTextEvent || + awapMsg.Type == dara.AwapMessageTypeDownstreamBinaryEvent { + incoming := &dara.AwapIncomingMessage{ + AwapMessage: *awapMsg, + RawPayload: message.Payload, + } + return h.HandleAwapIncomingMessage(session, incoming) + } + + return nil +} + +func (h *SequentialHandler) SupportsPartialMessages() bool { + return false +} + +func (h *SequentialHandler) HandleError(session *dara.WebSocketSessionInfo, err error) error { + fmt.Printf("❌ Sequential Test Error: %v\n", err) + printSessionInfo(session) + return nil +} + +func (h *SequentialHandler) AfterConnectionClosed(session *dara.WebSocketSessionInfo, code int, reason string) error { + fmt.Printf("✗ Sequential Test - Connection closed (code: %d, reason: %s)\n", code, reason) + printSessionInfo(session) + return nil +} + +func (h *SequentialHandler) GetReceivedSeq() []int64 { + h.mu.Lock() + defer h.mu.Unlock() + result := make([]int64, len(h.receivedSeq)) + copy(result, h.receivedSeq) + return result +} + +func (h *SequentialHandler) WaitForCompletion(timeout time.Duration) bool { + h.mu.Lock() + done := h.done + h.mu.Unlock() + + if done == nil { + return true // Already completed + } + select { + case <-done: + return true + case <-time.After(timeout): + return false + } +} + +func testSequentialMessageReception(ctx *cli.Context, args []string) error { + fmt.Println("\n=== Sequential Message Reception Test ===") + fmt.Println("This test verifies that messages are received in the correct order") + + profile, _ := config.LoadProfileWithContext(ctx) + credential, err := profile.GetCredential(ctx, nil) + if err != nil { + return fmt.Errorf("failed to get credential: %w", err) + } + + config := &openapiClient.Config{ + Credential: credential, + Endpoint: dara.String("dalutest-pre.aliyuncs.com"), + Protocol: dara.String("https"), + } + + apiClient, err := openapiClient.NewClient(config) + if err != nil { + return fmt.Errorf("failed to create client: %w", err) + } + + params := &openapiClient.Params{ + Action: tea.String("WebsocketAwapDemoApi"), + Version: tea.String("2022-02-02"), + Protocol: tea.String("https"), + Method: tea.String("GET"), + Pathname: tea.String("/ws/awap-demo-api"), + AuthType: tea.String("AK"), + } + + request := &openapiClient.OpenApiRequest{ + Query: map[string]*string{ + "delay": tea.String("3000"), // Delay between messages (ms) + "batchSendMsgCnt": tea.String("20"), // Number of messages to send + }, + Headers: map[string]*string{ + "Sec-Websocket-Protocol": tea.String("awap"), + }, + } + + runtime := &dara.ExtendedRuntimeOptions{ + RuntimeOptions: dara.RuntimeOptions{ + ReadTimeout: dara.Int(120000), // 120 seconds (enough for 20 messages with 3s delay) + ConnectTimeout: dara.Int(30000), // 30 seconds + }, + WebSocketPingInterval: dara.Int(30000), // 30秒心跳 + WebSocketHandshakeTimeout: dara.Int(30000), // 30秒握手超时 + WebSocketWriteTimeout: dara.Int(30000), // 30秒写入超时 + WebSocketEnableReconnect: dara.Bool(true), // 启用重连 + WebSocketMaxReconnectTimes: dara.Int(5), // 最多重连5次 + } + + expectedCount := 20 + handler := NewSequentialHandler(expectedCount) + + fmt.Println("Connecting to WebSocket server...") + fmt.Printf("Query parameters: delay=%s, batchSendMsgCnt=%s\n", + dara.StringValue(request.Query["delay"]), + dara.StringValue(request.Query["batchSendMsgCnt"])) + + wsClient, err := apiClient.DoWebSocketRequest(params, request, runtime, handler) + if err != nil { + log.Fatalf("Connection failed: %v", err) + } + defer wsClient.Close() + + fmt.Println("Connection established, waiting for server to start sending batch messages...") + time.Sleep(2 * time.Second) + + fmt.Println("\nSending request to trigger batch message sending...") + err = apiClient.SendAwapRequest(wsClient, "sequential-test-001", 1, map[string]interface{}{ + "action": "batchSend", + "test": "sequential", + }) + if err != nil { + log.Printf("Failed to send request: %v", err) + } else { + fmt.Println("Request sent successfully") + } + + fmt.Printf("\nWaiting for %d messages to be received (timeout: 100s)...\n", expectedCount) + fmt.Println("Note: Server will send messages with 3s delay between each, so total time ~60s") + + progressTicker := time.NewTicker(5 * time.Second) + defer progressTicker.Stop() + + doneChan := make(chan bool, 1) + go func() { + timeout := 100 * time.Second // 20 messages * 3s delay + buffer + doneChan <- handler.WaitForCompletion(timeout) + }() + + // Show progress while waiting + for { + select { + case success := <-doneChan: + if success { + fmt.Println("\n✓ All messages received!") + } else { + received := len(handler.GetReceivedSeq()) + fmt.Printf("\n⚠ Timeout waiting for all messages. Received %d/%d messages\n", + received, expectedCount) + if received == 0 { + fmt.Println("\n⚠ No messages received at all. Possible issues:") + fmt.Println(" 1. Server may not support batchSendMsgCnt parameter") + fmt.Println(" 2. Query parameters may not be passed correctly") + fmt.Println(" 3. Server may require a different trigger mechanism") + } + } + goto done + case <-progressTicker.C: + received := len(handler.GetReceivedSeq()) + fmt.Printf(" Progress: %d/%d messages received...\n", received, expectedCount) + } + } +done: + + receivedSeq := handler.GetReceivedSeq() + fmt.Printf("\n=== Sequence Verification ===\n") + fmt.Printf("Expected count: %d\n", expectedCount) + fmt.Printf("Received count: %d\n", len(receivedSeq)) + + if len(receivedSeq) > 0 { + fmt.Printf("Received sequence numbers: %v\n", receivedSeq) + + isOrdered := true + for i := 1; i < len(receivedSeq); i++ { + if receivedSeq[i] < receivedSeq[i-1] { + isOrdered = false + fmt.Printf("❌ Out of order detected: seq[%d]=%d < seq[%d]=%d\n", + i, receivedSeq[i], i-1, receivedSeq[i-1]) + break + } + } + + if isOrdered { + fmt.Println("✅ All messages received in correct order!") + } else { + fmt.Println("❌ Messages received out of order!") + } + + // Check for missing sequence numbers + missing := []int64{} + seqMap := make(map[int64]bool) + for _, seq := range receivedSeq { + seqMap[seq] = true + } + for i := int64(1); i <= int64(expectedCount); i++ { + if !seqMap[i] { + missing = append(missing, i) + } + } + if len(missing) > 0 { + fmt.Printf("⚠ Missing sequence numbers: %v\n", missing) + } else { + fmt.Println("✅ No missing sequence numbers") + } + } else { + fmt.Println("❌ No messages received!") + } + + fmt.Println("\n=== Sequential Message Reception Test Complete ===") + return nil +} From cef8af9b921089abafb6bf2f00eeaaf65b7982db Mon Sep 17 00:00:00 2001 From: AllyW Date: Tue, 18 Nov 2025 21:31:02 +0800 Subject: [PATCH 2/7] refactor --- websocketTest/main.go | 386 +++++++++++++++++++++++++++++++----------- 1 file changed, 286 insertions(+), 100 deletions(-) diff --git a/websocketTest/main.go b/websocketTest/main.go index 0bdf648c7..273977598 100644 --- a/websocketTest/main.go +++ b/websocketTest/main.go @@ -73,7 +73,8 @@ func runWebsocketTest(ctx *cli.Context, args []string) error { fmt.Printf("response: %s\n", response["statusCode"]) // testAwapWebSocket(ctx, args) - testGeneralWebSocket(ctx, args) + testAwapWebSocketWithoutHandleRawMessage(ctx, args) // 新增:不重写 HandleRawMessage 的用例 + // testGeneralWebSocket(ctx, args) // testSequentialMessageReception(ctx, args) return nil } @@ -119,20 +120,20 @@ type SimpleHandler struct { } func (h *SimpleHandler) AfterConnectionEstablished(session *dara.WebSocketSessionInfo) error { - fmt.Println("✓ Connected to WebSocket server") + fmt.Println("✓ CLI Connected to WebSocket server") printSessionInfo(session) return nil } func (h *SimpleHandler) HandleAwapMessage(session *dara.WebSocketSessionInfo, message *dara.AwapMessage) error { - fmt.Println("📨 Received AWAP message:") + fmt.Println("📨 CLI Received AWAP message:") jsonData, _ := json.Marshal(message) fmt.Printf(" Message: %s\n", string(jsonData)) return nil } func (h *SimpleHandler) HandleAwapIncomingMessage(session *dara.WebSocketSessionInfo, message *dara.AwapIncomingMessage) error { - fmt.Println("📬 Received AWAP incoming message:") + fmt.Println("📬 CLI Received AWAP incoming message:") jsonData, _ := json.Marshal(message) fmt.Printf(" Incoming Message: %s\n", string(jsonData)) return nil @@ -144,7 +145,7 @@ func (h *SimpleHandler) HandleRawMessage(session *dara.WebSocketSessionInfo, mes // can't access the outer SimpleHandler type awapMsg, err := dara.ParseAwapMessage(message) if err != nil { - fmt.Printf("[Simple] Failed to parse AWAP message: %v\n", err) + fmt.Printf("[CLI Simple] Failed to parse AWAP message: %v\n", err) return err } @@ -155,8 +156,7 @@ func (h *SimpleHandler) HandleRawMessage(session *dara.WebSocketSessionInfo, mes } // Also call HandleAwapIncomingMessage for event types - if awapMsg.Type == dara.AwapMessageTypeEvent || - awapMsg.Type == dara.AwapMessageTypeUpstreamTextEvent || + if awapMsg.Type == dara.AwapMessageTypeUpstreamTextEvent || awapMsg.Type == dara.AwapMessageTypeUpstreamBinaryEvent || awapMsg.Type == dara.AwapMessageTypeAckRequiredTextEvent || awapMsg.Type == dara.AwapMessageTypeMessageReceiveEvent || @@ -177,13 +177,13 @@ func (h *SimpleHandler) SupportsPartialMessages() bool { } func (h *SimpleHandler) HandleError(session *dara.WebSocketSessionInfo, err error) error { - fmt.Printf("❌ Error: %v\n", err) + fmt.Printf("❌ CLI Error: %v\n", err) printSessionInfo(session) return nil } func (h *SimpleHandler) AfterConnectionClosed(session *dara.WebSocketSessionInfo, code int, reason string) error { - fmt.Printf("✗ Connection closed (code: %d, reason: %s)\n", code, reason) + fmt.Printf("✗ CLI Connection closed (code: %d, reason: %s)\n", code, reason) printSessionInfo(session) return nil } @@ -220,7 +220,7 @@ func testAwapWebSocket(ctx *cli.Context, args []string) error { // Product: dara.String("DaluTestInner"), Action: tea.String("WebsocketAwapDemoApi"), Version: tea.String("2022-02-02"), - Protocol: tea.String("https"), + Protocol: tea.String("wss"), Method: tea.String("GET"), Pathname: tea.String("/ws/awap-demo-api"), AuthType: tea.String("AK"), @@ -232,23 +232,24 @@ func testAwapWebSocket(ctx *cli.Context, args []string) error { }, } - runtime := &dara.ExtendedRuntimeOptions{ - RuntimeOptions: dara.RuntimeOptions{ - ReadTimeout: dara.Int(60000), // 60 seconds - ConnectTimeout: dara.Int(30000), // 30 seconds (increased for slow networks) - }, - WebSocketPingInterval: dara.Int(30000), // 30秒心跳 - WebSocketHandshakeTimeout: dara.Int(30000), // 30秒握手超时(增加以应对网络延迟) - WebSocketWriteTimeout: dara.Int(30000), // 30秒写入超时(增加以应对网络延迟) - WebSocketEnableReconnect: dara.Bool(true), // 启用重连 - WebSocketMaxReconnectTimes: dara.Int(5), // 最多重连5次 + runtime := &dara.RuntimeOptions{ + ReadTimeout: dara.Int(60000), // 60 seconds + ConnectTimeout: dara.Int(30000), // 30 seconds (increased for slow networks) + WebSocketPingInterval: dara.Int(30000), // 30秒心跳 + WebSocketHandshakeTimeout: dara.Int(30000), // 30秒握手超时(增加以应对网络延迟) + WebSocketWriteTimeout: dara.Int(30000), // 30秒写入超时(增加以应对网络延迟) + WebSocketEnableReconnect: dara.Bool(true), // 启用重连 + WebSocketMaxReconnectTimes: dara.Int(5), // 最多重连5次 + WebSocketHandler: &SimpleHandler{}, // 通过 runtime 配置 handler } fmt.Println("Connecting...") - wsClient, err := apiClient.DoWebSocketRequest(params, request, runtime, &SimpleHandler{}) + // Handler 从 runtime 中获取 + result, err := apiClient.DoWebSocketRequest(params, request, runtime) if err != nil { log.Fatalf("Connection failed: %v", err) } + wsClient := result["wsClient"].(dara.WebSocketClient) defer wsClient.Close() fmt.Println("\nSending AWAP message...") @@ -272,7 +273,7 @@ func testAwapWebSocket(ctx *cli.Context, args []string) error { } // 方法 3: 手动构建 AWAP 消息 - awapMsg := dara.BuildAwapRequest("msg-003", 3, map[string]interface{}{ + awapMsg := apiClient.BuildAwapRequest("msg-003", 3, map[string]interface{}{ "message": "Hello WebSocket!", }) err = apiClient.SendAwapMessage(wsClient, awapMsg) @@ -280,6 +281,22 @@ func testAwapWebSocket(ctx *cli.Context, args []string) error { log.Printf("Failed to send AWAP message: %v", err) } + time.Sleep(1 * time.Second) + + // 方法 4: 发送 AckRequiredTextEvent 类型的消息(需要确认的消息) + fmt.Println("\n4. Sending AckRequiredTextEvent message...") + ackRequiredMsg := apiClient.BuildAwapMessage(dara.AwapMessageTypeAckRequiredTextEvent, "msg-004", 4, map[string]interface{}{ + "action": "ackRequiredTest", + "data": "This message requires acknowledgment", + "timestamp": time.Now().Unix(), + }) + err = apiClient.SendAwapMessage(wsClient, ackRequiredMsg) + if err != nil { + log.Printf("Failed to send AckRequiredTextEvent message: %v", err) + } else { + fmt.Println("✓ AckRequiredTextEvent message sent successfully") + } + // Wait for response time.Sleep(3 * time.Second) @@ -393,6 +410,178 @@ func (h *GeneralHandler) AfterConnectionClosed(session *dara.WebSocketSessionInf return nil } +// NoHandleRawMessageHandler 是一个不重写 HandleRawMessage 的 handler +// 这个 handler 依赖 AbstractAwapWebSocketHandler.HandleRawMessage +// 来展示 AwapWebSocketHandler 接口的实际使用 +type NoHandleRawMessageHandler struct { + dara.AbstractAwapWebSocketHandler + messageCount int + mu sync.Mutex +} + +func (h *NoHandleRawMessageHandler) AfterConnectionEstablished(session *dara.WebSocketSessionInfo) error { + fmt.Println("✓ CLI NoHandleRawMessage - Connected to WebSocket server") + fmt.Println(" Note: This handler does NOT override HandleRawMessage") + fmt.Println(" It uses AbstractAwapWebSocketHandler.HandleRawMessage") + fmt.Println(" which uses AwapWebSocketHandler interface") + printSessionInfo(session) + return nil +} + +func (h *NoHandleRawMessageHandler) HandleAwapMessage(session *dara.WebSocketSessionInfo, message *dara.AwapMessage) error { + h.mu.Lock() + h.messageCount++ + count := h.messageCount + h.mu.Unlock() + + fmt.Printf("📨 CLI NoHandleRawMessage - HandleAwapMessage called (#%d):\n", count) + fmt.Printf(" Type: %s\n", message.Type) + fmt.Printf(" ID: %s\n", message.ID) + fmt.Printf(" Seq: %d\n", message.Seq) + if message.Payload != nil { + payloadJSON, _ := json.Marshal(message.Payload) + fmt.Printf(" Payload: %s\n", string(payloadJSON)) + } + printSessionInfo(session) + return nil +} + +func (h *NoHandleRawMessageHandler) HandleAwapIncomingMessage(session *dara.WebSocketSessionInfo, message *dara.AwapIncomingMessage) error { + fmt.Printf("📬 CLI NoHandleRawMessage - HandleAwapIncomingMessage called:\n") + fmt.Printf(" Type: %s\n", message.Type) + fmt.Printf(" ID: %s\n", message.ID) + fmt.Printf(" Seq: %d\n", message.Seq) + fmt.Printf(" RawPayload length: %d bytes\n", len(message.RawPayload)) + printSessionInfo(session) + return nil +} + +func (h *NoHandleRawMessageHandler) HandleError(session *dara.WebSocketSessionInfo, err error) error { + fmt.Printf("❌ CLI NoHandleRawMessage - Error: %v\n", err) + printSessionInfo(session) + return nil +} + +func (h *NoHandleRawMessageHandler) AfterConnectionClosed(session *dara.WebSocketSessionInfo, code int, reason string) error { + h.mu.Lock() + count := h.messageCount + h.mu.Unlock() + + fmt.Printf("✗ CLI NoHandleRawMessage - Connection closed (code: %d, reason: %s)\n", code, reason) + fmt.Printf(" Total messages received: %d\n", count) + printSessionInfo(session) + return nil +} + +func (h *NoHandleRawMessageHandler) SupportsPartialMessages() bool { + return false +} + +func testAwapWebSocketWithoutHandleRawMessage(ctx *cli.Context, args []string) error { + fmt.Println("\n=== AWAP WebSocket Test (Without HandleRawMessage Override) ===") + fmt.Println("This test demonstrates AwapWebSocketHandler interface usage") + fmt.Println("The handler does NOT override HandleRawMessage,") + fmt.Println("so it uses AbstractAwapWebSocketHandler.HandleRawMessage") + fmt.Println("which uses AwapWebSocketHandler interface for type assertion") + + profile, _ := config.LoadProfileWithContext(ctx) + credential, err := profile.GetCredential(ctx, nil) + if err != nil { + return fmt.Errorf("failed to get credential: %w", err) + } + config := &openapiClient.Config{ + Credential: credential, + Endpoint: dara.String("dalutest-pre.aliyuncs.com"), + Protocol: dara.String("https"), + } + + apiClient, err := openapiClient.NewClient(config) + if err != nil { + return fmt.Errorf("failed to create client: %w", err) + } + + params := &openapiClient.Params{ + Action: tea.String("WebsocketAwapDemoApi"), + Version: tea.String("2022-02-02"), + Protocol: tea.String("wss"), + Method: tea.String("GET"), + Pathname: tea.String("/ws/awap-demo-api"), + AuthType: tea.String("AK"), + } + + request := &openapiClient.OpenApiRequest{ + Headers: map[string]*string{ + "Sec-Websocket-Protocol": tea.String("awap"), + }, + } + + runtime := &dara.RuntimeOptions{ + ReadTimeout: dara.Int(60000), // 60 seconds + ConnectTimeout: dara.Int(30000), // 30 seconds + WebSocketPingInterval: dara.Int(30000), // 30秒心跳 + WebSocketHandshakeTimeout: dara.Int(30000), // 30秒握手超时 + WebSocketWriteTimeout: dara.Int(30000), // 30秒写入超时 + WebSocketEnableReconnect: dara.Bool(true), // 启用重连 + WebSocketMaxReconnectTimes: dara.Int(5), // 最多重连5次 + WebSocketHandler: &NoHandleRawMessageHandler{}, // 使用不重写 HandleRawMessage 的 handler + } + + fmt.Println("Connecting...") + result, err := apiClient.DoWebSocketRequest(params, request, runtime) + if err != nil { + log.Fatalf("Connection failed: %v", err) + } + wsClient := result["wsClient"].(dara.WebSocketClient) + defer wsClient.Close() + + fmt.Println("\nSending AWAP messages...") + + // 方法 1: 使用 SendAwapRequest 发送请求消息 + fmt.Println("1. Sending AWAP request message...") + err = apiClient.SendAwapRequest(wsClient, "msg-no-handleraw-001", 1, map[string]interface{}{ + "action": "test", + "data": "This handler does NOT override HandleRawMessage", + }) + if err != nil { + log.Printf("Failed to send AWAP request: %v", err) + } + + time.Sleep(1 * time.Second) + + // 方法 2: 使用 SendAwapEvent 发送事件消息 + fmt.Println("2. Sending AWAP event message...") + err = apiClient.SendAwapEvent(wsClient, "msg-no-handleraw-002", 2, map[string]interface{}{ + "eventType": "testEvent", + "message": "Testing AwapWebSocketHandler interface usage", + }) + if err != nil { + log.Printf("Failed to send AWAP event: %v", err) + } + + time.Sleep(1 * time.Second) + + // 方法 3: 发送 AckRequiredTextEvent 类型的消息 + fmt.Println("3. Sending AckRequiredTextEvent message...") + ackRequiredMsg := apiClient.BuildAwapMessage(dara.AwapMessageTypeAckRequiredTextEvent, "msg-no-handleraw-003", 3, map[string]interface{}{ + "action": "ackRequiredTest", + "data": "This message requires acknowledgment", + "timestamp": time.Now().Unix(), + }) + err = apiClient.SendAwapMessage(wsClient, ackRequiredMsg) + if err != nil { + log.Printf("Failed to send AckRequiredTextEvent message: %v", err) + } + + // Wait for response + fmt.Println("\nWaiting for server responses...") + time.Sleep(3 * time.Second) + + fmt.Println("\n=== NoHandleRawMessage Test Complete ===") + fmt.Println("This test demonstrates that AwapWebSocketHandler interface") + fmt.Println("is used when handler does NOT override HandleRawMessage") + return nil +} + func testGeneralWebSocket(ctx *cli.Context, args []string) error { fmt.Println("\n=== General WebSocket Example ===") profile, _ := config.LoadProfileWithContext(ctx) @@ -414,7 +603,7 @@ func testGeneralWebSocket(ctx *cli.Context, args []string) error { params := &openapiClient.Params{ Action: tea.String("WebsocketGeneralDemoApi"), Version: tea.String("2022-02-02"), - Protocol: tea.String("https"), + Protocol: tea.String("wss"), Method: tea.String("GET"), Pathname: tea.String("/ws/general-demo-api"), AuthType: tea.String("AK"), @@ -426,23 +615,24 @@ func testGeneralWebSocket(ctx *cli.Context, args []string) error { }, } - runtime := &dara.ExtendedRuntimeOptions{ - RuntimeOptions: dara.RuntimeOptions{ - ReadTimeout: dara.Int(60000), // 60 seconds - ConnectTimeout: dara.Int(30000), // 30 seconds - }, - WebSocketPingInterval: dara.Int(30000), // 30秒心跳 - WebSocketHandshakeTimeout: dara.Int(30000), // 30秒握手超时 - WebSocketWriteTimeout: dara.Int(30000), // 30秒写入超时 - WebSocketEnableReconnect: dara.Bool(true), // 启用重连 - WebSocketMaxReconnectTimes: dara.Int(5), // 最多重连5次 + runtime := &dara.RuntimeOptions{ + ReadTimeout: dara.Int(60000), // 60 seconds + ConnectTimeout: dara.Int(30000), // 30 seconds + WebSocketPingInterval: dara.Int(30000), // 30秒心跳 + WebSocketHandshakeTimeout: dara.Int(30000), // 30秒握手超时 + WebSocketWriteTimeout: dara.Int(30000), // 30秒写入超时 + WebSocketEnableReconnect: dara.Bool(true), // 启用重连 + WebSocketMaxReconnectTimes: dara.Int(5), // 最多重连5次 + WebSocketHandler: &GeneralHandler{}, // 通过 runtime 配置 handler } fmt.Println("Connecting to General WebSocket...") - wsClient, err := apiClient.DoWebSocketRequest(params, request, runtime, &GeneralHandler{}) + // Handler 从 runtime 中获取 + result, err := apiClient.DoWebSocketRequest(params, request, runtime) if err != nil { log.Fatalf("Connection failed: %v", err) } + wsClient := result["wsClient"].(dara.WebSocketClient) defer wsClient.Close() fmt.Println("\nSending General messages...") @@ -457,50 +647,50 @@ func testGeneralWebSocket(ctx *cli.Context, args []string) error { time.Sleep(1 * time.Second) // 方法 2: 发送 JSON 消息 - // fmt.Println("2. Sending General JSON message...") - // eventData := map[string]interface{}{ - // "name": "general-test", - // "object": map[string]interface{}{ - // "field1": "general", - // "field2": 2, - // "field3": []string{"test"}, - // }, - // "list": []map[string]interface{}{ - // {"enabled": false, "value": "general"}, - // }, - // "map": map[string]string{ - // "test": "test", - // }, - // } - // err = apiClient.SendGeneralJSONMessage(wsClient, eventData) - // if err != nil { - // log.Printf("Failed to send General JSON message: %v", err) - // } - - // time.Sleep(1 * time.Second) + fmt.Println("2. Sending General JSON message...") + eventData := map[string]interface{}{ + "name": "general-test", + "object": map[string]interface{}{ + "field1": "general", + "field2": 2, + "field3": []string{"test"}, + }, + "list": []map[string]interface{}{ + {"enabled": false, "value": "general"}, + }, + "map": map[string]string{ + "test": "test", + }, + } + err = apiClient.SendGeneralJSONMessage(wsClient, eventData) + if err != nil { + log.Printf("Failed to send General JSON message: %v", err) + } + + time.Sleep(1 * time.Second) // 方法 3: 发送带自定义头部的消息 - // fmt.Println("3. Sending General message with custom headers...") - // generalMsg := dara.BuildGeneralJSONMessage(map[string]interface{}{ - // "action": "test", - // "data": "Hello with headers!", - // }) - // generalMsg.WithHeader("X-Custom-Header", "custom-value") - // generalMsg.WithHeader("Content-Type", "application/json") - // err = apiClient.SendGeneralMessage(wsClient, generalMsg) - // if err != nil { - // log.Printf("Failed to send General message with headers: %v", err) - // } - - // time.Sleep(1 * time.Second) + fmt.Println("3. Sending General message with custom headers...") + generalMsg := apiClient.BuildGeneralJSONMessage(map[string]interface{}{ + "action": "test", + "data": "Hello with headers!", + }) + generalMsg.WithHeader("X-Custom-Header", "custom-value") + generalMsg.WithHeader("Content-Type", "application/json") + err = apiClient.SendGeneralMessage(wsClient, generalMsg) + if err != nil { + log.Printf("Failed to send General message with headers: %v", err) + } + + time.Sleep(1 * time.Second) // 方法 4: 发送二进制消息 - // fmt.Println("4. Sending General binary message...") - // binaryData := []byte("Binary General Data") - // err = apiClient.SendGeneralBinaryMessage(wsClient, binaryData) - // if err != nil { - // log.Printf("Failed to send General binary message: %v", err) - // } + fmt.Println("4. Sending General binary message...") + binaryData := []byte("Binary General Data") + err = apiClient.SendGeneralBinaryMessage(wsClient, binaryData) + if err != nil { + log.Printf("Failed to send General binary message: %v", err) + } // Wait for response time.Sleep(3 * time.Second) @@ -526,7 +716,7 @@ func NewSequentialHandler(expectedCount int) *SequentialHandler { } func (h *SequentialHandler) AfterConnectionEstablished(session *dara.WebSocketSessionInfo) error { - fmt.Println("✓ Sequential Test - Connected to WebSocket server") + fmt.Println("✓ CLI Sequential Test - Connected to WebSocket server") printSessionInfo(session) return nil } @@ -535,7 +725,7 @@ func (h *SequentialHandler) HandleAwapMessage(session *dara.WebSocketSessionInfo h.mu.Lock() defer h.mu.Unlock() - fmt.Printf("📨 Sequential Test - HandleAwapMessage called: seq=%d, type=%s, id=%s\n", + fmt.Printf("📨 CLI Sequential Test - HandleAwapMessage called: seq=%d, type=%s, id=%s\n", message.Seq, message.Type, message.ID) printSessionInfo(session) @@ -563,7 +753,7 @@ func (h *SequentialHandler) HandleAwapMessage(session *dara.WebSocketSessionInfo func (h *SequentialHandler) HandleAwapIncomingMessage(session *dara.WebSocketSessionInfo, message *dara.AwapIncomingMessage) error { // This is called after HandleAwapMessage, so we don't need to count again // Just log for debugging - fmt.Printf("📬 Sequential Test - HandleAwapIncomingMessage called: seq=%d, type=%s, id=%s\n", + fmt.Printf("📬 CLI Sequential Test - HandleAwapIncomingMessage called: seq=%d, type=%s, id=%s\n", message.Seq, message.Type, message.ID) return nil } @@ -574,20 +764,16 @@ func (h *SequentialHandler) HandleRawMessage(session *dara.WebSocketSessionInfo, // can't access the outer SequentialHandler type awapMsg, err := dara.ParseAwapMessage(message) if err != nil { - fmt.Printf("[Sequential] Failed to parse AWAP message: %v\n", err) + fmt.Printf("[CLI Sequential] Failed to parse AWAP message: %v\n", err) return err } - // Call HandleAwapMessage directly on h (which is *SequentialHandler) - // This will call our overridden implementation - fmt.Printf("[Sequential] Calling HandleAwapMessage directly: seq=%d, type=%s\n", awapMsg.Seq, awapMsg.Type) + fmt.Printf("[CLI Sequential] Calling HandleAwapMessage directly: seq=%d, type=%s\n", awapMsg.Seq, awapMsg.Type) if err := h.HandleAwapMessage(session, awapMsg); err != nil { return err } - // Also call HandleAwapIncomingMessage for event types - if awapMsg.Type == dara.AwapMessageTypeEvent || - awapMsg.Type == dara.AwapMessageTypeUpstreamTextEvent || + if awapMsg.Type == dara.AwapMessageTypeUpstreamTextEvent || awapMsg.Type == dara.AwapMessageTypeUpstreamBinaryEvent || awapMsg.Type == dara.AwapMessageTypeAckRequiredTextEvent || awapMsg.Type == dara.AwapMessageTypeMessageReceiveEvent || @@ -608,13 +794,13 @@ func (h *SequentialHandler) SupportsPartialMessages() bool { } func (h *SequentialHandler) HandleError(session *dara.WebSocketSessionInfo, err error) error { - fmt.Printf("❌ Sequential Test Error: %v\n", err) + fmt.Printf("❌ CLI Sequential Test Error: %v\n", err) printSessionInfo(session) return nil } func (h *SequentialHandler) AfterConnectionClosed(session *dara.WebSocketSessionInfo, code int, reason string) error { - fmt.Printf("✗ Sequential Test - Connection closed (code: %d, reason: %s)\n", code, reason) + fmt.Printf("✗ CLI Sequential Test - Connection closed (code: %d, reason: %s)\n", code, reason) printSessionInfo(session) return nil } @@ -667,7 +853,7 @@ func testSequentialMessageReception(ctx *cli.Context, args []string) error { params := &openapiClient.Params{ Action: tea.String("WebsocketAwapDemoApi"), Version: tea.String("2022-02-02"), - Protocol: tea.String("https"), + Protocol: tea.String("wss"), Method: tea.String("GET"), Pathname: tea.String("/ws/awap-demo-api"), AuthType: tea.String("AK"), @@ -683,30 +869,30 @@ func testSequentialMessageReception(ctx *cli.Context, args []string) error { }, } - runtime := &dara.ExtendedRuntimeOptions{ - RuntimeOptions: dara.RuntimeOptions{ - ReadTimeout: dara.Int(120000), // 120 seconds (enough for 20 messages with 3s delay) - ConnectTimeout: dara.Int(30000), // 30 seconds - }, - WebSocketPingInterval: dara.Int(30000), // 30秒心跳 - WebSocketHandshakeTimeout: dara.Int(30000), // 30秒握手超时 - WebSocketWriteTimeout: dara.Int(30000), // 30秒写入超时 - WebSocketEnableReconnect: dara.Bool(true), // 启用重连 - WebSocketMaxReconnectTimes: dara.Int(5), // 最多重连5次 - } - expectedCount := 20 handler := NewSequentialHandler(expectedCount) + runtime := &dara.RuntimeOptions{ + ReadTimeout: dara.Int(120000), // 120 seconds (enough for 20 messages with 3s delay) + ConnectTimeout: dara.Int(30000), // 30 seconds + WebSocketPingInterval: dara.Int(30000), // 30秒心跳 + WebSocketHandshakeTimeout: dara.Int(30000), // 30秒握手超时 + WebSocketWriteTimeout: dara.Int(30000), // 30秒写入超时 + WebSocketEnableReconnect: dara.Bool(true), // 启用重连 + WebSocketMaxReconnectTimes: dara.Int(5), // 最多重连5次 + WebSocketHandler: handler, // 通过 runtime 配置 handler + } + fmt.Println("Connecting to WebSocket server...") fmt.Printf("Query parameters: delay=%s, batchSendMsgCnt=%s\n", dara.StringValue(request.Query["delay"]), dara.StringValue(request.Query["batchSendMsgCnt"])) - wsClient, err := apiClient.DoWebSocketRequest(params, request, runtime, handler) + result, err := apiClient.DoWebSocketRequest(params, request, runtime) if err != nil { log.Fatalf("Connection failed: %v", err) } + wsClient := result["wsClient"].(dara.WebSocketClient) defer wsClient.Close() fmt.Println("Connection established, waiting for server to start sending batch messages...") From 89bf1ae87578e696e468a6e3d41c3b1d0d1a191e Mon Sep 17 00:00:00 2001 From: AllyW Date: Wed, 19 Nov 2025 16:51:16 +0800 Subject: [PATCH 3/7] add tests --- websocketTest/main.go | 97 ++++++++++++++++++++++++++----------------- 1 file changed, 60 insertions(+), 37 deletions(-) diff --git a/websocketTest/main.go b/websocketTest/main.go index 273977598..bc9fa0847 100644 --- a/websocketTest/main.go +++ b/websocketTest/main.go @@ -72,10 +72,10 @@ func runWebsocketTest(ctx *cli.Context, args []string) error { // bodyBytes, _ := GetContentFromApiResponse(response) fmt.Printf("response: %s\n", response["statusCode"]) - // testAwapWebSocket(ctx, args) + testAwapWebSocket(ctx, args) testAwapWebSocketWithoutHandleRawMessage(ctx, args) // 新增:不重写 HandleRawMessage 的用例 - // testGeneralWebSocket(ctx, args) - // testSequentialMessageReception(ctx, args) + testGeneralWebSocket(ctx, args) + testSequentialMessageReception(ctx, args) return nil } @@ -103,9 +103,6 @@ func printSessionInfo(session *dara.WebSocketSessionInfo) { return } fmt.Printf(" [Session] ID: %s\n", session.SessionID) - if session.RequestID != "" { - fmt.Printf(" [Session] RequestID: %s\n", session.RequestID) - } fmt.Printf(" [Session] ConnectedAt: %s\n", session.ConnectedAt.Format(time.RFC3339)) fmt.Printf(" [Session] RemoteAddr: %s\n", session.RemoteAddr) fmt.Printf(" [Session] LocalAddr: %s\n", session.LocalAddr) @@ -263,8 +260,8 @@ func testAwapWebSocket(ctx *cli.Context, args []string) error { log.Printf("Failed to send AWAP request: %v", err) } - // 方法 2: 使用 SendAwapEvent 发送事件消息 - err = apiClient.SendAwapEvent(wsClient, "msg-002", 2, map[string]interface{}{ + // 方法 2: 使用 SendAwapRequest 发送请求消息 + err = apiClient.SendAwapRequest(wsClient, "msg-002", 2, map[string]interface{}{ "eventType": "userAction", "message": "Hello WebSocket!", }) @@ -283,22 +280,37 @@ func testAwapWebSocket(ctx *cli.Context, args []string) error { time.Sleep(1 * time.Second) - // 方法 4: 发送 AckRequiredTextEvent 类型的消息(需要确认的消息) - fmt.Println("\n4. Sending AckRequiredTextEvent message...") - ackRequiredMsg := apiClient.BuildAwapMessage(dara.AwapMessageTypeAckRequiredTextEvent, "msg-004", 4, map[string]interface{}{ - "action": "ackRequiredTest", - "data": "This message requires acknowledgment", - "timestamp": time.Now().Unix(), - }) - err = apiClient.SendAwapMessage(wsClient, ackRequiredMsg) + // 方法 4: 发送 AckRequiredTextEvent 类型的消息(等待响应) + fmt.Println("\n4. Sending AckRequiredTextEvent message and waiting for ACK...") + ackResponse, err := apiClient.SendAwapRequestWithAck( + wsClient, + "msg-004", + 4, + map[string]interface{}{ + "action": "ackRequiredTest", + "data": "This message requires acknowledgment", + "timestamp": time.Now().Unix(), + }, + 30*time.Second, // 超时时间 + ) if err != nil { - log.Printf("Failed to send AckRequiredTextEvent message: %v", err) + log.Printf("❌ Failed to send AckRequiredTextEvent or timed out waiting for response: %v", err) } else { - fmt.Println("✓ AckRequiredTextEvent message sent successfully") + fmt.Printf("✅ Received acknowledgment:\n") + fmt.Printf(" Response Type: %s\n", ackResponse.Type) + if ackResponse.Headers != nil { + if ackID, ok := ackResponse.Headers["ack-id"]; ok { + fmt.Printf(" Ack-ID: %s\n", ackID) + } + } + if ackResponse.Payload != nil { + payloadJSON, _ := json.Marshal(ackResponse.Payload) + fmt.Printf(" Payload: %s\n", string(payloadJSON)) + } } - // Wait for response - time.Sleep(3 * time.Second) + // Wait for other responses + time.Sleep(2 * time.Second) fmt.Println("\n=== AWAP Example Complete ===") return nil @@ -345,8 +357,6 @@ func (h *GeneralHandler) HandleGeneralIncomingMessage(session *dara.WebSocketSes func (h *GeneralHandler) HandleRawMessage(session *dara.WebSocketSessionInfo, message *dara.WebSocketMessage) error { // Parse and handle General messages directly - // This avoids the issue where AbstractGeneralWebSocketHandler.HandleRawMessage - // can't access the outer GeneralHandler type if message.Type == dara.WebSocketMessageTypeText { // Parse as General text message generalMsg, err := dara.ParseGeneralMessage(message) @@ -548,9 +558,9 @@ func testAwapWebSocketWithoutHandleRawMessage(ctx *cli.Context, args []string) e time.Sleep(1 * time.Second) - // 方法 2: 使用 SendAwapEvent 发送事件消息 - fmt.Println("2. Sending AWAP event message...") - err = apiClient.SendAwapEvent(wsClient, "msg-no-handleraw-002", 2, map[string]interface{}{ + // 方法 2: 使用 SendAwapRequest 发送请求消息 + fmt.Println("2. Sending AWAP request message...") + err = apiClient.SendAwapRequest(wsClient, "msg-no-handleraw-002", 2, map[string]interface{}{ "eventType": "testEvent", "message": "Testing AwapWebSocketHandler interface usage", }) @@ -561,19 +571,34 @@ func testAwapWebSocketWithoutHandleRawMessage(ctx *cli.Context, args []string) e time.Sleep(1 * time.Second) // 方法 3: 发送 AckRequiredTextEvent 类型的消息 - fmt.Println("3. Sending AckRequiredTextEvent message...") - ackRequiredMsg := apiClient.BuildAwapMessage(dara.AwapMessageTypeAckRequiredTextEvent, "msg-no-handleraw-003", 3, map[string]interface{}{ - "action": "ackRequiredTest", - "data": "This message requires acknowledgment", - "timestamp": time.Now().Unix(), - }) - err = apiClient.SendAwapMessage(wsClient, ackRequiredMsg) + fmt.Println("3. Sending AckRequiredTextEvent message (with ACK wait)...") + response, err := apiClient.SendAwapRequestWithAck( + wsClient, + "msg-no-handleraw-003", + 3, + map[string]interface{}{ + "action": "ackRequiredTest", + "data": "This message requires acknowledgment", + "timestamp": time.Now().Unix(), + }, + 30*time.Second, // 30 seconds timeout + ) if err != nil { - log.Printf("Failed to send AckRequiredTextEvent message: %v", err) + log.Printf("❌ Failed to send AckRequiredTextEvent or receive response: %v", err) + } else { + fmt.Printf("✅ Received ACK response:\n") + fmt.Printf(" Type: %s\n", response.Type) + if response.Headers != nil { + fmt.Printf(" Headers: %+v\n", response.Headers) + } + if response.Payload != nil { + payloadJSON, _ := json.Marshal(response.Payload) + fmt.Printf(" Payload: %s\n", string(payloadJSON)) + } } - // Wait for response - fmt.Println("\nWaiting for server responses...") + // Wait for other responses + fmt.Println("\nWaiting for other server responses...") time.Sleep(3 * time.Second) fmt.Println("\n=== NoHandleRawMessage Test Complete ===") @@ -727,8 +752,6 @@ func (h *SequentialHandler) HandleAwapMessage(session *dara.WebSocketSessionInfo fmt.Printf("📨 CLI Sequential Test - HandleAwapMessage called: seq=%d, type=%s, id=%s\n", message.Seq, message.Type, message.ID) - printSessionInfo(session) - // Print message payload for debugging if message.Payload != nil { payloadJSON, _ := json.Marshal(message.Payload) From 42789be306a5f960767f67b7f0996ba64aedd36a Mon Sep 17 00:00:00 2001 From: AllyW Date: Fri, 21 Nov 2025 21:30:08 +0800 Subject: [PATCH 4/7] adjust api --- websocketTest/main.go | 556 +++++++++++++++--------------------------- 1 file changed, 192 insertions(+), 364 deletions(-) diff --git a/websocketTest/main.go b/websocketTest/main.go index bc9fa0847..ca05f8890 100644 --- a/websocketTest/main.go +++ b/websocketTest/main.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "log" + "strconv" "sync" "time" @@ -12,7 +13,7 @@ import ( "github.com/aliyun/aliyun-cli/v3/i18n" openapiClient "github.com/alibabacloud-go/darabonba-openapi/v2/client" - openapiutil "github.com/alibabacloud-go/darabonba-openapi/v2/utils" + websocketutils "github.com/alibabacloud-go/darabonba-openapi/v2/websocketutils" dara "github.com/alibabacloud-go/tea/dara" "github.com/alibabacloud-go/tea/tea" ) @@ -30,51 +31,51 @@ func NewWebsocketTestCommand() *cli.Command { } func runWebsocketTest(ctx *cli.Context, args []string) error { - profile, _ := config.LoadProfileWithContext(ctx) - credential, err := profile.GetCredential(ctx, nil) - if err != nil { - return fmt.Errorf("failed to get credential: %w", err) - } - - conf := &openapiClient.Config{ - Credential: credential, - RegionId: tea.String(profile.RegionId), - Endpoint: tea.String("openapi-mcp.cn-hangzhou.aliyuncs.com"), - } - - client, err := openapiClient.NewClient(conf) - if err != nil { - return err - } - - params := &openapiClient.Params{ - Action: tea.String("ListApiMcpServers"), - Version: tea.String("2024-11-30"), - Protocol: tea.String("HTTPS"), - Method: tea.String("GET"), - AuthType: tea.String("AK"), - Style: tea.String("ROA"), - Pathname: tea.String("/apimcpservers"), - ReqBodyType: tea.String("json"), - BodyType: tea.String("json"), - } - queries := map[string]interface{}{} - queries["id"] = tea.String("Bt4Td5W1tI31YAsu") - - runtime := &dara.RuntimeOptions{} - request := &openapiClient.OpenApiRequest{ - Query: openapiutil.Query(queries), - } - response, err := client.CallApi(params, request, runtime) - if err != nil { - return err - } - // bodyBytes, _ := GetContentFromApiResponse(response) - fmt.Printf("response: %s\n", response["statusCode"]) - - testAwapWebSocket(ctx, args) - testAwapWebSocketWithoutHandleRawMessage(ctx, args) // 新增:不重写 HandleRawMessage 的用例 - testGeneralWebSocket(ctx, args) + // profile, _ := config.LoadProfileWithContext(ctx) + // credential, err := profile.GetCredential(ctx, nil) + // if err != nil { + // return fmt.Errorf("failed to get credential: %w", err) + // } + + // conf := &openapiClient.Config{ + // Credential: credential, + // RegionId: tea.String(profile.RegionId), + // Endpoint: tea.String("openapi-mcp.cn-hangzhou.aliyuncs.com"), + // } + + // client, err := openapiClient.NewClient(conf) + // if err != nil { + // return err + // } + + // params := &openapiClient.Params{ + // Action: tea.String("ListApiMcpServers"), + // Version: tea.String("2024-11-30"), + // Protocol: tea.String("HTTPS"), + // Method: tea.String("GET"), + // AuthType: tea.String("AK"), + // Style: tea.String("ROA"), + // Pathname: tea.String("/apimcpservers"), + // ReqBodyType: tea.String("json"), + // BodyType: tea.String("json"), + // } + // queries := map[string]interface{}{} + // queries["id"] = tea.String("Bt4Td5W1tI31YAsu") + + // runtime := &dara.RuntimeOptions{} + // request := &openapiClient.OpenApiRequest{ + // Query: openapiutil.Query(queries), + // } + // response, err := client.CallApi(params, request, runtime) + // if err != nil { + // return err + // } + // // bodyBytes, _ := GetContentFromApiResponse(response) + // fmt.Printf("response: %s\n", response["statusCode"]) + + // testAwapWebSocket(ctx, args) + // testAwapWebSocketWithoutHandleAwapMessage(ctx, args) // 重写 HandleRawMessage 的用例 + // testGeneralWebSocket(ctx, args) testSequentialMessageReception(ctx, args) return nil } @@ -124,48 +125,14 @@ func (h *SimpleHandler) AfterConnectionEstablished(session *dara.WebSocketSessio func (h *SimpleHandler) HandleAwapMessage(session *dara.WebSocketSessionInfo, message *dara.AwapMessage) error { fmt.Println("📨 CLI Received AWAP message:") - jsonData, _ := json.Marshal(message) - fmt.Printf(" Message: %s\n", string(jsonData)) - return nil -} - -func (h *SimpleHandler) HandleAwapIncomingMessage(session *dara.WebSocketSessionInfo, message *dara.AwapIncomingMessage) error { - fmt.Println("📬 CLI Received AWAP incoming message:") - jsonData, _ := json.Marshal(message) - fmt.Printf(" Incoming Message: %s\n", string(jsonData)) + fmt.Printf(" Type: %s\n", message.Format) + fmt.Printf(" ID: %s\n", message.ID) + fmt.Printf(" Headers: %+v\n", message.Headers) return nil } func (h *SimpleHandler) HandleRawMessage(session *dara.WebSocketSessionInfo, message *dara.WebSocketMessage) error { - // Parse the AWAP message ourselves and call HandleAwapMessage directly - // This avoids the issue where AbstractAwapWebSocketHandler.HandleRawMessage - // can't access the outer SimpleHandler type - awapMsg, err := dara.ParseAwapMessage(message) - if err != nil { - fmt.Printf("[CLI Simple] Failed to parse AWAP message: %v\n", err) - return err - } - - // Call HandleAwapMessage directly on h (which is *SimpleHandler) - // This will call our overridden implementation - if err := h.HandleAwapMessage(session, awapMsg); err != nil { - return err - } - - // Also call HandleAwapIncomingMessage for event types - if awapMsg.Type == dara.AwapMessageTypeUpstreamTextEvent || - awapMsg.Type == dara.AwapMessageTypeUpstreamBinaryEvent || - awapMsg.Type == dara.AwapMessageTypeAckRequiredTextEvent || - awapMsg.Type == dara.AwapMessageTypeMessageReceiveEvent || - awapMsg.Type == dara.AwapMessageTypeDownstreamTextEvent || - awapMsg.Type == dara.AwapMessageTypeDownstreamBinaryEvent { - incoming := &dara.AwapIncomingMessage{ - AwapMessage: *awapMsg, - RawPayload: message.Payload, - } - return h.HandleAwapIncomingMessage(session, incoming) - } - + fmt.Println("📨 CLI Received AWAP message from HandleRawMessage!!!! shouldn't be") return nil } @@ -215,19 +182,16 @@ func testAwapWebSocket(ctx *cli.Context, args []string) error { // ReqBodyType: tea.String("json"), // BodyType: tea.String("json"), // Product: dara.String("DaluTestInner"), - Action: tea.String("WebsocketAwapDemoApi"), - Version: tea.String("2022-02-02"), - Protocol: tea.String("wss"), - Method: tea.String("GET"), - Pathname: tea.String("/ws/awap-demo-api"), - AuthType: tea.String("AK"), + Action: tea.String("WebsocketAwapDemoApi"), + Version: tea.String("2022-02-02"), + Protocol: tea.String("wss"), + Method: tea.String("GET"), + Pathname: tea.String("/ws/awap-demo-api"), + AuthType: tea.String("AK"), + WebsocketSubProtocol: tea.String("awap"), } - request := &openapiClient.OpenApiRequest{ - Headers: map[string]*string{ - "Sec-Websocket-Protocol": tea.String("awap"), - }, - } + request := &openapiClient.OpenApiRequest{} runtime := &dara.RuntimeOptions{ ReadTimeout: dara.Int(60000), // 60 seconds @@ -242,56 +206,57 @@ func testAwapWebSocket(ctx *cli.Context, args []string) error { fmt.Println("Connecting...") // Handler 从 runtime 中获取 - result, err := apiClient.DoWebSocketRequest(params, request, runtime) + result, err := apiClient.CallApi(params, request, runtime) if err != nil { log.Fatalf("Connection failed: %v", err) } - wsClient := result["wsClient"].(dara.WebSocketClient) - defer wsClient.Close() + websocketObj := result["websocketClient"].(*websocketutils.WebSocketClient) + defer websocketObj.Close() - fmt.Println("\nSending AWAP message...") + fmt.Println("\n1. Sending AWAP message...") // 方法 1: 使用 SendAwapRequest 发送请求消息(推荐) - err = apiClient.SendAwapRequest(wsClient, "msg-001", 1, map[string]interface{}{ - "action": "test", - "data": "Hello WebSocket!", - }) + err = websocketObj.SendRawAwapTextMessageWithId(dara.AwapMessageType("UpstreamTextEvent"), + "msg-001", + map[string]interface{}{"action": "test", "data": "Hello WebSocket!"}) if err != nil { log.Printf("Failed to send AWAP request: %v", err) + } else { + fmt.Printf("AWAP message sent successfully, type: %s\n", dara.AwapMessageType("UpstreamTextEvent")) } - // 方法 2: 使用 SendAwapRequest 发送请求消息 - err = apiClient.SendAwapRequest(wsClient, "msg-002", 2, map[string]interface{}{ - "eventType": "userAction", - "message": "Hello WebSocket!", - }) + // 方法 2: 手动构建 AWAP 消息 + awapMsg := websocketutils.NewAwapMessage(dara.AwapMessageType("UpstreamTextEvent"), + "msg-002", + map[string]interface{}{"message": "Hello WebSocket!"}, + ) + err = websocketObj.SendAwapTextMessage(awapMsg) if err != nil { - log.Printf("Failed to send AWAP event: %v", err) + log.Printf("Failed to send AWAP message: %v", err) + } else { + fmt.Printf("AWAP message sent successfully, type: %s\n", dara.AwapMessageType("UpstreamTextEvent")) } - // 方法 3: 手动构建 AWAP 消息 - awapMsg := apiClient.BuildAwapRequest("msg-003", 3, map[string]interface{}{ - "message": "Hello WebSocket!", - }) - err = apiClient.SendAwapMessage(wsClient, awapMsg) + // 方法 3: binary 信息 + awapMsgBinary := websocketutils.NewAwapMessage(dara.AwapMessageType("UpstreamBinaryEvent"), + "msg-003", + []byte("Hello WebSocket!"), + ) + err = websocketObj.SendAwapBinaryMessage(awapMsgBinary) if err != nil { log.Printf("Failed to send AWAP message: %v", err) + } else { + fmt.Printf("AWAP message sent successfully, type: %s\n", dara.AwapMessageType("UpstreamBinaryEvent")) } time.Sleep(1 * time.Second) // 方法 4: 发送 AckRequiredTextEvent 类型的消息(等待响应) fmt.Println("\n4. Sending AckRequiredTextEvent message and waiting for ACK...") - ackResponse, err := apiClient.SendAwapRequestWithAck( - wsClient, + ackResponse, err := websocketObj.SendRawAwapRequestWithAck( "msg-004", - 4, - map[string]interface{}{ - "action": "ackRequiredTest", - "data": "This message requires acknowledgment", - "timestamp": time.Now().Unix(), - }, - 30*time.Second, // 超时时间 + map[string]interface{}{"action": "ackRequiredTest", "data": "This message requires acknowledgment", "timestamp": time.Now().Unix()}, + 30*time.Second, ) if err != nil { log.Printf("❌ Failed to send AckRequiredTextEvent or timed out waiting for response: %v", err) @@ -326,35 +291,6 @@ func (h *GeneralHandler) AfterConnectionEstablished(session *dara.WebSocketSessi return nil } -func (h *GeneralHandler) HandleGeneralTextMessage(session *dara.WebSocketSessionInfo, message *dara.GeneralMessage) error { - fmt.Println("📨 CLI General Received General text message:") - jsonData, _ := json.Marshal(message) - fmt.Printf(" Message: %s\n", string(jsonData)) - return nil -} - -func (h *GeneralHandler) HandleGeneralBinaryMessage(session *dara.WebSocketSessionInfo, data []byte) error { - fmt.Println("📦 CLI General Received General binary message:") - fmt.Printf(" Size: %d bytes\n", len(data)) - fmt.Printf(" Content: %s\n", string(data)) - return nil -} - -func (h *GeneralHandler) HandleGeneralIncomingMessage(session *dara.WebSocketSessionInfo, message *dara.GeneralIncomingMessage) error { - if message.IsBinary { - fmt.Println("📦 CLI General incoming binary message:") - } else { - fmt.Println("📨 CLI General incoming message:") - } - if message.IsBinary { - fmt.Printf(" Size: %d bytes\n", len(message.RawPayload)) - } else { - jsonData, _ := json.Marshal(message.Body) - fmt.Printf(" Body: %s\n", string(jsonData)) - } - return nil -} - func (h *GeneralHandler) HandleRawMessage(session *dara.WebSocketSessionInfo, message *dara.WebSocketMessage) error { // Parse and handle General messages directly if message.Type == dara.WebSocketMessageTypeText { @@ -365,40 +301,13 @@ func (h *GeneralHandler) HandleRawMessage(session *dara.WebSocketSessionInfo, me return err } - // Check message type from headers - msgType := generalMsg.Headers["type"] - fmt.Printf("[CLI General] Received text message, type from header: %s\n", msgType) - - // Create incoming message - incoming := &dara.GeneralIncomingMessage{ - Headers: generalMsg.Headers, - Body: generalMsg.Body, - RawPayload: message.Payload, - IsBinary: false, - } - - // Call both handlers directly on h (which is *GeneralHandler) - if err := h.HandleGeneralTextMessage(session, generalMsg); err != nil { - return err - } - return h.HandleGeneralIncomingMessage(session, incoming) + fmt.Printf("[CLI General] Received text message: %+v\n", message) + fmt.Printf("[CLI General] Received text message: %s\n", generalMsg.Body) } else if message.Type == dara.WebSocketMessageTypeBinary { // Handle as binary message fmt.Printf("[CLI General] Received binary message\n") - - incoming := &dara.GeneralIncomingMessage{ - Headers: make(map[string]string), - Body: nil, - RawPayload: message.Payload, - IsBinary: true, - } - - // Call both handlers directly on h (which is *GeneralHandler) - if err := h.HandleGeneralBinaryMessage(session, message.Payload); err != nil { - return err - } - return h.HandleGeneralIncomingMessage(session, incoming) + fmt.Printf("[CLI General] Received binary message: %s\n", message.Payload) } return nil @@ -420,79 +329,60 @@ func (h *GeneralHandler) AfterConnectionClosed(session *dara.WebSocketSessionInf return nil } -// NoHandleRawMessageHandler 是一个不重写 HandleRawMessage 的 handler -// 这个 handler 依赖 AbstractAwapWebSocketHandler.HandleRawMessage -// 来展示 AwapWebSocketHandler 接口的实际使用 -type NoHandleRawMessageHandler struct { +// NoHandleAwapMessageHandler 是一个重写 HandleRawMessage 的 handler +type NoHandleAwapMessageHandler struct { dara.AbstractAwapWebSocketHandler messageCount int mu sync.Mutex } -func (h *NoHandleRawMessageHandler) AfterConnectionEstablished(session *dara.WebSocketSessionInfo) error { - fmt.Println("✓ CLI NoHandleRawMessage - Connected to WebSocket server") - fmt.Println(" Note: This handler does NOT override HandleRawMessage") - fmt.Println(" It uses AbstractAwapWebSocketHandler.HandleRawMessage") - fmt.Println(" which uses AwapWebSocketHandler interface") +func (h *NoHandleAwapMessageHandler) AfterConnectionEstablished(session *dara.WebSocketSessionInfo) error { + fmt.Println("✓ CLI NoHandleAwapMessage - Connected to WebSocket server") + fmt.Println(" Note: This handler does NOT override HandleAwapMessage") + fmt.Println(" It overrides HandleRawMessage instead") printSessionInfo(session) return nil } -func (h *NoHandleRawMessageHandler) HandleAwapMessage(session *dara.WebSocketSessionInfo, message *dara.AwapMessage) error { +func (h *NoHandleAwapMessageHandler) HandleRawMessage(session *dara.WebSocketSessionInfo, message *dara.WebSocketMessage) error { h.mu.Lock() h.messageCount++ count := h.messageCount h.mu.Unlock() - fmt.Printf("📨 CLI NoHandleRawMessage - HandleAwapMessage called (#%d):\n", count) - fmt.Printf(" Type: %s\n", message.Type) - fmt.Printf(" ID: %s\n", message.ID) - fmt.Printf(" Seq: %d\n", message.Seq) - if message.Payload != nil { - payloadJSON, _ := json.Marshal(message.Payload) - fmt.Printf(" Payload: %s\n", string(payloadJSON)) - } + fmt.Printf("📨 CLI NoHandleAwapMessage - HandleRawMessage called (#%d):\n", count) + fmt.Printf(" Type: %+v\n", message.Type) + fmt.Printf(" Headers: %+v\n", message.Headers) + fmt.Printf(" Payload: %s\n", string(message.Payload)) printSessionInfo(session) return nil } -func (h *NoHandleRawMessageHandler) HandleAwapIncomingMessage(session *dara.WebSocketSessionInfo, message *dara.AwapIncomingMessage) error { - fmt.Printf("📬 CLI NoHandleRawMessage - HandleAwapIncomingMessage called:\n") - fmt.Printf(" Type: %s\n", message.Type) - fmt.Printf(" ID: %s\n", message.ID) - fmt.Printf(" Seq: %d\n", message.Seq) - fmt.Printf(" RawPayload length: %d bytes\n", len(message.RawPayload)) +func (h *NoHandleAwapMessageHandler) HandleError(session *dara.WebSocketSessionInfo, err error) error { + fmt.Printf("❌ CLI NoHandleAwapMessage - Error: %v\n", err) printSessionInfo(session) return nil } -func (h *NoHandleRawMessageHandler) HandleError(session *dara.WebSocketSessionInfo, err error) error { - fmt.Printf("❌ CLI NoHandleRawMessage - Error: %v\n", err) - printSessionInfo(session) - return nil -} - -func (h *NoHandleRawMessageHandler) AfterConnectionClosed(session *dara.WebSocketSessionInfo, code int, reason string) error { +func (h *NoHandleAwapMessageHandler) AfterConnectionClosed(session *dara.WebSocketSessionInfo, code int, reason string) error { h.mu.Lock() count := h.messageCount h.mu.Unlock() - fmt.Printf("✗ CLI NoHandleRawMessage - Connection closed (code: %d, reason: %s)\n", code, reason) + fmt.Printf("✗ CLI NoHandleAwapMessage - Connection closed (code: %d, reason: %s)\n", code, reason) fmt.Printf(" Total messages received: %d\n", count) printSessionInfo(session) return nil } -func (h *NoHandleRawMessageHandler) SupportsPartialMessages() bool { +func (h *NoHandleAwapMessageHandler) SupportsPartialMessages() bool { return false } -func testAwapWebSocketWithoutHandleRawMessage(ctx *cli.Context, args []string) error { - fmt.Println("\n=== AWAP WebSocket Test (Without HandleRawMessage Override) ===") +func testAwapWebSocketWithoutHandleAwapMessage(ctx *cli.Context, args []string) error { + fmt.Println("\n=== AWAP WebSocket Test (HandleRawMessage Override) ===") fmt.Println("This test demonstrates AwapWebSocketHandler interface usage") - fmt.Println("The handler does NOT override HandleRawMessage,") - fmt.Println("so it uses AbstractAwapWebSocketHandler.HandleRawMessage") - fmt.Println("which uses AwapWebSocketHandler interface for type assertion") + fmt.Println("The handler does NOT override HandleAwapMessage, but override HandleRawMessage") profile, _ := config.LoadProfileWithContext(ctx) credential, err := profile.GetCredential(ctx, nil) @@ -511,99 +401,73 @@ func testAwapWebSocketWithoutHandleRawMessage(ctx *cli.Context, args []string) e } params := &openapiClient.Params{ - Action: tea.String("WebsocketAwapDemoApi"), - Version: tea.String("2022-02-02"), - Protocol: tea.String("wss"), - Method: tea.String("GET"), - Pathname: tea.String("/ws/awap-demo-api"), - AuthType: tea.String("AK"), + Action: tea.String("WebsocketAwapDemoApi"), + Version: tea.String("2022-02-02"), + Protocol: tea.String("wss"), + Method: tea.String("GET"), + Pathname: tea.String("/ws/awap-demo-api"), + AuthType: tea.String("AK"), + WebsocketSubProtocol: tea.String("awap"), } - request := &openapiClient.OpenApiRequest{ - Headers: map[string]*string{ - "Sec-Websocket-Protocol": tea.String("awap"), - }, - } + request := &openapiClient.OpenApiRequest{} runtime := &dara.RuntimeOptions{ - ReadTimeout: dara.Int(60000), // 60 seconds - ConnectTimeout: dara.Int(30000), // 30 seconds - WebSocketPingInterval: dara.Int(30000), // 30秒心跳 - WebSocketHandshakeTimeout: dara.Int(30000), // 30秒握手超时 - WebSocketWriteTimeout: dara.Int(30000), // 30秒写入超时 - WebSocketEnableReconnect: dara.Bool(true), // 启用重连 - WebSocketMaxReconnectTimes: dara.Int(5), // 最多重连5次 - WebSocketHandler: &NoHandleRawMessageHandler{}, // 使用不重写 HandleRawMessage 的 handler + ReadTimeout: dara.Int(60000), // 60 seconds + ConnectTimeout: dara.Int(30000), // 30 seconds + WebSocketPingInterval: dara.Int(30000), // 30秒心跳 + WebSocketHandshakeTimeout: dara.Int(30000), // 30秒握手超时 + WebSocketWriteTimeout: dara.Int(30000), // 30秒写入超时 + WebSocketEnableReconnect: dara.Bool(true), // 启用重连 + WebSocketMaxReconnectTimes: dara.Int(5), // 最多重连5次 + WebSocketHandler: &NoHandleAwapMessageHandler{}, // 使用不重写 HandleRawMessage 的 handler } fmt.Println("Connecting...") - result, err := apiClient.DoWebSocketRequest(params, request, runtime) + result, err := apiClient.CallApi(params, request, runtime) if err != nil { log.Fatalf("Connection failed: %v", err) } - wsClient := result["wsClient"].(dara.WebSocketClient) - defer wsClient.Close() + websocketObj := result["websocketClient"].(*websocketutils.WebSocketClient) + defer websocketObj.Close() fmt.Println("\nSending AWAP messages...") // 方法 1: 使用 SendAwapRequest 发送请求消息 fmt.Println("1. Sending AWAP request message...") - err = apiClient.SendAwapRequest(wsClient, "msg-no-handleraw-001", 1, map[string]interface{}{ - "action": "test", - "data": "This handler does NOT override HandleRawMessage", - }) + err = websocketObj.SendRawAwapTextMessageWithId(dara.AwapMessageType("UpstreamTextEvent"), + "msg-no-handleraw-001", + map[string]interface{}{"action": "test", "data": "This handler does NOT override HandleRawMessage"}, + ) if err != nil { log.Printf("Failed to send AWAP request: %v", err) + } else { + fmt.Printf("AWAP message sent successfully, type: %s\n", dara.AwapMessageType("UpstreamTextEvent")) } time.Sleep(1 * time.Second) - // 方法 2: 使用 SendAwapRequest 发送请求消息 - fmt.Println("2. Sending AWAP request message...") - err = apiClient.SendAwapRequest(wsClient, "msg-no-handleraw-002", 2, map[string]interface{}{ - "eventType": "testEvent", - "message": "Testing AwapWebSocketHandler interface usage", - }) - if err != nil { - log.Printf("Failed to send AWAP event: %v", err) - } - - time.Sleep(1 * time.Second) - - // 方法 3: 发送 AckRequiredTextEvent 类型的消息 - fmt.Println("3. Sending AckRequiredTextEvent message (with ACK wait)...") - response, err := apiClient.SendAwapRequestWithAck( - wsClient, - "msg-no-handleraw-003", - 3, - map[string]interface{}{ - "action": "ackRequiredTest", - "data": "This message requires acknowledgment", - "timestamp": time.Now().Unix(), - }, - 30*time.Second, // 30 seconds timeout + // 方法 2: binary 信息 + awapMsgBinary := websocketutils.NewAwapMessage(dara.AwapMessageType("UpstreamBinaryEvent"), + "msg-003", + []byte("Hello WebSocket!"), ) + err = websocketObj.SendAwapBinaryMessage(awapMsgBinary) if err != nil { - log.Printf("❌ Failed to send AckRequiredTextEvent or receive response: %v", err) + log.Printf("Failed to send AWAP message: %v", err) } else { - fmt.Printf("✅ Received ACK response:\n") - fmt.Printf(" Type: %s\n", response.Type) - if response.Headers != nil { - fmt.Printf(" Headers: %+v\n", response.Headers) - } - if response.Payload != nil { - payloadJSON, _ := json.Marshal(response.Payload) - fmt.Printf(" Payload: %s\n", string(payloadJSON)) - } + fmt.Printf("AWAP message sent successfully, type: %s\n", dara.AwapMessageType("UpstreamBinaryEvent")) } + time.Sleep(1 * time.Second) + // Wait for other responses fmt.Println("\nWaiting for other server responses...") time.Sleep(3 * time.Second) - fmt.Println("\n=== NoHandleRawMessage Test Complete ===") + fmt.Println("\n=== NoHandleAwapMessageHandler Test Complete ===") fmt.Println("This test demonstrates that AwapWebSocketHandler interface") - fmt.Println("is used when handler does NOT override HandleRawMessage") + fmt.Println("is used when handler overrides HandleRawMessage") return nil } @@ -626,19 +490,16 @@ func testGeneralWebSocket(ctx *cli.Context, args []string) error { } params := &openapiClient.Params{ - Action: tea.String("WebsocketGeneralDemoApi"), - Version: tea.String("2022-02-02"), - Protocol: tea.String("wss"), - Method: tea.String("GET"), - Pathname: tea.String("/ws/general-demo-api"), - AuthType: tea.String("AK"), + Action: tea.String("WebsocketGeneralDemoApi"), + Version: tea.String("2022-02-02"), + Protocol: tea.String("wss"), + Method: tea.String("GET"), + Pathname: tea.String("/ws/general-demo-api"), + AuthType: tea.String("AK"), + WebsocketSubProtocol: tea.String("general"), } - request := &openapiClient.OpenApiRequest{ - Headers: map[string]*string{ - "Sec-Websocket-Protocol": tea.String("general"), - }, - } + request := &openapiClient.OpenApiRequest{} runtime := &dara.RuntimeOptions{ ReadTimeout: dara.Int(60000), // 60 seconds @@ -653,18 +514,18 @@ func testGeneralWebSocket(ctx *cli.Context, args []string) error { fmt.Println("Connecting to General WebSocket...") // Handler 从 runtime 中获取 - result, err := apiClient.DoWebSocketRequest(params, request, runtime) + result, err := apiClient.CallApi(params, request, runtime) if err != nil { log.Fatalf("Connection failed: %v", err) } - wsClient := result["wsClient"].(dara.WebSocketClient) - defer wsClient.Close() + websocketObj := result["websocketClient"].(*websocketutils.WebSocketClient) + defer websocketObj.Close() fmt.Println("\nSending General messages...") // 方法 1: 发送文本消息 fmt.Println("1. Sending General text message...") - err = apiClient.SendGeneralTextMessage(wsClient, "Hello General WebSocket!") + err = websocketObj.SendGeneralTextMessage("Hello General WebSocket!") if err != nil { log.Printf("Failed to send General text message: %v", err) } @@ -673,7 +534,7 @@ func testGeneralWebSocket(ctx *cli.Context, args []string) error { // 方法 2: 发送 JSON 消息 fmt.Println("2. Sending General JSON message...") - eventData := map[string]interface{}{ + jsonData, _ := json.Marshal(map[string]interface{}{ "name": "general-test", "object": map[string]interface{}{ "field1": "general", @@ -686,33 +547,16 @@ func testGeneralWebSocket(ctx *cli.Context, args []string) error { "map": map[string]string{ "test": "test", }, - } - err = apiClient.SendGeneralJSONMessage(wsClient, eventData) - if err != nil { - log.Printf("Failed to send General JSON message: %v", err) - } - - time.Sleep(1 * time.Second) - - // 方法 3: 发送带自定义头部的消息 - fmt.Println("3. Sending General message with custom headers...") - generalMsg := apiClient.BuildGeneralJSONMessage(map[string]interface{}{ - "action": "test", - "data": "Hello with headers!", }) - generalMsg.WithHeader("X-Custom-Header", "custom-value") - generalMsg.WithHeader("Content-Type", "application/json") - err = apiClient.SendGeneralMessage(wsClient, generalMsg) + err = websocketObj.SendGeneralTextMessage(string(jsonData)) if err != nil { - log.Printf("Failed to send General message with headers: %v", err) + log.Printf("Failed to send General JSON message: %v", err) } - time.Sleep(1 * time.Second) - - // 方法 4: 发送二进制消息 - fmt.Println("4. Sending General binary message...") + // 方法 3: 发送二进制消息 + fmt.Println("3. Sending General binary message...") binaryData := []byte("Binary General Data") - err = apiClient.SendGeneralBinaryMessage(wsClient, binaryData) + err = websocketObj.SendGeneralBinaryMessage(binaryData) if err != nil { log.Printf("Failed to send General binary message: %v", err) } @@ -749,9 +593,12 @@ func (h *SequentialHandler) AfterConnectionEstablished(session *dara.WebSocketSe func (h *SequentialHandler) HandleAwapMessage(session *dara.WebSocketSessionInfo, message *dara.AwapMessage) error { h.mu.Lock() defer h.mu.Unlock() - + seq, err := strconv.ParseInt(message.Headers["seq"], 10, 64) + if err != nil { + return err + } fmt.Printf("📨 CLI Sequential Test - HandleAwapMessage called: seq=%d, type=%s, id=%s\n", - message.Seq, message.Type, message.ID) + seq, message.Type, message.ID) // Print message payload for debugging if message.Payload != nil { payloadJSON, _ := json.Marshal(message.Payload) @@ -759,11 +606,11 @@ func (h *SequentialHandler) HandleAwapMessage(session *dara.WebSocketSessionInfo } // Track sequence number - h.receivedSeq = append(h.receivedSeq, message.Seq) + h.receivedSeq = append(h.receivedSeq, seq) fmt.Printf(" Progress: %d/%d messages received\n", len(h.receivedSeq), h.expectedCount) // Check if we've received all expected messages - if len(h.receivedSeq) >= h.expectedCount { + if len(h.receivedSeq) > h.expectedCount { if h.done != nil { close(h.done) h.done = nil // Prevent double close @@ -773,14 +620,6 @@ func (h *SequentialHandler) HandleAwapMessage(session *dara.WebSocketSessionInfo return nil } -func (h *SequentialHandler) HandleAwapIncomingMessage(session *dara.WebSocketSessionInfo, message *dara.AwapIncomingMessage) error { - // This is called after HandleAwapMessage, so we don't need to count again - // Just log for debugging - fmt.Printf("📬 CLI Sequential Test - HandleAwapIncomingMessage called: seq=%d, type=%s, id=%s\n", - message.Seq, message.Type, message.ID) - return nil -} - func (h *SequentialHandler) HandleRawMessage(session *dara.WebSocketSessionInfo, message *dara.WebSocketMessage) error { // Parse the AWAP message ourselves and call HandleAwapMessage directly // This avoids the issue where AbstractAwapWebSocketHandler.HandleRawMessage @@ -791,25 +630,17 @@ func (h *SequentialHandler) HandleRawMessage(session *dara.WebSocketSessionInfo, return err } - fmt.Printf("[CLI Sequential] Calling HandleAwapMessage directly: seq=%d, type=%s\n", awapMsg.Seq, awapMsg.Type) - if err := h.HandleAwapMessage(session, awapMsg); err != nil { + seq, err := strconv.ParseInt(message.Headers["seq"], 10, 64) + if err != nil { return err } - if awapMsg.Type == dara.AwapMessageTypeUpstreamTextEvent || - awapMsg.Type == dara.AwapMessageTypeUpstreamBinaryEvent || - awapMsg.Type == dara.AwapMessageTypeAckRequiredTextEvent || - awapMsg.Type == dara.AwapMessageTypeMessageReceiveEvent || - awapMsg.Type == dara.AwapMessageTypeDownstreamTextEvent || - awapMsg.Type == dara.AwapMessageTypeDownstreamBinaryEvent { - incoming := &dara.AwapIncomingMessage{ - AwapMessage: *awapMsg, - RawPayload: message.Payload, - } - return h.HandleAwapIncomingMessage(session, incoming) + fmt.Printf("[CLI Sequential] Calling HandleAwapMessage directly: seq=%d, type=%s\n", seq, awapMsg.Type) + if err := h.HandleAwapMessage(session, awapMsg); err != nil { + return err } - return nil + } func (h *SequentialHandler) SupportsPartialMessages() bool { @@ -874,25 +705,23 @@ func testSequentialMessageReception(ctx *cli.Context, args []string) error { } params := &openapiClient.Params{ - Action: tea.String("WebsocketAwapDemoApi"), - Version: tea.String("2022-02-02"), - Protocol: tea.String("wss"), - Method: tea.String("GET"), - Pathname: tea.String("/ws/awap-demo-api"), - AuthType: tea.String("AK"), + Action: tea.String("WebsocketAwapDemoApi"), + Version: tea.String("2022-02-02"), + Protocol: tea.String("wss"), + Method: tea.String("GET"), + Pathname: tea.String("/ws/awap-demo-api"), + AuthType: tea.String("AK"), + WebsocketSubProtocol: tea.String("awap"), } request := &openapiClient.OpenApiRequest{ Query: map[string]*string{ "delay": tea.String("3000"), // Delay between messages (ms) - "batchSendMsgCnt": tea.String("20"), // Number of messages to send - }, - Headers: map[string]*string{ - "Sec-Websocket-Protocol": tea.String("awap"), + "batchSendMsgCnt": tea.String("10"), // Number of messages to send }, } - expectedCount := 20 + expectedCount := 10 handler := NewSequentialHandler(expectedCount) runtime := &dara.RuntimeOptions{ @@ -911,21 +740,20 @@ func testSequentialMessageReception(ctx *cli.Context, args []string) error { dara.StringValue(request.Query["delay"]), dara.StringValue(request.Query["batchSendMsgCnt"])) - result, err := apiClient.DoWebSocketRequest(params, request, runtime) + result, err := apiClient.CallApi(params, request, runtime) if err != nil { log.Fatalf("Connection failed: %v", err) } - wsClient := result["wsClient"].(dara.WebSocketClient) - defer wsClient.Close() + websocketObj := result["websocketClient"].(*websocketutils.WebSocketClient) + defer websocketObj.Close() fmt.Println("Connection established, waiting for server to start sending batch messages...") time.Sleep(2 * time.Second) fmt.Println("\nSending request to trigger batch message sending...") - err = apiClient.SendAwapRequest(wsClient, "sequential-test-001", 1, map[string]interface{}{ - "action": "batchSend", - "test": "sequential", - }) + err = websocketObj.SendRawAwapTextMessageWithId(dara.AwapMessageType("UpstreamTextEvent"), + "sequential-test-001", + map[string]interface{}{"action": "batchSend", "test": "sequential"}) if err != nil { log.Printf("Failed to send request: %v", err) } else { From 846c183641e2d44352e0f04702b994586d5550b9 Mon Sep 17 00:00:00 2001 From: AllyW Date: Sat, 29 Nov 2025 20:40:18 +0800 Subject: [PATCH 5/7] add binary trigger --- websocketTest/main.go | 169 ++++++++++++++++++++++++++++++++---------- 1 file changed, 128 insertions(+), 41 deletions(-) diff --git a/websocketTest/main.go b/websocketTest/main.go index ca05f8890..fe9fd6924 100644 --- a/websocketTest/main.go +++ b/websocketTest/main.go @@ -13,6 +13,7 @@ import ( "github.com/aliyun/aliyun-cli/v3/i18n" openapiClient "github.com/alibabacloud-go/darabonba-openapi/v2/client" + openapiutil "github.com/alibabacloud-go/darabonba-openapi/v2/utils" websocketutils "github.com/alibabacloud-go/darabonba-openapi/v2/websocketutils" dara "github.com/alibabacloud-go/tea/dara" "github.com/alibabacloud-go/tea/tea" @@ -73,10 +74,11 @@ func runWebsocketTest(ctx *cli.Context, args []string) error { // // bodyBytes, _ := GetContentFromApiResponse(response) // fmt.Printf("response: %s\n", response["statusCode"]) + testAwapWebSocketBinary(ctx, args) // testAwapWebSocket(ctx, args) // testAwapWebSocketWithoutHandleAwapMessage(ctx, args) // 重写 HandleRawMessage 的用例 // testGeneralWebSocket(ctx, args) - testSequentialMessageReception(ctx, args) + // testSequentialMessageReception(ctx, args) return nil } @@ -136,10 +138,6 @@ func (h *SimpleHandler) HandleRawMessage(session *dara.WebSocketSessionInfo, mes return nil } -func (h *SimpleHandler) SupportsPartialMessages() bool { - return false -} - func (h *SimpleHandler) HandleError(session *dara.WebSocketSessionInfo, err error) error { fmt.Printf("❌ CLI Error: %v\n", err) printSessionInfo(session) @@ -152,6 +150,130 @@ func (h *SimpleHandler) AfterConnectionClosed(session *dara.WebSocketSessionInfo return nil } +func testAwapWebSocketBinary(ctx *cli.Context, args []string) error { + fmt.Println("=== WebSocket Binary Example ===") + profile, _ := config.LoadProfileWithContext(ctx) + credential, err := profile.GetCredential(ctx, nil) + if err != nil { + return fmt.Errorf("failed to get credential: %w", err) + } + config := &openapiClient.Config{ + Credential: credential, + Endpoint: dara.String("dalutest-pre.aliyuncs.com"), + Protocol: dara.String("https"), + } + + apiClient, err := openapiClient.NewClient(config) + if err != nil { + return fmt.Errorf("failed to create client: %w", err) + } + + // Setup WebSocket + params := &openapiClient.Params{ + Action: tea.String("WebsocketAwapDemoApi"), + Version: tea.String("2022-02-02"), + Protocol: tea.String("wss"), + Method: tea.String("GET"), + Pathname: tea.String("/ws/awap-demo-api"), + AuthType: tea.String("AK"), + WebsocketSubProtocol: tea.String("awap"), + } + + request := &openapiClient.OpenApiRequest{} + + runtime := &dara.RuntimeOptions{ + ReadTimeout: dara.Int(60000), // 60 seconds + ConnectTimeout: dara.Int(30000), // 30 seconds (increased for slow networks) + WebSocketPingInterval: dara.Int(30000), // 30秒心跳 + WebSocketHandshakeTimeout: dara.Int(30000), // 30秒握手超时(增加以应对网络延迟) + WebSocketWriteTimeout: dara.Int(30000), // 30秒写入超时(增加以应对网络延迟) + WebSocketEnableReconnect: dara.Bool(true), // 启用重连 + WebSocketMaxReconnectTimes: dara.Int(5), // 最多重连5次 + WebSocketHandler: &SimpleHandler{}, // 通过 runtime 配置 handler + } + + fmt.Println("Connecting...") + // Handler 从 runtime 中获取 + result, err := apiClient.CallApi(params, request, runtime) + if err != nil { + log.Fatalf("Connection failed: %v", err) + } + websocketObj := result["websocketClient"].(*websocketutils.WebSocketClient) + defer websocketObj.Close() + + sessionInfo := websocketObj.GetSessionInfo() + sessionId := sessionInfo.SessionID + if sessionId == "" { + return fmt.Errorf("session ID is empty") + } + + fmt.Println("\n1. Sending AWAP message...") + + // 方法 1: 发送 binary 信息 + awapMsgBinary := websocketutils.NewAwapMessage(dara.AwapMessageType("UpstreamBinaryEvent"), + "msg-001", + []byte("Hello WebSocket!"), + ) + awapMsgBinary.WithHeader("session-id", sessionId) + err = websocketObj.SendAwapBinaryMessage(awapMsgBinary) + if err != nil { + log.Printf("Failed to send AWAP message: %v", err) + } else { + fmt.Printf("AWAP message sent successfully, type: %s\n", dara.AwapMessageType("UpstreamBinaryEvent")) + } + time.Sleep(5 * time.Second) + + apiClientTrigger, err := openapiClient.NewClient(config) + if err != nil { + return fmt.Errorf("failed to create client: %w", err) + } + + // trigger binary from server + params = &openapiClient.Params{ + // Action: tea.String("ListApiMcpServers"), + // Version: tea.String("2024-11-30"), + // Protocol: tea.String("HTTPS"), + // Method: tea.String("GET"), + // AuthType: tea.String("AK"), + // Style: tea.String("ROA"), + // Pathname: tea.String("/apimcpservers"), + // ReqBodyType: tea.String("json"), + // BodyType: tea.String("json"), + // Product: dara.String("DaluTestInner"), + Action: tea.String("WebsocketServerExecute"), + Version: tea.String("2022-02-02"), + Protocol: tea.String("HTTPS"), + Method: tea.String("POST"), + Pathname: tea.String("/ws_server/execute"), + AuthType: tea.String("AK"), + Style: tea.String("RPC"), + ReqBodyType: tea.String("json"), + BodyType: tea.String("json"), + } + + request = &openapiClient.OpenApiRequest{ + Query: openapiutil.Query(map[string]interface{}{ + "sessionId": sessionId, + "action": "sendBinary", + }), + } + + runtimeTrigger := &dara.RuntimeOptions{} + + fmt.Println("Triggering binary from server...") + result, err = apiClientTrigger.CallApi(params, request, runtimeTrigger) + if err != nil { + log.Fatalf("Failed to trigger binary from server: %v", err) + } + fmt.Printf("Trigger binary from server result: %+v\n", result) + + time.Sleep(10 * time.Second) + fmt.Println("Waiting for 10 seconds...") + + fmt.Println("\n=== WebSocket Binary Example Complete ===") + return nil +} + func testAwapWebSocket(ctx *cli.Context, args []string) error { fmt.Println("=== WebSocket Example ===") profile, _ := config.LoadProfileWithContext(ctx) @@ -286,7 +408,7 @@ type GeneralHandler struct { } func (h *GeneralHandler) AfterConnectionEstablished(session *dara.WebSocketSessionInfo) error { - fmt.Println("✓ CLI General Connected to General WebSocket server") + fmt.Println("✓ [CLI General] Connected to General WebSocket server") printSessionInfo(session) return nil } @@ -313,10 +435,6 @@ func (h *GeneralHandler) HandleRawMessage(session *dara.WebSocketSessionInfo, me return nil } -func (h *GeneralHandler) SupportsPartialMessages() bool { - return false -} - func (h *GeneralHandler) HandleError(session *dara.WebSocketSessionInfo, err error) error { fmt.Printf("❌ CLI General Error: %v\n", err) printSessionInfo(session) @@ -375,10 +493,6 @@ func (h *NoHandleAwapMessageHandler) AfterConnectionClosed(session *dara.WebSock return nil } -func (h *NoHandleAwapMessageHandler) SupportsPartialMessages() bool { - return false -} - func testAwapWebSocketWithoutHandleAwapMessage(ctx *cli.Context, args []string) error { fmt.Println("\n=== AWAP WebSocket Test (HandleRawMessage Override) ===") fmt.Println("This test demonstrates AwapWebSocketHandler interface usage") @@ -620,33 +734,6 @@ func (h *SequentialHandler) HandleAwapMessage(session *dara.WebSocketSessionInfo return nil } -func (h *SequentialHandler) HandleRawMessage(session *dara.WebSocketSessionInfo, message *dara.WebSocketMessage) error { - // Parse the AWAP message ourselves and call HandleAwapMessage directly - // This avoids the issue where AbstractAwapWebSocketHandler.HandleRawMessage - // can't access the outer SequentialHandler type - awapMsg, err := dara.ParseAwapMessage(message) - if err != nil { - fmt.Printf("[CLI Sequential] Failed to parse AWAP message: %v\n", err) - return err - } - - seq, err := strconv.ParseInt(message.Headers["seq"], 10, 64) - if err != nil { - return err - } - - fmt.Printf("[CLI Sequential] Calling HandleAwapMessage directly: seq=%d, type=%s\n", seq, awapMsg.Type) - if err := h.HandleAwapMessage(session, awapMsg); err != nil { - return err - } - return nil - -} - -func (h *SequentialHandler) SupportsPartialMessages() bool { - return false -} - func (h *SequentialHandler) HandleError(session *dara.WebSocketSessionInfo, err error) error { fmt.Printf("❌ CLI Sequential Test Error: %v\n", err) printSessionInfo(session) From e15e1be22ece37cacbe2b15512cd6fa963e02324 Mon Sep 17 00:00:00 2001 From: AllyW Date: Mon, 8 Dec 2025 14:55:09 +0800 Subject: [PATCH 6/7] adjust openapi --- websocketTest/main.go | 180 +++++++++++++++++++----------------------- 1 file changed, 83 insertions(+), 97 deletions(-) diff --git a/websocketTest/main.go b/websocketTest/main.go index fe9fd6924..5d98c4ff0 100644 --- a/websocketTest/main.go +++ b/websocketTest/main.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "strconv" + "strings" "sync" "time" @@ -116,7 +117,7 @@ func printSessionInfo(session *dara.WebSocketSessionInfo) { } type SimpleHandler struct { - dara.AbstractAwapWebSocketHandler + websocketutils.AbstractAwapWebSocketHandler } func (h *SimpleHandler) AfterConnectionEstablished(session *dara.WebSocketSessionInfo) error { @@ -125,7 +126,7 @@ func (h *SimpleHandler) AfterConnectionEstablished(session *dara.WebSocketSessio return nil } -func (h *SimpleHandler) HandleAwapMessage(session *dara.WebSocketSessionInfo, message *dara.AwapMessage) error { +func (h *SimpleHandler) HandleAwapMessage(session *dara.WebSocketSessionInfo, message *websocketutils.AwapMessage) error { fmt.Println("📨 CLI Received AWAP message:") fmt.Printf(" Type: %s\n", message.Format) fmt.Printf(" ID: %s\n", message.ID) @@ -210,16 +211,17 @@ func testAwapWebSocketBinary(ctx *cli.Context, args []string) error { fmt.Println("\n1. Sending AWAP message...") // 方法 1: 发送 binary 信息 - awapMsgBinary := websocketutils.NewAwapMessage(dara.AwapMessageType("UpstreamBinaryEvent"), - "msg-001", + awapMsgBinary := websocketutils.NewAwapMessage( []byte("Hello WebSocket!"), + websocketutils.WithType(websocketutils.AwapMessageType("UpstreamBinaryEvent")), + websocketutils.WithID("msg-001"), ) awapMsgBinary.WithHeader("session-id", sessionId) err = websocketObj.SendAwapBinaryMessage(awapMsgBinary) if err != nil { log.Printf("Failed to send AWAP message: %v", err) } else { - fmt.Printf("AWAP message sent successfully, type: %s\n", dara.AwapMessageType("UpstreamBinaryEvent")) + fmt.Printf("AWAP message sent successfully, type: %s\n", websocketutils.AwapMessageType("UpstreamBinaryEvent")) } time.Sleep(5 * time.Second) @@ -230,32 +232,38 @@ func testAwapWebSocketBinary(ctx *cli.Context, args []string) error { // trigger binary from server params = &openapiClient.Params{ - // Action: tea.String("ListApiMcpServers"), - // Version: tea.String("2024-11-30"), - // Protocol: tea.String("HTTPS"), - // Method: tea.String("GET"), - // AuthType: tea.String("AK"), - // Style: tea.String("ROA"), - // Pathname: tea.String("/apimcpservers"), - // ReqBodyType: tea.String("json"), - // BodyType: tea.String("json"), - // Product: dara.String("DaluTestInner"), Action: tea.String("WebsocketServerExecute"), Version: tea.String("2022-02-02"), Protocol: tea.String("HTTPS"), Method: tea.String("POST"), Pathname: tea.String("/ws_server/execute"), AuthType: tea.String("AK"), - Style: tea.String("RPC"), - ReqBodyType: tea.String("json"), - BodyType: tea.String("json"), - } + Style: dara.String("ROA"), + ReqBodyType: dara.String("json"), + BodyType: dara.String("string"), + } + + header := "type:DownstreamBinaryEvent\nseq:1\nid:msg1\ntimestamp:1719242591197\n\n" + jsonPart := `{ + "audioId":"640bc797bb684bd6960185651307", + "AudioType": "flac", + "processStatus": "start", + "additionalConf":{ + "timeout": 60, + "maxAudioLength": "10" + } +}` + + // 将JSON部分转为byte类型 + jsonBytes := []byte(jsonPart) + bytesJSON, _ := json.Marshal(jsonBytes) request = &openapiClient.OpenApiRequest{ Query: openapiutil.Query(map[string]interface{}{ "sessionId": sessionId, "action": "sendBinary", }), + Stream: strings.NewReader(header + string(bytesJSON)), } runtimeTrigger := &dara.RuntimeOptions{} @@ -292,18 +300,7 @@ func testAwapWebSocket(ctx *cli.Context, args []string) error { return fmt.Errorf("failed to create client: %w", err) } - // Setup WebSocket params := &openapiClient.Params{ - // Action: tea.String("ListApiMcpServers"), - // Version: tea.String("2024-11-30"), - // Protocol: tea.String("HTTPS"), - // Method: tea.String("GET"), - // AuthType: tea.String("AK"), - // Style: tea.String("ROA"), - // Pathname: tea.String("/apimcpservers"), - // ReqBodyType: tea.String("json"), - // BodyType: tea.String("json"), - // Product: dara.String("DaluTestInner"), Action: tea.String("WebsocketAwapDemoApi"), Version: tea.String("2022-02-02"), Protocol: tea.String("wss"), @@ -327,7 +324,6 @@ func testAwapWebSocket(ctx *cli.Context, args []string) error { } fmt.Println("Connecting...") - // Handler 从 runtime 中获取 result, err := apiClient.CallApi(params, request, runtime) if err != nil { log.Fatalf("Connection failed: %v", err) @@ -335,76 +331,64 @@ func testAwapWebSocket(ctx *cli.Context, args []string) error { websocketObj := result["websocketClient"].(*websocketutils.WebSocketClient) defer websocketObj.Close() - fmt.Println("\n1. Sending AWAP message...") - // 方法 1: 使用 SendAwapRequest 发送请求消息(推荐) - err = websocketObj.SendRawAwapTextMessageWithId(dara.AwapMessageType("UpstreamTextEvent"), - "msg-001", - map[string]interface{}{"action": "test", "data": "Hello WebSocket!"}) - if err != nil { - log.Printf("Failed to send AWAP request: %v", err) - } else { - fmt.Printf("AWAP message sent successfully, type: %s\n", dara.AwapMessageType("UpstreamTextEvent")) - } - - // 方法 2: 手动构建 AWAP 消息 - awapMsg := websocketutils.NewAwapMessage(dara.AwapMessageType("UpstreamTextEvent"), - "msg-002", - map[string]interface{}{"message": "Hello WebSocket!"}, + // 手动构建 AWAP 消息 + fmt.Println("\n1. Sending AWAP text message...") + awapMsg := websocketutils.NewAwapMessage( + map[string]interface{}{"message": "Hello WebSocket!", "try": "1"}, + websocketutils.WithType(websocketutils.AwapMessageType("UpstreamTextEvent")), + websocketutils.WithID("msg-001"), ) err = websocketObj.SendAwapTextMessage(awapMsg) if err != nil { log.Printf("Failed to send AWAP message: %v", err) } else { - fmt.Printf("AWAP message sent successfully, type: %s\n", dara.AwapMessageType("UpstreamTextEvent")) + fmt.Printf("AWAP message sent successfully, type: %s\n", websocketutils.AwapMessageType("UpstreamTextEvent")) } - // 方法 3: binary 信息 - awapMsgBinary := websocketutils.NewAwapMessage(dara.AwapMessageType("UpstreamBinaryEvent"), - "msg-003", - []byte("Hello WebSocket!"), + // 方法 2: binary 信息 + fmt.Println("\n2. Sending AWAP binary message...") + awapMsgBinary := websocketutils.NewAwapMessage( + []byte("Hello WebSocket! try 2"), + websocketutils.WithType(websocketutils.AwapMessageType("UpstreamBinaryEvent")), + websocketutils.WithID("msg-002"), ) err = websocketObj.SendAwapBinaryMessage(awapMsgBinary) if err != nil { log.Printf("Failed to send AWAP message: %v", err) } else { - fmt.Printf("AWAP message sent successfully, type: %s\n", dara.AwapMessageType("UpstreamBinaryEvent")) + fmt.Printf("AWAP message sent successfully, type: %s\n", websocketutils.AwapMessageType("UpstreamBinaryEvent")) } time.Sleep(1 * time.Second) - // 方法 4: 发送 AckRequiredTextEvent 类型的消息(等待响应) - fmt.Println("\n4. Sending AckRequiredTextEvent message and waiting for ACK...") - ackResponse, err := websocketObj.SendRawAwapRequestWithAck( - "msg-004", - map[string]interface{}{"action": "ackRequiredTest", "data": "This message requires acknowledgment", "timestamp": time.Now().Unix()}, - 30*time.Second, + // 方法 3: 发送 AckRequiredTextEvent 类型的消息(等待响应) + fmt.Println("\n3. Sending AckRequiredTextEvent message and waiting for ACK...") + message := websocketutils.NewAwapMessage( + map[string]interface{}{"action": "ackRequiredTest", "data": "This message requires acknowledgment", "timestamp": time.Now().Unix(), "try": "3"}, + websocketutils.WithType(websocketutils.AwapMessageType("AckRequiredTextEvent")), + websocketutils.WithID("msg-003"), ) + ackResponse, err := websocketObj.SendAwapRequestWithAck(message, 30*time.Second) if err != nil { log.Printf("❌ Failed to send AckRequiredTextEvent or timed out waiting for response: %v", err) } else { fmt.Printf("✅ Received acknowledgment:\n") - fmt.Printf(" Response Type: %s\n", ackResponse.Type) - if ackResponse.Headers != nil { - if ackID, ok := ackResponse.Headers["ack-id"]; ok { - fmt.Printf(" Ack-ID: %s\n", ackID) - } - } - if ackResponse.Payload != nil { - payloadJSON, _ := json.Marshal(ackResponse.Payload) - fmt.Printf(" Payload: %s\n", string(payloadJSON)) - } + fmt.Printf(" Response: %+v\n", ackResponse) + fmt.Printf(" Response Type: %s\n", ackResponse.(*websocketutils.AwapMessage).Type) + fmt.Printf(" Response Headers: %+v\n", ackResponse.(*websocketutils.AwapMessage).Headers) + fmt.Printf(" Response Payload: %+v\n", ackResponse.(*websocketutils.AwapMessage).Payload) } // Wait for other responses - time.Sleep(2 * time.Second) + time.Sleep(10 * time.Second) fmt.Println("\n=== AWAP Example Complete ===") return nil } type GeneralHandler struct { - dara.AbstractGeneralWebSocketHandler + websocketutils.AbstractGeneralWebSocketHandler } func (h *GeneralHandler) AfterConnectionEstablished(session *dara.WebSocketSessionInfo) error { @@ -416,19 +400,15 @@ func (h *GeneralHandler) AfterConnectionEstablished(session *dara.WebSocketSessi func (h *GeneralHandler) HandleRawMessage(session *dara.WebSocketSessionInfo, message *dara.WebSocketMessage) error { // Parse and handle General messages directly if message.Type == dara.WebSocketMessageTypeText { - // Parse as General text message - generalMsg, err := dara.ParseGeneralMessage(message) + generalMsg, err := websocketutils.ParseGeneralMessage(message) if err != nil { fmt.Printf("[CLI General] Failed to parse General message: %v\n", err) return err } - fmt.Printf("[CLI General] Received text message: %+v\n", message) - fmt.Printf("[CLI General] Received text message: %s\n", generalMsg.Body) + fmt.Printf("[CLI General] Received text message: %+v\n", generalMsg) } else if message.Type == dara.WebSocketMessageTypeBinary { - // Handle as binary message - fmt.Printf("[CLI General] Received binary message\n") fmt.Printf("[CLI General] Received binary message: %s\n", message.Payload) } @@ -449,7 +429,7 @@ func (h *GeneralHandler) AfterConnectionClosed(session *dara.WebSocketSessionInf // NoHandleAwapMessageHandler 是一个重写 HandleRawMessage 的 handler type NoHandleAwapMessageHandler struct { - dara.AbstractAwapWebSocketHandler + websocketutils.AbstractAwapWebSocketHandler messageCount int mu sync.Mutex } @@ -471,8 +451,7 @@ func (h *NoHandleAwapMessageHandler) HandleRawMessage(session *dara.WebSocketSes fmt.Printf("📨 CLI NoHandleAwapMessage - HandleRawMessage called (#%d):\n", count) fmt.Printf(" Type: %+v\n", message.Type) fmt.Printf(" Headers: %+v\n", message.Headers) - fmt.Printf(" Payload: %s\n", string(message.Payload)) - printSessionInfo(session) + fmt.Printf(" Payload(should have empty line): %s\n", string(message.Payload)) return nil } @@ -549,35 +528,39 @@ func testAwapWebSocketWithoutHandleAwapMessage(ctx *cli.Context, args []string) // 方法 1: 使用 SendAwapRequest 发送请求消息 fmt.Println("1. Sending AWAP request message...") - err = websocketObj.SendRawAwapTextMessageWithId(dara.AwapMessageType("UpstreamTextEvent"), - "msg-no-handleraw-001", - map[string]interface{}{"action": "test", "data": "This handler does NOT override HandleRawMessage"}, + message := websocketutils.NewAwapMessage( + map[string]interface{}{"action": "test", "data": "This handler does NOT override HandleRawMessage", "try": "1"}, + websocketutils.WithType(websocketutils.AwapMessageType("UpstreamTextEvent")), + websocketutils.WithID("msg-no-handleraw-001"), ) + err = websocketObj.SendAwapTextMessage(message) if err != nil { log.Printf("Failed to send AWAP request: %v", err) } else { - fmt.Printf("AWAP message sent successfully, type: %s\n", dara.AwapMessageType("UpstreamTextEvent")) + fmt.Printf("AWAP message sent successfully, type: %s\n", websocketutils.AwapMessageType("UpstreamTextEvent")) } time.Sleep(1 * time.Second) // 方法 2: binary 信息 - awapMsgBinary := websocketutils.NewAwapMessage(dara.AwapMessageType("UpstreamBinaryEvent"), - "msg-003", - []byte("Hello WebSocket!"), + fmt.Println("2. Sending AWAP binary message...") + awapMsgBinary := websocketutils.NewAwapMessage( + []byte("Hello WebSocket! try 2"), + websocketutils.WithType(websocketutils.AwapMessageType("UpstreamBinaryEvent")), + websocketutils.WithID("msg-002"), ) err = websocketObj.SendAwapBinaryMessage(awapMsgBinary) if err != nil { log.Printf("Failed to send AWAP message: %v", err) } else { - fmt.Printf("AWAP message sent successfully, type: %s\n", dara.AwapMessageType("UpstreamBinaryEvent")) + fmt.Printf("AWAP message sent successfully, type: %s\n", websocketutils.AwapMessageType("UpstreamBinaryEvent")) } time.Sleep(1 * time.Second) // Wait for other responses fmt.Println("\nWaiting for other server responses...") - time.Sleep(3 * time.Second) + time.Sleep(10 * time.Second) fmt.Println("\n=== NoHandleAwapMessageHandler Test Complete ===") fmt.Println("This test demonstrates that AwapWebSocketHandler interface") @@ -639,7 +622,7 @@ func testGeneralWebSocket(ctx *cli.Context, args []string) error { // 方法 1: 发送文本消息 fmt.Println("1. Sending General text message...") - err = websocketObj.SendGeneralTextMessage("Hello General WebSocket!") + err = websocketObj.SendGeneralTextMessage("Hello General WebSocket! try 1") if err != nil { log.Printf("Failed to send General text message: %v", err) } @@ -650,6 +633,7 @@ func testGeneralWebSocket(ctx *cli.Context, args []string) error { fmt.Println("2. Sending General JSON message...") jsonData, _ := json.Marshal(map[string]interface{}{ "name": "general-test", + "try": "2", "object": map[string]interface{}{ "field1": "general", "field2": 2, @@ -669,21 +653,20 @@ func testGeneralWebSocket(ctx *cli.Context, args []string) error { // 方法 3: 发送二进制消息 fmt.Println("3. Sending General binary message...") - binaryData := []byte("Binary General Data") + binaryData := []byte("Binary General Data try 3") err = websocketObj.SendGeneralBinaryMessage(binaryData) if err != nil { log.Printf("Failed to send General binary message: %v", err) } - // Wait for response - time.Sleep(3 * time.Second) + time.Sleep(10 * time.Second) fmt.Println("\n=== General Example Complete ===") return nil } type SequentialHandler struct { - dara.AbstractAwapWebSocketHandler + websocketutils.AbstractAwapWebSocketHandler receivedSeq []int64 // Track received sequence numbers mu sync.Mutex expectedCount int @@ -704,7 +687,7 @@ func (h *SequentialHandler) AfterConnectionEstablished(session *dara.WebSocketSe return nil } -func (h *SequentialHandler) HandleAwapMessage(session *dara.WebSocketSessionInfo, message *dara.AwapMessage) error { +func (h *SequentialHandler) HandleAwapMessage(session *dara.WebSocketSessionInfo, message *websocketutils.AwapMessage) error { h.mu.Lock() defer h.mu.Unlock() seq, err := strconv.ParseInt(message.Headers["seq"], 10, 64) @@ -838,9 +821,12 @@ func testSequentialMessageReception(ctx *cli.Context, args []string) error { time.Sleep(2 * time.Second) fmt.Println("\nSending request to trigger batch message sending...") - err = websocketObj.SendRawAwapTextMessageWithId(dara.AwapMessageType("UpstreamTextEvent"), - "sequential-test-001", - map[string]interface{}{"action": "batchSend", "test": "sequential"}) + message := websocketutils.NewAwapMessage( + map[string]interface{}{"action": "batchSend", "test": "sequential"}, + websocketutils.WithType(websocketutils.AwapMessageType("UpstreamTextEvent")), + websocketutils.WithID("sequential-test-001"), + ) + err = websocketObj.SendAwapTextMessage(message) if err != nil { log.Printf("Failed to send request: %v", err) } else { From 21b572086bfd2f978f1d4d7b8bf3f4fed6622c9a Mon Sep 17 00:00:00 2001 From: AllyW Date: Mon, 8 Dec 2025 19:59:57 +0800 Subject: [PATCH 7/7] fix tests --- go.mod | 4 +--- go.sum | 9 +++++++++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 5ae86f561..fe36a1458 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/alibabacloud-go/alibabacloud-gateway-sls v0.3.0 github.com/alibabacloud-go/darabonba-openapi/v2 v2.1.12 github.com/alibabacloud-go/tea v1.3.13 - github.com/alibabacloud-go/tea-utils/v2 v2.0.7 + github.com/alibabacloud-go/tea-utils/v2 v2.0.9 github.com/aliyun/alibaba-cloud-sdk-go v1.63.107 github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible github.com/aliyun/credentials-go v1.4.7 @@ -61,5 +61,3 @@ require ( replace github.com/alibabacloud-go/darabonba-openapi/v2 => ../darabonba-openapi/golang replace github.com/alibabacloud-go/tea => ../tea - -replace github.com/alibabacloud-go/tea-utils/v2 => ../tea-utils diff --git a/go.sum b/go.sum index 4299166bc..e3b0acd42 100644 --- a/go.sum +++ b/go.sum @@ -33,6 +33,11 @@ github.com/alibabacloud-go/openapi-util v0.1.0 h1:0z75cIULkDrdEhkLWgi9tnLe+KhAFE github.com/alibabacloud-go/openapi-util v0.1.0/go.mod h1:sQuElr4ywwFRlCCberQwKRFhRzIyG4QTP/P4y1CJ6Ws= github.com/alibabacloud-go/tea-utils v1.3.1 h1:iWQeRzRheqCMuiF3+XkfybB3kTgUXkXX+JMrqfLeB2I= github.com/alibabacloud-go/tea-utils v1.3.1/go.mod h1:EI/o33aBfj3hETm4RLiAxF/ThQdSngxrpF8rKUDJjPE= +github.com/alibabacloud-go/tea-utils/v2 v2.0.1/go.mod h1:U5MTY10WwlquGPS34DOeomUGBB0gXbLueiq5Trwu0C4= +github.com/alibabacloud-go/tea-utils/v2 v2.0.5/go.mod h1:dL6vbUT35E4F4bFTHL845eUloqaerYBYPsdWR2/jhe4= +github.com/alibabacloud-go/tea-utils/v2 v2.0.6/go.mod h1:qxn986l+q33J5VkialKMqT/TTs3E+U9MJpd001iWQ9I= +github.com/alibabacloud-go/tea-utils/v2 v2.0.9 h1:y6pUIlhjxbZl9ObDAcmA1H3c21eaAxADHTDQmBnAIgA= +github.com/alibabacloud-go/tea-utils/v2 v2.0.9/go.mod h1:qxn986l+q33J5VkialKMqT/TTs3E+U9MJpd001iWQ9I= github.com/aliyun/alibaba-cloud-sdk-go v1.63.107 h1:qagvUyrgOnBIlVRQWOyCZGVKUIYbMBdGdJ104vBpRFU= github.com/aliyun/alibaba-cloud-sdk-go v1.63.107/go.mod h1:SOSDHfe1kX91v3W5QiBsWSLqeLxImobbMX1mxrFHsVQ= github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible h1:8psS8a+wKfiLt1iVDX79F7Y6wUM49Lcha2FMXt4UM8g= @@ -184,6 +189,7 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= @@ -229,6 +235,7 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= @@ -266,6 +273,7 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= @@ -278,6 +286,7 @@ golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuX golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY=