Skip to content

Commit df96dff

Browse files
Move subscribe/unsubscribe functions to PubnubClient. Move EE related functions to internal utilities.
1 parent 2beb8ba commit df96dff

File tree

9 files changed

+808
-577
lines changed

9 files changed

+808
-577
lines changed

Source/PubnubLibrary/Private/Entities/PubnubSubscription.cpp

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "PubnubSubsystem.h"
55
#include "Entities/PubnubBaseEntity.h"
66
#include "FunctionLibraries/PubnubUtilities.h"
7+
#include "FunctionLibraries/PubnubInternalUtilities.h"
78

89

910
void UPubnubSubscriptionBase::BeginDestroy()
@@ -119,7 +120,7 @@ void UPubnubSubscription::InitSubscription(UPubnubSubsystem* InPubnubSubsystem,
119120
return;
120121
}
121122
PubnubSubsystem = InPubnubSubsystem;
122-
CCoreSubscription = UPubnubUtilities::EEGetSubscriptionForEntity(InPubnubSubsystem->ctx_ee, Entity->EntityID, Entity->EntityType, InSubscribeSettings);
123+
CCoreSubscription = UPubnubInternalUtilities::EEGetSubscriptionForEntity(InPubnubSubsystem->ctx_ee, Entity->EntityID, Entity->EntityType, InSubscribeSettings);
123124

124125
InternalInit();
125126
}
@@ -225,10 +226,10 @@ void UPubnubSubscription::InternalInit()
225226
};
226227

227228
// Register created callback in subscription
228-
UPubnubUtilities::EEAddSubscriptionListenerOfType(CCoreSubscription, CallbackMessages, EPubnubListenerType::PLT_Message, this);
229-
UPubnubUtilities::EEAddSubscriptionListenerOfType(CCoreSubscription, CallbackSignals, EPubnubListenerType::PLT_Signal, this);
230-
UPubnubUtilities::EEAddSubscriptionListenerOfType(CCoreSubscription, CallbackObjects, EPubnubListenerType::PLT_Objects, this);
231-
UPubnubUtilities::EEAddSubscriptionListenerOfType(CCoreSubscription, CallbackMessageActions, EPubnubListenerType::PLT_MessageAction, this);
229+
UPubnubInternalUtilities::EEAddSubscriptionListenerOfType(CCoreSubscription, CallbackMessages, EPubnubListenerType::PLT_Message, this);
230+
UPubnubInternalUtilities::EEAddSubscriptionListenerOfType(CCoreSubscription, CallbackSignals, EPubnubListenerType::PLT_Signal, this);
231+
UPubnubInternalUtilities::EEAddSubscriptionListenerOfType(CCoreSubscription, CallbackObjects, EPubnubListenerType::PLT_Objects, this);
232+
UPubnubInternalUtilities::EEAddSubscriptionListenerOfType(CCoreSubscription, CallbackMessageActions, EPubnubListenerType::PLT_MessageAction, this);
232233

233234
//Bind to OnPubnubSubsystemDeinitialized so subscription is properly Cleaned up, when it's not needed
234235
PubnubSubsystem->OnPubnubSubsystemDeinitialized.AddDynamic(this, &UPubnubSubscription::CleanUpSubscription);
@@ -443,7 +444,7 @@ void UPubnubSubscriptionSet::InitSubscriptionSet(UPubnubSubsystem* InPubnubSubsy
443444
return;
444445
}
445446
PubnubSubsystem = InPubnubSubsystem;
446-
CCoreSubscriptionSet = UPubnubUtilities::EEGetSubscriptionSetForEntities(InPubnubSubsystem->ctx_ee, Channels, ChannelGroups, InSubscribeSettings);
447+
CCoreSubscriptionSet = UPubnubInternalUtilities::EEGetSubscriptionSetForEntities(InPubnubSubsystem->ctx_ee, Channels, ChannelGroups, InSubscribeSettings);
447448

