diff --git a/main.go b/main.go index 262dc05..2663959 100644 --- a/main.go +++ b/main.go @@ -3,12 +3,10 @@ package main import ( "fmt" "github.com/ardanlabs/conf" - "github.com/cockroachdb/pebble" "github.com/pkg/errors" "github.com/qubic/go-archiver/processor" "github.com/qubic/go-archiver/rpc" "github.com/qubic/go-archiver/store" - "github.com/qubic/go-archiver/validator/tick" qubic "github.com/qubic/go-node-connector" "github.com/qubic/go-node-connector/types" "log" @@ -16,7 +14,6 @@ import ( _ "net/http/pprof" "os" "os/signal" - "runtime" "syscall" "time" ) @@ -58,6 +55,7 @@ func run() error { } Store struct { ResetEmptyTickKeys bool `conf:"default:false"` + EpochCount int `conf:"default:2"` } } @@ -87,74 +85,11 @@ func run() error { } log.Printf("main: Config :\n%v\n", out) - l1Options := pebble.LevelOptions{ - BlockRestartInterval: 16, - BlockSize: 4096, - BlockSizeThreshold: 90, - Compression: pebble.NoCompression, - FilterPolicy: nil, - FilterType: pebble.TableFilter, - IndexBlockSize: 4096, - TargetFileSize: 268435456, // 256 MB - } - l2Options := pebble.LevelOptions{ - BlockRestartInterval: 16, - BlockSize: 4096, - BlockSizeThreshold: 90, - Compression: pebble.ZstdCompression, - FilterPolicy: nil, - FilterType: pebble.TableFilter, - IndexBlockSize: 4096, - TargetFileSize: l1Options.TargetFileSize * 10, // 2.5 GB - } - l3Options := pebble.LevelOptions{ - BlockRestartInterval: 16, - BlockSize: 4096, - BlockSizeThreshold: 90, - Compression: pebble.ZstdCompression, - FilterPolicy: nil, - FilterType: pebble.TableFilter, - IndexBlockSize: 4096, - TargetFileSize: l2Options.TargetFileSize * 10, // 25 GB - } - l4Options := pebble.LevelOptions{ - BlockRestartInterval: 16, - BlockSize: 4096, - BlockSizeThreshold: 90, - Compression: pebble.ZstdCompression, - FilterPolicy: nil, - FilterType: pebble.TableFilter, - IndexBlockSize: 4096, - TargetFileSize: l3Options.TargetFileSize * 10, // 250 GB - } - - pebbleOptions := pebble.Options{ - Levels: []pebble.LevelOptions{l1Options, l2Options, l3Options, l4Options}, - MaxConcurrentCompactions: func() int { return runtime.NumCPU() }, - MemTableSize: 268435456, // 256 MB - EventListener: store.NewPebbleEventListener(), - } - - db, err := pebble.Open(cfg.Qubic.StorageFolder, &pebbleOptions) - if err != nil { - return errors.Wrap(err, "opening db with zstd compression") - } - defer db.Close() - - ps := store.NewPebbleStore(db, nil) - - if cfg.Store.ResetEmptyTickKeys { - fmt.Printf("Resetting empty ticks for all epochs...\n") - err = tick.ResetEmptyTicksForAllEpochs(ps) - if err != nil { - return errors.Wrap(err, "resetting empty ticks keys") - } - } - - err = tick.CalculateEmptyTicksForAllEpochs(ps) + ps, err := store.NewPebbleStore(cfg.Qubic.StorageFolder, nil, cfg.Store.EpochCount) if err != nil { - return errors.Wrap(err, "calculating empty ticks for all epochs") + return errors.Wrap(err, "creating info store") } + defer ps.Close() p, err := qubic.NewPoolConnection(qubic.PoolConfig{ InitialCap: cfg.Pool.InitialCap, diff --git a/processor/processor.go b/processor/processor.go index 729e829..bd3e428 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -119,7 +119,7 @@ func (p *Processor) processStatus(ctx context.Context, lastTick *protobuff.Proce return errors.Wrap(err, "processing skipped ticks") } - err = p.ps.SetLastProcessedTick(ctx, nextTick) + err = p.ps.SetLastProcessedTick(nextTick) if err != nil { return errors.Wrapf(err, "setting last processed tick %d", nextTick.TickNumber) } @@ -130,7 +130,21 @@ func (p *Processor) processStatus(ctx context.Context, lastTick *protobuff.Proce func (p *Processor) getNextProcessingTick(_ context.Context, lastTick *protobuff.ProcessedTick, currentTickInfo types.TickInfo) (*protobuff.ProcessedTick, error) { //handles the case where the initial tick of epoch returned by the node is greater than the last processed tick // which means that we are in the next epoch and we should start from the initial tick of the current epoch + + //Simulate epoch transition + if lastTick.TickNumber > 22690020 { + currentTickInfo.InitialTick = 22690023 + currentTickInfo.Epoch = 156 + lastTick.Epoch = 156 + } + if currentTickInfo.InitialTick > lastTick.TickNumber { + + err := p.ps.HandleEpochTransition(uint32(currentTickInfo.Epoch)) + if err != nil { + return nil, errors.Wrap(err, "handling epoch transition") + } + return &protobuff.ProcessedTick{TickNumber: currentTickInfo.InitialTick, Epoch: uint32(currentTickInfo.Epoch)}, nil } @@ -139,7 +153,7 @@ func (p *Processor) getNextProcessingTick(_ context.Context, lastTick *protobuff } func (p *Processor) getLastProcessedTick(ctx context.Context, currentTickInfo types.TickInfo) (*protobuff.ProcessedTick, error) { - lastTick, err := p.ps.GetLastProcessedTick(ctx) + lastTick, err := p.ps.GetLastProcessedTick() if err != nil { //handles first run of the archiver where there is nothing in storage // in this case last tick is 0 and epoch is current tick info epoch @@ -163,18 +177,10 @@ func (p *Processor) processSkippedTicks(ctx context.Context, lastTick *protobuff return errors.Errorf("Next tick should not be equal to last tick %d", nextTick.TickNumber) } - err := p.ps.AppendProcessedTickInterval(ctx, nextTick.Epoch, &protobuff.ProcessedTickInterval{InitialProcessedTick: nextTick.TickNumber, LastProcessedTick: nextTick.TickNumber}) + err := p.ps.GetCurrentEpochStore().AppendProcessedTickInterval(&protobuff.ProcessedTickInterval{InitialProcessedTick: nextTick.TickNumber, LastProcessedTick: nextTick.TickNumber}) if err != nil { return errors.Wrap(err, "appending processed tick interval") } - err = p.ps.SetSkippedTicksInterval(ctx, &protobuff.SkippedTicksInterval{ - StartTick: lastTick.TickNumber + 1, - EndTick: nextTick.TickNumber - 1, - }) - if err != nil { - return errors.Wrap(err, "setting skipped ticks interval") - } - return nil } diff --git a/processor/processor_test.go b/processor/processor_test.go index 7819f52..f83c7fc 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -2,7 +2,6 @@ package processor import ( "context" - "github.com/cockroachdb/pebble" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" pb "github.com/qubic/go-archiver/protobuff" @@ -11,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" "google.golang.org/protobuf/proto" + "log" "os" "path/filepath" "testing" @@ -24,12 +24,14 @@ func TestProcessor_GetLastProcessedTick(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dbDir) - db, err := pebble.Open(filepath.Join(dbDir, "testdb"), &pebble.Options{}) - require.NoError(t, err) - defer db.Close() + testPath := filepath.Join(dbDir, "testdb") logger, _ := zap.NewDevelopment() - s := store.NewPebbleStore(db, logger) + s, err := store.NewPebbleStore(testPath, logger, 10) + require.NoError(t, err) + + err = s.HandleEpochTransition(1) + require.NoError(t, err) p := Processor{ps: s} @@ -39,6 +41,7 @@ func TestProcessor_GetLastProcessedTick(t *testing.T) { got, err := p.getLastProcessedTick(ctx, currentTickInfo) require.NoError(t, err) + log.Printf("GOT: %v EXPECTED: %v", got, &expected) require.True(t, proto.Equal(got, &expected)) } @@ -50,12 +53,11 @@ func TestProcessor_GetNextProcessingTick(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dbDir) - db, err := pebble.Open(filepath.Join(dbDir, "testdb"), &pebble.Options{}) - require.NoError(t, err) - defer db.Close() + testPath := filepath.Join(dbDir, "testdb") logger, _ := zap.NewDevelopment() - s := store.NewPebbleStore(db, logger) + s, err := store.NewPebbleStore(testPath, logger, 10) + require.NoError(t, err) p := Processor{ps: s} @@ -96,12 +98,14 @@ func TestProcessor_ProcessStatus(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dbDir) - db, err := pebble.Open(filepath.Join(dbDir, "testdb"), &pebble.Options{}) - require.NoError(t, err) - defer db.Close() + testPath := filepath.Join(dbDir, "testdb") logger, _ := zap.NewDevelopment() - s := store.NewPebbleStore(db, logger) + s, err := store.NewPebbleStore(testPath, logger, 10) + require.NoError(t, err) + + err = s.HandleEpochTransition(1) + require.NoError(t, err) p := Processor{ps: s} @@ -123,7 +127,7 @@ func TestProcessor_ProcessStatus(t *testing.T) { }, }, } - got, err := s.GetProcessedTickIntervals(ctx) + got, err := s.GetProcessedTickIntervals() require.NoError(t, err) diff := cmp.Diff(got, expected, cmpopts.IgnoreUnexported(pb.ProcessedTickInterval{}, pb.ProcessedTickIntervalsPerEpoch{})) require.True(t, cmp.Equal(diff, "")) @@ -135,7 +139,7 @@ func TestProcessor_ProcessStatus(t *testing.T) { require.NoError(t, err) expected[0].Intervals[0].LastProcessedTick = nextTick.TickNumber - got, err = s.GetProcessedTickIntervals(ctx) + got, err = s.GetProcessedTickIntervals() require.NoError(t, err) diff = cmp.Diff(got, expected, cmpopts.IgnoreUnexported(pb.ProcessedTickInterval{}, pb.ProcessedTickIntervalsPerEpoch{})) @@ -151,7 +155,7 @@ func TestProcessor_ProcessStatus(t *testing.T) { LastProcessedTick: nextTick.TickNumber, }) - got, err = s.GetProcessedTickIntervals(ctx) + got, err = s.GetProcessedTickIntervals() require.NoError(t, err) diff = cmp.Diff(got, expected, cmpopts.IgnoreUnexported(pb.ProcessedTickInterval{}, pb.ProcessedTickIntervalsPerEpoch{})) @@ -163,13 +167,15 @@ func TestProcessor_ProcessStatus(t *testing.T) { require.NoError(t, err) expected[0].Intervals[1].LastProcessedTick = nextTick.TickNumber - got, err = s.GetProcessedTickIntervals(ctx) + got, err = s.GetProcessedTickIntervals() require.NoError(t, err) diff = cmp.Diff(got, expected, cmpopts.IgnoreUnexported(pb.ProcessedTickInterval{}, pb.ProcessedTickIntervalsPerEpoch{})) require.True(t, cmp.Equal(diff, "")) // new epoch + err = s.HandleEpochTransition(2) + require.NoError(t, err) lastTick.TickNumber = nextTick.TickNumber nextTick = pb.ProcessedTick{TickNumber: 200, Epoch: 2} err = p.processStatus(ctx, &lastTick, &nextTick) @@ -184,10 +190,11 @@ func TestProcessor_ProcessStatus(t *testing.T) { }, }) - got, err = s.GetProcessedTickIntervals(ctx) + got, err = s.GetProcessedTickIntervals() require.NoError(t, err) diff = cmp.Diff(got, expected, cmpopts.IgnoreUnexported(pb.ProcessedTickInterval{}, pb.ProcessedTickIntervalsPerEpoch{})) + log.Printf("DIFF: %s\n", diff) require.True(t, cmp.Equal(diff, "")) lastTick.TickNumber = nextTick.TickNumber @@ -197,10 +204,11 @@ func TestProcessor_ProcessStatus(t *testing.T) { require.NoError(t, err) expected[1].Intervals[0].LastProcessedTick = nextTick.TickNumber - got, err = s.GetProcessedTickIntervals(ctx) + got, err = s.GetProcessedTickIntervals() require.NoError(t, err) diff = cmp.Diff(got, expected, cmpopts.IgnoreUnexported(pb.ProcessedTickInterval{}, pb.ProcessedTickIntervalsPerEpoch{})) + log.Printf("DIFF: %s\n", diff) require.True(t, cmp.Equal(diff, "")) } @@ -212,12 +220,14 @@ func TestProcessor_ProcessStatusOnthefly(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dbDir) - db, err := pebble.Open(filepath.Join(dbDir, "testdb"), &pebble.Options{}) - require.NoError(t, err) - defer db.Close() + testPath := filepath.Join(dbDir, "testdb") logger, _ := zap.NewDevelopment() - s := store.NewPebbleStore(db, logger) + s, err := store.NewPebbleStore(testPath, logger, 10) + require.NoError(t, err) + + err = s.HandleEpochTransition(1) + require.NoError(t, err) p := Processor{ps: s} @@ -239,7 +249,7 @@ func TestProcessor_ProcessStatusOnthefly(t *testing.T) { }, }, } - got, err := s.GetProcessedTickIntervals(ctx) + got, err := s.GetProcessedTickIntervals() require.NoError(t, err) diff := cmp.Diff(got, expected, cmpopts.IgnoreUnexported(pb.ProcessedTickInterval{}, pb.ProcessedTickIntervalsPerEpoch{})) require.True(t, cmp.Equal(diff, "")) diff --git a/rpc/rpc_server.go b/rpc/rpc_server.go index bfe052f..1dbf562 100644 --- a/rpc/rpc_server.go +++ b/rpc/rpc_server.go @@ -8,7 +8,6 @@ import ( "github.com/pkg/errors" "github.com/qubic/go-archiver/protobuff" "github.com/qubic/go-archiver/store" - "github.com/qubic/go-archiver/validator/quorum" "github.com/qubic/go-archiver/validator/tick" qubic "github.com/qubic/go-node-connector" "github.com/qubic/go-node-connector/types" @@ -55,33 +54,8 @@ func NewServer(listenAddrGRPC, listenAddrHTTP string, syncThreshold int, chainTi } } -func getTransactionInfo(ctx context.Context, pebbleStore *store.PebbleStore, transactionId string, tickNumber uint32) (*TransactionInfo, error) { - tickData, err := pebbleStore.GetTickData(ctx, tickNumber) - if err != nil { - return nil, errors.Wrap(err, "getting tick data") - } - - txStatus, err := pebbleStore.GetTransactionStatus(ctx, transactionId) - if err != nil { - if errors.Is(err, store.ErrNotFound) { - return &TransactionInfo{ - timestamp: tickData.Timestamp, - moneyFlew: false, - }, nil - } - - return nil, errors.Wrap(err, "getting transaction status") - } - - return &TransactionInfo{ - timestamp: tickData.Timestamp, - moneyFlew: txStatus.MoneyFlew, - }, nil - -} - func (s *Server) GetTickData(ctx context.Context, req *protobuff.GetTickDataRequest) (*protobuff.GetTickDataResponse, error) { - tickData, err := s.store.GetTickData(ctx, req.TickNumber) + tickData, err := s.store.GetTickData(req.TickNumber) if err != nil { if errors.Is(err, store.ErrNotFound) { return nil, status.Errorf(codes.NotFound, "tick data not found") @@ -99,9 +73,10 @@ func (s *Server) GetTickData(ctx context.Context, req *protobuff.GetTickDataRequ return &protobuff.GetTickDataResponse{TickData: tickData}, nil } + func (s *Server) GetTickTransactions(ctx context.Context, req *protobuff.GetTickTransactionsRequest) (*protobuff.GetTickTransactionsResponse, error) { - txs, err := s.store.GetTickTransactions(ctx, req.TickNumber) + txs, err := s.store.GetTickTransactions(req.TickNumber) if err != nil { if errors.Is(err, store.ErrNotFound) { return nil, status.Errorf(codes.NotFound, "tick transactions for specified tick not found") @@ -112,20 +87,22 @@ func (s *Server) GetTickTransactions(ctx context.Context, req *protobuff.GetTick return &protobuff.GetTickTransactionsResponse{Transactions: txs}, nil } +/* func (s *Server) GetTickTransferTransactions(ctx context.Context, req *protobuff.GetTickTransactionsRequest) (*protobuff.GetTickTransactionsResponse, error) { - txs, err := s.store.GetTickTransferTransactions(ctx, req.TickNumber) - if err != nil { - if errors.Is(err, store.ErrNotFound) { - return nil, status.Errorf(codes.NotFound, "tick transfer transactions for specified tick not found") + txs, err := s.store.GetTickTransferTransactions(ctx, req.TickNumber) + if err != nil { + if errors.Is(err, store.ErrNotFound) { + return nil, status.Errorf(codes.NotFound, "tick transfer transactions for specified tick not found") + } + return nil, status.Errorf(codes.Internal, "getting tick transactions: %v", err) } - return nil, status.Errorf(codes.Internal, "getting tick transactions: %v", err) - } - return &protobuff.GetTickTransactionsResponse{Transactions: txs}, nil -} + return &protobuff.GetTickTransactionsResponse{Transactions: txs}, nil + } +*/ func (s *Server) GetTransaction(ctx context.Context, req *protobuff.GetTransactionRequest) (*protobuff.GetTransactionResponse, error) { - tx, err := s.store.GetTransaction(ctx, req.TxId) + tx, err := s.store.GetTransaction(req.TxId) if err != nil { if errors.Is(err, store.ErrNotFound) { return nil, status.Errorf(codes.NotFound, "transaction not found") @@ -135,118 +112,9 @@ func (s *Server) GetTransaction(ctx context.Context, req *protobuff.GetTransacti return &protobuff.GetTransactionResponse{Transaction: tx}, nil } -func (s *Server) GetQuorumTickData(ctx context.Context, req *protobuff.GetQuorumTickDataRequest) (*protobuff.GetQuorumTickDataResponse, error) { - lastProcessedTick, err := s.store.GetLastProcessedTick(ctx) - if err != nil { - return nil, status.Errorf(codes.Internal, "getting last processed tick: %v", err) - } - if req.TickNumber > lastProcessedTick.TickNumber { - st := status.Newf(codes.FailedPrecondition, "requested tick number %d is greater than last processed tick %d", req.TickNumber, lastProcessedTick.TickNumber) - st, err = st.WithDetails(&protobuff.LastProcessedTick{LastProcessedTick: lastProcessedTick.TickNumber}) - if err != nil { - return nil, status.Errorf(codes.Internal, "creating custom status") - } - - return nil, st.Err() - } - - processedTickIntervalsPerEpoch, err := s.store.GetProcessedTickIntervals(ctx) - if err != nil { - return nil, status.Errorf(codes.Internal, "getting processed tick intervals per epoch") - } - - epoch, err := tick.GetTickEpoch(req.TickNumber, processedTickIntervalsPerEpoch) - if err != nil { - return nil, status.Errorf(codes.Internal, "getting tick epoch :%v", err) - } - - lastTickFlag, index, err := tick.IsTickLastInAnyEpochInterval(req.TickNumber, epoch, processedTickIntervalsPerEpoch) - if err != nil { - return nil, status.Errorf(codes.Internal, "checking if tick is last tick in it's epoch: %v", err) - } - - if lastTickFlag { - lastQuorumDataPerEpochInterval, err := s.store.GetLastTickQuorumDataListPerEpochInterval(epoch) - if err != nil { - return nil, status.Errorf(codes.Internal, "getting quorum data for last processed tick: %v", err) - } - - return &protobuff.GetQuorumTickDataResponse{ - QuorumTickData: lastQuorumDataPerEpochInterval.QuorumDataPerInterval[int32(index)], - }, nil - } - - wasSkipped, nextAvailableTick := tick.WasSkippedByArchive(req.TickNumber, processedTickIntervalsPerEpoch) - if wasSkipped == true { - st := status.Newf(codes.OutOfRange, "provided tick number %d was skipped by the system, next available tick is %d", req.TickNumber, nextAvailableTick) - st, err = st.WithDetails(&protobuff.NextAvailableTick{NextTickNumber: nextAvailableTick}) - if err != nil { - return nil, status.Errorf(codes.Internal, "creating custom status") - } - - return nil, st.Err() - } - - if req.TickNumber == lastProcessedTick.TickNumber { - tickData, err := s.store.GetQuorumTickData(ctx, req.TickNumber) - if err != nil { - if errors.Is(err, store.ErrNotFound) { - return nil, status.Errorf(codes.NotFound, "quorum tick data not found") - } - return nil, status.Errorf(codes.Internal, "getting quorum tick data: %v", err) - } - - res := protobuff.GetQuorumTickDataResponse{ - QuorumTickData: &protobuff.QuorumTickData{ - QuorumTickStructure: tickData.QuorumTickStructure, - QuorumDiffPerComputor: make(map[uint32]*protobuff.QuorumDiff), - }, - } - - for id, diff := range tickData.QuorumDiffPerComputor { - res.QuorumTickData.QuorumDiffPerComputor[id] = &protobuff.QuorumDiff{ - ExpectedNextTickTxDigestHex: diff.ExpectedNextTickTxDigestHex, - SignatureHex: diff.SignatureHex, - } - } - - return &res, nil - } - - nextTick := req.TickNumber + 1 - nextTickQuorumData, err := s.store.GetQuorumTickData(ctx, nextTick) - if err != nil { - if errors.Is(err, store.ErrNotFound) { - return nil, status.Errorf(codes.Internal, "quorum data for next tick was not found") - } - return nil, status.Errorf(codes.Internal, "getting tick data: %v", err) - } - - currentTickQuorumData, err := s.store.GetQuorumTickData(ctx, req.TickNumber) - if err != nil { - if errors.Is(err, store.ErrNotFound) { - return nil, status.Errorf(codes.Internal, "quorum data for tick was not found") - } - return nil, status.Errorf(codes.Internal, "getting tick data: %v", err) - } - - computors, err := s.store.GetComputors(ctx, currentTickQuorumData.QuorumTickStructure.Epoch) - if err != nil { - return nil, status.Errorf(codes.Internal, "getting computor list") - } - - reconstructedQuorumData, err := quorum.ReconstructQuorumData(currentTickQuorumData, nextTickQuorumData, computors) - if err != nil { - return nil, status.Errorf(codes.Internal, "reconstructing quorum data: %v", err) - } - - return &protobuff.GetQuorumTickDataResponse{ - QuorumTickData: reconstructedQuorumData, - }, nil -} func (s *Server) GetComputors(ctx context.Context, req *protobuff.GetComputorsRequest) (*protobuff.GetComputorsResponse, error) { - computors, err := s.store.GetComputors(ctx, req.Epoch) + computors, err := s.store.GetCurrentEpochStore().GetComputorList() if err != nil { if errors.Is(err, store.ErrNotFound) { return nil, status.Errorf(codes.NotFound, "computors not found") @@ -258,42 +126,27 @@ func (s *Server) GetComputors(ctx context.Context, req *protobuff.GetComputorsRe } func (s *Server) GetStatus(ctx context.Context, _ *emptypb.Empty) (*protobuff.GetStatusResponse, error) { - lastProcessedTick, err := s.store.GetLastProcessedTick(ctx) + lastProcessedTick, err := s.store.GetLastProcessedTick() if err != nil { return nil, status.Errorf(codes.Internal, "getting last processed tick: %v", err) } - lastProcessedTicksPerEpoch, err := s.store.GetLastProcessedTicksPerEpoch(ctx) + lastProcessedTicksPerEpoch, err := s.store.GetLastProcessedTicksPerEpoch() if err != nil { return nil, status.Errorf(codes.Internal, "getting last processed tick: %v", err) } - skippedTicks, err := s.store.GetSkippedTicksInterval(ctx) - if err != nil { - if errors.Is(err, store.ErrNotFound) { - return &protobuff.GetStatusResponse{LastProcessedTick: lastProcessedTick, LastProcessedTicksPerEpoch: lastProcessedTicksPerEpoch}, nil - } - - return nil, status.Errorf(codes.Internal, "getting skipped ticks: %v", err) - } - - ptie, err := s.store.GetProcessedTickIntervals(ctx) + ptie, err := s.store.GetProcessedTickIntervals() if err != nil { return nil, status.Errorf(codes.Internal, "getting processed tick intervals") } - emptyTickForCurrentEpoch, err := s.store.GetEmptyTicksForEpoch(lastProcessedTick.Epoch) - - if err != nil { - return nil, status.Errorf(codes.Internal, "getting empty ticks for current epoch: %v", err) - } - return &protobuff.GetStatusResponse{ LastProcessedTick: lastProcessedTick, LastProcessedTicksPerEpoch: lastProcessedTicksPerEpoch, - SkippedTicks: skippedTicks.SkippedTicks, + SkippedTicks: nil, ProcessedTickIntervalsPerEpoch: ptie, - EmptyTicksPerEpoch: map[uint32]uint32{lastProcessedTick.Epoch: emptyTickForCurrentEpoch}, + EmptyTicksPerEpoch: nil, }, nil } @@ -336,7 +189,7 @@ func fetchChainTick(ctx context.Context, url string) (int, error) { func (s *Server) GetHealthCheck(ctx context.Context, _ *emptypb.Empty) (*protobuff.GetHealthCheckResponse, error) { //Get last processed tick - lastProcessedTick, err := s.store.GetLastProcessedTick(ctx) + lastProcessedTick, err := s.store.GetLastProcessedTick() if err != nil { return nil, status.Errorf(codes.Internal, "getting last processed tick: %v", err) } @@ -368,158 +221,15 @@ func (s *Server) GetLatestTick(ctx context.Context, _ *emptypb.Empty) (*protobuf return &protobuff.GetLatestTickResponse{LatestTick: uint32(chainTick)}, nil } -func (s *Server) GetTransferTransactionsPerTick(ctx context.Context, req *protobuff.GetTransferTransactionsPerTickRequest) (*protobuff.GetTransferTransactionsPerTickResponse, error) { - txs, err := s.store.GetTransactionsForEntity(ctx, req.Identity, uint64(req.GetStartTick()), uint64(req.GetEndTick())) - if err != nil { - return nil, status.Errorf(codes.Internal, "getting transfer transactions: %v", err) - } - - return &protobuff.GetTransferTransactionsPerTickResponse{TransferTransactionsPerTick: txs}, nil -} - -func (s *Server) GetTickApprovedTransactions(ctx context.Context, req *protobuff.GetTickApprovedTransactionsRequest) (*protobuff.GetTickApprovedTransactionsResponse, error) { - - tts, err := s.store.GetTickTransactionsStatus(ctx, uint64(req.TickNumber)) - if err != nil { - if errors.Is(err, store.ErrNotFound) { - return nil, status.Errorf(codes.NotFound, "tick transactions status data not found for tick %d", req.TickNumber) - } - return nil, status.Errorf(codes.Internal, "getting tick transactions status: %v", err) - } - - approvedTxs := make([]*protobuff.Transaction, 0, len(tts.Transactions)) - for _, txStatus := range tts.Transactions { - if txStatus.MoneyFlew == false { - continue - } - - tx, err := s.store.GetTransaction(ctx, txStatus.TxId) - if err != nil { - return nil, errors.Wrapf(err, "getting tx %s from archiver", txStatus.TxId) - } - - if tx.InputType == 1 && tx.InputSize == 1000 && tx.DestId == "EAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAVWRF" { - moneyFlew, err := recomputeSendManyMoneyFlew(tx) - if err != nil { - return nil, status.Errorf(codes.Internal, "recomputeSendManyMoneyFlew: %v", err) - } - - if moneyFlew == false { - continue - } - } - - approvedTxs = append(approvedTxs, &protobuff.Transaction{ - SourceId: tx.SourceId, - DestId: tx.DestId, - Amount: tx.Amount, - TickNumber: tx.TickNumber, - InputType: tx.InputType, - InputSize: tx.InputSize, - InputHex: tx.InputHex, - SignatureHex: tx.SignatureHex, - TxId: tx.TxId, - }) - } - - return &protobuff.GetTickApprovedTransactionsResponse{ApprovedTransactions: approvedTxs}, nil -} - -func (s *Server) GetTransactionStatus(ctx context.Context, req *protobuff.GetTransactionStatusRequest) (*protobuff.GetTransactionStatusResponse, error) { - id := types.Identity(req.TxId) - pubKey, err := id.ToPubKey(true) - if err != nil { - return nil, status.Errorf(codes.InvalidArgument, "invalid tx id format: %v", err) - } - - var pubkeyFixed [32]byte - copy(pubkeyFixed[:], pubKey[:32]) - id, err = id.FromPubKey(pubkeyFixed, true) - if err != nil { - return nil, status.Errorf(codes.InvalidArgument, "invalid tx id format: %v", err) - } - - if id.String() != req.TxId { - return nil, status.Errorf(codes.InvalidArgument, "invalid tx id format") - } - - tx, err := s.store.GetTransaction(ctx, req.TxId) - if err != nil { - if errors.Is(err, store.ErrNotFound) { - return nil, status.Errorf(codes.NotFound, "tx status for specified tx id not found") - } - return nil, status.Errorf(codes.Internal, "getting tx status: %v", err) - } - - lastProcessedTick, err := s.store.GetLastProcessedTick(ctx) - if err != nil { - return nil, status.Errorf(codes.Internal, "getting last processed tick: %v", err) - } - - if tx.TickNumber > lastProcessedTick.TickNumber { - return nil, status.Errorf(codes.NotFound, "tx status for specified tx id not found") - } - - if tx.Amount <= 0 { - return nil, status.Errorf(codes.NotFound, "tx status for specified tx id not found") - } - - txStatus, err := s.store.GetTransactionStatus(ctx, req.TxId) - if err != nil { - if errors.Is(err, store.ErrNotFound) { - return &protobuff.GetTransactionStatusResponse{TransactionStatus: &protobuff.TransactionStatus{TxId: tx.TxId, MoneyFlew: false}}, nil - } - return nil, status.Errorf(codes.Internal, "getting tx status: %v", err) - } - - if txStatus.MoneyFlew == false { - return &protobuff.GetTransactionStatusResponse{TransactionStatus: &protobuff.TransactionStatus{TxId: tx.TxId, MoneyFlew: false}}, nil - } - - if tx.InputType == 1 && tx.InputSize == 1000 && tx.DestId == "EAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAVWRF" { - moneyFlew, err := recomputeSendManyMoneyFlew(tx) - if err != nil { - return nil, status.Errorf(codes.Internal, "recomputeSendManyMoneyFlew: %v", err) - } - - return &protobuff.GetTransactionStatusResponse{TransactionStatus: &protobuff.TransactionStatus{TxId: tx.TxId, MoneyFlew: moneyFlew}}, nil - } - - return &protobuff.GetTransactionStatusResponse{TransactionStatus: txStatus}, nil -} - -func (s *Server) GetChainHash(ctx context.Context, req *protobuff.GetChainHashRequest) (*protobuff.GetChainHashResponse, error) { - hash, err := s.store.GetChainDigest(ctx, req.TickNumber) - if err != nil { - if errors.Is(err, store.ErrNotFound) { - return nil, status.Errorf(codes.NotFound, "chain hash for specified tick not found") - } - return nil, status.Errorf(codes.Internal, "getting chain hash: %v", err) - } - - return &protobuff.GetChainHashResponse{HexDigest: hex.EncodeToString(hash[:])}, nil -} - -func (s *Server) GetStoreHash(ctx context.Context, req *protobuff.GetChainHashRequest) (*protobuff.GetChainHashResponse, error) { - hash, err := s.store.GetStoreDigest(ctx, req.TickNumber) - if err != nil { - if errors.Is(err, store.ErrNotFound) { - return nil, status.Errorf(codes.NotFound, "store hash for specified tick not found") - } - return nil, status.Errorf(codes.Internal, "getting store hash: %v", err) - } - - return &protobuff.GetChainHashResponse{HexDigest: hex.EncodeToString(hash[:])}, nil -} - func (s *Server) Start() error { - tickInBoundsInterception := NewTickWithinBoundsInterceptor(s.store) + //TODO + //tickInBoundsInterception := NewTickWithinBoundsInterceptor(s.store) srv := grpc.NewServer( grpc.MaxRecvMsgSize(600*1024*1024), grpc.MaxSendMsgSize(600*1024*1024), - grpc.UnaryInterceptor(tickInBoundsInterception.GetInterceptor), + //grpc.UnaryInterceptor(tickInBoundsInterception.GetInterceptor), ) protobuff.RegisterArchiveServiceServer(srv, s) reflection.Register(srv) diff --git a/rpc/v2_endpoints.go b/rpc/v2_endpoints.go index c9766b3..d0b61ca 100644 --- a/rpc/v2_endpoints.go +++ b/rpc/v2_endpoints.go @@ -1,24 +1,6 @@ package rpc -import ( - "cmp" - "context" - "encoding/hex" - "github.com/cockroachdb/pebble" - - "log" - - "slices" - - "github.com/pkg/errors" - "github.com/qubic/go-archiver/protobuff" - "github.com/qubic/go-archiver/store" - "github.com/qubic/go-node-connector/types" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -func (s *Server) GetTickQuorumDataV2(ctx context.Context, req *protobuff.GetTickRequestV2) (*protobuff.GetQuorumTickDataResponse, error) { +/*func (s *Server) GetTickQuorumDataV2(ctx context.Context, req *protobuff.GetTickRequestV2) (*protobuff.GetQuorumTickDataResponse, error) { return s.GetQuorumTickData(ctx, &protobuff.GetQuorumTickDataRequest{ TickNumber: req.TickNumber, }) @@ -505,4 +487,4 @@ func (s *Server) GetEpochTickListV2(ctx context.Context, req *protobuff.GetEpoch Ticks: selectedTicks, }, nil -} +}*/ diff --git a/rpc/v2_endpoints_test.go b/rpc/v2_endpoints_test.go index a741990..4aab7b2 100644 --- a/rpc/v2_endpoints_test.go +++ b/rpc/v2_endpoints_test.go @@ -6,7 +6,7 @@ import ( "testing" ) -func Test_V2Endpoints_createPaginationInfo(t *testing.T) { +/*func Test_V2Endpoints_createPaginationInfo(t *testing.T) { pagination, err := getPaginationInformation(0, 1, 100) assert.NoError(t, err) @@ -43,9 +43,9 @@ func Test_V2Endpoints_createPaginationInfo(t *testing.T) { verify(t, pagination, 12345, 999, 100, 124, 124, -1) -} +}*/ -func Test_V2Endpoints_createPaginationInfo_givenInvalidArguments_thenError(t *testing.T) { +/*func Test_V2Endpoints_createPaginationInfo_givenInvalidArguments_thenError(t *testing.T) { _, err := getPaginationInformation(12345, 0, 100) assert.Error(t, err) @@ -54,7 +54,7 @@ func Test_V2Endpoints_createPaginationInfo_givenInvalidArguments_thenError(t *te _, err = getPaginationInformation(12345, 1, 0) assert.Error(t, err) -} +}*/ func verify(t *testing.T, pagination *protobuff.Pagination, totalRecords, pageNumber, pageSize, totalPages, previousPage, nextPage int) { assert.NotNil(t, pagination) diff --git a/store/epoch_store.go b/store/epoch_store.go new file mode 100644 index 0000000..24d93ea --- /dev/null +++ b/store/epoch_store.go @@ -0,0 +1,309 @@ +package store + +import ( + "encoding/binary" + "github.com/cockroachdb/pebble" + "github.com/pkg/errors" + "github.com/qubic/go-archiver/protobuff" + "google.golang.org/protobuf/proto" + "path/filepath" + "runtime" + "strconv" +) + +//var ErrNotFound = errors.New("store resource not found") + +type EpochStore struct { + pebbleDb *pebble.DB + epoch uint32 +} + +func NewEpochStore(storageDirPath string, epoch uint32) (*EpochStore, error) { + + dbPath := storageDirPath + string(filepath.Separator) + strconv.Itoa(int(epoch)) + + dbOptions := getDBOptions() + db, err := pebble.Open(dbPath, &dbOptions) + if err != nil { + return nil, errors.Wrapf(err, "opening db for epoch %d", epoch) + } + + return &EpochStore{ + pebbleDb: db, + epoch: epoch, + }, nil +} + +func (es *EpochStore) SetComputorList(computorList *protobuff.Computors) error { + + key := []byte{computorListKey} + serialized, err := proto.Marshal(computorList) + if err != nil { + return errors.Wrap(err, "marshalling computor list object") + } + err = es.pebbleDb.Set(key, serialized, pebble.Sync) + if err != nil { + return errors.Wrap(err, "storing computor list") + } + return nil +} + +func (es *EpochStore) GetComputorList() (*protobuff.Computors, error) { + + key := []byte{computorListKey} + value, closer, err := es.pebbleDb.Get(key) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return nil, ErrNotFound + } + + return nil, errors.Wrap(err, "getting computor list") + } + defer closer.Close() + + var computorList protobuff.Computors + if err := proto.Unmarshal(value, &computorList); err != nil { + return nil, errors.Wrap(err, "unmarshalling computor list object") + } + return &computorList, err + +} + +func (es *EpochStore) SetTickData(tickNumber uint32, tickData *protobuff.TickData) error { + + key := assembleKey(tickDataKey, tickNumber) + serialized, err := proto.Marshal(tickData) + if err != nil { + return errors.Wrap(err, "marshalling tick data object") + } + err = es.pebbleDb.Set(key, serialized, pebble.Sync) + if err != nil { + return errors.Wrap(err, "storing tick data") + } + return nil +} + +func (es *EpochStore) GetTickData(tickNumber uint32) (*protobuff.TickData, error) { + + key := assembleKey(tickDataKey, tickNumber) + value, closer, err := es.pebbleDb.Get(key) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return nil, ErrNotFound + } + + return nil, errors.Wrap(err, "getting tick data") + } + defer closer.Close() + + var tickData protobuff.TickData + if err := proto.Unmarshal(value, &tickData); err != nil { + return nil, errors.Wrap(err, "unmarshalling tick data object") + } + return &tickData, err +} + +func (es *EpochStore) SetTransactions(transactions []*protobuff.Transaction) error { + + batch := es.pebbleDb.NewBatchWithSize(len(transactions)) + defer batch.Close() + + for _, tx := range transactions { + key := assembleKey(transactionKey, tx.TxId) + serialized, err := proto.Marshal(tx) + if err != nil { + return errors.Wrap(err, "marshalling transaction object") + } + err = batch.Set(key, serialized, nil) + if err != nil { + return errors.Wrap(err, "adding transaction to batch") + } + } + if err := batch.Commit(pebble.Sync); err != nil { + return errors.Wrap(err, "committing transaction batch") + } + return nil +} + +func (es *EpochStore) GetTransaction(txId string) (*protobuff.Transaction, error) { + + key := assembleKey(transactionKey, txId) + value, closer, err := es.pebbleDb.Get(key) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return nil, ErrNotFound + } + + return nil, errors.Wrap(err, "getting transaction") + } + defer closer.Close() + + var transaction protobuff.Transaction + if err := proto.Unmarshal(value, &transaction); err != nil { + return nil, errors.Wrap(err, "unmarshalling transaction") + } + + return &transaction, nil +} + +func (es *EpochStore) SetLastProcessedTick(tickNumber uint32) error { + + key := []byte{lastProcessedTickKey} + + value := make([]byte, 4) + binary.LittleEndian.PutUint32(value, tickNumber) + + err := es.pebbleDb.Set(key, value, pebble.Sync) + if err != nil { + return errors.Wrap(err, "storing last processed tick") + } + + processedTickIntervals, err := es.GetProcessedTickIntervals() + if err != nil { + return errors.Wrap(err, "getting processed tick intervals") + } + + if len(processedTickIntervals.Intervals) == 0 { + processedTickIntervals = &protobuff.ProcessedTickIntervalsPerEpoch{Epoch: es.epoch, Intervals: []*protobuff.ProcessedTickInterval{{InitialProcessedTick: tickNumber, LastProcessedTick: tickNumber}}} + } else { + processedTickIntervals.Intervals[len(processedTickIntervals.Intervals)-1].LastProcessedTick = tickNumber + } + + err = es.SetProcessedTickIntervals(processedTickIntervals) + if err != nil { + return errors.Wrap(err, "updating tick intervals") + } + + return nil +} + +func (es *EpochStore) GetLastProcessedTick() (uint32, error) { + key := []byte{lastProcessedTickKey} + + value, closer, err := es.pebbleDb.Get(key) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return 0, ErrNotFound + } + + return 0, errors.Wrap(err, "getting last processed tick") + } + defer closer.Close() + + return binary.LittleEndian.Uint32(value), nil +} + +func (es *EpochStore) SetProcessedTickIntervals(processedTickIntervals *protobuff.ProcessedTickIntervalsPerEpoch) error { + + key := []byte{processedTickIntervalsKey} + serialized, err := proto.Marshal(processedTickIntervals) + if err != nil { + return errors.Wrap(err, "marshalling processed tick intervals object") + } + err = es.pebbleDb.Set(key, serialized, pebble.Sync) + if err != nil { + return errors.Wrap(err, "storing processed tick intervals") + } + return nil +} + +func (es *EpochStore) GetProcessedTickIntervals() (*protobuff.ProcessedTickIntervalsPerEpoch, error) { + key := []byte{processedTickIntervalsKey} + value, closer, err := es.pebbleDb.Get(key) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return &protobuff.ProcessedTickIntervalsPerEpoch{Intervals: make([]*protobuff.ProcessedTickInterval, 0), Epoch: es.epoch}, nil + } + return nil, errors.Wrap(err, "getting processed tick intervals") + } + defer closer.Close() + + var processedTickIntervals protobuff.ProcessedTickIntervalsPerEpoch + if err := proto.Unmarshal(value, &processedTickIntervals); err != nil { + return nil, errors.Wrap(err, "unmarshalling processed tick intervals") + } + + return &processedTickIntervals, nil +} + +func (es *EpochStore) AppendProcessedTickInterval(tickInterval *protobuff.ProcessedTickInterval) error { + + existing, err := es.GetProcessedTickIntervals() + if err != nil { + return errors.Wrap(err, "getting existing processed tick intervals") + } + existing.Intervals = append(existing.Intervals, tickInterval) + err = es.SetProcessedTickIntervals(existing) + if err != nil { + return errors.Wrap(err, "storing updated processed tick intervals") + } + return nil +} + +const ( + computorListKey = 0xa0 + tickDataKey = 0xa1 + transactionKey + + lastProcessedTickKey = 0xb0 + processedTickIntervalsKey = 0xb1 +) + +type iDType interface { + uint32 | uint64 | string +} + +func assembleKey[T iDType](keyPrefix int, id T) []byte { + + prefix := byte(keyPrefix) + + key := []byte{prefix} + + switch any(id).(type) { + + case uint32: + asserted := any(id).(uint32) + key = binary.BigEndian.AppendUint64(key, uint64(asserted)) + break + + case uint64: + asserted := any(id).(uint64) + key = binary.BigEndian.AppendUint64(key, asserted) + break + + case string: + asserted := any(id).(string) + key = append(key, []byte(asserted)...) + } + return key +} + +func getDBOptions() pebble.Options { + l1Options := pebble.LevelOptions{ + BlockRestartInterval: 16, + BlockSize: 4096, + BlockSizeThreshold: 90, + Compression: pebble.NoCompression, + FilterPolicy: nil, + FilterType: pebble.TableFilter, + IndexBlockSize: 4096, + TargetFileSize: 268435456, // 256 MB + } + l2Options := pebble.LevelOptions{ + BlockRestartInterval: 16, + BlockSize: 4096, + BlockSizeThreshold: 90, + Compression: pebble.NoCompression, + FilterPolicy: nil, + FilterType: pebble.TableFilter, + IndexBlockSize: 4096, + TargetFileSize: l1Options.TargetFileSize * 10, // 2.5 GB + } + + return pebble.Options{ + Levels: []pebble.LevelOptions{l1Options, l2Options}, + MaxConcurrentCompactions: func() int { return runtime.NumCPU() }, + MemTableSize: 268435456, // 256 MB + EventListener: NewPebbleEventListener(), + } +} diff --git a/store/keys.go b/store/keys.go.old similarity index 100% rename from store/keys.go rename to store/keys.go.old diff --git a/store/store.go b/store/store.go index 8233a21..e062a23 100644 --- a/store/store.go +++ b/store/store.go @@ -1,14 +1,17 @@ package store import ( - "context" + "cmp" "encoding/binary" "github.com/cockroachdb/pebble" "github.com/pkg/errors" "github.com/qubic/go-archiver/protobuff" "go.uber.org/zap" "google.golang.org/protobuf/proto" - "strconv" + "log" + "path/filepath" + "slices" + "sync" ) const maxTickNumber = ^uint64(0) @@ -16,908 +19,361 @@ const maxTickNumber = ^uint64(0) var ErrNotFound = errors.New("store resource not found") type PebbleStore struct { - db *pebble.DB - logger *zap.Logger -} + maxEpochCount int -func NewPebbleStore(db *pebble.DB, logger *zap.Logger) *PebbleStore { - return &PebbleStore{db: db, logger: logger} -} + storeInfo *stores -func (s *PebbleStore) GetTickData(ctx context.Context, tickNumber uint32) (*protobuff.TickData, error) { - key := tickDataKey(tickNumber) - value, closer, err := s.db.Get(key) - if err != nil { - if errors.Is(err, pebble.ErrNotFound) { - return nil, ErrNotFound - } - - return nil, errors.Wrap(err, "getting tick data") - } - defer closer.Close() - - var td protobuff.TickData - if err := proto.Unmarshal(value, &td); err != nil { - return nil, errors.Wrap(err, "unmarshalling tick data to protobuff type") - } + storagePath string - return &td, err + infoDb *pebble.DB + logger *zap.Logger } -func (s *PebbleStore) SetTickData(ctx context.Context, tickNumber uint32, td *protobuff.TickData) error { - key := tickDataKey(tickNumber) - serialized, err := proto.Marshal(td) - if err != nil { - return errors.Wrap(err, "serializing td proto") - } +func NewPebbleStore(storagePath string, logger *zap.Logger, epochCount int) (*PebbleStore, error) { - err = s.db.Set(key, serialized, pebble.Sync) - if err != nil { - return errors.Wrap(err, "setting tick data") - } + infoDbPath := storagePath + string(filepath.Separator) + "info" - return nil -} - -func (s *PebbleStore) SetQuorumTickData(ctx context.Context, tickNumber uint32, qtd *protobuff.QuorumTickDataStored) error { - key := quorumTickDataKey(tickNumber) - serialized, err := proto.Marshal(qtd) + infoDb, err := pebble.Open(infoDbPath, &pebble.Options{}) // Use default options if err != nil { - return errors.Wrap(err, "serializing qtdV2 proto") + return nil, errors.Wrap(err, "opening info db") } - err = s.db.Set(key, serialized, pebble.Sync) - if err != nil { - return errors.Wrap(err, "setting quorum tick data") + ps := PebbleStore{ + maxEpochCount: epochCount, + storeInfo: &stores{}, + storagePath: storagePath, + infoDb: infoDb, + logger: logger, } - return nil -} - -func (s *PebbleStore) GetQuorumTickData(ctx context.Context, tickNumber uint32) (*protobuff.QuorumTickDataStored, error) { - key := quorumTickDataKey(tickNumber) - value, closer, err := s.db.Get(key) + lastProcessedTick, err := ps.GetLastProcessedTick() if err != nil { - if errors.Is(err, pebble.ErrNotFound) { - return nil, ErrNotFound + if !errors.Is(err, ErrNotFound) { + return nil, errors.Wrap(err, "failed to get last processed tick") } - - return nil, errors.Wrap(err, "getting quorum tick data") } - defer closer.Close() - var qtd protobuff.QuorumTickDataStored - if err := proto.Unmarshal(value, &qtd); err != nil { - return nil, errors.Wrap(err, "unmarshalling qtdV2 to protobuf type") - } - - return &qtd, err -} - -func (s *PebbleStore) GetComputors(ctx context.Context, epoch uint32) (*protobuff.Computors, error) { - key := computorsKey(epoch) - - value, closer, err := s.db.Get(key) + relevantEpochs, err := ps.GetRelevantEpochList() if err != nil { - if errors.Is(err, pebble.ErrNotFound) { - return nil, ErrNotFound + if !errors.Is(err, ErrNotFound) { + return nil, errors.Wrap(err, "failed to get relevant epoch list") } - - return nil, errors.Wrap(err, "getting quorum tick data") - } - defer closer.Close() - - var computors protobuff.Computors - if err := proto.Unmarshal(value, &computors); err != nil { - return nil, errors.Wrap(err, "unmarshalling computors to protobuff type") - } - - return &computors, nil -} - -func (s *PebbleStore) SetComputors(ctx context.Context, epoch uint32, computors *protobuff.Computors) error { - key := computorsKey(epoch) - - serialized, err := proto.Marshal(computors) - if err != nil { - return errors.Wrap(err, "serializing computors proto") } - err = s.db.Set(key, serialized, pebble.Sync) - if err != nil { - return errors.Wrap(err, "setting computors") - } - - return nil -} - -func (s *PebbleStore) SetTransactions(ctx context.Context, txs []*protobuff.Transaction) error { - batch := s.db.NewBatchWithSize(len(txs)) - defer batch.Close() + loadedEpochs := make(map[uint32]*EpochStore) - for _, tx := range txs { - key, err := tickTxKey(tx.TxId) - if err != nil { - return errors.Wrapf(err, "creating tx key for id: %s", tx.TxId) + for _, epoch := range relevantEpochs { + if epoch == 0 { + continue } - - serialized, err := proto.Marshal(tx) + epochStore, err := NewEpochStore(storagePath, epoch) if err != nil { - return errors.Wrap(err, "serializing tx proto") + return nil, errors.Wrapf(err, "creating store for epoch %d", epoch) } - err = batch.Set(key, serialized, nil) - if err != nil { - return errors.Wrap(err, "getting tick data") + if lastProcessedTick != nil && epoch == lastProcessedTick.Epoch { + ps.storeInfo.setCurrentEpochStore(epochStore) } + loadedEpochs[epoch] = epochStore } + ps.storeInfo.setLoadedEpochs(loadedEpochs) - if err := batch.Commit(pebble.Sync); err != nil { - return errors.Wrap(err, "committing batch") - } - - return nil + return &ps, err } -func (s *PebbleStore) GetTickTransactions(ctx context.Context, tickNumber uint32) ([]*protobuff.Transaction, error) { - td, err := s.GetTickData(ctx, tickNumber) - if err != nil { - if errors.Is(err, ErrNotFound) { - return nil, ErrNotFound - } - - return nil, errors.Wrap(err, "getting tick data") - } - - txs := make([]*protobuff.Transaction, 0, len(td.TransactionIds)) - for _, txID := range td.TransactionIds { - tx, err := s.GetTransaction(ctx, txID) - if err != nil { - if errors.Is(err, ErrNotFound) { - return nil, ErrNotFound - } - - return nil, errors.Wrapf(err, "getting tx for id: %s", txID) - } - - txs = append(txs, tx) - } +func (s *PebbleStore) SetLastProcessedTick(processedTick *protobuff.ProcessedTick) error { - return txs, nil -} - -func (s *PebbleStore) GetTickTransferTransactions(ctx context.Context, tickNumber uint32) ([]*protobuff.Transaction, error) { - td, err := s.GetTickData(ctx, tickNumber) + key := []byte{lastProcessedTickKey} + value, err := proto.Marshal(processedTick) if err != nil { - if errors.Is(err, ErrNotFound) { - return nil, ErrNotFound - } - - return nil, errors.Wrap(err, "getting tick data") - } - - txs := make([]*protobuff.Transaction, 0, len(td.TransactionIds)) - for _, txID := range td.TransactionIds { - tx, err := s.GetTransaction(ctx, txID) - if err != nil { - if errors.Is(err, ErrNotFound) { - return nil, ErrNotFound - } - - return nil, errors.Wrapf(err, "getting tx for id: %s", txID) - } - if tx.Amount <= 0 { - continue - } - - txs = append(txs, tx) + return errors.Wrap(err, "marshalling last processed tick") } - - return txs, nil -} - -func (s *PebbleStore) GetTransaction(ctx context.Context, txID string) (*protobuff.Transaction, error) { - key, err := tickTxKey(txID) + err = s.infoDb.Set(key, value, pebble.Sync) if err != nil { - return nil, errors.Wrap(err, "getting tx key") + return errors.Wrap(err, "storing last processed tick") } - value, closer, err := s.db.Get(key) + currentEpochStore := s.storeInfo.getCurrentEpochStore() + err = currentEpochStore.SetLastProcessedTick(processedTick.TickNumber) if err != nil { - if errors.Is(err, pebble.ErrNotFound) { - return nil, ErrNotFound - } - - return nil, errors.Wrap(err, "getting tx") - } - defer closer.Close() - - var tx protobuff.Transaction - if err := proto.Unmarshal(value, &tx); err != nil { - return nil, errors.Wrap(err, "unmarshalling tx to protobuff type") + return errors.Wrap(err, "storing last processed tick to epoch storage") } - return &tx, nil + return nil } -func (s *PebbleStore) SetLastProcessedTick(ctx context.Context, lastProcessedTick *protobuff.ProcessedTick) error { - batch := s.db.NewBatch() - defer batch.Close() - - key := lastProcessedTickKeyPerEpoch(lastProcessedTick.Epoch) - value := make([]byte, 4) - binary.LittleEndian.PutUint32(value, lastProcessedTick.TickNumber) - - err := batch.Set(key, value, pebble.Sync) - if err != nil { - return errors.Wrap(err, "setting last processed tick") - } - - key = lastProcessedTickKey() - serialized, err := proto.Marshal(lastProcessedTick) - if err != nil { - return errors.Wrap(err, "serializing skipped tick proto") - } - - err = batch.Set(key, serialized, pebble.Sync) - if err != nil { - return errors.Wrap(err, "setting last processed tick") - } - - err = batch.Commit(pebble.Sync) - if err != nil { - return errors.Wrap(err, "committing batch") - } - - ptie, err := s.getProcessedTickIntervalsPerEpoch(ctx, lastProcessedTick.Epoch) - if err != nil { - return errors.Wrap(err, "getting ptie") - } - - if len(ptie.Intervals) == 0 { - ptie = &protobuff.ProcessedTickIntervalsPerEpoch{Epoch: lastProcessedTick.Epoch, Intervals: []*protobuff.ProcessedTickInterval{{InitialProcessedTick: lastProcessedTick.TickNumber, LastProcessedTick: lastProcessedTick.TickNumber}}} - } else { - ptie.Intervals[len(ptie.Intervals)-1].LastProcessedTick = lastProcessedTick.TickNumber - } - - err = s.SetProcessedTickIntervalPerEpoch(ctx, lastProcessedTick.Epoch, ptie) - if err != nil { - return errors.Wrap(err, "setting ptie") - } +func (s *PebbleStore) GetLastProcessedTick() (*protobuff.ProcessedTick, error) { - return nil -} + key := []byte{lastProcessedTickKey} -func (s *PebbleStore) GetLastProcessedTick(ctx context.Context) (*protobuff.ProcessedTick, error) { - key := lastProcessedTickKey() - value, closer, err := s.db.Get(key) + value, closer, err := s.infoDb.Get(key) if err != nil { if errors.Is(err, pebble.ErrNotFound) { - return nil, ErrNotFound + return &protobuff.ProcessedTick{ + TickNumber: 0, + Epoch: 0, + }, ErrNotFound } return nil, errors.Wrap(err, "getting last processed tick") } defer closer.Close() - var lpt protobuff.ProcessedTick - if err := proto.Unmarshal(value, &lpt); err != nil { - return nil, errors.Wrap(err, "unmarshalling lpt to protobuff type") - } - - return &lpt, nil -} - -func (s *PebbleStore) GetLastProcessedTicksPerEpoch(ctx context.Context) (map[uint32]uint32, error) { - upperBound := append([]byte{LastProcessedTickPerEpoch}, []byte(strconv.FormatUint(maxTickNumber, 10))...) - iter, err := s.db.NewIter(&pebble.IterOptions{ - LowerBound: []byte{LastProcessedTickPerEpoch}, - UpperBound: upperBound, - }) + var lastProcessedTick protobuff.ProcessedTick + err = proto.Unmarshal(value, &lastProcessedTick) if err != nil { - return nil, errors.Wrap(err, "creating iter") + return nil, errors.Wrap(err, "unmarshalling last processed tick") } - defer iter.Close() - ticksPerEpoch := make(map[uint32]uint32) - for iter.First(); iter.Valid(); iter.Next() { - key := iter.Key() - - value, err := iter.ValueAndErr() - if err != nil { - return nil, errors.Wrap(err, "getting value from iter") - } - - epochNumber := binary.BigEndian.Uint32(key[1:]) - tickNumber := binary.LittleEndian.Uint32(value) - ticksPerEpoch[epochNumber] = tickNumber - } - - return ticksPerEpoch, nil + return &lastProcessedTick, nil } -func (s *PebbleStore) SetSkippedTicksInterval(ctx context.Context, skippedTick *protobuff.SkippedTicksInterval) error { - newList := protobuff.SkippedTicksIntervalList{} - current, err := s.GetSkippedTicksInterval(ctx) - if err != nil { - if !errors.Is(err, ErrNotFound) { - return errors.Wrap(err, "getting skipped tick interval") - } - } else { - newList.SkippedTicks = current.SkippedTicks - } +func (s *PebbleStore) SetRelevantEpochList(epochs []uint32) error { - newList.SkippedTicks = append(newList.SkippedTicks, skippedTick) + key := []byte{relevantEpochsKey} - key := skippedTicksIntervalKey() - serialized, err := proto.Marshal(&newList) - if err != nil { - return errors.Wrap(err, "serializing skipped tick proto") + value := make([]byte, len(epochs)*4) + for index, epoch := range epochs { + binary.LittleEndian.PutUint32(value[index*4:index*4+4], epoch) } - err = s.db.Set(key, serialized, pebble.Sync) + err := s.infoDb.Set(key, value, pebble.Sync) if err != nil { - return errors.Wrap(err, "setting skipped tick interval") + return errors.Wrap(err, "storing relevant epoch list") } return nil } -func (s *PebbleStore) GetSkippedTicksInterval(ctx context.Context) (*protobuff.SkippedTicksIntervalList, error) { - key := skippedTicksIntervalKey() - value, closer, err := s.db.Get(key) +func (s *PebbleStore) GetRelevantEpochList() ([]uint32, error) { + + key := []byte{relevantEpochsKey} + + value, closer, err := s.infoDb.Get(key) if err != nil { if errors.Is(err, pebble.ErrNotFound) { - return nil, ErrNotFound + return []uint32{}, nil } - return nil, errors.Wrap(err, "getting skipped tick interval") + return nil, errors.Wrap(err, "getting relevant epoch list") } defer closer.Close() - var stil protobuff.SkippedTicksIntervalList - if err := proto.Unmarshal(value, &stil); err != nil { - return nil, errors.Wrap(err, "unmarshalling skipped tick interval to protobuff type") - } - - return &stil, nil -} - -func (s *PebbleStore) PutTransferTransactionsPerTick(ctx context.Context, identity string, tickNumber uint32, txs *protobuff.TransferTransactionsPerTick) error { - key := identityTransferTransactionsPerTickKey(identity, tickNumber) + epochs := make([]uint32, len(value)/4) - serialized, err := proto.Marshal(txs) - if err != nil { - return errors.Wrap(err, "serializing tx proto") + for index := 0; index < len(value)/4; index++ { + epoch := binary.LittleEndian.Uint32(value[index*4 : index*4+4]) + epochs = append(epochs, epoch) } - err = s.db.Set(key, serialized, pebble.Sync) - if err != nil { - return errors.Wrap(err, "setting transfer tx") - } - - return nil -} - -type Pageable struct { - Page, Size uint32 -} - -type Sortable struct { - Descending bool -} - -type Filterable struct { - ScOnly bool + return epochs, nil } -func (s *PebbleStore) GetTransactionsForEntity(ctx context.Context, identity string, startTick, endTick uint64) ([]*protobuff.TransferTransactionsPerTick, error) { - const limitForRequestWithoutPaging = 1000 // old implementation was unlimited. - transfers, _, err := s.GetTransactionsForEntityPaged(ctx, identity, startTick, endTick, - Pageable{Size: limitForRequestWithoutPaging}, - Sortable{}, - Filterable{}, - ) - return transfers, err +func (s *PebbleStore) GetCurrentEpochStore() *EpochStore { + return s.storeInfo.getCurrentEpochStore() } -func (s *PebbleStore) GetTransactionsForEntityPaged(_ context.Context, identity string, startTick, endTick uint64, page Pageable, sort Sortable, filter Filterable) ([]*protobuff.TransferTransactionsPerTick, int, error) { +func (s *PebbleStore) HandleEpochTransition(epoch uint32) error { - var index, start, end int - start = int(page.Page) * int(page.Size) - end = start + int(page.Size) - - var transferTxs []*protobuff.TransferTransactionsPerTick - transferTxs = make([]*protobuff.TransferTransactionsPerTick, 0, min(page.Size, 1000)) - - partialKey := identityTransferTransactions(identity) - iter, err := s.db.NewIter(&pebble.IterOptions{ - LowerBound: binary.BigEndian.AppendUint64(partialKey, startTick), - UpperBound: binary.BigEndian.AppendUint64(partialKey, endTick+1), - }) + epochStore, err := NewEpochStore(s.storagePath, epoch) if err != nil { - return nil, -1, errors.Wrap(err, "creating iterator") + return errors.Wrap(err, "creating new epoch store") } - defer iter.Close() - if sort.Descending { - for iter.Last(); iter.Valid(); iter.Prev() { - index, transferTxs, err = getTransfersPage(iter, index, transferTxs, start, end, filter) - } - } else { - for iter.First(); iter.Valid(); iter.Next() { // per tick - index, transferTxs, err = getTransfersPage(iter, index, transferTxs, start, end, filter) - } - } - if err != nil { - return nil, -1, errors.Wrap(err, "getting transfers page") - } + s.storeInfo.setCurrentEpochStore(epochStore) - return transferTxs, index, nil -} + loadedEpochs := s.storeInfo.getLoadedEpochs() + loadedEpochs[epoch] = epochStore -func getTransfersPage(iter *pebble.Iterator, index int, transferTxs []*protobuff.TransferTransactionsPerTick, pageStart int, pageEnd int, filter Filterable) (int, []*protobuff.TransferTransactionsPerTick, error) { - value, err := iter.ValueAndErr() - if err != nil { - return -1, nil, errors.Wrap(err, "getting value from iter") - } + epochs := getRelevantEpochListFromMap(loadedEpochs) - var perTick protobuff.TransferTransactionsPerTick - var toBeAdded *protobuff.TransferTransactionsPerTick + log.Printf("Epoch list pre transition: %v\n", epochs) - err = proto.Unmarshal(value, &perTick) - if err != nil { - return -1, nil, errors.Wrap(err, "unmarshalling transfer tx per tick to protobuff type") - } - - transactions := filterTransactions(filter, &perTick) + if len(epochs) > s.maxEpochCount { + slices.Sort(epochs) - count := len(transactions) - if count > 0 && index+count >= pageStart && index < pageEnd { + discardedEpochs := epochs[:len(epochs)-s.maxEpochCount] + for _, e := range discardedEpochs { - startIndex := max(pageStart-index, 0) // if index < pageStart we need to skip first items - endIndex := min(pageEnd-index, count) - - if index+count >= pageStart && endIndex > startIndex { // covers case index >= pageStart and index+count >= pageStart - toBeAdded = &protobuff.TransferTransactionsPerTick{ - TickNumber: perTick.GetTickNumber(), - Identity: perTick.GetIdentity(), - Transactions: transactions[startIndex:endIndex], - } - transferTxs = append(transferTxs, toBeAdded) + //Commented for now. Closing the DB will hang for an unknown period of time. If the program is terminated before this finished, the updated list will not be saved, leading to a broken state + //oldStore := s.loadedEpochs[e] + delete(loadedEpochs, e) + //err := oldStore.pebbleDb.Close() + //return errors.Wrapf(err, "closing old epoch store %d", e) } } - index += count - return index, transferTxs, nil -} + s.storeInfo.setLoadedEpochs(loadedEpochs) -func filterTransactions(filter Filterable, perTick *protobuff.TransferTransactionsPerTick) []*protobuff.Transaction { - var transactions []*protobuff.Transaction - if filter.ScOnly { // filter if necessary - transactions = make([]*protobuff.Transaction, 0) - for _, tx := range perTick.GetTransactions() { - if tx.InputType != 0 { - transactions = append(transactions, tx) - } - } - } else { - transactions = perTick.GetTransactions() - } - return transactions -} + epochs = getRelevantEpochListFromMap(loadedEpochs) -func (s *PebbleStore) PutChainDigest(ctx context.Context, tickNumber uint32, digest []byte) error { - key := chainDigestKey(tickNumber) + log.Printf("Epoch list post transition: %v\n", epochs) - err := s.db.Set(key, digest, pebble.Sync) + err = s.SetRelevantEpochList(epochs) if err != nil { - return errors.Wrap(err, "setting chain digest") + return errors.Wrap(err, "saving relevant epoch list") } return nil } -func (s *PebbleStore) GetChainDigest(ctx context.Context, tickNumber uint32) ([]byte, error) { - key := chainDigestKey(tickNumber) - value, closer, err := s.db.Get(key) - if err != nil { - if errors.Is(err, pebble.ErrNotFound) { - return nil, ErrNotFound - } - - return nil, errors.Wrap(err, "getting chain digest") +func getRelevantEpochListFromMap(epochMap map[uint32]*EpochStore) []uint32 { + var epochs []uint32 + for e, _ := range epochMap { + epochs = append(epochs, e) } - defer closer.Close() - - return value, nil -} - -func (s *PebbleStore) PutStoreDigest(ctx context.Context, tickNumber uint32, digest []byte) error { - key := storeDigestKey(tickNumber) - - err := s.db.Set(key, digest, pebble.Sync) - if err != nil { - return errors.Wrap(err, "setting chain digest") - } - - return nil -} - -func (s *PebbleStore) GetStoreDigest(ctx context.Context, tickNumber uint32) ([]byte, error) { - key := storeDigestKey(tickNumber) - value, closer, err := s.db.Get(key) - if err != nil { - if errors.Is(err, pebble.ErrNotFound) { - return nil, ErrNotFound - } - - return nil, errors.Wrap(err, "getting chain digest") - } - defer closer.Close() - return value, nil + return epochs } -func (s *PebbleStore) GetTickTransactionsStatus(ctx context.Context, tickNumber uint64) (*protobuff.TickTransactionsStatus, error) { - key := tickTxStatusKey(tickNumber) - value, closer, err := s.db.Get(key) - if err != nil { - if errors.Is(err, pebble.ErrNotFound) { - return nil, ErrNotFound - } - - return nil, errors.Wrap(err, "getting transactions status") - } - defer closer.Close() - - var tts protobuff.TickTransactionsStatus - if err := proto.Unmarshal(value, &tts); err != nil { - return nil, errors.Wrap(err, "unmarshalling tick transactions status") - } - - return &tts, err -} +func (s *PebbleStore) GetTickData(tickNumber uint32) (*protobuff.TickData, error) { -func (s *PebbleStore) GetTransactionStatus(ctx context.Context, txID string) (*protobuff.TransactionStatus, error) { - key := txStatusKey(txID) - value, closer, err := s.db.Get(key) - if err != nil { - if errors.Is(err, pebble.ErrNotFound) { - return nil, ErrNotFound + loadedEpochs := s.storeInfo.getLoadedEpochs() + for _, store := range loadedEpochs { + tickData, err := store.GetTickData(tickNumber) + if err != nil { + if errors.Is(err, ErrNotFound) { + continue + } + return nil, errors.Wrap(err, "finding tick data") } - return nil, errors.Wrap(err, "getting transaction status") - } - defer closer.Close() - - var ts protobuff.TransactionStatus - if err := proto.Unmarshal(value, &ts); err != nil { - return nil, errors.Wrap(err, "unmarshalling transaction status") + return tickData, nil } - return &ts, err + return nil, errors.Errorf("unable to find tick data for tick %d", tickNumber) } -func (s *PebbleStore) SetTickTransactionsStatus(ctx context.Context, tickNumber uint64, tts *protobuff.TickTransactionsStatus) error { - key := tickTxStatusKey(tickNumber) - batch := s.db.NewBatchWithSize(len(tts.Transactions) + 1) - defer batch.Close() - - serialized, err := proto.Marshal(tts) - if err != nil { - return errors.Wrap(err, "serializing tts proto") - } +func (s *PebbleStore) GetTickTransactions(tickNumber uint32) ([]*protobuff.Transaction, error) { - err = batch.Set(key, serialized, pebble.Sync) - if err != nil { - return errors.Wrap(err, "setting tts data") - } - - for _, tx := range tts.Transactions { - key := txStatusKey(tx.TxId) - - serialized, err := proto.Marshal(tx) - if err != nil { - return errors.Wrap(err, "serializing tx status proto") - } + var transactions []*protobuff.Transaction - err = batch.Set(key, serialized, nil) + loadedEpochs := s.storeInfo.getLoadedEpochs() + for _, store := range loadedEpochs { + tickData, err := store.GetTickData(tickNumber) if err != nil { - return errors.Wrap(err, "setting tx status data") + if errors.Is(err, ErrNotFound) { + continue + } + return nil, errors.Wrap(err, "finding tick transactions") } - } - err = batch.Commit(pebble.Sync) - if err != nil { - return errors.Wrap(err, "committing batch") - } - - return nil -} - -func (s *PebbleStore) getProcessedTickIntervalsPerEpoch(ctx context.Context, epoch uint32) (*protobuff.ProcessedTickIntervalsPerEpoch, error) { - key := processedTickIntervalsPerEpochKey(epoch) - value, closer, err := s.db.Get(key) - if err != nil { - if errors.Is(err, pebble.ErrNotFound) { - return &protobuff.ProcessedTickIntervalsPerEpoch{Intervals: make([]*protobuff.ProcessedTickInterval, 0), Epoch: epoch}, nil + for _, txId := range tickData.TransactionIds { + transaction, err := store.GetTransaction(txId) + if err != nil { + return nil, errors.Wrapf(err, "fetching transaction for tick %d", tickNumber) + } + transactions = append(transactions, transaction) } - return nil, errors.Wrap(err, "getting processed tick intervals per epoch from store") - } - defer closer.Close() + return transactions, nil - var ptie protobuff.ProcessedTickIntervalsPerEpoch - if err := proto.Unmarshal(value, &ptie); err != nil { - return nil, errors.Wrap(err, "unmarshalling processed tick intervals per epoch") } - return &ptie, nil + return nil, errors.Errorf("unable to find transactions for tick %d", tickNumber) } -func (s *PebbleStore) SetProcessedTickIntervalPerEpoch(ctx context.Context, epoch uint32, ptie *protobuff.ProcessedTickIntervalsPerEpoch) error { - key := processedTickIntervalsPerEpochKey(epoch) - serialized, err := proto.Marshal(ptie) - if err != nil { - return errors.Wrap(err, "serializing ptie proto") - } +func (s *PebbleStore) GetTransaction(txId string) (*protobuff.Transaction, error) { - err = s.db.Set(key, serialized, pebble.Sync) - if err != nil { - return errors.Wrap(err, "setting ptie") - } - - return nil -} - -func (s *PebbleStore) AppendProcessedTickInterval(ctx context.Context, epoch uint32, pti *protobuff.ProcessedTickInterval) error { - existing, err := s.getProcessedTickIntervalsPerEpoch(ctx, epoch) - if err != nil { - return errors.Wrap(err, "getting existing processed tick intervals") - } - - existing.Intervals = append(existing.Intervals, pti) - - err = s.SetProcessedTickIntervalPerEpoch(ctx, epoch, existing) - if err != nil { - return errors.Wrap(err, "setting ptie") - } - - return nil -} - -func (s *PebbleStore) GetProcessedTickIntervals(ctx context.Context) ([]*protobuff.ProcessedTickIntervalsPerEpoch, error) { - upperBound := append([]byte{ProcessedTickIntervals}, []byte(strconv.FormatUint(maxTickNumber, 10))...) - iter, err := s.db.NewIter(&pebble.IterOptions{ - LowerBound: []byte{ProcessedTickIntervals}, - UpperBound: upperBound, - }) - if err != nil { - return nil, errors.Wrap(err, "creating iter") - } - defer iter.Close() - - processedTickIntervals := make([]*protobuff.ProcessedTickIntervalsPerEpoch, 0) - for iter.First(); iter.Valid(); iter.Next() { - value, err := iter.ValueAndErr() + loadedEpochs := s.storeInfo.getLoadedEpochs() + for _, store := range loadedEpochs { + transaction, err := store.GetTransaction(txId) if err != nil { - return nil, errors.Wrap(err, "getting value from iter") + if errors.Is(err, ErrNotFound) { + continue + } + return nil, errors.Wrap(err, "finding transaction") } - var ptie protobuff.ProcessedTickIntervalsPerEpoch - err = proto.Unmarshal(value, &ptie) - if err != nil { - return nil, errors.Wrap(err, "unmarshalling iter ptie") - } - processedTickIntervals = append(processedTickIntervals, &ptie) + return transaction, nil } - return processedTickIntervals, nil + return nil, errors.Errorf("unable to find transaction %s", txId) } -func (s *PebbleStore) SetEmptyTicksForEpoch(epoch uint32, emptyTicksCount uint32) error { - key := emptyTicksPerEpochKey(epoch) - - value := make([]byte, 4) - binary.LittleEndian.PutUint32(value, emptyTicksCount) - - err := s.db.Set(key, value, pebble.Sync) - if err != nil { - return errors.Wrapf(err, "saving emptyTickCount for epoch %d", epoch) - } - return nil -} +func (s *PebbleStore) GetLastProcessedTicksPerEpoch() (map[uint32]uint32, error) { -func (s *PebbleStore) GetEmptyTicksForEpoch(epoch uint32) (uint32, error) { - key := emptyTicksPerEpochKey(epoch) + epochLastProcessedTickMap := make(map[uint32]uint32) - value, closer, err := s.db.Get(key) - if err != nil { - if errors.Is(err, pebble.ErrNotFound) { - return 0, err + loadedEpochs := s.storeInfo.getLoadedEpochs() + for _, epochStore := range loadedEpochs { + lastProcessedTick, err := epochStore.GetLastProcessedTick() + if err != nil { + return nil, errors.Wrapf(err, "getting last processed tick of epoch %d", epochStore.epoch) } - - return 0, errors.Wrapf(err, "getting emptyTickCount for epoch %d", epoch) + epochLastProcessedTickMap[epochStore.epoch] = lastProcessedTick } - defer closer.Close() - emptyTicksCount := binary.LittleEndian.Uint32(value) - - return emptyTicksCount, nil + return epochLastProcessedTickMap, nil } -func (s *PebbleStore) GetEmptyTicksForEpochs(firstEpoch, lastEpoch uint32) (map[uint32]uint32, error) { - - iter, err := s.db.NewIter(&pebble.IterOptions{ - LowerBound: emptyTicksPerEpochKey(firstEpoch), - UpperBound: emptyTicksPerEpochKey(lastEpoch + 1), // Increment as upper bound is exclusive - }) - if err != nil { - return nil, errors.Wrap(err, "creating iter") - } - defer iter.Close() +func (s *PebbleStore) GetProcessedTickIntervals() ([]*protobuff.ProcessedTickIntervalsPerEpoch, error) { - emptyTickMap := make(map[uint32]uint32) + var processedTickIntervalsPerEpoch []*protobuff.ProcessedTickIntervalsPerEpoch - for iter.First(); iter.Valid(); iter.Next() { - - value, err := iter.ValueAndErr() + loadedEpochs := s.storeInfo.getLoadedEpochs() + for _, epochStore := range loadedEpochs { + processedTickIntervals, err := epochStore.GetProcessedTickIntervals() if err != nil { - return nil, errors.Wrap(err, "getting value from iter") + return nil, errors.Wrapf(err, "getting processed tick intervals of epoch %d", epochStore.epoch) } - - key := iter.Key() - epochNumber := binary.BigEndian.Uint64(key[1:]) - emptyTicksCount := binary.LittleEndian.Uint32(value) - - emptyTickMap[uint32(epochNumber)] = emptyTicksCount + processedTickIntervalsPerEpoch = append(processedTickIntervalsPerEpoch, processedTickIntervals) } + slices.SortFunc(processedTickIntervalsPerEpoch, func(a, b *protobuff.ProcessedTickIntervalsPerEpoch) int { + return cmp.Compare(a.Epoch, b.Epoch) + }) + return processedTickIntervalsPerEpoch, nil - return emptyTickMap, nil } -func (s *PebbleStore) DeleteEmptyTicksKeyForEpoch(epoch uint32) error { - key := emptyTicksPerEpochKey(epoch) - - err := s.db.Delete(key, pebble.Sync) - if err != nil { - return errors.Wrapf(err, "deleting empty ticks key for epoch %d", epoch) - } - return nil +func (s *PebbleStore) Close() { + s.infoDb.Close() + s.storeInfo.closeStores() } -func (s *PebbleStore) SetLastTickQuorumDataPerEpochIntervals(epoch uint32, lastQuorumDataPerEpochIntervals *protobuff.LastTickQuorumDataPerEpochIntervals) error { - - key := lastTickQuorumDataPerEpochIntervalKey(epoch) - - value, err := proto.Marshal(lastQuorumDataPerEpochIntervals) - if err != nil { - return errors.Wrapf(err, "serializing last quorum data per epoch intervals for epoch %d", epoch) - } - - err = s.db.Set(key, value, pebble.Sync) - if err != nil { - return errors.Wrapf(err, "setting last quorum data per epoch intervals for epoch %d", epoch) - } - return nil +type stores struct { + currentEpochStore *EpochStore + loadedEpochs map[uint32]*EpochStore + mutex sync.RWMutex } -func (s *PebbleStore) GetLastTickQuorumDataListPerEpochInterval(epoch uint32) (*protobuff.LastTickQuorumDataPerEpochIntervals, error) { - key := lastTickQuorumDataPerEpochIntervalKey(epoch) - - value, closer, err := s.db.Get(key) - if err != nil { - if errors.Is(err, pebble.ErrNotFound) { - return &protobuff.LastTickQuorumDataPerEpochIntervals{ - QuorumDataPerInterval: make(map[int32]*protobuff.QuorumTickData), - }, nil - } - return nil, errors.Wrapf(err, "getting quorum data list for the intervals of epoch %d", epoch) - } - defer closer.Close() - - var lastQuorumDataPerEpochIntervals protobuff.LastTickQuorumDataPerEpochIntervals - err = proto.Unmarshal(value, &lastQuorumDataPerEpochIntervals) - if err != nil { - return nil, errors.Wrapf(err, "de-serializing last quorum data per epoch intervals for epoch %d", epoch) - } +func (s *stores) getCurrentEpochStore() *EpochStore { + s.mutex.RLock() + defer s.mutex.RUnlock() + return s.currentEpochStore - return &lastQuorumDataPerEpochIntervals, err } -func (s *PebbleStore) SetQuorumDataForCurrentEpochInterval(epoch uint32, quorumData *protobuff.QuorumTickData) error { - - processedIntervals, err := s.getProcessedTickIntervalsPerEpoch(nil, epoch) - if err != nil { - return errors.Wrapf(err, "getting processed tick intervals for epoch %d", epoch) - } - - intervalIndex := len(processedIntervals.Intervals) - 1 - if intervalIndex < 0 { - intervalIndex = 0 - } - - quorumDataPerIntervals, err := s.GetLastTickQuorumDataListPerEpochInterval(epoch) - if err != nil { - return errors.Wrap(err, "getting last quorum data list for epoch intervals") - } - - quorumDataPerIntervals.QuorumDataPerInterval[int32(intervalIndex)] = quorumData - - err = s.SetLastTickQuorumDataPerEpochIntervals(epoch, quorumDataPerIntervals) - if err != nil { - return errors.Wrap(err, "setting last quorum data list for epoch intervals") - } - - return nil +func (s *stores) setCurrentEpochStore(store *EpochStore) { + s.mutex.Lock() + defer s.mutex.Unlock() + s.currentEpochStore = store } -func (s *PebbleStore) SetEmptyTickListPerEpoch(epoch uint32, emptyTicks []uint32) error { - key := emptyTickListPerEpochKey(epoch) - - value := make([]byte, len(emptyTicks)*4) - for index, tickNumber := range emptyTicks { - binary.LittleEndian.PutUint32(value[index*4:index*4+4], tickNumber) - } - - err := s.db.Set(key, value, pebble.Sync) - if err != nil { - return errors.Wrapf(err, "saving empty tick list for epoch %d", epoch) - } - return nil +func (s *stores) getLoadedEpochs() map[uint32]*EpochStore { + s.mutex.RLock() + defer s.mutex.RUnlock() + return s.loadedEpochs } -func (s *PebbleStore) GetEmptyTickListPerEpoch(epoch uint32) ([]uint32, error) { - key := emptyTickListPerEpochKey(epoch) - - value, closer, err := s.db.Get(key) - if err != nil { - if errors.Is(err, pebble.ErrNotFound) { - return nil, err - } - - return nil, errors.Wrapf(err, "getting empty tick list for epoch %d", epoch) - } - defer closer.Close() - - if len(value)%4 != 0 { - return nil, errors.Errorf("corrupted empty tick list for epoch %d. array length mod 4 != 0. length: %d", epoch, len(value)) - } - - var emptyTicks []uint32 - - for index := 0; index < (len(value) / 4); index++ { - tickNumber := binary.LittleEndian.Uint32(value[index*4 : index*4+4]) - emptyTicks = append(emptyTicks, tickNumber) - } - - return emptyTicks, nil - +func (s *stores) setLoadedEpochs(loadedEpochs map[uint32]*EpochStore) { + s.mutex.Lock() + defer s.mutex.Unlock() + s.loadedEpochs = loadedEpochs } -func (s *PebbleStore) AppendEmptyTickToEmptyTickListPerEpoch(epoch uint32, tickNumber uint32) error { - - emptyTicks, err := s.GetEmptyTickListPerEpoch(epoch) - if err != nil { - return errors.Wrapf(err, "getting empty tick list for epoch %d", epoch) - } - - emptyTicks = append(emptyTicks, tickNumber) +func (s *stores) closeStores() error { + s.mutex.Lock() + defer s.mutex.Unlock() - err = s.SetEmptyTickListPerEpoch(epoch, emptyTicks) - if err != nil { - return errors.Wrapf(err, "saving appended empty tick list") + for _, epochStore := range s.loadedEpochs { + err := epochStore.pebbleDb.Close() + if err != nil { + return errors.Wrapf(err, "closing store for epoch %d", epochStore.epoch) + } } - return nil } -func (s *PebbleStore) DeleteEmptyTickListKeyForEpoch(epoch uint32) error { - key := emptyTickListPerEpochKey(epoch) - - err := s.db.Delete(key, pebble.Sync) - if err != nil { - return errors.Wrapf(err, "deleting empty tick list key for epoch %d", epoch) - } - return nil -} +const ( + relevantEpochsKey = 0xb3 +) diff --git a/store/store_paging_test.go b/store/store_paging_test.go.old similarity index 100% rename from store/store_paging_test.go rename to store/store_paging_test.go.old diff --git a/store/store_test.go b/store/store_test.go.old similarity index 100% rename from store/store_test.go rename to store/store_test.go.old diff --git a/validator/chain/chain.go b/validator/chain/chain.go deleted file mode 100644 index ff3625b..0000000 --- a/validator/chain/chain.go +++ /dev/null @@ -1,128 +0,0 @@ -package chain - -import ( - "context" - "github.com/pkg/errors" - "github.com/qubic/go-archiver/protobuff" - "github.com/qubic/go-archiver/store" - "github.com/qubic/go-node-connector/types" -) - -func ComputeAndSave(ctx context.Context, store *store.PebbleStore, initialEpochTick, tickNumber uint32, quorumVote types.QuorumTickVote) error { - prevDigest, err := getPrevChainDigest(ctx, store, initialEpochTick, tickNumber) - if err != nil { - return errors.Wrap(err, "getting prev chain digest") - } - - currentDigest, err := computeCurrentTickDigest(ctx, quorumVote, prevDigest) - if err != nil { - return errors.Wrap(err, "computing current tick digest") - } - - err = store.PutChainDigest(ctx, tickNumber, currentDigest[:]) - if err != nil { - return errors.Wrapf(err, "storing chain digest for tick: %d\n", tickNumber) - } - - return nil -} - -func ComputeStoreAndSave(ctx context.Context, store *store.PebbleStore, initialEpochTick, tickNumber uint32, validTxs []types.Transaction, tickTxsStatus *protobuff.TickTransactionsStatus) error { - if tickNumber < 13752150 { - return nil - } - - prevDigest, err := getPrevStoreDigest(ctx, store, initialEpochTick, tickNumber) - if err != nil { - return errors.Wrap(err, "getting prev chain digest") - } - - currentDigest, err := computeCurrentTickStoreDigest(ctx, validTxs, tickTxsStatus, prevDigest) - if err != nil { - return errors.Wrap(err, "computing current tick digest") - } - - err = store.PutStoreDigest(ctx, tickNumber, currentDigest[:]) - if err != nil { - return errors.Wrapf(err, "storing store digest for tick: %d\n", tickNumber) - } - - return nil -} - -func getPrevStoreDigest(ctx context.Context, store *store.PebbleStore, initialEpochTick, tickNumber uint32) ([32]byte, error) { - if tickNumber == initialEpochTick { - return [32]byte{}, nil - } - - previousTickStoreDigestStored, err := store.GetStoreDigest(ctx, tickNumber-1) - if err != nil { - //returning nil error in order to not fail until epoch change - return [32]byte{}, nil - //return [32]byte{}, errors.Wrapf(err, "getting chain digest for last tick: %d\n", tickNumber-1) - } - - var previousTickStoreDigest [32]byte - copy(previousTickStoreDigest[:], previousTickStoreDigestStored) - - return previousTickStoreDigest, nil -} - -func getPrevChainDigest(ctx context.Context, store *store.PebbleStore, initialEpochTick, tickNumber uint32) ([32]byte, error) { - // if this is the first tick, there is no previous chain digest, so we are using an empty one - if tickNumber == initialEpochTick { - return [32]byte{}, nil - } - - previousTickChainDigestStored, err := store.GetChainDigest(ctx, tickNumber-1) - if err != nil { - return [32]byte{}, errors.Wrapf(err, "getting chain digest for last tick: %d\n", tickNumber-1) - } - - var previousTickChainDigest [32]byte - copy(previousTickChainDigest[:], previousTickChainDigestStored) - - return previousTickChainDigest, nil -} - -func computeCurrentTickDigest(ctx context.Context, vote types.QuorumTickVote, previousTickChainDigest [32]byte) ([32]byte, error) { - chain := Chain{ - Epoch: vote.Epoch, - Tick: vote.Tick, - Millisecond: vote.Millisecond, - Second: vote.Second, - Minute: vote.Minute, - Hour: vote.Hour, - Day: vote.Day, - Month: vote.Month, - Year: vote.Year, - PreviousResourceTestingDigest: vote.PreviousResourceTestingDigest, - PreviousTransactionBodyDigest: vote.PreviousTransactionBodyDigest, - PreviousSpectrumDigest: vote.PreviousSpectrumDigest, - PreviousUniverseDigest: vote.PreviousUniverseDigest, - PreviousComputerDigest: vote.PreviousComputerDigest, - TxDigest: vote.TxDigest, - PreviousTickChainDigest: previousTickChainDigest, - } - - digest, err := chain.Digest() - if err != nil { - return [32]byte{}, errors.Wrap(err, "computing chain digest") - } - return digest, nil -} - -func computeCurrentTickStoreDigest(ctx context.Context, validTxs []types.Transaction, tickTxsStatus *protobuff.TickTransactionsStatus, previousTickChainDigest [32]byte) ([32]byte, error) { - s := Store{ - PreviousTickStoreDigest: previousTickChainDigest, - ValidTxs: validTxs, - TickTxsStatus: tickTxsStatus, - } - - digest, err := s.Digest() - if err != nil { - return [32]byte{}, errors.Wrap(err, "computing store digest") - } - - return digest, nil -} diff --git a/validator/chain/chain_test.go b/validator/chain/chain_test.go deleted file mode 100644 index af2f2c5..0000000 --- a/validator/chain/chain_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package chain - -import ( - "github.com/stretchr/testify/require" - "testing" -) - -func TestChain_Digest(t *testing.T) { - chain := Chain{ - Epoch: 20, - Tick: 30, - Millisecond: 40, - Second: 50, - Minute: 60, - Hour: 70, - Day: 80, - Month: 90, - Year: 10, - PreviousResourceTestingDigest: 13423432, - PreviousTransactionBodyDigest: 23432431, - PreviousSpectrumDigest: [32]byte{}, - PreviousUniverseDigest: [32]byte{}, - PreviousComputerDigest: [32]byte{}, - TxDigest: [32]byte{}, - } - _, err := chain.Digest() - require.NoError(t, err) -} - -func TestChain_MarshallBinary(t *testing.T) { - chain := Chain{ - Epoch: 20, - Tick: 30, - Millisecond: 40, - Second: 50, - Minute: 60, - Hour: 70, - Day: 80, - Month: 90, - Year: 10, - PreviousResourceTestingDigest: 13423432, - PreviousTransactionBodyDigest: 23432431, - PreviousSpectrumDigest: [32]byte{}, - PreviousUniverseDigest: [32]byte{}, - PreviousComputerDigest: [32]byte{}, - TxDigest: [32]byte{}, - } - b, err := chain.MarshallBinary() - require.NoError(t, err) - require.Equal(t, len(b), 184) -} diff --git a/validator/chain/models.go b/validator/chain/models.go deleted file mode 100644 index b427db5..0000000 --- a/validator/chain/models.go +++ /dev/null @@ -1,120 +0,0 @@ -package chain - -import ( - "bytes" - "github.com/pkg/errors" - "github.com/qubic/go-archiver/protobuff" - "github.com/qubic/go-archiver/utils" - "github.com/qubic/go-node-connector/types" - "google.golang.org/protobuf/proto" - "sort" -) - -type Chain struct { - _ uint16 //padding - Epoch uint16 - Tick uint32 - Millisecond uint16 - Second uint8 - Minute uint8 - Hour uint8 - Day uint8 - Month uint8 - Year uint8 - PreviousResourceTestingDigest uint32 - PreviousTransactionBodyDigest uint32 - PreviousSpectrumDigest [32]byte - PreviousUniverseDigest [32]byte - PreviousComputerDigest [32]byte - TxDigest [32]byte - - PreviousTickChainDigest [32]byte -} - -func (c *Chain) Digest() ([32]byte, error) { - b, err := c.MarshallBinary() - if err != nil { - return [32]byte{}, errors.Wrap(err, "serializing vote") - } - - digest, err := utils.K12Hash(b) - if err != nil { - return [32]byte{}, errors.Wrap(err, "hashing vote") - } - - return digest, nil -} - -func (c *Chain) MarshallBinary() ([]byte, error) { - b, err := utils.BinarySerialize(c) - if err != nil { - return nil, errors.Wrap(err, "serializing vote") - } - - return b, nil -} - -type Store struct { - PreviousTickStoreDigest [32]byte - ValidTxs []types.Transaction - TickTxsStatus *protobuff.TickTransactionsStatus -} - -func (s *Store) MarshallBinary() ([]byte, error) { - var buff bytes.Buffer - _, err := buff.Write(s.PreviousTickStoreDigest[:]) - if err != nil { - return nil, errors.Wrap(err, "writing previousTickStoreDigest") - } - - digests := make([][32]byte, 0, len(s.ValidTxs)) - for _, tx := range s.ValidTxs { - digest, err := tx.Digest() - if err != nil { - return nil, errors.Wrap(err, "marshalling tx") - } - - digests = append(digests, digest) - } - - sortByteSlices(digests) - - for _, digest := range digests { - _, err = buff.Write(digest[:]) - if err != nil { - return nil, errors.Wrap(err, "writing digest") - } - } - - b, err := proto.Marshal(s.TickTxsStatus) - if err != nil { - return nil, errors.Wrap(err, "marshalling tickTxsStatus") - } - - _, err = buff.Write(b) - if err != nil { - return nil, errors.Wrap(err, "writing tickTxsStatus") - } - - return buff.Bytes(), nil -} - -func (s *Store) Digest() ([32]byte, error) { - b, err := s.MarshallBinary() - if err != nil { - return [32]byte{}, errors.Wrap(err, "serializing store") - } - - digest, err := utils.K12Hash(b) - if err != nil { - return [32]byte{}, errors.Wrap(err, "hashing vote") - } - - return digest, nil -} - -func sortByteSlices(slice [][32]byte) { - sort.SliceStable(slice, func(i, j int) bool { - return bytes.Compare(slice[i][:], slice[j][:]) == -1 - }) -} diff --git a/validator/computors/validator.go b/validator/computors/validator.go index e05071a..1bc18e9 100644 --- a/validator/computors/validator.go +++ b/validator/computors/validator.go @@ -44,7 +44,7 @@ func Store(ctx context.Context, store *store.PebbleStore, epoch uint16, computor return errors.Wrap(err, "qubic to proto") } - err = store.SetComputors(ctx, uint32(epoch), protoModel) + err = store.GetCurrentEpochStore().SetComputorList(protoModel) if err != nil { return errors.Wrap(err, "set computors") } @@ -53,7 +53,7 @@ func Store(ctx context.Context, store *store.PebbleStore, epoch uint16, computor } func Get(ctx context.Context, store *store.PebbleStore, epoch uint32) (types.Computors, error) { - protoModel, err := store.GetComputors(ctx, epoch) + protoModel, err := store.GetCurrentEpochStore().GetComputorList() if err != nil { return types.Computors{}, errors.Wrap(err, "get computors") } diff --git a/validator/quorum/validator.go b/validator/quorum/validator.go index 9bfdc02..0b89f0e 100644 --- a/validator/quorum/validator.go +++ b/validator/quorum/validator.go @@ -3,7 +3,6 @@ package quorum import ( "context" "github.com/pkg/errors" - "github.com/qubic/go-archiver/store" "github.com/qubic/go-archiver/utils" "github.com/qubic/go-node-connector/types" "log" @@ -155,6 +154,7 @@ func getDigestFromQuorumTickData(data types.QuorumTickVote) ([32]byte, error) { return digest, nil } +/* func Store(ctx context.Context, store *store.PebbleStore, tickNumber uint32, quorumVotes types.QuorumVotes) error { protoModel := qubicToProtoStored(quorumVotes) @@ -171,4 +171,4 @@ func Store(ctx context.Context, store *store.PebbleStore, tickNumber uint32, quo } return nil -} +}*/ diff --git a/validator/tick/empty_tick.go b/validator/tick/empty_tick.go index 1114237..4af1413 100644 --- a/validator/tick/empty_tick.go +++ b/validator/tick/empty_tick.go @@ -1,20 +1,13 @@ package tick import ( - "context" - "fmt" - "github.com/cockroachdb/pebble" - "github.com/pkg/errors" "github.com/qubic/go-archiver/protobuff" - "github.com/qubic/go-archiver/store" - "github.com/qubic/go-node-connector/types" "google.golang.org/protobuf/proto" - "time" ) var emptyTickData = &protobuff.TickData{} -func CalculateEmptyTicksForEpoch(ctx context.Context, ps *store.PebbleStore, epoch uint32) ([]uint32, error) { +/*func CalculateEmptyTicksForEpoch(ctx context.Context, ps *store.PebbleStore, epoch uint32) ([]uint32, error) { epochs, err := ps.GetProcessedTickIntervals(ctx) if err != nil { @@ -48,7 +41,7 @@ func CalculateEmptyTicksForEpoch(ctx context.Context, ps *store.PebbleStore, epo return emptyTicks, err } return make([]uint32, 0), nil -} +}*/ func CheckIfTickIsEmptyProto(tickData *protobuff.TickData) bool { @@ -59,7 +52,7 @@ func CheckIfTickIsEmptyProto(tickData *protobuff.TickData) bool { return false } -func CheckIfTickIsEmpty(tickData types.TickData) (bool, error) { +/*func CheckIfTickIsEmpty(tickData types.TickData) (bool, error) { data, err := qubicToProto(tickData) if err != nil { return false, errors.Wrap(err, "converting tick data to protobuf format") @@ -129,4 +122,4 @@ func ResetEmptyTicksForAllEpochs(ps *store.PebbleStore) error { } return nil -} +}*/ diff --git a/validator/tick/validator.go b/validator/tick/validator.go index cabae88..c2da015 100644 --- a/validator/tick/validator.go +++ b/validator/tick/validator.go @@ -90,7 +90,7 @@ func Store(ctx context.Context, store *store.PebbleStore, tickNumber uint32, tic return errors.Wrap(err, "converting qubic tick data to proto") } - err = store.SetTickData(ctx, tickNumber, protoTickData) + err = store.GetCurrentEpochStore().SetTickData(tickNumber, protoTickData) if err != nil { return errors.Wrap(err, "set tick data") } diff --git a/validator/tx/validator.go b/validator/tx/validator.go index 4d832b4..f2cb9c9 100644 --- a/validator/tx/validator.go +++ b/validator/tx/validator.go @@ -4,7 +4,6 @@ import ( "context" "encoding/hex" "github.com/pkg/errors" - "github.com/qubic/go-archiver/protobuff" "github.com/qubic/go-archiver/store" "github.com/qubic/go-archiver/utils" "github.com/qubic/go-node-connector/types" @@ -105,12 +104,6 @@ func Store(ctx context.Context, store *store.PebbleStore, tickNumber uint32, tra if err != nil { return errors.Wrap(err, "storing tick transactions") } - - err = storeTransferTransactions(ctx, store, tickNumber, transactions) - if err != nil { - return errors.Wrap(err, "storing transfer transactions") - } - return nil } @@ -120,68 +113,10 @@ func storeTickTransactions(ctx context.Context, store *store.PebbleStore, transa return errors.Wrap(err, "converting to proto") } - err = store.SetTransactions(ctx, protoModel) + err = store.GetCurrentEpochStore().SetTransactions(protoModel) if err != nil { return errors.Wrap(err, "storing tick transactions") } return nil } - -func storeTransferTransactions(ctx context.Context, store *store.PebbleStore, tickNumber uint32, transactions types.Transactions) error { - transferTransactions, err := removeNonTransferTransactionsAndConvert(transactions) - if err != nil { - return errors.Wrap(err, "removing non transfer transactions") - } - txsPerIdentity, err := createTransferTransactionsIdentityMap(ctx, transferTransactions) - if err != nil { - return errors.Wrap(err, "filtering transfer transactions") - } - - for id, txs := range txsPerIdentity { - err = store.PutTransferTransactionsPerTick(ctx, id, tickNumber, &protobuff.TransferTransactionsPerTick{TickNumber: uint32(tickNumber), Identity: id, Transactions: txs}) - if err != nil { - return errors.Wrap(err, "storing transfer transactions") - } - } - - return nil -} - -func removeNonTransferTransactionsAndConvert(transactions []types.Transaction) ([]*protobuff.Transaction, error) { - transferTransactions := make([]*protobuff.Transaction, 0) - for _, tx := range transactions { - if tx.Amount == 0 { - continue - } - - protoTx, err := txToProto(tx) - if err != nil { - return nil, errors.Wrap(err, "converting to proto") - } - - transferTransactions = append(transferTransactions, protoTx) - } - - return transferTransactions, nil -} - -func createTransferTransactionsIdentityMap(ctx context.Context, txs []*protobuff.Transaction) (map[string][]*protobuff.Transaction, error) { - txsPerIdentity := make(map[string][]*protobuff.Transaction) - for _, tx := range txs { - _, ok := txsPerIdentity[tx.DestId] - if !ok { - txsPerIdentity[tx.DestId] = make([]*protobuff.Transaction, 0) - } - - _, ok = txsPerIdentity[tx.SourceId] - if !ok { - txsPerIdentity[tx.SourceId] = make([]*protobuff.Transaction, 0) - } - - txsPerIdentity[tx.DestId] = append(txsPerIdentity[tx.DestId], tx) - txsPerIdentity[tx.SourceId] = append(txsPerIdentity[tx.SourceId], tx) - } - - return txsPerIdentity, nil -} diff --git a/validator/tx/validator_test.go b/validator/tx/validator_test.go index 9abe0c3..e4904cd 100644 --- a/validator/tx/validator_test.go +++ b/validator/tx/validator_test.go @@ -2,10 +2,6 @@ package tx import ( "context" - "github.com/cockroachdb/pebble" - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "github.com/qubic/go-archiver/protobuff" "github.com/qubic/go-archiver/store" "github.com/qubic/go-node-connector/types" "github.com/stretchr/testify/require" @@ -15,7 +11,7 @@ import ( "testing" ) -func Test_CreateTransferTransactionsIdentityMap(t *testing.T) { +/*func Test_CreateTransferTransactionsIdentityMap(t *testing.T) { txs := []*protobuff.Transaction{ { SourceId: "QJRRSSKMJRDKUDTYVNYGAMQPULKAMILQQYOWBEXUDEUWQUMNGDHQYLOAJMEB", @@ -84,7 +80,7 @@ func Test_CreateTransferTransactionsIdentityMap(t *testing.T) { require.NoError(t, err) diff := cmp.Diff(got, expected, cmpopts.IgnoreUnexported(protobuff.Transaction{})) require.Empty(t, diff) -} +}*/ func TestStore(t *testing.T) { ctx := context.Background() @@ -94,12 +90,14 @@ func TestStore(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dbDir) - db, err := pebble.Open(filepath.Join(dbDir, "testdb"), &pebble.Options{}) - require.NoError(t, err) - defer db.Close() + testPath := filepath.Join(dbDir, "testdb") logger, _ := zap.NewDevelopment() - s := store.NewPebbleStore(db, logger) + s, err := store.NewPebbleStore(testPath, logger, 10) + require.NoError(t, err) + + err = s.HandleEpochTransition(1) + require.NoError(t, err) firstTick := []types.Transaction{ { @@ -141,7 +139,7 @@ func TestStore(t *testing.T) { err = Store(ctx, s, 1, firstTick) require.NoError(t, err) - expectedFirstTickFirstID := &protobuff.TransferTransactionsPerTick{ + /*expectedFirstTickFirstID := &protobuff.TransferTransactionsPerTick{ TickNumber: 1, Identity: "QJRRSSKMJRDKUDTYVNYGAMQPULKAMILQQYOWBEXUDEUWQUMNGDHQYLOAJMEB", Transactions: []*protobuff.Transaction{ @@ -177,9 +175,9 @@ func TestStore(t *testing.T) { SignatureHex: "00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", }, }, - } + }*/ - got, err := s.GetTransactionsForEntity(ctx, "QJRRSSKMJRDKUDTYVNYGAMQPULKAMILQQYOWBEXUDEUWQUMNGDHQYLOAJMEB", 1, 1) + /*got, err := s.GetTransactionsForEntity(ctx, "QJRRSSKMJRDKUDTYVNYGAMQPULKAMILQQYOWBEXUDEUWQUMNGDHQYLOAJMEB", 1, 1) require.NoError(t, err) diff := cmp.Diff(got, []*protobuff.TransferTransactionsPerTick{expectedFirstTickFirstID}, cmpopts.IgnoreFields(protobuff.Transaction{}, "TxId"), cmpopts.IgnoreUnexported(protobuff.TransferTransactionsPerTick{}, protobuff.Transaction{})) require.Empty(t, diff) @@ -187,12 +185,12 @@ func TestStore(t *testing.T) { got, err = s.GetTransactionsForEntity(ctx, "IXTSDANOXIVIWGNDCNZVWSAVAEPBGLGSQTLSVHHBWEGKSEKPRQGWIJJCTUZB", 1, 1) require.NoError(t, err) diff = cmp.Diff(got, []*protobuff.TransferTransactionsPerTick{expectedFirstTickSecondID}, cmpopts.IgnoreFields(protobuff.Transaction{}, "TxId"), cmpopts.IgnoreUnexported(protobuff.TransferTransactionsPerTick{}, protobuff.Transaction{})) - require.Empty(t, diff) + require.Empty(t, diff)*/ err = Store(ctx, s, 2, secondTick) require.NoError(t, err) - expectedSecondTickFirstID := &protobuff.TransferTransactionsPerTick{ + /*expectedSecondTickFirstID := &protobuff.TransferTransactionsPerTick{ TickNumber: 2, Identity: "QJRRSSKMJRDKUDTYVNYGAMQPULKAMILQQYOWBEXUDEUWQUMNGDHQYLOAJMEB", Transactions: []*protobuff.Transaction{ @@ -209,9 +207,9 @@ func TestStore(t *testing.T) { SignatureHex: "00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", }, }, - } + }*/ - expectedSecondTickSecondID := &protobuff.TransferTransactionsPerTick{ + /*expectedSecondTickSecondID := &protobuff.TransferTransactionsPerTick{ TickNumber: 2, Identity: "IXTSDANOXIVIWGNDCNZVWSAVAEPBGLGSQTLSVHHBWEGKSEKPRQGWIJJCTUZB", Transactions: []*protobuff.Transaction{ @@ -228,9 +226,9 @@ func TestStore(t *testing.T) { SignatureHex: "00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", }, }, - } + }*/ - got, err = s.GetTransactionsForEntity(ctx, "QJRRSSKMJRDKUDTYVNYGAMQPULKAMILQQYOWBEXUDEUWQUMNGDHQYLOAJMEB", 2, 2) + /*got, err = s.GetTransactionsForEntity(ctx, "QJRRSSKMJRDKUDTYVNYGAMQPULKAMILQQYOWBEXUDEUWQUMNGDHQYLOAJMEB", 2, 2) require.NoError(t, err) diff = cmp.Diff(got, []*protobuff.TransferTransactionsPerTick{expectedSecondTickFirstID}, cmpopts.IgnoreFields(protobuff.Transaction{}, "TxId"), cmpopts.IgnoreUnexported(protobuff.TransferTransactionsPerTick{}, protobuff.Transaction{})) require.Empty(t, diff) @@ -250,7 +248,7 @@ func TestStore(t *testing.T) { gotCombined, err = s.GetTransactionsForEntity(ctx, "IXTSDANOXIVIWGNDCNZVWSAVAEPBGLGSQTLSVHHBWEGKSEKPRQGWIJJCTUZB", 1, 2) require.NoError(t, err) diff = cmp.Diff(gotCombined, expectedCombined, cmpopts.IgnoreFields(protobuff.Transaction{}, "TxId"), cmpopts.IgnoreUnexported(protobuff.TransferTransactionsPerTick{}, protobuff.Transaction{})) - require.Empty(t, diff) + require.Empty(t, diff)*/ } func identityToPubkeyNoError(id string) [32]byte { diff --git a/validator/txstatus/models.go b/validator/txstatus/models.go deleted file mode 100644 index 9c5e843..0000000 --- a/validator/txstatus/models.go +++ /dev/null @@ -1,75 +0,0 @@ -package txstatus - -import ( - "github.com/pkg/errors" - "github.com/qubic/go-archiver/protobuff" - "github.com/qubic/go-node-connector/types" - "log" -) - -func qubicToProto(txs types.Transactions, model types.TransactionStatus) (*protobuff.TickTransactionsStatus, error) { - tickTransactions := make([]*protobuff.TransactionStatus, 0, model.TxCount) - txsIdMap, err := createTxsIdMap(txs) - if err != nil { - return nil, errors.Wrap(err, "error creating txs id map") - } - - for index, txDigest := range model.TransactionDigests { - var id types.Identity - id, err := id.FromPubKey(txDigest, true) - if err != nil { - return nil, errors.Wrap(err, "converting digest to id") - } - if _, ok := txsIdMap[id.String()]; !ok { - log.Printf("Skipping tx status with id: %s\n", id.String()) - continue - } - - moneyFlew := getMoneyFlewFromBits(model.MoneyFlew, index) - - tx := &protobuff.TransactionStatus{ - TxId: id.String(), - MoneyFlew: moneyFlew, - } - - tickTransactions = append(tickTransactions, tx) - } - - return &protobuff.TickTransactionsStatus{Transactions: tickTransactions}, nil -} - -func createTxsIdMap(txs types.Transactions) (map[string]struct{}, error) { - txsIdMap := make(map[string]struct{}, len(txs)) - for _, tx := range txs { - digest, err := tx.Digest() - if err != nil { - return nil, errors.Wrapf(err, "creating tx digest for tx with src pubkey %s", tx.SourcePublicKey) - } - - id, err := tx.ID() - if err != nil { - return nil, errors.Wrapf(err, "converting tx with digest %s to id", digest) - } - - txsIdMap[id] = struct{}{} - } - - return txsIdMap, nil -} - -func getMoneyFlewFromBits(input [(types.NumberOfTransactionsPerTick + 7) / 8]byte, digestIndex int) bool { - pos := digestIndex / 8 - bitIndex := digestIndex % 8 - - return getNthBit(input[pos], bitIndex) -} - -func getNthBit(input byte, bitIndex int) bool { - // Shift the input byte to the right by the bitIndex positions - // This isolates the bit at the bitIndex position at the least significant bit position - shifted := input >> bitIndex - - // Extract the least significant bit using a bitwise AND operation with 1 - // If the least significant bit is 1, the result will be 1; otherwise, it will be 0 - return shifted&1 == 1 -} diff --git a/validator/txstatus/models_test.go b/validator/txstatus/models_test.go deleted file mode 100644 index 7dd5c6f..0000000 --- a/validator/txstatus/models_test.go +++ /dev/null @@ -1,73 +0,0 @@ -package txstatus - -import ( - "github.com/qubic/go-node-connector/types" - "log" - "testing" -) - -func TestQubicToProto(t *testing.T) { - tickTransactionStatus := types.TransactionStatus{ - CurrentTickOfNode: 100, - Tick: 100, - TxCount: 3, - MoneyFlew: [128]byte{0b10001000, 0b00000001}, - TransactionDigests: [][32]byte{ - [32]byte{209, 173, 239, 194, 151, 98, 29, 180, 83, 67, 142, 32, 4, 9, 167, 32, 159, 95, 116, 116, 214, 221, 171, 255, 13, 125, 86, 112, 5, 31, 191, 193}, - [32]byte{230, 252, 58, 173, 75, 89, 77, 130, 191, 49, 3, 161, 16, 22, 216, 13, 232, 131, 222, 135, 59, 206, 196, 142, 144, 57, 98, 134, 80, 59, 38, 19}, - [32]byte{230, 252, 58, 173, 75, 89, 77, 130, 191, 49, 3, 161, 16, 22, 216, 13, 232, 131, 222, 135, 59, 206, 196, 142, 144, 57, 98, 134, 80, 59, 38, 21}, - [32]byte{230, 252, 58, 173, 75, 89, 77, 130, 191, 49, 3, 161, 16, 22, 216, 13, 232, 131, 222, 135, 59, 206, 196, 142, 144, 57, 98, 134, 80, 59, 38, 19}, - [32]byte{209, 173, 239, 194, 151, 98, 29, 180, 83, 67, 142, 32, 4, 9, 167, 32, 159, 95, 116, 116, 214, 221, 171, 255, 13, 125, 86, 112, 5, 31, 191, 193}, - [32]byte{230, 252, 58, 173, 75, 89, 77, 130, 191, 49, 3, 161, 16, 22, 216, 13, 232, 131, 222, 135, 59, 206, 196, 142, 144, 57, 98, 134, 80, 59, 38, 19}, - [32]byte{230, 252, 58, 173, 75, 89, 77, 130, 191, 49, 3, 161, 16, 22, 216, 13, 232, 131, 222, 135, 59, 206, 196, 142, 144, 57, 98, 134, 80, 59, 38, 21}, - [32]byte{230, 252, 58, 173, 75, 89, 77, 130, 191, 49, 3, 161, 16, 22, 216, 13, 232, 131, 222, 135, 59, 206, 196, 142, 144, 57, 98, 134, 80, 59, 38, 21}, - [32]byte{230, 252, 58, 173, 75, 89, 77, 130, 191, 49, 3, 161, 16, 22, 216, 13, 232, 131, 222, 135, 59, 206, 196, 142, 144, 57, 98, 134, 80, 59, 38, 19}, - }, - } - - res, err := qubicToProto(types.Transactions{}, tickTransactionStatus) - if err != nil { - t.Fatalf("Got err when converting qubic to proto. err: %s", err.Error()) - } - - log.Println(res) -} - -func TestEqualSlices(t *testing.T) { - slice1 := [][32]byte{ - [32]byte{21, 22, 23, 24, 25, 26, 27, 28, 29, 30}, - [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, - [32]byte{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, - } - - slice2 := [][32]byte{ - [32]byte{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, - [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, - [32]byte{21, 22, 23, 24, 25, 26, 27, 28, 29, 30}, - } - - if !equalDigests(slice1, slice2) { - t.Fatalf("Slices are not equal") - } - - // check if the original slices were not changed - if slice1[0] != [32]byte{21, 22, 23, 24, 25, 26, 27, 28, 29, 30} { - t.Fatalf("Original slice was changed") - } - if slice2[0] != [32]byte{11, 12, 13, 14, 15, 16, 17, 18, 19, 20} { - t.Fatalf("Original slice was changed") - } - if slice1[1] != [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} { - t.Fatalf("Original slice was changed") - } - if slice2[1] != [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} { - t.Fatalf("Original slice was changed") - } - if slice1[2] != [32]byte{11, 12, 13, 14, 15, 16, 17, 18, 19, 20} { - t.Fatalf("Original slice was changed") - } - if slice2[2] != [32]byte{21, 22, 23, 24, 25, 26, 27, 28, 29, 30} { - t.Fatalf("Original slice was changed") - } - -} diff --git a/validator/txstatus/validator.go b/validator/txstatus/validator.go deleted file mode 100644 index 9eb8f77..0000000 --- a/validator/txstatus/validator.go +++ /dev/null @@ -1,87 +0,0 @@ -package txstatus - -import ( - "bytes" - "context" - "github.com/pkg/errors" - "github.com/qubic/go-archiver/protobuff" - "github.com/qubic/go-archiver/store" - "github.com/qubic/go-node-connector/types" - "sort" -) - -func Validate(ctx context.Context, tickTxStatus types.TransactionStatus, tickTxs types.Transactions) (*protobuff.TickTransactionsStatus, error) { - //if tickTxStatus.TxCount != uint32(len(tickTxs)) { - // return nil, errors.Errorf("Mismatched tx length. Tick tx status count: %d - len(tickTx): %d", tickTxStatus.TxCount, len(tickTxs)) - //} - - //tickTxDigests, err := getTickTxDigests(tickTxs) - //if err != nil { - // return nil, errors.Wrap(err, "getting tick tx digests") - //} - - //if !equalDigests(tickTxDigests, tickTxStatus.TransactionDigests) { - // return nil, errors.New("digests not equal") - //} - - proto, err := qubicToProto(tickTxs, tickTxStatus) - if err != nil { - return nil, errors.Wrap(err, "qubic to proto") - } - - return proto, nil -} - -func getTickTxDigests(tickTxs types.Transactions) ([][32]byte, error) { - tickTxDigests := make([][32]byte, 0, len(tickTxs)) - for index, tx := range tickTxs { - digest, err := tx.Digest() - if err != nil { - return nil, errors.Wrapf(err, "creating digest for tx on index %d", index) - } - - tickTxDigests = append(tickTxDigests, digest) - } - - return tickTxDigests, nil -} - -func copySlice(slice [][32]byte) [][32]byte { - newSlice := make([][32]byte, len(slice)) - copy(newSlice, slice) - return newSlice -} - -func equalDigests(tickTxsDigests [][32]byte, tickTxStatusDigests [][32]byte) bool { - //copy slices to avoid modifying the original slices - copyTxsDigests := copySlice(tickTxsDigests) - copyTxStatusDigests := copySlice(tickTxStatusDigests) - - // Sort the slices - sortByteSlices(copyTxsDigests) - sortByteSlices(copyTxStatusDigests) - - // Compare the sorted slices element by element - for i := range copyTxsDigests { - if !bytes.Equal(copyTxsDigests[i][:], copyTxStatusDigests[i][:]) { - return false - } - } - - return true -} - -func sortByteSlices(slice [][32]byte) { - sort.Slice(slice, func(i, j int) bool { - return bytes.Compare(slice[i][:], slice[j][:]) == -1 - }) -} - -func Store(ctx context.Context, store *store.PebbleStore, tickNumber uint32, approvedTxs *protobuff.TickTransactionsStatus) error { - err := store.SetTickTransactionsStatus(ctx, uint64(tickNumber), approvedTxs) - if err != nil { - return errors.Wrap(err, "setting tts") - } - - return nil -} diff --git a/validator/validator.go b/validator/validator.go index afb3a43..e3a2afa 100644 --- a/validator/validator.go +++ b/validator/validator.go @@ -4,17 +4,13 @@ import ( "context" "encoding/base64" "encoding/json" - "fmt" - "github.com/cockroachdb/pebble" "github.com/pkg/errors" "github.com/qubic/go-archiver/protobuff" "github.com/qubic/go-archiver/store" - "github.com/qubic/go-archiver/validator/chain" "github.com/qubic/go-archiver/validator/computors" "github.com/qubic/go-archiver/validator/quorum" "github.com/qubic/go-archiver/validator/tick" "github.com/qubic/go-archiver/validator/tx" - "github.com/qubic/go-archiver/validator/txstatus" qubic "github.com/qubic/go-node-connector" "github.com/qubic/go-node-connector/types" "github.com/qubic/go-schnorrq" @@ -85,7 +81,6 @@ func (v *Validator) ValidateTick(ctx context.Context, initialEpochTick, tickNumb var tickData types.TickData var validTxs = make([]types.Transaction, 0) - approvedTxs := &protobuff.TickTransactionsStatus{} if quorumVotes[0].TxDigest != [32]byte{} { td, err := v.qu.GetTickData(ctx, tickNumber) @@ -117,34 +112,13 @@ func (v *Validator) ValidateTick(ctx context.Context, initialEpochTick, tickNumb log.Printf("Validated %d transactions\n", len(validTxs)) - var tickTxStatus types.TransactionStatus - - if disableStatusAddon { - tickTxStatus = types.TransactionStatus{ - CurrentTickOfNode: tickNumber, - Tick: tickNumber, - TxCount: uint32(len(validTxs)), - MoneyFlew: [128]byte{}, - TransactionDigests: nil, - } - } else { - tickTxStatus, err = v.qu.GetTxStatus(ctx, tickNumber) - if err != nil { - return errors.Wrap(err, "getting tx status") - } - } - - approvedTxs, err = txstatus.Validate(ctx, tickTxStatus, validTxs) - if err != nil { - return errors.Wrap(err, "validating tx status") - } } // proceed to storing tick information - err = quorum.Store(ctx, v.store, tickNumber, alignedVotes) - if err != nil { - return errors.Wrap(err, "storing quorum votes") - } + /* err = quorum.Store(ctx, v.store, tickNumber, alignedVotes) + if err != nil { + return errors.Wrap(err, "storing quorum votes") + }*/ log.Printf("Stored %d quorum votes\n", len(alignedVotes)) @@ -162,55 +136,6 @@ func (v *Validator) ValidateTick(ctx context.Context, initialEpochTick, tickNumb log.Printf("Stored %d transactions\n", len(validTxs)) - err = txstatus.Store(ctx, v.store, tickNumber, approvedTxs) - if err != nil { - return errors.Wrap(err, "storing tx status") - } - - err = chain.ComputeAndSave(ctx, v.store, initialEpochTick, tickNumber, alignedVotes[0]) - if err != nil { - return errors.Wrap(err, "computing and saving chain digest") - } - - err = chain.ComputeStoreAndSave(ctx, v.store, initialEpochTick, tickNumber, validTxs, approvedTxs) - if err != nil { - return errors.Wrap(err, "computing and saving store digest") - } - - isEmpty, err := tick.CheckIfTickIsEmpty(tickData) - if err != nil { - return errors.Wrap(err, "checking if tick is empty") - } - - if isEmpty { - emptyTicks, err := v.store.GetEmptyTicksForEpoch(uint32(epoch)) - if err != nil { - if !errors.Is(err, pebble.ErrNotFound) { - return errors.Wrap(err, "getting empty ticks for current epoch") - } - } - - if emptyTicks == 0 { - fmt.Printf("Initializing empty ticks for epoch: %d\n", epoch) - err := v.store.SetEmptyTickListPerEpoch(uint32(epoch), make([]uint32, 0)) - if err != nil { - return errors.Wrapf(err, "initializing empty tick list for epoch %d", epoch) - } - } - - emptyTicks += 1 - - err = v.store.SetEmptyTicksForEpoch(uint32(epoch), emptyTicks) - if err != nil { - return errors.Wrap(err, "setting current ticks for current epoch") - } - fmt.Printf("Empty ticks for epoch %d: %d\n", epoch, emptyTicks) - - err = v.store.AppendEmptyTickToEmptyTickListPerEpoch(uint32(epoch), tickNumber) - if err != nil { - return errors.Wrap(err, "appending tick to empty tick list") - } - } return nil }