22
33import com .exaroton .api .server .Server ;
44import com .exaroton .api .server .ServerStatus ;
5- import com .exaroton .api .ws .data .*;
65import com .exaroton .api .ws .stream .*;
76import com .exaroton .api .ws .subscriber .*;
87import com .google .gson .Gson ;
9- import com .google .gson .JsonObject ;
108import com .google .gson .JsonParser ;
119import org .jetbrains .annotations .ApiStatus ;
1210import org .jetbrains .annotations .NotNull ;
@@ -57,7 +55,7 @@ public final class WebSocketConnection implements WebSocket.Listener, Closeable
5755 /**
5856 * active streams
5957 */
60- private final Map <StreamName , Stream > streams = new HashMap <>();
58+ private final Map <Class <? extends Stream <?>> , Stream <?> > streams = new HashMap <>();
6159
6260 @ NotNull
6361 private final String apiToken ;
@@ -88,7 +86,7 @@ public WebSocketConnection(
8886 this .uri = uri ;
8987 this .apiToken = apiToken ;
9088 this .server = Objects .requireNonNull (server );
91- this .streams .put (StreamName . STATUS , new Stream (this , this .gson , StreamName . STATUS ));
89+ this .streams .put (ServerStatusStream . class , new ServerStatusStream (this , this .gson ). setServer ( server ));
9290
9391 connect ();
9492 }
@@ -98,21 +96,15 @@ public WebSocketConnection(
9896 *
9997 * @param name stream name
10098 */
101- public void subscribe (@ NotNull StreamName name ) {
99+ public void subscribe (@ NotNull StreamType name ) {
102100 Objects .requireNonNull (name );
103101
104- if (streams .containsKey (name )) {
102+ if (streams .containsKey (name . getStreamClass () )) {
105103 return ;
106104 }
107105
108- Stream stream ;
109- if (name == StreamName .CONSOLE ) {
110- stream = new ConsoleStream (this , this .gson );
111- } else {
112- stream = new Stream (this , this .gson , name );
113- }
114-
115- this .streams .put (name , stream );
106+ Stream <?> stream = name .construct (this , gson );
107+ this .streams .put (name .getStreamClass (), stream );
116108 stream .start ();
117109 }
118110
@@ -121,13 +113,13 @@ public void subscribe(@NotNull StreamName name) {
121113 *
122114 * @param name stream name
123115 */
124- public void unsubscribe (@ NotNull StreamName name ) {
116+ public void unsubscribe (@ NotNull StreamType name ) {
125117 Objects .requireNonNull (name );
126118
127- Stream stream = this .streams .get (name );
119+ Stream <?> stream = this .streams .get (name . getStreamClass () );
128120 if (stream != null ) {
129121 stream .stop ();
130- this .streams .remove (name );
122+ this .streams .remove (name . getStreamClass () );
131123 }
132124 }
133125
@@ -137,7 +129,7 @@ public void unsubscribe(@NotNull StreamName name) {
137129 * @param subscriber instance of class handling server status changes
138130 */
139131 public void addServerStatusSubscriber (ServerStatusSubscriber subscriber ) {
140- this .streams . get ( StreamName . STATUS ). subscribers . add ( subscriber );
132+ this .addStreamSubscriber ( ServerStatusStream . class , subscriber );
141133 }
142134
143135 /**
@@ -146,7 +138,7 @@ public void addServerStatusSubscriber(ServerStatusSubscriber subscriber) {
146138 * @param subscriber instance of class handling new console lines
147139 */
148140 public void addConsoleSubscriber (ConsoleSubscriber subscriber ) {
149- this .addStreamSubscriber (StreamName . CONSOLE , subscriber );
141+ this .addStreamSubscriber (ConsoleStream . class , subscriber );
150142 }
151143
152144 /**
@@ -155,7 +147,7 @@ public void addConsoleSubscriber(ConsoleSubscriber subscriber) {
155147 * @param subscriber instance of class handling heap data
156148 */
157149 public void addHeapSubscriber (HeapSubscriber subscriber ) {
158- this .addStreamSubscriber (StreamName . HEAP , subscriber );
150+ this .addStreamSubscriber (HeapStream . class , subscriber );
159151 }
160152
161153 /**
@@ -164,7 +156,7 @@ public void addHeapSubscriber(HeapSubscriber subscriber) {
164156 * @param subscriber instance of class handling stats
165157 */
166158 public void addStatsSubscriber (StatsSubscriber subscriber ) {
167- this .addStreamSubscriber (StreamName . STATS , subscriber );
159+ this .addStreamSubscriber (StatsStream . class , subscriber );
168160 }
169161
170162 /**
@@ -173,7 +165,7 @@ public void addStatsSubscriber(StatsSubscriber subscriber) {
173165 * @param subscriber instance of class handling stats
174166 */
175167 public void addTickSubscriber (TickSubscriber subscriber ) {
176- this .addStreamSubscriber (StreamName . TICK , subscriber );
168+ this .addStreamSubscriber (TickStream . class , subscriber );
177169 }
178170
179171 /**
@@ -183,9 +175,9 @@ public void addTickSubscriber(TickSubscriber subscriber) {
183175 * @return was the command executed
184176 */
185177 public boolean executeCommand (String command ) {
186- Stream s = this .streams . get ( StreamName . CONSOLE );
187- if (s instanceof ConsoleStream ) {
188- (( ConsoleStream ) s ) .executeCommand (command );
178+ ConsoleStream stream = this .getStream ( ConsoleStream . class );
179+ if (stream != null ) {
180+ stream .executeCommand (command );
189181 return true ;
190182 }
191183
@@ -236,9 +228,17 @@ public CompletableFuture<Boolean> serverHasStatus(Set<ServerStatus> status) {
236228 return future .thenApply (s -> s .hasStatus (status ));
237229 }
238230
239- private void addStreamSubscriber (StreamName name , Subscriber subscriber ) {
240- if (!this .streams .containsKey (name )) throw new RuntimeException ("There is no active stream for: " + name );
241- this .streams .get (name ).subscribers .add (subscriber );
231+ private <T extends Subscriber > void addStreamSubscriber (Class <? extends Stream <T >> c , T subscriber ) {
232+ if (!this .streams .containsKey (c )) {
233+ throw new IllegalStateException ("There is no active stream for: " + c );
234+ }
235+
236+ getStream (c ).addSubscriber (subscriber );
237+ }
238+
239+ private <T extends Stream <?>> T getStream (Class <T > c ) {
240+ @ SuppressWarnings ("unchecked" ) T stream = (T ) this .streams .get (c );
241+ return stream ;
242242 }
243243
244244 private void connect () {
@@ -284,78 +284,21 @@ public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean
284284 this .messages .clear ();
285285
286286 default :
287- handleData (type , message );
287+ final StreamType name = StreamType .get (message .get ("stream" ).getAsString ());
288+ final Stream <?> stream = streams .get (name .getStreamClass ());
289+ if (stream != null ) {
290+ stream .onMessage (type , message );
291+ }
288292 }
289293
290294 return null ;
291295 }
292296
293- /**
294- * handle websocket data
295- *
296- * @param type message type
297- * @param message raw message
298- */
299- private void handleData (String type , JsonObject message ) {
300- final StreamName name = StreamName .get (type );
301- final Stream stream = streams .get (name );
302-
303- if (stream == null ) {
304- return ;
305- }
306-
307- switch (type ) {
308- case "status" :
309- Server oldServer = new Server (server .getClient (), gson , server .getId ()).setFromObject (server );
310- this .server .setFromObject (gson .fromJson (message , ServerStatusStreamData .class ).getData ());
311-
312- //start/stop streams based on status
313- for (Stream s : streams .values ()) {
314- s .onStatusChange ();
315- }
316-
317- for (Subscriber subscriber : stream .subscribers ) {
318- if (subscriber instanceof ServerStatusSubscriber ) {
319- ((ServerStatusSubscriber ) subscriber ).statusUpdate (oldServer , this .server );
320- }
321- }
322- break ;
323-
324- case "line" :
325- String line = gson .fromJson (message , ConsoleStreamData .class ).getData ();
326- for (Subscriber subscriber : stream .subscribers ) {
327- if (subscriber instanceof ConsoleSubscriber ) {
328- ((ConsoleSubscriber ) subscriber ).line (line );
329- }
330- }
331- break ;
332-
333- case "heap" :
334- HeapUsage usage = gson .fromJson (message , HeapStreamData .class ).getData ();
335- for (Subscriber subscriber : stream .subscribers ) {
336- if (subscriber instanceof HeapSubscriber ) {
337- ((HeapSubscriber ) subscriber ).heap (usage );
338- }
339- }
340- break ;
341-
342- case "stats" :
343- StatsData stats = gson .fromJson (message , StatsStreamData .class ).getData ();
344- for (Subscriber subscriber : stream .subscribers ) {
345- if (subscriber instanceof StatsSubscriber ) {
346- ((StatsSubscriber ) subscriber ).stats (stats );
347- }
348- }
349- break ;
350-
351- case "tick" :
352- TickData tick = gson .fromJson (message , TickStreamData .class ).getData ();
353- for (Subscriber subscriber : stream .subscribers ) {
354- if (subscriber instanceof TickSubscriber ) {
355- ((TickSubscriber ) subscriber ).tick (tick );
356- }
357- }
358- break ;
297+ @ ApiStatus .Internal
298+ public void onStatusChange () {
299+ // start/stop streams based on status
300+ for (Stream <?> s : streams .values ()) {
301+ s .onStatusChange ();
359302 }
360303 }
361304
@@ -364,6 +307,10 @@ private void handleData(String type, JsonObject message) {
364307 public CompletionStage <?> onClose (WebSocket webSocket , int statusCode , String reason ) {
365308 logger .info ("Websocket connection to {} closed: {} {}" , uri , statusCode , reason );
366309
310+ for (Stream <?> stream : streams .values ()) {
311+ stream .onDisconnected ();
312+ }
313+
367314 if (this .shouldAutoReconnect ()) {
368315 reconnectTimer = new Timer ();
369316 logger .debug ("Reconnecting in 5s" );
@@ -405,4 +352,8 @@ public void close() {
405352 this .client .sendClose (0 , "unsubscribe" );
406353 }
407354 }
355+
356+ public boolean isReady () {
357+ return ready ;
358+ }
408359}
0 commit comments