Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 53 additions & 53 deletions ProfileStore.luau
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ local function UpdateAsync(profile_store, profile_key, transform_params, is_user
--}

local loaded_data, key_info

local next_in_queue = WaitInUpdateQueue(SessionToken(profile_store.Name, profile_key, is_user_mock))

local success = true
Expand Down Expand Up @@ -745,7 +745,7 @@ local function SaveProfileAsync(profile, is_ending_session, is_overwriting, last
ExistingProfileHandle = nil,
MissingProfileHandle = nil,
EditProfile = function(latest_data)

-- Check if this session still owns the profile:

local session_owns_profile = false
Expand Down Expand Up @@ -819,25 +819,25 @@ local function SaveProfileAsync(profile, is_ending_session, is_overwriting, last
if is_overwriting == true then
break
end

repeat_save_flag = false

local active_session = loaded_data.MetaData.ActiveSession
local session_load_count = loaded_data.MetaData.SessionLoadCount
local session_owns_profile = false

if type(active_session) == "table" then
session_owns_profile = IsThisSession(active_session) and session_load_count == profile.load_index
end

local force_load_session = loaded_data.MetaData.ForceLoadSession
local force_load_pending = false
if type(force_load_session) == "table" then
force_load_pending = not IsThisSession(force_load_session)
end

local is_active = profile:IsActive()

-- If another server is trying to start a session for this profile - end the session:

if force_load_pending == true and session_owns_profile == true then
Expand All @@ -846,7 +846,7 @@ local function SaveProfileAsync(profile, is_ending_session, is_overwriting, last
end
break
end

-- Clearing processed update list / Detecting new updates:

local locked_updates = profile.locked_global_updates -- [index] = true, ...
Expand Down Expand Up @@ -894,7 +894,7 @@ local function SaveProfileAsync(profile, is_ending_session, is_overwriting, last
is_processed = true
locked_updates[index] = true
end

local send_update_data = DeepCopyTable(update_data)

task.spawn(handler, send_update_data, processed_callback)
Expand All @@ -909,7 +909,7 @@ local function SaveProfileAsync(profile, is_ending_session, is_overwriting, last

end
else

if profile.roblox_message_subscription ~= nil then
profile.roblox_message_subscription:Disconnect()
end
Expand All @@ -924,7 +924,7 @@ local function SaveProfileAsync(profile, is_ending_session, is_overwriting, last
profile.OnAfterSave:Fire(profile.LastSavedData)

elseif repeat_save_flag == true then

-- DataStore call likely resulted in an error; Repeat the DataStore call shortly
task.wait(exp_backoff)
exp_backoff = math.min(if last_save_reason == "Shutdown" then 8 else 20, exp_backoff * 2)
Expand Down Expand Up @@ -1158,7 +1158,7 @@ function Profile:MessageHandler(fn)
local processed_callback = function()
locked_updates[index] = true
end

local send_update_data = DeepCopyTable(update_data)

task.spawn(fn, send_update_data, processed_callback)
Expand All @@ -1170,21 +1170,21 @@ function Profile:MessageHandler(fn)
end

function Profile:Save()

if self.view_mode == true then
error(`[{script.Name}]: Can't save profile in view mode; Should you be calling :SetAsync() instead?`)
end

if self:IsActive() == false then
warn(`[{script.Name}]: Attempted saving an inactive profile (STORE:{self.ProfileStore.Name}; KEY:{self.Key});`
.. ` Traceback:\n` .. debug.traceback())
return
end

-- Move the profile right behind the auto save index to delay the next auto save for it:
RemoveProfileFromAutoSave(self)
AddProfileToAutoSave(self)

-- Perform save in new thread:
task.spawn(SaveProfileAsync, self)

Expand Down Expand Up @@ -1336,10 +1336,10 @@ end
local function RobloxMessageSubscription(profile, unique_session_id)

local last_roblox_message = 0

local roblox_message_subscription = MessagingService:SubscribeAsync("PS_" .. unique_session_id, function(message)
if type(message.Data) == "table" and message.Data.LoadCount == profile.SessionLoadCount then
-- High reaction rate, based on numPlayers × 10 DataStore budget as of writing
-- High reaction rate, based on numPlayers �? 10 DataStore budget as of writing
if os.clock() - last_roblox_message > 6 then
last_roblox_message = os.clock()
if profile:IsActive() == true then
Expand All @@ -1361,7 +1361,7 @@ local function RobloxMessageSubscription(profile, unique_session_id)

end

function ProfileStore:StartSessionAsync(profile_key, params)
function ProfileStore:StartSessionAsync(profile_key:string, params)

local is_mock = ReadMockFlag()

Expand All @@ -1372,17 +1372,17 @@ function ProfileStore:StartSessionAsync(profile_key, params)
elseif string.len(profile_key) > 50 then
error(`[{script.Name}]: profile_key is too long`)
end

if params ~= nil and type(params) ~= "table" then
error(`[{script.Name}]: Invalid params`)
end

params = params or {}

if ProfileStore.IsClosing == true then
return nil
end

WaitForStoreReady(self)

local session_token = SessionToken(self.Name, profile_key, is_mock)
Expand All @@ -1392,9 +1392,9 @@ function ProfileStore:StartSessionAsync(profile_key, params)
end

ActiveProfileLoadJobs = ActiveProfileLoadJobs + 1

local is_user_cancel = false

local function cancel_condition()
if is_user_cancel == false then
if params.Cancel ~= nil then
Expand All @@ -1404,7 +1404,7 @@ function ProfileStore:StartSessionAsync(profile_key, params)
end
return true
end

local user_steal = params.Steal == true

local force_load_steps = 0 -- Session conflict handling values
Expand Down Expand Up @@ -1491,7 +1491,7 @@ function ProfileStore:StartSessionAsync(profile_key, params)

end,
MissingProfileHandle = function(latest_data)

local is_cancel = ProfileStore.IsClosing == true or cancel_condition() == true

latest_data.Data = DeepCopyTable(self.template)
Expand Down Expand Up @@ -1541,12 +1541,12 @@ function ProfileStore:StartSessionAsync(profile_key, params)

local profile = Profile.New(loaded_data, key_info, self, profile_key, is_mock, session_token)
AddProfileToAutoSave(profile)

if is_mock ~= true and DataStoreState == "Access" then

-- Use MessagingService to quickly detect session conflicts and resolve them quickly:
task.spawn(RobloxMessageSubscription, profile, unique_session_id) -- Blocking prevention

end

if ProfileStore.IsClosing == true or cancel_condition() == true then
Expand All @@ -1559,7 +1559,7 @@ function ProfileStore:StartSessionAsync(profile_key, params)
return profile

else

if ProfileStore.IsClosing == true or cancel_condition() == true then
ActiveProfileLoadJobs = ActiveProfileLoadJobs - 1
return nil
Expand All @@ -1578,13 +1578,13 @@ function ProfileStore:StartSessionAsync(profile_key, params)
steal_session = true
end
end

-- Request the remote server to end its session:
if type(active_session[3]) == "string" then
local session_load_count = loaded_data.MetaData.SessionLoadCount or 0
task.spawn(MessagingService.PublishAsync, MessagingService, "PS_" .. active_session[3], {LoadCount = session_load_count, EndSession = true})
end

-- Attempt to load the profile again after a delay
local wait_until = os.clock() + if request_force_load == true then FIRST_LOAD_REPEAT else LOAD_REPEAT_PERIOD
repeat task.wait() until os.clock() >= wait_until or ProfileStore.IsClosing == true
Expand All @@ -1604,23 +1604,23 @@ function ProfileStore:StartSessionAsync(profile_key, params)
return nil -- In this scenario it is likely that this server started shutting down
end
else

-- A DataStore call has likely ended in an error:

local default_timeout = false

if params.Cancel == nil then
default_timeout = os.clock() - start >= START_SESSION_TIMEOUT
end

if default_timeout == true or ProfileStore.IsClosing == true or cancel_condition() == true then
ActiveProfileLoadJobs = ActiveProfileLoadJobs - 1
return nil
end

task.wait(exp_backoff) -- Repeat the call shortly
exp_backoff = math.min(20, exp_backoff * 2)

end

end
Expand All @@ -1630,7 +1630,7 @@ function ProfileStore:StartSessionAsync(profile_key, params)

end

function ProfileStore:MessageAsync(profile_key, message)
function ProfileStore:MessageAsync(profile_key:string, message)

local is_mock = ReadMockFlag()

Expand Down Expand Up @@ -1690,27 +1690,27 @@ function ProfileStore:MessageAsync(profile_key, message)
)

if loaded_data ~= nil then

local session_token = SessionToken(self.Name, profile_key, is_mock)

local profile = ActiveSessionCheck[session_token]

if profile ~= nil then

-- The message was sent to a profile that is active in this server:
profile:Save()

else

local meta_data = loaded_data.MetaData or {}
local active_session = meta_data.ActiveSession
local session_load_count = meta_data.SessionLoadCount or 0

if type(active_session) == "table" and type(active_session[3]) == "string" then
-- Request the remote server to auto-save sooner and receive the message:
task.spawn(MessagingService.PublishAsync, MessagingService, "PS_" .. active_session[3], {LoadCount = session_load_count})
end

end

return true
Expand All @@ -1728,7 +1728,7 @@ function ProfileStore:MessageAsync(profile_key, message)

end

function ProfileStore:GetAsync(profile_key, version)
function ProfileStore:GetAsync(profile_key:string, version)

local is_mock = ReadMockFlag()

Expand Down Expand Up @@ -1806,7 +1806,7 @@ function ProfileStore:GetAsync(profile_key, version)

end

function ProfileStore:RemoveAsync(profile_key)
function ProfileStore:RemoveAsync(profile_key:string)

local is_mock = ReadMockFlag()

Expand All @@ -1821,7 +1821,7 @@ function ProfileStore:RemoveAsync(profile_key)
WaitForStoreReady(self)

local wipe_status = false

local next_in_queue = WaitInUpdateQueue(SessionToken(self.Name, profile_key, is_mock))

if is_mock == true then -- Used when the profile is accessed through ProfileStore.Mock
Expand Down Expand Up @@ -1859,7 +1859,7 @@ function ProfileStore:RemoveAsync(profile_key)
end)

end

next_in_queue()

return wipe_status
Expand Down Expand Up @@ -2037,7 +2037,7 @@ function ProfileVersionQuery:NextAsync()

end

function ProfileStore:VersionQuery(profile_key, sort_direction, min_date, max_date)
function ProfileStore:VersionQuery(profile_key:string, sort_direction, min_date, max_date)

local is_mock = ReadMockFlag()

Expand Down Expand Up @@ -2104,10 +2104,10 @@ if IsStudio == true then
end)

else

DataStoreState = "Access"
ProfileStore.DataStoreState = "Access"

end

-- Update loop:
Expand Down Expand Up @@ -2240,4 +2240,4 @@ task.spawn(function()

end)

return ProfileStore
return ProfileStore