448449
InternalInit();
449450
}
@@ -561,10 +562,10 @@ void UPubnubSubscriptionSet::InternalInit()
561562
};
562563

563564
// Register created callback in subscription
564-
UPubnubUtilities::EEAddSubscriptionSetListenerOfType(CCoreSubscriptionSet, CallbackMessages, EPubnubListenerType::PLT_Message, this);
565-
UPubnubUtilities::EEAddSubscriptionSetListenerOfType(CCoreSubscriptionSet, CallbackSignals, EPubnubListenerType::PLT_Signal, this);
566-
UPubnubUtilities::EEAddSubscriptionSetListenerOfType(CCoreSubscriptionSet, CallbackObjects, EPubnubListenerType::PLT_Objects, this);
567-
UPubnubUtilities::EEAddSubscriptionSetListenerOfType(CCoreSubscriptionSet, CallbackMessageActions, EPubnubListenerType::PLT_MessageAction, this);
565+
UPubnubInternalUtilities::EEAddSubscriptionSetListenerOfType(CCoreSubscriptionSet, CallbackMessages, EPubnubListenerType::PLT_Message, this);
566+
UPubnubInternalUtilities::EEAddSubscriptionSetListenerOfType(CCoreSubscriptionSet, CallbackSignals, EPubnubListenerType::PLT_Signal, this);
567+
UPubnubInternalUtilities::EEAddSubscriptionSetListenerOfType(CCoreSubscriptionSet, CallbackObjects, EPubnubListenerType::PLT_Objects, this);
568+
UPubnubInternalUtilities::EEAddSubscriptionSetListenerOfType(CCoreSubscriptionSet, CallbackMessageActions, EPubnubListenerType::PLT_MessageAction, this);
568569

569570
//Bind to OnPubnubSubsystemDeinitialized so subscription is properly Cleaned up, when it's not needed
570571
PubnubSubsystem->OnPubnubSubsystemDeinitialized.AddDynamic(this, &UPubnubSubscriptionSet::CleanUpSubscription);

Source/PubnubLibrary/Private/FunctionLibraries/PubnubInternalUtilities.cpp

Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
#include "FunctionLibraries/PubnubInternalUtilities.h"
44
#include "PubNub.h"
5+
#include "PubnubClient.h"
6+
#include "FunctionLibraries/PubnubUtilities.h"
57

68

