27
27
module ScheduledMerges (
28
28
-- * Main API
29
29
LSM ,
30
+ TableId (.. ),
30
31
LSMConfig (.. ),
31
32
Key (K ), Value (V ), resolveValue , Blob (B ),
32
33
new ,
@@ -100,25 +101,35 @@ module ScheduledMerges (
100
101
import Prelude hiding (lookup )
101
102
102
103
import Data.Foldable (for_ , toList , traverse_ )
104
+ import Data.Functor.Contravariant
103
105
import Data.Map.Strict (Map )
104
106
import qualified Data.Map.Strict as Map
105
107
import Data.Maybe (catMaybes )
108
+ import Data.Primitive.Types
106
109
import Data.STRef
107
110
108
111
import qualified Control.Exception as Exc (assert )
109
112
import Control.Monad (foldM , forM , when )
110
113
import Control.Monad.ST
111
114
import qualified Control.Monad.Trans.Except as E
112
- import Control.Tracer ( Tracer , contramap , traceWith )
115
+ import Control.Tracer
113
116
import GHC.Stack (HasCallStack , callStack )
114
117
115
118
import Text.Printf (printf )
116
119
117
120
import qualified Test.QuickCheck as QC
118
121
119
- data LSM s = LSMHandle ! (STRef s Counter )
120
- ! LSMConfig
121
- ! (STRef s (LSMContent s ))
122
+ data LSM s = LSMHandle {
123
+ tableId :: ! TableId
124
+ , _tableCounter :: ! (STRef s Counter )
125
+ , _tableConfig :: ! LSMConfig
126
+ , _tableContents :: ! (STRef s (LSMContent s ))
127
+ }
128
+
129
+ -- | Identifiers for 'LSM' tables
130
+ newtype TableId = TableId Int
131
+ deriving stock (Show , Eq , Ord )
132
+ deriving newtype (Enum , Prim )
122
133
123
134
-- | Configuration options for individual LSM tables.
124
135
data LSMConfig = LSMConfig {
@@ -960,8 +971,8 @@ suppliedCreditMergingRun (MergingRun _ d ref) =
960
971
-- LSM handle
961
972
--
962
973
963
- new :: ST s (LSM s )
964
- new = newWith conf
974
+ new :: Tracer ( ST s ) Event -> TableId -> ST s (LSM s )
975
+ new tr tid = newWith tr tid conf
965
976
where
966
977
-- 4 was the default for both the max write buffer size and size ratio
967
978
-- before they were made configurable
@@ -970,16 +981,17 @@ new = newWith conf
970
981
, configSizeRatio = 4
971
982
}
972
983
973
- newWith :: LSMConfig -> ST s (LSM s )
974
- newWith conf
984
+ newWith :: Tracer ( ST s ) Event -> TableId -> LSMConfig -> ST s (LSM s )
985
+ newWith tr tid conf
975
986
| configMaxWriteBufferSize conf <= 0 =
976
987
error " newWith: configMaxWriteBufferSize should be positive"
977
988
| configSizeRatio conf <= 1 =
978
989
error " newWith: configSizeRatio should be larger than 1"
979
990
| otherwise = do
991
+ traceWith tr $ NewTableEvent tid conf
980
992
c <- newSTRef 0
981
993
lsm <- newSTRef (LSMContent Map. empty [] NoUnion )
982
- pure (LSMHandle c conf lsm)
994
+ pure (LSMHandle tid c conf lsm)
983
995
984
996
inserts :: Tracer (ST s ) Event -> LSM s -> [(Key , Value , Maybe Blob )] -> ST s ()
985
997
inserts tr lsm kvbs = updates tr lsm [ (k, Insert v b) | (k, v, b) <- kvbs ]
@@ -1009,7 +1021,8 @@ updates :: Tracer (ST s) Event -> LSM s -> [(Key, Entry)] -> ST s ()
1009
1021
updates tr lsm = mapM_ (uncurry (update tr lsm))
1010
1022
1011
1023
update :: Tracer (ST s ) Event -> LSM s -> Key -> Entry -> ST s ()
1012
- update tr (LSMHandle scr conf lsmr) k entry = do
1024
+ update tr (LSMHandle tid scr conf lsmr) k entry = do
1025
+ traceWith tr $ UpdateEvent tid k entry
1013
1026
sc <- readSTRef scr
1014
1027
content@ (LSMContent wb ls unionLevel) <- readSTRef lsmr
1015
1028
modifySTRef' scr (+ 1 )
@@ -1018,15 +1031,15 @@ update tr (LSMHandle scr conf lsmr) k entry = do
1018
1031
let wb' = Map. insertWith combine k entry wb
1019
1032
if bufferSize wb' >= maxWriteBufferSize conf
1020
1033
then do
1021
- ls' <- increment tr sc conf (bufferToRun wb') ls unionLevel
1034
+ ls' <- increment ( LevelEvent tid >$< tr) sc conf (bufferToRun wb') ls unionLevel
1022
1035
let content' = LSMContent Map. empty ls' unionLevel
1023
1036
invariant conf content'
1024
1037
writeSTRef lsmr content'
1025
1038
else
1026
1039
writeSTRef lsmr (LSMContent wb' ls unionLevel)
1027
1040
1028
1041
supplyMergeCredits :: LSM s -> NominalCredit -> ST s ()
1029
- supplyMergeCredits (LSMHandle scr conf lsmr) credits = do
1042
+ supplyMergeCredits (LSMHandle _ scr conf lsmr) credits = do
1030
1043
content@ (LSMContent _ ls _) <- readSTRef lsmr
1031
1044
modifySTRef' scr (+ 1 )
1032
1045
supplyCreditsLevels credits ls
@@ -1038,22 +1051,24 @@ data LookupResult v b =
1038
1051
deriving stock (Eq , Show )
1039
1052
1040
1053
lookups :: LSM s -> [Key ] -> ST s [LookupResult Value Blob ]
1041
- lookups (LSMHandle _ _conf lsmr) ks = do
1054
+ lookups (LSMHandle _ _ _conf lsmr) ks = do
1042
1055
LSMContent wb ls ul <- readSTRef lsmr
1043
1056
runs <- concat <$> flattenLevels ls
1044
1057
traverse (doLookup wb runs ul) ks
1045
1058
1046
- lookup :: LSM s -> Key -> ST s (LookupResult Value Blob )
1047
- lookup (LSMHandle _ _conf lsmr) k = do
1059
+ lookup :: Tracer (ST s ) Event -> LSM s -> Key -> ST s (LookupResult Value Blob )
1060
+ lookup tr (LSMHandle tid _ _conf lsmr) k = do
1061
+ traceWith tr $ LookupEvent tid k
1048
1062
LSMContent wb ls ul <- readSTRef lsmr
1049
1063
runs <- concat <$> flattenLevels ls
1050
1064
doLookup wb runs ul k
1051
1065
1052
- duplicate :: LSM s -> ST s (LSM s )
1053
- duplicate (LSMHandle _scr conf lsmr) = do
1066
+ duplicate :: Tracer (ST s ) Event -> TableId -> LSM s -> ST s (LSM s )
1067
+ duplicate tr childTid (LSMHandle parentTid _scr conf lsmr) = do
1068
+ traceWith tr $ DuplicateEvent childTid parentTid
1054
1069
scr' <- newSTRef 0
1055
1070
lsmr' <- newSTRef =<< readSTRef lsmr
1056
- pure (LSMHandle scr' conf lsmr')
1071
+ pure (LSMHandle childTid scr' conf lsmr')
1057
1072
-- it's that simple here, because we share all the pure value and all the
1058
1073
-- STRefs and there's no ref counting to be done
1059
1074
@@ -1064,9 +1079,12 @@ duplicate (LSMHandle _scr conf lsmr) = do
1064
1079
-- merge that can be performed incrementally (somewhat similar to a thunk).
1065
1080
--
1066
1081
-- The more merge work remains, the more expensive are lookups on the table.
1067
- unions :: [LSM s ] -> ST s (LSM s )
1068
- unions lsms = do
1069
- (confs, trees) <- fmap unzip $ forM lsms $ \ (LSMHandle _ conf lsmr) ->
1082
+ unions :: Tracer (ST s ) Event -> TableId -> [LSM s ] -> ST s (LSM s )
1083
+ unions tr childTid lsms = do
1084
+ traceWith tr $
1085
+ let parentTids = fmap tableId lsms
1086
+ in UnionsEvent childTid parentTids
1087
+ (confs, trees) <- fmap unzip $ forM lsms $ \ (LSMHandle _ _ conf lsmr) ->
1070
1088
(conf,) <$> (contentToMergingTree =<< readSTRef lsmr)
1071
1089
-- Check that the configurations are equal
1072
1090
conf <- case confs of
@@ -1081,7 +1099,7 @@ unions lsms = do
1081
1099
Union tree <$> newSTRef debt
1082
1100
lsmr <- newSTRef (LSMContent Map. empty [] unionLevel)
1083
1101
c <- newSTRef 0
1084
- pure (LSMHandle c conf lsmr)
1102
+ pure (LSMHandle childTid c conf lsmr)
1085
1103
1086
1104
-- | The /current/ upper bound on the number of 'UnionCredits' that have to be
1087
1105
-- supplied before a 'union' is completed.
@@ -1097,7 +1115,7 @@ newtype UnionDebt = UnionDebt Debt
1097
1115
-- | Return the current union debt. This debt can be reduced until it is paid
1098
1116
-- off using 'supplyUnionCredits'.
1099
1117
remainingUnionDebt :: LSM s -> ST s UnionDebt
1100
- remainingUnionDebt (LSMHandle _ _conf lsmr) = do
1118
+ remainingUnionDebt (LSMHandle _ _ _conf lsmr) = do
1101
1119
LSMContent _ _ ul <- readSTRef lsmr
1102
1120
UnionDebt <$> case ul of
1103
1121
NoUnion -> pure 0
@@ -1123,7 +1141,7 @@ newtype UnionCredits = UnionCredits Credit
1123
1141
-- a union has finished. In particular, if the returned number of credits is
1124
1142
-- non-negative, then the union is finished.
1125
1143
supplyUnionCredits :: LSM s -> UnionCredits -> ST s UnionCredits
1126
- supplyUnionCredits (LSMHandle scr conf lsmr) (UnionCredits credits)
1144
+ supplyUnionCredits (LSMHandle _ scr conf lsmr) (UnionCredits credits)
1127
1145
| credits <= 0 = pure (UnionCredits 0 )
1128
1146
| otherwise = do
1129
1147
content@ (LSMContent _ _ ul) <- readSTRef lsmr
@@ -1399,7 +1417,7 @@ depositNominalCredit (NominalDebt nominalDebt)
1399
1417
-- Updates
1400
1418
--
1401
1419
1402
- increment :: forall s . Tracer (ST s ) Event
1420
+ increment :: forall s . Tracer (ST s ) ( EventAt EventDetail )
1403
1421
-> Counter
1404
1422
-> LSMConfig
1405
1423
-> Run -> Levels s -> UnionLevel s -> ST s (Levels s )
@@ -1411,19 +1429,21 @@ increment tr sc conf run0 ls0 ul = do
1411
1429
1412
1430
go :: Int -> [Run ] -> Levels s -> ST s (Levels s )
1413
1431
go ! ln incoming [] = do
1414
- let mergePolicy = mergePolicyForLevel ln [] ul
1415
1432
traceWith tr' AddLevelEvent
1433
+ let mergePolicy = mergePolicyForLevel ln [] ul
1416
1434
ir <- newLevelMerge tr' conf ln mergePolicy (mergeTypeFor [] ) incoming
1417
1435
pure (Level ir [] : [] )
1418
1436
where
1419
1437
tr' = contramap (EventAt sc ln) tr
1420
1438
1421
1439
go ! ln incoming (Level ir rs : ls) = do
1422
1440
r <- case ir of
1423
- Single r -> pure r
1441
+ Single r -> do
1442
+ traceWith tr' $ SingleRunCompletedEvent r
1443
+ pure r
1424
1444
Merging mergePolicy _ _ mr -> do
1425
1445
r <- expectCompletedMergingRun mr
1426
- traceWith tr' MergeCompletedEvent {
1446
+ traceWith tr' LevelMergeCompletedEvent {
1427
1447
mergePolicy,
1428
1448
mergeType = let MergingRun mt _ _ = mr in mt,
1429
1449
mergeSize = runSize r
@@ -1436,6 +1456,8 @@ increment tr sc conf run0 ls0 ul = do
1436
1456
-- If r is still too small for this level then keep it and merge again
1437
1457
-- with the incoming runs.
1438
1458
LevelTiering | runTooSmallForLevel LevelTiering conf ln r -> do
1459
+ traceWith tr' $ RunTooSmallForLevelEvent LevelTiering ln r
1460
+
1439
1461
ir' <- newLevelMerge tr' conf ln LevelTiering (mergeTypeFor ls) (incoming ++ [r])
1440
1462
pure (Level ir' rs : ls)
1441
1463
@@ -1444,29 +1466,37 @@ increment tr sc conf run0 ls0 ul = do
1444
1466
-- as a bundle and move them down to the level below. We start a merge
1445
1467
-- for the new incoming runs. This level is otherwise empty.
1446
1468
LevelTiering | levelIsFullTiering conf ln incoming resident -> do
1469
+ traceWith tr' $ LevelIsFullEvent LevelTiering
1470
+
1447
1471
ir' <- newLevelMerge tr' conf ln LevelTiering MergeMidLevel incoming
1448
1472
ls' <- go (ln+ 1 ) resident ls
1449
1473
pure (Level ir' [] : ls')
1450
1474
1451
1475
-- This tiering level is not yet full. We move the completed merged run
1452
1476
-- into the level proper, and start the new merge for the incoming runs.
1453
1477
LevelTiering -> do
1478
+ traceWith tr' $ LevelIsNotFullEvent LevelTiering
1479
+
1454
1480
ir' <- newLevelMerge tr' conf ln LevelTiering (mergeTypeFor ls) incoming
1455
- traceWith tr' (AddRunEvent ( length resident) )
1481
+ traceWith tr' (AddRunEvent resident)
1456
1482
pure (Level ir' resident : ls)
1457
1483
1458
1484
-- The final level is using levelling. If the existing completed merge
1459
1485
-- run is too large for this level, we promote the run to the next
1460
1486
-- level and start merging the incoming runs into this (otherwise
1461
1487
-- empty) level .
1462
1488
LevelLevelling | levelIsFullLevelling conf ln incoming r -> do
1489
+ traceWith tr' $ LevelIsFullEvent LevelLevelling
1490
+
1463
1491
assert (null rs && null ls) $ pure ()
1464
1492
ir' <- newLevelMerge tr' conf ln LevelTiering MergeMidLevel incoming
1465
1493
ls' <- go (ln+ 1 ) [r] []
1466
1494
pure (Level ir' [] : ls')
1467
1495
1468
1496
-- Otherwise we start merging the incoming runs into the run.
1469
1497
LevelLevelling -> do
1498
+ traceWith tr' $ LevelIsNotFullEvent LevelLevelling
1499
+
1470
1500
assert (null rs && null ls) $ pure ()
1471
1501
ir' <- newLevelMerge tr' conf ln LevelLevelling (mergeTypeFor ls)
1472
1502
(incoming ++ [r])
@@ -1479,17 +1509,19 @@ newLevelMerge :: Tracer (ST s) EventDetail
1479
1509
-> LSMConfig
1480
1510
-> Int -> MergePolicyForLevel -> LevelMergeType
1481
1511
-> [Run ] -> ST s (IncomingRun s )
1482
- newLevelMerge _ _ _ _ _ [r] = pure (Single r)
1512
+ newLevelMerge tr _ _ _ _ [r] = do
1513
+ traceWith tr $ NewSingleRunEvent r
1514
+ pure (Single r)
1483
1515
newLevelMerge tr conf@ LSMConfig {.. } level mergePolicy mergeType rs = do
1484
- assertST (length rs `elem` [configSizeRatio, configSizeRatio + 1 ])
1485
1516
mergingRun@ (MergingRun _ physicalDebt _) <- newMergingRun mergeType rs
1486
- assertWithMsgM $ leq (totalDebt physicalDebt) maxPhysicalDebt
1487
- traceWith tr MergeStartedEvent {
1517
+ traceWith tr NewLevelMergeEvent {
1488
1518
mergePolicy,
1489
1519
mergeType,
1490
- mergeDebt = totalDebt physicalDebt,
1491
- mergeRunsSize = map runSize rs
1520
+ mergeDebt = totalDebt physicalDebt,
1521
+ mergeRuns = rs
1492
1522
}
1523
+ assertST (length rs `elem` [configSizeRatio, configSizeRatio + 1 ])
1524
+ assertWithMsgM $ leq (totalDebt physicalDebt) maxPhysicalDebt
1493
1525
nominalCreditVar <- newSTRef (NominalCredit 0 )
1494
1526
pure (Merging mergePolicy nominalDebt nominalCreditVar mergingRun)
1495
1527
where
@@ -1766,7 +1798,7 @@ data MTree r = MLeaf r
1766
1798
deriving stock (Eq , Foldable , Functor , Show )
1767
1799
1768
1800
allLevels :: LSM s -> ST s (Buffer , [[Run ]], Maybe (MTree Run ))
1769
- allLevels (LSMHandle _ _conf lsmr) = do
1801
+ allLevels (LSMHandle _ _ _conf lsmr) = do
1770
1802
LSMContent wb ls ul <- readSTRef lsmr
1771
1803
rs <- flattenLevels ls
1772
1804
tree <- case ul of
@@ -1836,7 +1868,7 @@ type LevelRepresentation =
1836
1868
[Run ])
1837
1869
1838
1870
dumpRepresentation :: LSM s -> ST s Representation
1839
- dumpRepresentation (LSMHandle _ _conf lsmr) = do
1871
+ dumpRepresentation (LSMHandle _ _ _conf lsmr) = do
1840
1872
LSMContent wb ls ul <- readSTRef lsmr
1841
1873
levels <- mapM dumpLevel ls
1842
1874
tree <- case ul of
@@ -1877,7 +1909,15 @@ representationShape (wb, levels, tree) =
1877
1909
1878
1910
-- TODO: these events are incomplete, in particular we should also trace what
1879
1911
-- happens in the union level.
1880
- type Event = EventAt EventDetail
1912
+ data Event =
1913
+ NewTableEvent TableId LSMConfig
1914
+ | UpdateEvent TableId Key Entry
1915
+ | LookupEvent TableId Key
1916
+ | DuplicateEvent TableId TableId
1917
+ | UnionsEvent TableId [TableId ]
1918
+ | LevelEvent TableId (EventAt EventDetail )
1919
+ deriving stock Show
1920
+
1881
1921
data EventAt e = EventAt {
1882
1922
eventAtStep :: Counter ,
1883
1923
eventAtLevel :: Int ,
@@ -1886,21 +1926,27 @@ data EventAt e = EventAt {
1886
1926
deriving stock Show
1887
1927
1888
1928
data EventDetail =
1889
- AddLevelEvent
1890
- | AddRunEvent {
1891
- runsAtLevel :: Int
1892
- }
1893
- | MergeStartedEvent {
1894
- mergePolicy :: MergePolicyForLevel ,
1895
- mergeType :: LevelMergeType ,
1896
- mergeDebt :: Debt ,
1897
- mergeRunsSize :: [Int ]
1898
- }
1899
- | MergeCompletedEvent {
1900
- mergePolicy :: MergePolicyForLevel ,
1901
- mergeType :: LevelMergeType ,
1902
- mergeSize :: Int
1903
- }
1929
+ AddLevelEvent
1930
+ | AddRunEvent {
1931
+ runsAtLevel :: [Run ]
1932
+ }
1933
+ | NewLevelMergeEvent {
1934
+ mergePolicy :: MergePolicyForLevel ,
1935
+ mergeType :: LevelMergeType ,
1936
+ mergeDebt :: Debt ,
1937
+ mergeRuns :: [Run ]
1938
+ }
1939
+ | NewSingleRunEvent Run
1940
+ | LevelMergeCompletedEvent {
1941
+ mergePolicy :: MergePolicyForLevel ,
1942
+ mergeType :: LevelMergeType ,
1943
+ mergeSize :: Int
1944
+ }
1945
+ | SingleRunCompletedEvent Run
1946
+
1947
+ | RunTooSmallForLevelEvent MergePolicyForLevel Int Run
1948
+ | LevelIsFullEvent MergePolicyForLevel
1949
+ | LevelIsNotFullEvent MergePolicyForLevel
1904
1950
deriving stock Show
1905
1951
1906
1952
-------------------------------------------------------------------------------
0 commit comments