Skip to content

Commit 3969ff3

Browse files
authored
Make channels and channelGroups in Subscribe EE immutable. (#157)
* Make channels and channelGroups in Subscribe EE immutable.
1 parent 6c75584 commit 3969ff3

16 files changed

+467
-103
lines changed

src/integrationTest/kotlin/com/pubnub/api/integration/BaseIntegrationTest.kt

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ abstract class BaseIntegrationTest {
4444
if (pnConfiguration == null) {
4545
pnConfiguration = getBasicPnConfiguration()
4646
}
47+
pnConfiguration.enableEventEngine = true
4748
val pubNub = PubNub(pnConfiguration)
4849
registerGuestClient(pubNub)
4950
return pubNub
@@ -101,9 +102,9 @@ abstract class BaseIntegrationTest {
101102

102103
private fun needsServer() = provideAuthKey() != null
103104

104-
protected open fun onBefore() {}
105-
protected open fun onAfter() {}
106-
protected open fun onPrePubnub() {}
105+
protected open fun onBefore() = noAction()
106+
protected open fun onAfter() = noAction()
107+
protected open fun onPrePubnub() = noAction()
107108

108109
protected open fun provideAuthKey(): String? = null
109110

@@ -112,4 +113,8 @@ abstract class BaseIntegrationTest {
112113
fun wait(seconds: Int = 3) {
113114
Thread.sleep((seconds * 1_000).toLong())
114115
}
116+
117+
private fun noAction() {
118+
// No action
119+
}
115120
}

src/integrationTest/kotlin/com/pubnub/api/integration/SubscribeIntegrationTests.kt

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import com.pubnub.api.CommonUtils.randomValue
55
import com.pubnub.api.PubNub
66
import com.pubnub.api.callbacks.SubscribeCallback
77
import com.pubnub.api.enums.PNOperationType
8-
import com.pubnub.api.enums.PNStatusCategory
98
import com.pubnub.api.listen
109
import com.pubnub.api.models.consumer.PNStatus
1110
import com.pubnub.api.models.consumer.pubsub.PNMessageResult
@@ -96,9 +95,8 @@ class SubscribeIntegrationTests : BaseIntegrationTest() {
9695

9796
pubnub.addListener(object : SubscribeCallback() {
9897
override fun status(pubnub: PubNub, pnStatus: PNStatus) {
99-
if (pnStatus.operation == PNOperationType.PNUnsubscribeOperation &&
100-
pnStatus.category == PNStatusCategory.PNAcknowledgmentCategory
101-
) {
98+
// because we have one channel subscribed unsubscribing from it will cause UnsubscribeAll
99+
if (pnStatus.affectedChannels.contains(expectedChannel) && pnStatus.operation == PNOperationType.PNUnsubscribeOperation) {
102100
success.set(pubnub.getSubscribedChannels().none { it == expectedChannel })
103101
}
104102
}
@@ -118,8 +116,7 @@ class SubscribeIntegrationTests : BaseIntegrationTest() {
118116

119117
pubnub.addListener(object : SubscribeCallback() {
120118
override fun status(pubnub: PubNub, pnStatus: PNStatus) {
121-
if (pnStatus.category == PNStatusCategory.PNAcknowledgmentCategory &&
122-
pnStatus.affectedChannels.contains(randomChannel) &&
119+
if (pnStatus.affectedChannels.contains(randomChannel) &&
123120
pnStatus.operation == PNOperationType.PNUnsubscribeOperation
124121
) {
125122
success.set(pubnub.getSubscribedChannels().isEmpty())

src/main/kotlin/com/pubnub/api/eventengine/EventEngine.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ internal class EventEngine<Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev,
3030
}
3131

3232
internal fun performTransitionAndEmitEffects(event: Ev) {
33-
log.trace("Curren state is: ${currentState::class.simpleName} ; event to be handled is: $event ")
33+
log.trace("Curren state is: ${currentState::class.simpleName} ; ${event::class.java.name.substringAfterLast('.').substringBefore('$')} to be handled is: $event ")
3434
val (newState, invocations) = transition(currentState, event)
3535
currentState = newState
3636
invocations.forEach { invocation -> effectSink.add(invocation) }

src/main/kotlin/com/pubnub/api/subscribe/Subscribe.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ internal class Subscribe(
9494
throwExceptionIfChannelAndChannelGroupIsMissing(channels, channelGroups)
9595
addChannelsToSubscriptionData(channels, withPresence)
9696
addChannelGroupsToSubscriptionData(channelGroups, withPresence)
97-
val channelsInLocalStorage = subscriptionData.channels
98-
val channelGroupsInLocalStorage = subscriptionData.channelGroups
97+
val channelsInLocalStorage = subscriptionData.channels.toSet()
98+
val channelGroupsInLocalStorage = subscriptionData.channelGroups.toSet()
9999
if (withTimetoken != 0L) {
100100
val subscriptionRestoredEvent = SubscriptionRestored(
101101
channelsInLocalStorage,
@@ -126,8 +126,8 @@ internal class Subscribe(
126126
removeChannelGroupsFromSubscriptionData(channelGroups)
127127

128128
if (subscriptionData.channels.size > 0 || subscriptionData.channelGroups.size > 0) {
129-
val channelsInLocalStorage = subscriptionData.channels
130-
val channelGroupsInLocalStorage = subscriptionData.channelGroups
129+
val channelsInLocalStorage = subscriptionData.channels.toSet()
130+
val channelGroupsInLocalStorage = subscriptionData.channelGroups.toSet()
131131
subscribeEventEngineManager.addEventToQueue(
132132
SubscriptionChanged(
133133
channelsInLocalStorage,
@@ -208,7 +208,7 @@ internal class Subscribe(
208208
channelGroups.forEach {
209209
subscriptionData.channelGroups.remove(it)
210210
val presenceChannelGroup = "$it$PRESENCE_CHANNEL_SUFFIX"
211-
subscriptionData.channels.remove(presenceChannelGroup)
211+
subscriptionData.channelGroups.remove(presenceChannelGroup)
212212
}
213213
}
214214

src/main/kotlin/com/pubnub/api/subscribe/eventengine/effect/EmitStatusEffect.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ internal class EmitStatusEffect(
1111
private val log = LoggerFactory.getLogger(EmitStatusEffect::class.java)
1212

1313
override fun runEffect() {
14-
log.trace("Running EmitStatusEffect")
14+
log.trace("Running EmitStatusEffect: $status")
1515
statusConsumer.announce(status)
1616
}
1717
}

src/main/kotlin/com/pubnub/api/subscribe/eventengine/state/SubscribeState.kt

Lines changed: 70 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,16 @@ internal sealed class SubscribeState : State<SubscribeEffectInvocation, Subscrib
3030
}
3131
}
3232

33-
data class Handshaking(
34-
val channels: Set<String>,
35-
val channelGroups: Set<String>,
33+
class Handshaking(
34+
channels: Set<String>,
35+
channelGroups: Set<String>,
3636
val subscriptionCursor: SubscriptionCursor? = null,
3737
) : SubscribeState() {
38+
// toSet() is a must because we want to make sure that channels is immutable, and Handshaking constructor
39+
// doesn't prevent from providing "channels" that is mutable set.
40+
val channels: Set<String> = channels.toSet()
41+
val channelGroups: Set<String> = channelGroups.toSet()
42+
3843
override fun onEntry() = setOf(SubscribeEffectInvocation.Handshake(channels, channelGroups))
3944
override fun onExit() = setOf(SubscribeEffectInvocation.CancelHandshake)
4045

@@ -87,13 +92,16 @@ internal sealed class SubscribeState : State<SubscribeEffectInvocation, Subscrib
8792
}
8893
}
8994

90-
data class HandshakeReconnecting(
91-
val channels: Set<String>,
92-
val channelGroups: Set<String>,
95+
class HandshakeReconnecting(
96+
channels: Set<String>,
97+
channelGroups: Set<String>,
9398
val attempts: Int,
9499
val reason: PubNubException?,
95100
val subscriptionCursor: SubscriptionCursor? = null
96101
) : SubscribeState() {
102+
val channels: Set<String> = channels.toSet()
103+
val channelGroups: Set<String> = channelGroups.toSet()
104+
97105
override fun onEntry() =
98106
setOf(SubscribeEffectInvocation.HandshakeReconnect(channels, channelGroups, attempts, reason))
99107

@@ -135,7 +143,12 @@ internal sealed class SubscribeState : State<SubscribeEffectInvocation, Subscrib
135143

136144
is SubscribeEvent.HandshakeReconnectSuccess -> {
137145
transitionTo(
138-
state = Receiving(channels, channelGroups, subscriptionCursor?.copy(region = event.subscriptionCursor.region) ?: event.subscriptionCursor),
146+
state = Receiving(
147+
channels,
148+
channelGroups,
149+
subscriptionCursor?.copy(region = event.subscriptionCursor.region)
150+
?: event.subscriptionCursor
151+
),
139152
SubscribeEffectInvocation.EmitStatus(
140153
PNStatus(
141154
category = PNStatusCategory.PNConnectedCategory,
@@ -163,11 +176,13 @@ internal sealed class SubscribeState : State<SubscribeEffectInvocation, Subscrib
163176
}
164177
}
165178

166-
data class HandshakeStopped(
167-
val channels: Set<String>,
168-
val channelGroups: Set<String>,
179+
class HandshakeStopped(
180+
channels: Set<String>,
181+
channelGroups: Set<String>,
169182
val reason: PubNubException?
170183
) : SubscribeState() {
184+
val channels: Set<String> = channels.toSet()
185+
val channelGroups: Set<String> = channelGroups.toSet()
171186

172187
override fun transition(event: SubscribeEvent): Pair<SubscribeState, Set<SubscribeEffectInvocation>> {
173188
return when (event) {
@@ -206,12 +221,14 @@ internal sealed class SubscribeState : State<SubscribeEffectInvocation, Subscrib
206221
}
207222
}
208223

209-
data class HandshakeFailed(
210-
val channels: Set<String>,
211-
val channelGroups: Set<String>,
224+
class HandshakeFailed(
225+
channels: Set<String>,
226+
channelGroups: Set<String>,
212227
val reason: PubNubException,
213228
val subscriptionCursor: SubscriptionCursor? = null
214229
) : SubscribeState() {
230+
val channels: Set<String> = channels.toSet()
231+
val channelGroups: Set<String> = channelGroups.toSet()
215232

216233
override fun transition(event: SubscribeEvent): Pair<SubscribeState, Set<SubscribeEffectInvocation>> {
217234
return when (event) {
@@ -242,11 +259,15 @@ internal sealed class SubscribeState : State<SubscribeEffectInvocation, Subscrib
242259
}
243260
}
244261

245-
data class Receiving(
246-
private val channels: Set<String>,
247-
private val channelGroups: Set<String>,
248-
private val subscriptionCursor: SubscriptionCursor
262+
class Receiving(
263+
channels: Set<String>,
264+
channelGroups: Set<String>,
265+
val subscriptionCursor: SubscriptionCursor
249266
) : SubscribeState() {
267+
268+
val channels: Set<String> = channels.toSet()
269+
val channelGroups: Set<String> = channelGroups.toSet()
270+
250271
override fun onEntry() = setOf(
251272
SubscribeEffectInvocation.ReceiveMessages(
252273
channels, channelGroups, subscriptionCursor
@@ -286,7 +307,13 @@ internal sealed class SubscribeState : State<SubscribeEffectInvocation, Subscrib
286307
}
287308

288309
is SubscribeEvent.SubscriptionRestored -> {
289-
transitionTo(Receiving(event.channels, event.channelGroups, SubscriptionCursor(event.subscriptionCursor.timetoken, subscriptionCursor.region)))
310+
transitionTo(
311+
Receiving(
312+
event.channels,
313+
event.channelGroups,
314+
SubscriptionCursor(event.subscriptionCursor.timetoken, subscriptionCursor.region)
315+
)
316+
)
290317
}
291318

292319
is SubscribeEvent.ReceiveSuccess -> {
@@ -318,13 +345,16 @@ internal sealed class SubscribeState : State<SubscribeEffectInvocation, Subscrib
318345
}
319346
}
320347

321-
data class ReceiveReconnecting(
322-
val channels: Set<String>,
323-
val channelGroups: Set<String>,
348+
class ReceiveReconnecting(
349+
channels: Set<String>,
350+
channelGroups: Set<String>,
324351
val subscriptionCursor: SubscriptionCursor,
325352
val attempts: Int,
326353
val reason: PubNubException?
327354
) : SubscribeState() {
355+
val channels: Set<String> = channels.toSet()
356+
val channelGroups: Set<String> = channelGroups.toSet()
357+
328358
override fun onEntry() =
329359
setOf(
330360
SubscribeEffectInvocation.ReceiveReconnect(
@@ -392,7 +422,13 @@ internal sealed class SubscribeState : State<SubscribeEffectInvocation, Subscrib
392422
}
393423

394424
is SubscribeEvent.SubscriptionRestored -> {
395-
transitionTo(Receiving(event.channels, event.channelGroups, SubscriptionCursor(event.subscriptionCursor.timetoken, subscriptionCursor.region)))
425+
transitionTo(
426+
Receiving(
427+
event.channels,
428+
event.channelGroups,
429+
SubscriptionCursor(event.subscriptionCursor.timetoken, subscriptionCursor.region)
430+
)
431+
)
396432
}
397433

398434
is SubscribeEvent.UnsubscribeAll -> {
@@ -417,11 +453,14 @@ internal sealed class SubscribeState : State<SubscribeEffectInvocation, Subscrib
417453
}
418454
}
419455

420-
data class ReceiveStopped(
421-
private val channels: Set<String>,
422-
val channelGroups: Set<String>,
456+
class ReceiveStopped(
457+
channels: Set<String>,
458+
channelGroups: Set<String>,
423459
val subscriptionCursor: SubscriptionCursor
424460
) : SubscribeState() {
461+
val channels: Set<String> = channels.toSet()
462+
val channelGroups: Set<String> = channelGroups.toSet()
463+
425464
override fun transition(event: SubscribeEvent): Pair<SubscribeState, Set<SubscribeEffectInvocation>> {
426465
return when (event) {
427466
is SubscribeEvent.Reconnect -> {
@@ -459,12 +498,15 @@ internal sealed class SubscribeState : State<SubscribeEffectInvocation, Subscrib
459498
}
460499
}
461500

462-
data class ReceiveFailed(
463-
private val channels: Set<String>,
464-
val channelGroups: Set<String>,
501+
class ReceiveFailed(
502+
channels: Set<String>,
503+
channelGroups: Set<String>,
465504
val subscriptionCursor: SubscriptionCursor,
466505
val reason: PubNubException
467506
) : SubscribeState() {
507+
val channels: Set<String> = channels.toSet()
508+
val channelGroups: Set<String> = channelGroups.toSet()
509+
468510
override fun transition(event: SubscribeEvent): Pair<SubscribeState, Set<SubscribeEffectInvocation>> {
469511
return when (event) {
470512
is SubscribeEvent.Reconnect -> {

src/test/kotlin/com/pubnub/api/subscribe/SubscribeTest.kt

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ private const val CHANNEL_GROUPS_01 = "channelGroups01"
2222
private const val CHANNEL_GROUPS_02 = "channelGroups02"
2323
private const val CHANNEL_GROUPS_03 = "channelGroups03"
2424

25+
private const val PNPRES = "-pnpres"
26+
2527
internal class SubscribeTest {
2628
private val channelsInSubscriptionData = mutableSetOf(CHANNEL_01, CHANNEL_02)
2729
private val channelGroupsInSubscriptionData = mutableSetOf(CHANNEL_GROUPS_01, CHANNEL_GROUPS_02)
@@ -81,18 +83,24 @@ internal class SubscribeTest {
8183

8284
@Test
8385
fun `should remove channels and channelGroups from local storage and pass SubscriptionChange event for handling when at least one channel or channelGroup left in storage`() {
86+
objectUnderTest.subscribe(
87+
setOf(CHANNEL_01, CHANNEL_02),
88+
setOf(CHANNEL_GROUPS_01, CHANNEL_GROUPS_02),
89+
withPresence = true,
90+
withTimetoken
91+
)
8492
val channelsToUnsubscribe: Set<String> = setOf(CHANNEL_01)
8593
val channelGroupsToUnsubscribe: Set<String> = setOf(CHANNEL_GROUPS_01)
8694

8795
objectUnderTest.unsubscribe(channelsToUnsubscribe, channelGroupsToUnsubscribe)
8896

8997
verify { subscribeEventEngineManager.addEventToQueue(any()) }
9098
assertEquals(
91-
SubscribeEvent.SubscriptionChanged(setOf(CHANNEL_02), setOf(CHANNEL_GROUPS_02)),
99+
SubscribeEvent.SubscriptionChanged(setOf(CHANNEL_02, "$CHANNEL_02$PNPRES"), setOf(CHANNEL_GROUPS_02, "$CHANNEL_GROUPS_02$PNPRES")),
92100
subscribeEvent.captured
93101
)
94-
assertEquals(setOf(CHANNEL_02), subscriptionData.channels)
95-
assertEquals(setOf(CHANNEL_GROUPS_02), subscriptionData.channelGroups)
102+
assertEquals(setOf(CHANNEL_02, "$CHANNEL_02$PNPRES"), subscriptionData.channels)
103+
assertEquals(setOf(CHANNEL_GROUPS_02, "$CHANNEL_GROUPS_02$PNPRES"), subscriptionData.channelGroups)
96104
}
97105

98106
@Test

0 commit comments

Comments
 (0)