79
void UPubnubInternalUtilities::PublishUESettingsToPubnubPublishOptions(const FPubnubPublishSettings &PublishSettings, pubnub_publish_options& PubnubPublishOptions)
@@ -43,3 +45,286 @@ void UPubnubInternalUtilities::FetchHistoryUESettingsToPbFetchHistoryOptions(con
4345
FetchHistorySettings.Start.IsEmpty() ? PubnubFetchHistoryOptions.start = NULL : nullptr;
4446
FetchHistorySettings.End.IsEmpty() ? PubnubFetchHistoryOptions.end = NULL : nullptr;
4547
}
48+
49+
pubnub_subscription_t* UPubnubInternalUtilities::EEGetSubscriptionForEntity(pubnub_t* Context, FString EntityID, EPubnubEntityType EntityType, FPubnubSubscribeSettings Options)
50+
{
51+
pubnub_subscription_options_t PnOptions = pubnub_subscription_options_defopts();
52+
PnOptions.receive_presence_events = Options.ReceivePresenceEvents;
53+
54+
FUTF8StringHolder EntityIDHolder(EntityID);
55+
pubnub_entity_t* PubnubEntity = nullptr;
56+
switch (EntityType)
57+
{
58+
case EPubnubEntityType::PEnT_Channel:
59+
PubnubEntity = reinterpret_cast<pubnub_entity_t*>(pubnub_channel_alloc(Context, EntityIDHolder.Get()));
60+
break;
61+
case EPubnubEntityType::PEnT_ChannelGroup:
62+
PubnubEntity = reinterpret_cast<pubnub_entity_t*>(pubnub_channel_group_alloc(Context, EntityIDHolder.Get()));
63+
break;
64+
case EPubnubEntityType::PEnT_ChannelMetadata:
65+
PubnubEntity = reinterpret_cast<pubnub_entity_t*>(pubnub_channel_metadata_alloc(Context, EntityIDHolder.Get()));
66+
break;
67+
case EPubnubEntityType::PEnT_UserMetadata:
68+
PubnubEntity = reinterpret_cast<pubnub_entity_t*>(pubnub_user_metadata_alloc(Context, EntityIDHolder.Get()));
69+
break;
70+
default:
71+
UE_LOG(PubnubLog, Error, TEXT("Unknown entity type: %d"), (int32)EntityType);
72+
return nullptr;
73+
}
74+
pubnub_subscription_t* Subscription = pubnub_subscription_alloc(PubnubEntity, &PnOptions);
75+
76+
pubnub_entity_free(reinterpret_cast<void**>(&PubnubEntity));
77+
78+
return Subscription;
79+
}
80+
81+
pubnub_subscription_set_t* UPubnubInternalUtilities::EEGetSubscriptionSetForEntities(pubnub_t* Context, TArray<FString> Channels, TArray<FString> ChannelGroups, FPubnubSubscribeSettings Options)
82+
{
83+
pubnub_subscription_options_t PnOptions = pubnub_subscription_options_defopts();
84+
PnOptions.receive_presence_events = Options.ReceivePresenceEvents;
85+
86+
TArray<pubnub_entity_t*> PubnubEntities;
87+
PubnubEntities.Reserve(Channels.Num() + ChannelGroups.Num());
88+
89+
for(FString Channel : Channels)
90+
{
91+
FUTF8StringHolder EntityIDHolder(Channel);
92+
PubnubEntities.Add(reinterpret_cast<pubnub_entity_t*>(pubnub_channel_alloc(Context, EntityIDHolder.Get())));
93+
}
94+
for(FString ChannelGroup : ChannelGroups)
95+
{
96+
FUTF8StringHolder EntityIDHolder(ChannelGroup);
97+
PubnubEntities.Add(reinterpret_cast<pubnub_entity_t*>(pubnub_channel_group_alloc(Context, EntityIDHolder.Get())));
98+
}
99+
100+
pubnub_subscription_set_t* SubscriptionSet = pubnub_subscription_set_alloc_with_entities(PubnubEntities.GetData(), PubnubEntities.Num(), &PnOptions);
101+
102+
for(pubnub_entity_t*& Entity : PubnubEntities)
103+
{
104+
pubnub_entity_free(reinterpret_cast<void**>(&Entity));
105+
}
106+
107+
return SubscriptionSet;
108+
}
109+
110+
bool UPubnubInternalUtilities::EEAddListenerAndSubscribe(pubnub_subscription_t* Subscription, pubnub_subscribe_message_callback_t Callback, UPubnubClient* PubnubClient)
111+
{
112+
if(!PubnubClient)
113+
{
114+
UE_LOG(PubnubLog, Error, TEXT("EEAddListenerAndSubscribe Failed, PubnubClient is invalid"));
115+
return false;
116+
}
117+
118+
EEAddSubscriptionListenersOfAllTypes(Subscription, Callback, PubnubClient);
119+
120+
return EESubscribeWithSubscription(Subscription, FPubnubSubscriptionCursor());
121+
}
122+
123+
bool UPubnubInternalUtilities::EERemoveListenerAndUnsubscribe(pubnub_subscription_t** SubscriptionPtr, pubnub_subscribe_message_callback_t Callback, UPubnubClient* PubnubClient)
124+
{
125+
if(!PubnubClient)
126+
{
127+
UE_LOG(PubnubLog, Error, TEXT("EERemoveListenerAndUnsubscribe Failed, PubnubClient is invalid"));
128+
return false;
129+
}
130+
131+
if(!SubscriptionPtr)
132+
{
133+
UE_LOG(PubnubLog, Error, TEXT("Failed to unsubscribe. Passed subscription pointer is invalid"));
134+
return false;
135+
}
136+
137+
EERemoveSubscriptionListenersOfAllTypes(SubscriptionPtr, Callback, PubnubClient);
138+
139+
return EEUnsubscribeWithSubscription(SubscriptionPtr);
140+
}
141+
142+
bool UPubnubInternalUtilities::EESubscribeWithSubscription(pubnub_subscription_t* Subscription, FPubnubSubscriptionCursor Cursor)
143+
{
144+
enum pubnub_res SubscribeResult;
145+
if(!Cursor.Timetoken.IsEmpty() || Cursor.Region != 0)
146+
{
147+
FUTF8StringHolder CursorTimetokenHolder(Cursor.Timetoken);
148+
pubnub_subscribe_cursor_t PubnubCursor = pubnub_subscribe_cursor(CursorTimetokenHolder.Get());
149+
PubnubCursor.region = Cursor.Region;
150+
SubscribeResult = pubnub_subscribe_with_subscription(Subscription, &PubnubCursor);
151+
}
152+
else
153+
{
154+
SubscribeResult = pubnub_subscribe_with_subscription(Subscription, nullptr);
155+
}
156+
157+
if(PNR_OK != SubscribeResult)
158+
{
159+
FString ResultString(pubnub_res_2_string(SubscribeResult));
160+
UE_LOG(PubnubLog, Error, TEXT("Failed to subscribe. Subscribe_with_subscription failed with error: %s"), *ResultString);
161+
return false;
162+
}
163+
164+
return true;
165+
}
166+
167+
bool UPubnubInternalUtilities::EEUnsubscribeWithSubscription(pubnub_subscription_t** SubscriptionPtr)
168+
{
169+
enum pubnub_res UnsubscribeResult = pubnub_unsubscribe_with_subscription(SubscriptionPtr);
170+
if(PNR_OK != UnsubscribeResult)
171+
{
172+
FString ResultString(pubnub_res_2_string(UnsubscribeResult));
173+
UE_LOG(PubnubLog, Error, TEXT("Failed to unsubscribe. Unsubscribe_with_subscription failed with error: "), *ResultString);
174+
return false;
175+
}
176+
177+
return true;
178+
}
179+
180+
bool UPubnubInternalUtilities::EESubscribeWithSubscriptionSet(pubnub_subscription_set_t* SubscriptionSet, FPubnubSubscriptionCursor Cursor)
181+
{
182+
enum pubnub_res SubscribeResult;
183+
if(!Cursor.Timetoken.IsEmpty() || Cursor.Region != 0)
184+
{
185+
FUTF8StringHolder CursorTimetokenHolder(Cursor.Timetoken);
186+
pubnub_subscribe_cursor_t PubnubCursor = pubnub_subscribe_cursor(CursorTimetokenHolder.Get());
187+
PubnubCursor.region = Cursor.Region;
188+
SubscribeResult = pubnub_subscribe_with_subscription_set(SubscriptionSet, &PubnubCursor);
189+
}
190+
else
191+
{
192+
SubscribeResult = pubnub_subscribe_with_subscription_set(SubscriptionSet, nullptr);
193+
}
194+
195+
if(PNR_OK != SubscribeResult)
196+
{
197+
FString ResultString(pubnub_res_2_string(SubscribeResult));
198+
UE_LOG(PubnubLog, Error, TEXT("Failed to subscribe. Subscribe_with_subscription_set failed with error: %s"), *ResultString);
199+
return false;
200+
}
201+
202+
return true;
203+
}
204+
205+
bool UPubnubInternalUtilities::EEUnsubscribeWithSubscriptionSet(pubnub_subscription_set_t** SubscriptionSetPtr)
206+
{
207+
enum pubnub_res UnsubscribeResult = pubnub_unsubscribe_with_subscription_set(SubscriptionSetPtr);
208+
if(PNR_OK != UnsubscribeResult)
209+
{
210+
FString ResultString(pubnub_res_2_string(UnsubscribeResult));
211+
UE_LOG(PubnubLog, Error, TEXT("Failed to unsubscribe. Unsubscribe_with_subscription failed with error: "), *ResultString);
212+
return false;
213+
}
214+
215+
return true;
216+
}
217+
218+
bool UPubnubInternalUtilities::EEAddSubscriptionListenerOfType(pubnub_subscription_t* Subscription, pubnub_subscribe_message_callback_t Callback, EPubnubListenerType ListenerType, UObject* Caller)
219+
{
220+
if(ListenerType == EPubnubListenerType::PLT_All)
221+
{
222+
EEAddSubscriptionListenersOfAllTypes(Subscription, Callback, Caller);
223+
return false;
224+
}
225+
226+
pubnub_subscribe_listener_type PubnubListenerType = static_cast<pubnub_subscribe_listener_type>(static_cast<uint8>(ListenerType));
227+
enum pubnub_res AddMessageListenerResult = pubnub_subscribe_add_subscription_listener(Subscription, PubnubListenerType, Callback, Caller);
228+
229+
if(PNR_OK != AddMessageListenerResult)
230+
{
231+
FString ResultString(pubnub_res_2_string(AddMessageListenerResult));
232+
UE_LOG(PubnubLog, Error, TEXT("Failed to add listener of type %s. Error: %s "), *StaticEnum<EPubnubListenerType>()->GetNameStringByValue(static_cast<int64>(ListenerType)), *ResultString);
233+
return false;
234+
}
235+
236+
return true;
237+
}
238+
239+
void UPubnubInternalUtilities::EEAddSubscriptionListenersOfAllTypes(pubnub_subscription_t* Subscription, pubnub_subscribe_message_callback_t Callback, UObject* Caller)
240+
{
241+
for(EPubnubListenerType Type : TEnumRange<EPubnubListenerType>())
242+
{
243+
EEAddSubscriptionListenerOfType(Subscription, Callback, Type, Caller);
244+
}
245+
}
246+
247+
bool UPubnubInternalUtilities::EERemoveSubscriptionListenerOfType(pubnub_subscription_t** SubscriptionPtr, pubnub_subscribe_message_callback_t Callback, EPubnubListenerType ListenerType, UObject* Caller)
248+
{
249+
if(ListenerType == EPubnubListenerType::PLT_All)
250+
{
251+
EERemoveSubscriptionListenersOfAllTypes(SubscriptionPtr, Callback, Caller);
252+
return false;
253+
}
254+
255+
pubnub_subscribe_listener_type PubnubListenerType = static_cast<pubnub_subscribe_listener_type>(static_cast<uint8>(ListenerType));
256+
enum pubnub_res RemoveMessageActionListenerResult = pubnub_subscribe_remove_subscription_listener(*SubscriptionPtr, PubnubListenerType, Callback, Caller);
257+
if(PNR_OK != RemoveMessageActionListenerResult)
258+
{
259+
FString ResultString(pubnub_res_2_string(RemoveMessageActionListenerResult));
260+
UE_LOG(PubnubLog, Error, TEXT("Failed to remove listener of type %s. Error: %s "), *StaticEnum<EPubnubListenerType>()->GetNameStringByValue(static_cast<int64>(ListenerType)), *ResultString);
261+
return false;
262+
}
263+
264+
return true;
265+
}
266+
267+
void UPubnubInternalUtilities::EERemoveSubscriptionListenersOfAllTypes(pubnub_subscription_t** SubscriptionPtr, pubnub_subscribe_message_callback_t Callback, UObject* Caller)
268+
{
269+
for(EPubnubListenerType Type : TEnumRange<EPubnubListenerType>())
270+
{
271+
EERemoveSubscriptionListenerOfType(SubscriptionPtr, Callback, Type, Caller);
272+
}
273+
}
274+
275+
bool UPubnubInternalUtilities::EEAddSubscriptionSetListenerOfType(pubnub_subscription_set_t* SubscriptionSet, pubnub_subscribe_message_callback_t Callback, EPubnubListenerType ListenerType, UObject* Caller)
276+
{
277+
if(ListenerType == EPubnubListenerType::PLT_All)
278+
{
279+
EEAddSubscriptionSetListenersOfAllTypes(SubscriptionSet, Callback, Caller);
280+
return false;
281+
}
282+
283+
pubnub_subscribe_listener_type PubnubListenerType = static_cast<pubnub_subscribe_listener_type>(static_cast<uint8>(ListenerType));
284+
enum pubnub_res AddMessageListenerResult = pubnub_subscribe_add_subscription_set_listener(SubscriptionSet, PubnubListenerType, Callback, Caller);
285+
286+
if(PNR_OK != AddMessageListenerResult)
287+
{
288+
FString ResultString(pubnub_res_2_string(AddMessageListenerResult));
289+
UE_LOG(PubnubLog, Error, TEXT("Failed to add listener of type %s. Error: %s "), *StaticEnum<EPubnubListenerType>()->GetNameStringByValue(static_cast<int64>(ListenerType)), *ResultString);
290+
return false;
291+
}
292+
293+
return true;
294+
}
295+
296+
void UPubnubInternalUtilities::EEAddSubscriptionSetListenersOfAllTypes(pubnub_subscription_set_t* SubscriptionSet, pubnub_subscribe_message_callback_t Callback, UObject* Caller)
297+
{
298+
for(EPubnubListenerType Type : TEnumRange<EPubnubListenerType>())
299+
{
300+
EEAddSubscriptionSetListenerOfType(SubscriptionSet, Callback, Type, Caller);
301+
}
302+
}
303+
304+
bool UPubnubInternalUtilities::EERemoveSubscriptionSetListenerOfType(pubnub_subscription_set_t** SubscriptionSetPtr, pubnub_subscribe_message_callback_t Callback, EPubnubListenerType ListenerType, UObject* Caller)
305+
{
306+
if(ListenerType == EPubnubListenerType::PLT_All)
307+
{
308+
EERemoveSubscriptionSetListenersOfAllTypes(SubscriptionSetPtr, Callback, Caller);
309+
return false;
310+
}
311+
312+
pubnub_subscribe_listener_type PubnubListenerType = static_cast<pubnub_subscribe_listener_type>(static_cast<uint8>(ListenerType));
313+
enum pubnub_res RemoveMessageActionListenerResult = pubnub_subscribe_remove_subscription_set_listener(*SubscriptionSetPtr, PubnubListenerType, Callback, Caller);
314+
if(PNR_OK != RemoveMessageActionListenerResult)
315+
{
316+
FString ResultString(pubnub_res_2_string(RemoveMessageActionListenerResult));
317+
UE_LOG(PubnubLog, Error, TEXT("Failed to remove listener of type %s. Error: %s "), *StaticEnum<EPubnubListenerType>()->GetNameStringByValue(static_cast<int64>(ListenerType)), *ResultString);
318+
return false;
319+
}
320+
321+
return true;
322+
}
323+
324+
void UPubnubInternalUtilities::EERemoveSubscriptionSetListenersOfAllTypes(pubnub_subscription_set_t** SubscriptionSetPtr, pubnub_subscribe_message_callback_t Callback, UObject* Caller)
325+
{
326+
for(EPubnubListenerType Type : TEnumRange<EPubnubListenerType>())
327+
{
328+
EERemoveSubscriptionSetListenerOfType(SubscriptionSetPtr, Callback, Type, Caller);
329+
}
330+
}

0 commit comments

Comments
 (0)