1212import java .util .concurrent .atomic .AtomicReference ;
1313import java .util .function .BiConsumer ;
1414import java .util .function .BiFunction ;
15+ import java .util .function .Consumer ;
1516import java .util .function .Function ;
1617
1718public class UpdateAgent <K , V > implements DataEntryAgent <K , V > {
1819 private final DataHolder <K , V > holder ;
19- private final Function <K , ValueWrapper <V >> updateFunction ;
20+ private final BiConsumer <K , Consumer < ValueWrapper <V >>> updateConsumer ;
2021
2122 private final Map <K , UpdateStatus > map = new ConcurrentHashMap <>();
2223
2324 private Function <K , FilterResult > filter = null ;
2425 private BiFunction <K , ValueWrapper <V >, ValueWrapper <V >> errorHandler = null ;
2526 private int maxSkips = 1 ;
2627
27- public UpdateAgent (DataHolder <K , V > holder , Function <K , ValueWrapper <V >> updateFunction ) {
28+ public UpdateAgent (DataHolder <K , V > holder , BiConsumer <K , Consumer < ValueWrapper <V >>> updateConsumer ) {
2829 this .holder = holder ;
29- this .updateFunction = updateFunction ;
30+ this .updateConsumer = updateConsumer ;
31+ }
32+
33+ public UpdateAgent (DataHolder <K , V > holder , Function <K , ValueWrapper <V >> updateFunction ) {
34+ this (holder , (key , consumer ) -> consumer .accept (updateFunction .apply (key )));
3035 }
3136
3237 public void setFilter (Function <K , FilterResult > filter ) {
@@ -48,6 +53,21 @@ public void setMaxSkips(int maxSkips) {
4853 this .maxSkips = maxSkips ;
4954 }
5055
56+ private void setValueEntry (ValueWrapper <V > valueWrapper , Map .Entry <K , UpdateStatus > entry ) {
57+ if (errorHandler != null && valueWrapper .state == ValueWrapper .State .ERROR ) {
58+ valueWrapper = errorHandler .apply (entry .getKey (), valueWrapper );
59+ }
60+ switch (valueWrapper .state ) {
61+ case ERROR :
62+ case NOT_HANDLED :
63+ entry .setValue (new UpdateStatus .Skip (maxSkips ));
64+ break ;
65+ default :
66+ entry .setValue (new UpdateStatus .Set (valueWrapper .value ));
67+ break ;
68+ }
69+ }
70+
5171 public Runnable getUpdateRunnable (int maxEntryPerCall ) {
5272 return new Runnable () {
5373 private final AtomicReference <Iterator <Map .Entry <K , UpdateStatus >>> iteratorRef = new AtomicReference <>();
@@ -66,6 +86,12 @@ public void run() {
6686 K key = entry .getKey ();
6787 UpdateStatus updateStatus = entry .getValue ();
6888
89+ if (updateStatus instanceof UpdateStatus .Value ) {
90+ ValueWrapper <V > valueWrapper = ((UpdateStatus .Value ) updateStatus ).get ();
91+ setValueEntry (valueWrapper , entry );
92+ continue ;
93+ }
94+
6995 if (updateStatus != UpdateStatus .DEFAULT && !(updateStatus instanceof UpdateStatus .Set )) {
7096 continue ;
7197 }
@@ -85,19 +111,15 @@ public void run() {
85111 }
86112 }
87113
88- ValueWrapper <V > valueWrapper = updateFunction .apply (key );
89- if (errorHandler != null && valueWrapper .state == ValueWrapper .State .ERROR ) {
90- valueWrapper = errorHandler .apply (key , valueWrapper );
91- }
92- switch (valueWrapper .state ) {
93- case ERROR :
94- case NOT_HANDLED :
95- entry .setValue (new UpdateStatus .Skip (maxSkips ));
96- break ;
97- default :
98- entry .setValue (new UpdateStatus .Set (valueWrapper .value ));
99- break ;
114+ entry .setValue (UpdateStatus .UPDATING );
115+ updateConsumer .accept (key , valueWrapper -> entry .setValue (new UpdateStatus .Value (valueWrapper )));
116+
117+ UpdateStatus afterUpdateStatus = entry .getValue ();
118+ if (afterUpdateStatus instanceof UpdateStatus .Value ) {
119+ ValueWrapper <V > valueWrapper = ((UpdateStatus .Value ) afterUpdateStatus ).get ();
120+ setValueEntry (valueWrapper , entry );
100121 }
122+
101123 count ++;
102124 }
103125 }
0 commit comments