33using Core . Extensions ;
44using Core . Grains ;
55using Core . Models ;
6+ using Orleans ;
67using Schwab . Enums ;
78using Schwab . Messages ;
89using Schwab . Models ;
10+ using System ;
911using System . Linq ;
1012using System . Threading ;
1113using System . Threading . Tasks ;
@@ -14,12 +16,6 @@ namespace Schwab.Grains
1416{
1517 public interface ISchwabConnectionGrain : IConnectionGrain
1618 {
17- /// <summary>
18- /// Stamp
19- /// </summary>
20- /// <param name="accessToken"></param>
21- Task < StatusResponse > Stamp ( string accessToken ) ;
22-
2319 /// <summary>
2420 /// Connect
2521 /// </summary>
@@ -41,26 +37,17 @@ public class SchwabConnectionGrain : ConnectionGrain, ISchwabConnectionGrain
4137 /// <summary>
4238 /// Connector
4339 /// </summary>
44- protected SchwabBroker connector = new ( ) ;
40+ protected SchwabBroker connector ;
4541
4642 /// <summary>
47- /// Observer
43+ /// Timer
4844 /// </summary>
49- protected ITradeObserver observer ;
45+ protected IDisposable counter ;
5046
5147 /// <summary>
52- /// Stamp
48+ /// Observer
5349 /// </summary>
54- /// <param name="accessToken"></param>
55- public virtual async Task < StatusResponse > Stamp ( string accessToken )
56- {
57- connector . AccessToken = accessToken ;
58-
59- return new ( )
60- {
61- Data = StatusEnum . Active
62- } ;
63- }
50+ protected ITradeObserver observer ;
6451
6552 /// <summary>
6653 /// Connect
@@ -73,12 +60,55 @@ public virtual async Task<StatusResponse> Setup(Connection connection, ITradeObs
7360
7461 state = connection ;
7562 observer = grainObserver ;
76- connector . ClientId = connection . Id ;
77- connector . ClientSecret = connection . Secret ;
78- connector . AccessToken = connection . AccessToken ;
79- connector . RefreshToken = connection . RefreshToken ;
63+ connector = new SchwabBroker ( )
64+ {
65+ ClientId = connection . Id ,
66+ ClientSecret = connection . Secret ,
67+ AccessToken = connection . AccessToken ,
68+ RefreshToken = connection . RefreshToken
69+ } ;
70+
71+ var descriptor = this . GetDescriptor ( ) ;
72+ var scope = await connector . Authenticate ( ) ;
73+
74+ connector . AccessToken = scope ? . AccessToken ;
75+
76+ var account = await connector . GetAccountCode ( CancellationToken . None ) ;
77+
78+ connection = connection with
79+ {
80+ AccessToken = scope ? . AccessToken ,
81+ Account = connection . Account with { Descriptor = account ? . FirstOrDefault ( ) ? . HashValue }
82+ } ;
83+
84+ await connector . Stream ( CancellationToken . None ) ;
85+
86+ await GrainFactory . GetGrain < ISchwabOrdersGrain > ( descriptor ) . Setup ( connection ) ;
87+ await GrainFactory . GetGrain < ISchwabPositionsGrain > ( descriptor ) . Setup ( connection ) ;
88+ await GrainFactory . GetGrain < ISchwabOrderSenderGrain > ( descriptor ) . Setup ( connection ) ;
89+ await GrainFactory . GetGrain < ISchwabTransactionsGrain > ( descriptor ) . Setup ( connection , observer ) ;
90+
91+ foreach ( var o in connection . Account . Instruments . Values )
92+ {
93+ await GrainFactory . GetGrain < ISchwabOptionsGrain > ( this . GetDescriptor ( o . Name ) ) . Setup ( connection ) ;
94+ }
95+
96+ counter = this . RegisterGrainTimer ( async data =>
97+ {
98+ connection = connection with { AccessToken = scope ? . AccessToken } ;
99+
100+ await GrainFactory . GetGrain < ISchwabOrdersGrain > ( descriptor ) . Setup ( connection ) ;
101+ await GrainFactory . GetGrain < ISchwabPositionsGrain > ( descriptor ) . Setup ( connection ) ;
102+ await GrainFactory . GetGrain < ISchwabOrderSenderGrain > ( descriptor ) . Setup ( connection ) ;
103+ await GrainFactory . GetGrain < ISchwabTransactionsGrain > ( descriptor ) . Setup ( connection , observer ) ;
104+
105+ foreach ( var o in state . Account . Instruments . Values )
106+ {
107+ await GrainFactory . GetGrain < ISchwabOptionsGrain > ( this . GetDescriptor ( o . Name ) ) . Setup ( connection ) ;
108+ }
109+
110+ } , 0 , TimeSpan . Zero , TimeSpan . FromMinutes ( 1 ) ) ;
80111
81- await connector . ConnectStream ( CancellationToken . None ) ;
82112 await Task . WhenAll ( connection . Account . Instruments . Values . Select ( Subscribe ) ) ;
83113
84114 return new ( )
@@ -95,6 +125,7 @@ public override Task<StatusResponse> Disconnect()
95125 connections ? . ForEach ( o => o . Dispose ( ) ) ;
96126 connections ? . Clear ( ) ;
97127 connector ? . Dispose ( ) ;
128+ counter ? . Dispose ( ) ;
98129
99130 return Task . FromResult ( new StatusResponse
100131 {
0 commit comments