diff --git a/Builds/VisualStudio/stellar-core.vcxproj b/Builds/VisualStudio/stellar-core.vcxproj
index 6f0566be60..634074adac 100644
--- a/Builds/VisualStudio/stellar-core.vcxproj
+++ b/Builds/VisualStudio/stellar-core.vcxproj
@@ -506,6 +506,7 @@ exit /b 0
+
@@ -938,6 +939,7 @@ exit /b 0
+
diff --git a/Builds/VisualStudio/stellar-core.vcxproj.filters b/Builds/VisualStudio/stellar-core.vcxproj.filters
index d34f209046..e07ad5f3ca 100644
--- a/Builds/VisualStudio/stellar-core.vcxproj.filters
+++ b/Builds/VisualStudio/stellar-core.vcxproj.filters
@@ -762,6 +762,9 @@
historyWork
+
+ historyWork
+
historyWork
@@ -1916,6 +1919,9 @@
historyWork
+
+ historyWork
+
historyWork
diff --git a/src/catchup/ApplyBufferedLedgersWork.cpp b/src/catchup/ApplyBufferedLedgersWork.cpp
index 6b5865b344..41306794a5 100644
--- a/src/catchup/ApplyBufferedLedgersWork.cpp
+++ b/src/catchup/ApplyBufferedLedgersWork.cpp
@@ -55,7 +55,9 @@ ApplyBufferedLedgersWork::onRun()
lcd.getTxSet()->sizeTxTotal(), lcd.getTxSet()->sizeOpTotalForLogging(),
stellarValueToString(mApp.getConfig(), lcd.getValue()));
- auto applyLedger = std::make_shared(mApp, lcd);
+ // Pass `nullptr` for `hEntries` because SCP messages of buffered ledgers
+ // have already been logged.
+ auto applyLedger = std::make_shared(mApp, lcd, nullptr);
auto predicate = [](Application& app) {
auto& bl = app.getBucketManager().getBucketList();
diff --git a/src/catchup/ApplyCheckpointWork.cpp b/src/catchup/ApplyCheckpointWork.cpp
index 4cc8007263..9cfdb48522 100644
--- a/src/catchup/ApplyCheckpointWork.cpp
+++ b/src/catchup/ApplyCheckpointWork.cpp
@@ -24,10 +24,9 @@
namespace stellar
{
-ApplyCheckpointWork::ApplyCheckpointWork(Application& app,
- TmpDir const& downloadDir,
- LedgerRange const& range,
- OnFailureCallback cb)
+ApplyCheckpointWork::ApplyCheckpointWork(
+ Application& app, TmpDir const& downloadDir, LedgerRange const& range,
+ OnFailureCallback cb, std::shared_ptr& scpDownloadDirs)
: BasicWork(app,
"apply-ledgers-" + fmt::format(FMT_STRING("{}-{}"),
range.mFirst, range.limit()),
@@ -36,6 +35,7 @@ ApplyCheckpointWork::ApplyCheckpointWork(Application& app,
, mLedgerRange(range)
, mCheckpoint(
app.getHistoryManager().checkpointContainingLedger(range.mFirst))
+ , mSCPDownloadDirs(scpDownloadDirs)
, mOnFailure(cb)
{
// Ledger range check to enforce application of a single checkpoint
@@ -69,6 +69,11 @@ ApplyCheckpointWork::closeFiles()
{
mHdrIn.close();
mTxIn.close();
+ for (auto& scpInfo : mSCPCheckpointInfo)
+ {
+ scpInfo.scpHistoryIn.close();
+ }
+ mSCPCheckpointInfo.clear();
mFilesOpen = false;
}
@@ -83,8 +88,7 @@ void
ApplyCheckpointWork::openInputFiles()
{
ZoneScoped;
- mHdrIn.close();
- mTxIn.close();
+ closeFiles();
FileTransferInfo hi(mDownloadDir, HISTORY_FILE_TYPE_LEDGER, mCheckpoint);
FileTransferInfo ti(mDownloadDir, HISTORY_FILE_TYPE_TRANSACTIONS,
mCheckpoint);
@@ -95,6 +99,32 @@ ApplyCheckpointWork::openInputFiles()
mTxIn.open(ti.localPath_nogz());
mTxHistoryEntry = TransactionHistoryEntry();
mHeaderHistoryEntry = LedgerHeaderHistoryEntry();
+
+ if (mSCPDownloadDirs)
+ {
+ // Initialize `SCPCheckpointInfo`s for each SCP history download
+ // directory.
+ for (auto const& scpDir : *mSCPDownloadDirs)
+ {
+ FileTransferInfo si(*scpDir, HISTORY_FILE_TYPE_SCP, mCheckpoint);
+ CLOG_DEBUG(History, "Saving SCP messages from {}",
+ si.localPath_nogz());
+ mSCPCheckpointInfo.emplace_back();
+ SCPCheckpointInfo& scpInfo = mSCPCheckpointInfo.back();
+ scpInfo.scpHistoryEntry = std::make_shared();
+ try
+ {
+ scpInfo.scpHistoryIn.open(si.localPath_nogz());
+ }
+ catch (FileSystemException const&)
+ {
+ // File doesn't exist for this checkpoint. That's ok. Skip it.
+ mSCPCheckpointInfo.pop_back();
+ continue;
+ }
+ }
+ }
+
mFilesOpen = true;
}
@@ -251,6 +281,49 @@ ApplyCheckpointWork::getNextLedgerCloseData()
std::make_optional(mHeaderHistoryEntry.hash));
}
+std::unique_ptr
+ApplyCheckpointWork::getNextSCPHistoryEntries()
+{
+ ZoneScoped;
+ auto ret = std::make_unique();
+ uint32_t ledgerSeq = mApp.getLedgerManager().getLastClosedLedgerNum();
+ for (SCPCheckpointInfo& info : mSCPCheckpointInfo)
+ {
+ XDRInputFileStream& in = info.scpHistoryIn;
+ std::shared_ptr& entry = info.scpHistoryEntry;
+ do
+ {
+ uint32 scpHistSeq = entry->v0().ledgerMessages.ledgerSeq;
+
+ if (scpHistSeq <= ledgerSeq)
+ {
+ // Catching up to `ledgerSeq + 1`
+ CLOG_DEBUG(History, "Skipping SCP messages for ledger {}",
+ scpHistSeq);
+ }
+ else if (scpHistSeq == ledgerSeq + 1)
+ {
+ // Caught up
+ CLOG_DEBUG(History, "Loaded SCP messages for ledger {}",
+ ledgerSeq);
+ ret->push_back(entry);
+ break;
+ }
+ else
+ {
+ // Ahead. This archive does not have messages for `ledgerSeq+1`
+ // TODO: Log which archive is missing the messages (also applies
+ // to other logging statements in this function)
+ CLOG_WARNING(History,
+ "Archive missing SCP messages for ledger {}",
+ scpHistSeq);
+ break;
+ }
+ } while (in && in.readOne(*entry));
+ }
+ return ret;
+}
+
BasicWork::State
ApplyCheckpointWork::onRun()
{
@@ -308,7 +381,8 @@ ApplyCheckpointWork::onRun()
return State::WORK_RUNNING;
}
- auto applyLedger = std::make_shared(mApp, *lcd);
+ auto applyLedger = std::make_shared(
+ mApp, *lcd, getNextSCPHistoryEntries());
auto predicate = [](Application& app) {
auto& bl = app.getBucketManager().getBucketList();
diff --git a/src/catchup/ApplyCheckpointWork.h b/src/catchup/ApplyCheckpointWork.h
index 93c4168b21..4c7db14411 100644
--- a/src/catchup/ApplyCheckpointWork.h
+++ b/src/catchup/ApplyCheckpointWork.h
@@ -4,6 +4,7 @@
#pragma once
+#include "catchup/ApplyLedgerWork.h"
#include "herder/LedgerCloseData.h"
#include "herder/TxSetFrame.h"
#include "history/HistoryArchive.h"
@@ -20,6 +21,18 @@ namespace stellar
class TmpDir;
struct LedgerHeaderHistoryEntry;
+using TmpDirVec = std::vector>;
+
+// This struct stores information about SCP messages in a single history
+// checkpoint
+struct SCPCheckpointInfo
+{
+ // Input stream holding `SCPHistoryEntry`s
+ XDRInputFileStream scpHistoryIn;
+ // Most recent SCP history entry read from `scpHistoryIn`
+ std::shared_ptr scpHistoryEntry;
+};
+
/**
* This class is responsible for applying transactions stored in files on
* temporary directory (downloadDir) to local ledger. It requires two sets of
@@ -45,11 +58,17 @@ class ApplyCheckpointWork : public BasicWork
TmpDir const& mDownloadDir;
LedgerRange const mLedgerRange;
uint32_t const mCheckpoint;
+ // The directories containing downloaded SCP history. May be null if no such
+ // directories exist.
+ std::shared_ptr mSCPDownloadDirs;
XDRInputFileStream mHdrIn;
XDRInputFileStream mTxIn;
TransactionHistoryEntry mTxHistoryEntry;
LedgerHeaderHistoryEntry mHeaderHistoryEntry;
+ // Vector containing each archive's SCP messages for the current checkpoint
+ // being processed
+ std::vector mSCPCheckpointInfo;
OnFailureCallback mOnFailure;
bool mFilesOpen{false};
@@ -61,11 +80,17 @@ class ApplyCheckpointWork : public BasicWork
std::shared_ptr getNextLedgerCloseData();
+ // Returns a vector holding SCP messages from each archive for the ledger
+ // being processed. This vector may be smaller than the total number of
+ // archives if some archives did not contain messages for the ledger.
+ std::unique_ptr getNextSCPHistoryEntries();
+
void closeFiles();
public:
ApplyCheckpointWork(Application& app, TmpDir const& downloadDir,
- LedgerRange const& range, OnFailureCallback cb);
+ LedgerRange const& range, OnFailureCallback cb,
+ std::shared_ptr& scpDownloadDirs);
~ApplyCheckpointWork() = default;
std::string getStatus() const override;
void onFailureRaise() override;
diff --git a/src/catchup/ApplyLedgerWork.cpp b/src/catchup/ApplyLedgerWork.cpp
index 5d910f8bf5..58811fce27 100644
--- a/src/catchup/ApplyLedgerWork.cpp
+++ b/src/catchup/ApplyLedgerWork.cpp
@@ -3,6 +3,7 @@
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0
#include "catchup/ApplyLedgerWork.h"
+#include "herder/HerderPersistence.h"
#include "ledger/LedgerManager.h"
#include "main/Application.h"
#include
@@ -10,12 +11,15 @@
namespace stellar
{
-ApplyLedgerWork::ApplyLedgerWork(Application& app,
- LedgerCloseData const& ledgerCloseData)
+ApplyLedgerWork::ApplyLedgerWork(
+ Application& app, LedgerCloseData const& ledgerCloseData,
+ std::unique_ptr hEntries)
: BasicWork(
app, "apply-ledger-" + std::to_string(ledgerCloseData.getLedgerSeq()),
BasicWork::RETRY_NEVER)
+ , mApp(app)
, mLedgerCloseData(ledgerCloseData)
+ , mHEntries(std::move(hEntries))
{
}
@@ -24,6 +28,11 @@ ApplyLedgerWork::onRun()
{
ZoneScoped;
mApp.getLedgerManager().closeLedger(mLedgerCloseData);
+ if (mHEntries)
+ {
+ mApp.getHerderPersistence().copySCPHistoryFromEntries(
+ *mHEntries, mLedgerCloseData.getLedgerSeq());
+ }
return BasicWork::State::WORK_SUCCESS;
}
diff --git a/src/catchup/ApplyLedgerWork.h b/src/catchup/ApplyLedgerWork.h
index 558aad27e1..fbf965cc3a 100644
--- a/src/catchup/ApplyLedgerWork.h
+++ b/src/catchup/ApplyLedgerWork.h
@@ -6,16 +6,23 @@
#include "herder/LedgerCloseData.h"
#include "work/Work.h"
+#include
namespace stellar
{
+using SCPHistoryEntryVec = std::vector>;
+
class ApplyLedgerWork : public BasicWork
{
+ Application& mApp;
LedgerCloseData const mLedgerCloseData;
+ // SCP messages for the ledger to be applied
+ std::unique_ptr mHEntries;
public:
- ApplyLedgerWork(Application& app, LedgerCloseData const& ledgerCloseData);
+ ApplyLedgerWork(Application& app, LedgerCloseData const& ledgerCloseData,
+ std::unique_ptr hEntries);
std::string getStatus() const override;
diff --git a/src/catchup/CatchupWork.cpp b/src/catchup/CatchupWork.cpp
index 760c15436c..55ae1d9fe5 100644
--- a/src/catchup/CatchupWork.cpp
+++ b/src/catchup/CatchupWork.cpp
@@ -14,8 +14,10 @@
#include "catchup/VerifyLedgerChainWork.h"
#include "herder/Herder.h"
#include "history/FileTransferInfo.h"
+#include "history/HistoryArchiveManager.h"
#include "history/HistoryManager.h"
#include "historywork/BatchDownloadWork.h"
+#include "historywork/BestEffortBatchDownloadWork.h"
#include "historywork/DownloadBucketsWork.h"
#include "historywork/DownloadVerifyTxResultsWork.h"
#include "historywork/GetAndUnzipRemoteFileWork.h"
@@ -81,7 +83,7 @@ CatchupWork::CatchupWork(Application& app,
std::shared_ptr archive)
: Work(app, "catchup", BasicWork::RETRY_NEVER)
, mLocalState{app.getLedgerManager().getLastClosedLedgerHAS()}
- , mDownloadDir{std::make_unique(
+ , mDownloadDir{std::make_shared(
mApp.getTmpDirManager().tmpDir(getName()))}
, mCatchupConfiguration{catchupConfiguration}
, mArchive{archive}
@@ -150,6 +152,7 @@ CatchupWork::doReset()
mHAS.reset();
mBucketHAS.reset();
mRetainedBuckets.clear();
+ mSCPDownloadDirs->clear();
}
void
@@ -166,7 +169,7 @@ CatchupWork::downloadVerifyLedgerChain(CatchupRange const& catchupRange,
// Batch download has default retries ("a few") to ensure we rotate through
// archives
auto getLedgers = std::make_shared(
- mApp, checkpointRange, HISTORY_FILE_TYPE_LEDGER, *mDownloadDir,
+ mApp, checkpointRange, HISTORY_FILE_TYPE_LEDGER, mDownloadDir,
mArchive);
mRangeEndPromise = std::promise();
mRangeEndFuture = mRangeEndPromise.get_future().share();
@@ -179,9 +182,32 @@ CatchupWork::downloadVerifyLedgerChain(CatchupRange const& catchupRange,
mApp, *mDownloadDir, verifyRange, mLastClosedLedgerHashPair,
mRangeEndFuture, std::move(fatalFailurePromise));
+ std::vector> seq{getLedgers, mVerifyLedgers};
+
+ Config const& cfg = mApp.getConfig();
+ // TODO: Test with this flag set to `false`. An empty `mSCPDownloadDirs`
+ // should prevent any further stages from running.
+ if (cfg.MODE_STORES_HISTORY_MISC)
+ {
+ for (std::string const& scpHistoryArchive : cfg.SCP_HISTORY_ARCHIVES)
+ {
+ CLOG_DEBUG(History, "Downloading SCP history from {}",
+ scpHistoryArchive);
+ std::shared_ptr scpArchive =
+ mApp.getHistoryArchiveManager().getHistoryArchive(
+ scpHistoryArchive);
+
+ auto scpDownloadDir =
+ std::make_shared(mApp.getTmpDirManager().tmpDir(
+ "scp-history-" + scpHistoryArchive));
+ mSCPDownloadDirs->push_back(scpDownloadDir);
+ seq.emplace_back(std::make_shared(
+ mApp, checkpointRange, HISTORY_FILE_TYPE_SCP, scpDownloadDir,
+ scpArchive));
+ }
+ }
// Never retry the sequence: downloads already have retries, and there's no
// point retrying verification
- std::vector> seq{getLedgers, mVerifyLedgers};
mDownloadVerifyLedgersSeq = addWork(
"download-verify-ledgers-seq", seq, BasicWork::RETRY_NEVER);
mCurrentWork = mDownloadVerifyLedgersSeq;
@@ -305,7 +331,8 @@ CatchupWork::downloadApplyTransactions(CatchupRange const& catchupRange)
auto waitForPublish = mCatchupConfiguration.offline();
auto range = catchupRange.getReplayRange();
mTransactionsVerifyApplySeq = std::make_shared(
- mApp, *mDownloadDir, range, mLastApplied, waitForPublish, mArchive);
+ mApp, *mDownloadDir, range, mLastApplied, waitForPublish,
+ mSCPDownloadDirs, mArchive);
}
BasicWork::State
diff --git a/src/catchup/CatchupWork.h b/src/catchup/CatchupWork.h
index ed36c75f5c..286f4eed8c 100644
--- a/src/catchup/CatchupWork.h
+++ b/src/catchup/CatchupWork.h
@@ -4,6 +4,7 @@
#pragma once
+#include "catchup/ApplyCheckpointWork.h"
#include "catchup/CatchupConfiguration.h"
#include "catchup/VerifyLedgerChainWork.h"
#include "history/HistoryArchive.h"
@@ -46,8 +47,10 @@ class CatchupWork : public Work
{
protected:
HistoryArchiveState mLocalState;
- std::unique_ptr mDownloadDir;
+ std::shared_ptr mDownloadDir;
std::map> mBuckets;
+ // Download directories for SCP message history
+ std::shared_ptr mSCPDownloadDirs = std::make_shared();
void doReset() override;
BasicWork::State doWork() override;
diff --git a/src/catchup/DownloadApplyTxsWork.cpp b/src/catchup/DownloadApplyTxsWork.cpp
index 1746060d69..83188ed862 100644
--- a/src/catchup/DownloadApplyTxsWork.cpp
+++ b/src/catchup/DownloadApplyTxsWork.cpp
@@ -23,6 +23,7 @@ namespace stellar
DownloadApplyTxsWork::DownloadApplyTxsWork(
Application& app, TmpDir const& downloadDir, LedgerRange const& range,
LedgerHeaderHistoryEntry& lastApplied, bool waitForPublish,
+ std::shared_ptr scpDownloadDirs,
std::shared_ptr archive)
: BatchWork(app, "download-apply-ledgers")
, mRange(range)
@@ -31,6 +32,7 @@ DownloadApplyTxsWork::DownloadApplyTxsWork(
, mCheckpointToQueue(
app.getHistoryManager().checkpointContainingLedger(range.mFirst))
, mWaitForPublish(waitForPublish)
+ , mSCPDownloadDirs(std::move(scpDownloadDirs))
, mArchive(archive)
{
}
@@ -76,7 +78,8 @@ DownloadApplyTxsWork::yieldMoreWork()
};
auto apply = std::make_shared(
- mApp, mDownloadDir, LedgerRange::inclusive(low, high), cb);
+ mApp, mDownloadDir, LedgerRange::inclusive(low, high), cb,
+ mSCPDownloadDirs);
std::vector> seq{getAndUnzip};
diff --git a/src/catchup/DownloadApplyTxsWork.h b/src/catchup/DownloadApplyTxsWork.h
index e5a81d9240..97bee2bdc3 100644
--- a/src/catchup/DownloadApplyTxsWork.h
+++ b/src/catchup/DownloadApplyTxsWork.h
@@ -4,6 +4,7 @@
#pragma once
+#include "catchup/ApplyCheckpointWork.h"
#include "ledger/LedgerRange.h"
#include "util/XDRStream.h"
#include "work/BatchWork.h"
@@ -29,6 +30,8 @@ class DownloadApplyTxsWork : public BatchWork
uint32_t mCheckpointToQueue;
std::shared_ptr mLastYieldedWork;
bool const mWaitForPublish;
+ // Download directories for SCP message history
+ std::shared_ptr mSCPDownloadDirs;
std::shared_ptr mArchive;
public:
@@ -36,6 +39,7 @@ class DownloadApplyTxsWork : public BatchWork
LedgerRange const& range,
LedgerHeaderHistoryEntry& lastApplied,
bool waitForPublish,
+ std::shared_ptr mSCPDownloadDirs,
std::shared_ptr archive = nullptr);
std::string getStatus() const override;
diff --git a/src/catchup/ReplayDebugMetaWork.cpp b/src/catchup/ReplayDebugMetaWork.cpp
index 2d2dcd7fde..a461cb969c 100644
--- a/src/catchup/ReplayDebugMetaWork.cpp
+++ b/src/catchup/ReplayDebugMetaWork.cpp
@@ -19,6 +19,24 @@
namespace stellar
{
+namespace
+{
+// Extract the `SCPHistoryEntry`s within a `LedgerCloseMeta`
+std::unique_ptr
+scpHistoryEntriesFromLedgerCloseMeta(LedgerCloseMeta const& lcm)
+{
+ xdr::xvector const& hist =
+ lcm.v() == 0 ? lcm.v0().scpInfo : lcm.v1().scpInfo;
+
+ auto ret = std::make_unique();
+ for (auto const& e : hist)
+ {
+ ret->emplace_back(std::make_shared(e));
+ }
+ return ret;
+}
+} // namespace
+
// Helper class to apply ledgers from a single debug meta file
class ApplyLedgersFromMetaWork : public Work
{
@@ -122,7 +140,9 @@ class ApplyLedgersFromMetaWork : public Work
lh.header.scpValue);
releaseAssert(!mApplyLedgerWork);
- mApplyLedgerWork = addWork(ledgerCloseData);
+ // TODO: Test
+ mApplyLedgerWork = addWork(
+ ledgerCloseData, scpHistoryEntriesFromLedgerCloseMeta(lcm));
return BasicWork::State::WORK_RUNNING;
}
diff --git a/src/herder/HerderPersistence.h b/src/herder/HerderPersistence.h
index 354e58c013..8ae2830ffc 100644
--- a/src/herder/HerderPersistence.h
+++ b/src/herder/HerderPersistence.h
@@ -4,6 +4,7 @@
// under the Apache License, Version 2.0. See the COPYING file at the root
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0
+#include "catchup/ApplyLedgerWork.h"
#include "herder/QuorumTracker.h"
#include "overlay/Peer.h"
#include "xdr/Stellar-SCP.h"
@@ -40,6 +41,13 @@ class HerderPersistence
uint32_t ledgerSeq,
uint32_t ledgerCount,
XDROutputFileStream& scpHistory);
+
+ // Given a set of SCP history entries from multiple archives, merge the
+ // entries--taking the newest entry for each node--and store the result in
+ // the database.
+ virtual void copySCPHistoryFromEntries(SCPHistoryEntryVec const& hEntries,
+ uint32_t ledgerSeq) = 0;
+
// quorum information lookup
static std::optional
getNodeQuorumSet(Database& db, soci::session& sess, NodeID const& nodeID);
diff --git a/src/herder/HerderPersistenceImpl.cpp b/src/herder/HerderPersistenceImpl.cpp
index 1afe36cc6a..3bb6ad643b 100644
--- a/src/herder/HerderPersistenceImpl.cpp
+++ b/src/herder/HerderPersistenceImpl.cpp
@@ -34,6 +34,22 @@ HerderPersistenceImpl::~HerderPersistenceImpl()
{
}
+void
+HerderPersistenceImpl::clearSCPHistoryAtSeq(uint32_t seq)
+{
+ auto& db = mApp.getDatabase();
+ auto prepClean =
+ db.getPreparedStatement("DELETE FROM scphistory WHERE ledgerseq =:l");
+
+ auto& st = prepClean.statement();
+ st.exchange(soci::use(seq));
+ st.define_and_bind();
+ {
+ ZoneNamedN(deleteSCPHistoryZone, "delete scphistory", true);
+ st.execute(true);
+ }
+}
+
void
HerderPersistenceImpl::saveSCPHistory(uint32_t seq,
std::vector const& envs,
@@ -49,19 +65,7 @@ HerderPersistenceImpl::saveSCPHistory(uint32_t seq,
auto& db = mApp.getDatabase();
soci::transaction txscope(db.getSession());
-
- {
- auto prepClean = db.getPreparedStatement(
- "DELETE FROM scphistory WHERE ledgerseq =:l");
-
- auto& st = prepClean.statement();
- st.exchange(soci::use(seq));
- st.define_and_bind();
- {
- ZoneNamedN(deleteSCPHistoryZone, "delete scphistory", true);
- st.execute(true);
- }
- }
+ clearSCPHistoryAtSeq(seq);
for (auto const& e : envs)
{
auto const& qHash =
@@ -82,9 +86,9 @@ HerderPersistenceImpl::saveSCPHistory(uint32_t seq,
"(:n, :l, :e)");
auto& st = prepEnv.statement();
- st.exchange(soci::use(nodeIDStrKey));
- st.exchange(soci::use(seq));
- st.exchange(soci::use(envelopeEncoded));
+ st.exchange(soci::use(nodeIDStrKey, "n"));
+ st.exchange(soci::use(seq, "l"));
+ st.exchange(soci::use(envelopeEncoded, "e"));
st.define_and_bind();
{
ZoneNamedN(insertSCPHistoryZone, "insert scphistory", true);
@@ -140,7 +144,18 @@ HerderPersistenceImpl::saveSCPHistory(uint32_t seq,
}
}
// save quorum sets
- for (auto const& p : usedQSets)
+ saveQuorumSets(seq, usedQSets);
+
+ txscope.commit();
+}
+
+void
+HerderPersistenceImpl::saveQuorumSets(
+ uint32_t seq, UnorderedMap const& qsets)
+{
+ ZoneScoped;
+ auto& db = mApp.getDatabase();
+ for (auto const& p : qsets)
{
std::string qSetH = binToHex(p.first);
@@ -207,8 +222,6 @@ HerderPersistenceImpl::saveSCPHistory(uint32_t seq,
}
}
}
-
- txscope.commit();
}
size_t
@@ -294,9 +307,111 @@ HerderPersistence::copySCPHistoryToStream(Database& db, soci::session& sess,
}
}
+ // TODO: What about quoruminfo table? That doesn't seem to be recorded. Do
+ // we care about it?
+
return n;
}
+namespace
+{
+// Merge the info in an `hEntry` with the info in `envs` and `qsets` by
+// overwriting older entries (as determined by BallotProtocol::isNewerStatement)
+// with newer ones.
+void
+mergeSCPHistory(SCPHistoryEntry const& hEntry, uint32_t ledgerSeq,
+ UnorderedMap& envs,
+ UnorderedMap& qsets)
+{
+ SCPHistoryEntryV0 const& hEntryV0 = hEntry.v0();
+ releaseAssert(hEntryV0.ledgerMessages.ledgerSeq == ledgerSeq);
+ for (SCPEnvelope const& e : hEntryV0.ledgerMessages.messages)
+ {
+ NodeID const& nodeID = e.statement.nodeID;
+ SCPStatement const& statement = e.statement;
+ auto it = envs.find(nodeID);
+ if (it == envs.end())
+ {
+ envs[nodeID] = e;
+ }
+ else if (BallotProtocol::isNewerStatement(it->second.statement,
+ statement))
+ {
+ it->second = e;
+ }
+ }
+
+ // Merge qset info
+ for (auto const& qset : hEntryV0.quorumSets)
+ {
+ qsets.try_emplace(xdrSha256(qset),
+ std::make_shared(qset));
+ }
+}
+} // namespace
+
+void
+HerderPersistenceImpl::copySCPHistoryFromEntries(
+ SCPHistoryEntryVec const& hEntries, uint32_t ledgerSeq)
+{
+ ZoneScoped;
+
+ if (hEntries.empty())
+ {
+ return;
+ }
+
+ // Merge entries
+ UnorderedMap envs;
+ UnorderedMap qsets;
+ for (auto const& hEntry : hEntries)
+ {
+ mergeSCPHistory(*hEntry, ledgerSeq, envs, qsets);
+ }
+
+ // TODO: Dedup with saveSCPHistory after changes to it merge
+ std::vector nodeIDs;
+ std::vector seqs(envs.size(), ledgerSeq);
+ std::vector envelopes;
+ for (auto const& kv : envs)
+ {
+ nodeIDs.emplace_back(KeyUtils::toStrKey(kv.first));
+ envelopes.emplace_back(
+ decoder::encode_b64(xdr::xdr_to_opaque(kv.second)));
+ }
+
+ CLOG_DEBUG(Herder, "Copying {} SCP history entries from ledger {}",
+ envs.size(), ledgerSeq);
+
+ Database& db = mApp.getDatabase();
+ soci::transaction txScope(db.getSession());
+ clearSCPHistoryAtSeq(ledgerSeq);
+ if (!envs.empty())
+ {
+ // Perform multi-row insert into scphistory
+ auto prepEnv =
+ db.getPreparedStatement("INSERT INTO scphistory "
+ "(nodeid, ledgerseq, envelope) VALUES "
+ "(:n, :l, :e)");
+ auto& st = prepEnv.statement();
+ st.exchange(soci::use(nodeIDs, "n"));
+ st.exchange(soci::use(seqs, "l"));
+ st.exchange(soci::use(envelopes, "e"));
+ st.define_and_bind();
+ {
+ ZoneNamedN(insertSCPHistoryZone, "insert scphistory", true);
+ st.execute(true);
+ }
+ if (st.get_affected_rows() != envs.size())
+ {
+ throw std::runtime_error("Could not update data in SQL");
+ }
+ }
+
+ saveQuorumSets(ledgerSeq, qsets);
+ txScope.commit();
+}
+
std::optional
HerderPersistence::getNodeQuorumSet(Database& db, soci::session& sess,
NodeID const& nodeID)
diff --git a/src/herder/HerderPersistenceImpl.h b/src/herder/HerderPersistenceImpl.h
index 968d66bb1a..688b2fdeeb 100644
--- a/src/herder/HerderPersistenceImpl.h
+++ b/src/herder/HerderPersistenceImpl.h
@@ -20,7 +20,17 @@ class HerderPersistenceImpl : public HerderPersistence
void saveSCPHistory(uint32_t seq, std::vector const& envs,
QuorumTracker::QuorumMap const& qmap) override;
+ void copySCPHistoryFromEntries(SCPHistoryEntryVec const& hEntries,
+ uint32_t ledgerSeq) override;
+
private:
Application& mApp;
+
+ // Save quorum sets at a given sequence number
+ void saveQuorumSets(uint32_t seq,
+ UnorderedMap const& qsets);
+
+ // Delete `scphistory` entries at a given sequence number.
+ void clearSCPHistoryAtSeq(uint32_t seq);
};
}
diff --git a/src/history/test/HistoryTests.cpp b/src/history/test/HistoryTests.cpp
index 7e4821f039..25681bab5c 100644
--- a/src/history/test/HistoryTests.cpp
+++ b/src/history/test/HistoryTests.cpp
@@ -375,14 +375,16 @@ TEST_CASE("Tx results verification", "[batching][resultsverification]")
auto checkpointLedger = catchupSimulation.getLastCheckpointLedger(2);
catchupSimulation.ensureOfflineCatchupPossible(checkpointLedger);
- auto tmpDir =
- catchupSimulation.getApp().getTmpDirManager().tmpDir("tx-results-test");
+ auto tmpDirPtr = std::make_shared(
+ catchupSimulation.getApp().getTmpDirManager().tmpDir(
+ "tx-results-test"));
+ auto& tmpDir = *tmpDirPtr;
auto& wm = catchupSimulation.getApp().getWorkScheduler();
CheckpointRange range{LedgerRange::inclusive(1, checkpointLedger),
catchupSimulation.getApp().getHistoryManager()};
auto verifyHeadersWork = wm.executeWork(
- range, HISTORY_FILE_TYPE_LEDGER, tmpDir);
+ range, HISTORY_FILE_TYPE_LEDGER, tmpDirPtr);
REQUIRE(verifyHeadersWork->getState() == BasicWork::State::WORK_SUCCESS);
SECTION("basic")
{
@@ -431,7 +433,7 @@ TEST_CASE("Tx results verification", "[batching][resultsverification]")
SECTION("invalid result entries")
{
auto getResults = wm.executeWork(
- range, HISTORY_FILE_TYPE_RESULTS, tmpDir);
+ range, HISTORY_FILE_TYPE_RESULTS, tmpDirPtr);
REQUIRE(getResults->getState() == BasicWork::State::WORK_SUCCESS);
FileTransferInfo ft(tmpDir, HISTORY_FILE_TYPE_RESULTS, range.last());
@@ -1490,6 +1492,9 @@ TEST_CASE("Catchup failure recovery with buffered checkpoint",
TEST_CASE("Change ordering of buffered ledgers", "[history][catchup]")
{
+ // TODO: Fix `cp` failures this test spits out. SCP history is optional so
+ // there shouldn't be errors when it doesn't exist.
+
CatchupSimulation catchupSimulation{};
auto app = catchupSimulation.createCatchupApplication(
diff --git a/src/history/test/HistoryTestsUtils.cpp b/src/history/test/HistoryTestsUtils.cpp
index 8eb6c39ad9..0d7928cf73 100644
--- a/src/history/test/HistoryTestsUtils.cpp
+++ b/src/history/test/HistoryTestsUtils.cpp
@@ -7,6 +7,7 @@
#include "catchup/CatchupRange.h"
#include "crypto/Hex.h"
#include "crypto/Random.h"
+#include "herder/HerderPersistence.h"
#include "herder/TxSetFrame.h"
#include "history/FileTransferInfo.h"
#include "history/HistoryArchiveManager.h"
@@ -16,6 +17,7 @@
#include "ledger/LedgerTxnHeader.h"
#include "lib/catch.hpp"
#include "main/ApplicationUtils.h"
+#include "scp/LocalNode.h"
#include "test/TestAccount.h"
#include "test/TestUtils.h"
#include "test/TxTests.h"
@@ -64,6 +66,8 @@ TmpDirHistoryConfigurator::configure(Config& cfg, bool writable) const
}
cfg.HISTORY[d] = HistoryArchiveConfiguration{d, getCmd, putCmd, mkdirCmd};
+ // TODO: Probably want different variations of this vv
+ cfg.SCP_HISTORY_ARCHIVES = {d};
cfg.TESTING_SOROBAN_HIGH_LIMIT_OVERRIDE = true;
return cfg;
}
@@ -408,6 +412,22 @@ CatchupSimulation::getLastCheckpointLedger(uint32_t checkpointIndex) const
1;
}
+namespace
+{
+// Make an envelope with enough detail for
+// `HerderPersistenceImpl::saveSCPHistory`
+SCPEnvelope
+makeEnvelope(HerderImpl& herder)
+{
+ SCPEnvelope result;
+ result.statement.nodeID = PubKeyUtils::pseudoRandomForTesting();
+ result.statement.pledges.type(SCP_ST_EXTERNALIZE);
+ result.statement.pledges.externalize().commitQuorumSetHash =
+ herder.getSCP().getLocalNode()->getQuorumSetHash();
+ return result;
+}
+} // namespace
+
void
CatchupSimulation::generateRandomLedger(uint32_t version)
{
@@ -525,6 +545,21 @@ CatchupSimulation::generateRandomLedger(uint32_t version)
mLedgerCloseDatas.emplace_back(ledgerSeq, txSet, sv);
+ mEnvelopes.emplace_back();
+ auto& envs = mEnvelopes.back();
+ auto& herder = dynamic_cast(mApp.getHerder());
+ for (int i = 0; i < 5; ++i)
+ {
+ // Generate at least one envelope
+ if (i == 0 || rand_flip())
+ {
+ envs.push_back(makeEnvelope(herder));
+ }
+ }
+ // Save SCP history to the database
+ mApp.getHerderPersistence().saveSCPHistory(ledgerSeq, envs,
+ QuorumTracker::QuorumMap());
+
auto& txsSucceeded =
mApp.getMetrics().NewCounter({"ledger", "apply", "success"});
auto lastSucceeded = txsSucceeded.count();
@@ -903,7 +938,8 @@ void
CatchupSimulation::externalizeLedger(HerderImpl& herder, uint32_t ledger)
{
// Remember the vectors count from 2, not 0.
- if (ledger - 2 >= mLedgerCloseDatas.size())
+ if (ledger - 2 >= mLedgerCloseDatas.size() ||
+ ledger - 2 >= mEnvelopes.size())
{
return;
}
@@ -1033,6 +1069,22 @@ CatchupSimulation::validateCatchup(Application::pointer app)
CHECK(haveCarolSeq == wantCarolSeq);
CHECK(haveEveSeq == wantEveSeq);
CHECK(haveStrpSeq == wantStrpSeq);
+
+ // Check that `scphistory` has the expected number of entries
+ size_t j;
+ size_t count;
+ auto st = mApp.getDatabase()
+ .getPreparedStatement(
+ "SELECT COUNT(*) FROM scphistory WHERE ledgerseq = :seq")
+ .statement();
+ st.exchange(soci::into(count));
+ st.exchange(soci::use(j));
+ st.define_and_bind();
+ for (j = 2; j < nextLedger - 1; ++j)
+ {
+ st.execute(true);
+ CHECK(count == mEnvelopes.at(j - 2).size());
+ }
}
CatchupPerformedWork
diff --git a/src/history/test/HistoryTestsUtils.h b/src/history/test/HistoryTestsUtils.h
index 863a0f9fed..ba2eb36c6a 100644
--- a/src/history/test/HistoryTestsUtils.h
+++ b/src/history/test/HistoryTestsUtils.h
@@ -188,6 +188,7 @@ class CatchupSimulation
BucketList mBucketListAtLastPublish;
std::vector mLedgerCloseDatas;
+ std::vector> mEnvelopes;
std::vector mLedgerSeqs;
std::vector mLedgerHashes;
diff --git a/src/historywork/BatchDownloadWork.cpp b/src/historywork/BatchDownloadWork.cpp
index f74a9a6aa4..3e2655ee83 100644
--- a/src/historywork/BatchDownloadWork.cpp
+++ b/src/historywork/BatchDownloadWork.cpp
@@ -16,7 +16,7 @@ namespace stellar
{
BatchDownloadWork::BatchDownloadWork(Application& app, CheckpointRange range,
std::string const& type,
- TmpDir const& downloadDir,
+ std::shared_ptr downloadDir,
std::shared_ptr archive)
: BatchWork(app,
fmt::format(FMT_STRING("batch-download-{:s}-{:08x}-{:08x}"),
@@ -52,7 +52,7 @@ BatchDownloadWork::yieldMoreWork()
return nullptr;
}
- FileTransferInfo ft(mDownloadDir, mFileType, mNext);
+ FileTransferInfo ft(*mDownloadDir, mFileType, mNext);
CLOG_DEBUG(History, "Downloading and unzipping {} for checkpoint {}",
mFileType, mNext);
auto getAndUnzip =
diff --git a/src/historywork/BatchDownloadWork.h b/src/historywork/BatchDownloadWork.h
index 3e355599df..539075984f 100644
--- a/src/historywork/BatchDownloadWork.h
+++ b/src/historywork/BatchDownloadWork.h
@@ -23,12 +23,13 @@ class BatchDownloadWork : public BatchWork
CheckpointRange const mRange;
uint32_t mNext;
std::string const mFileType;
- TmpDir const& mDownloadDir;
+ std::shared_ptr mDownloadDir;
std::shared_ptr mArchive;
public:
BatchDownloadWork(Application& app, CheckpointRange range,
- std::string const& type, TmpDir const& downloadDir,
+ std::string const& type,
+ std::shared_ptr downloadDir,
std::shared_ptr archive = nullptr);
~BatchDownloadWork() = default;
std::string getStatus() const override;
diff --git a/src/historywork/BestEffortBatchDownloadWork.cpp b/src/historywork/BestEffortBatchDownloadWork.cpp
new file mode 100644
index 0000000000..24c1318553
--- /dev/null
+++ b/src/historywork/BestEffortBatchDownloadWork.cpp
@@ -0,0 +1,20 @@
+#include "historywork/BestEffortBatchDownloadWork.h"
+
+namespace stellar
+{
+BestEffortBatchDownloadWork::BestEffortBatchDownloadWork(
+ Application& app, CheckpointRange range, std::string const& type,
+ std::shared_ptr downloadDir,
+ std::shared_ptr archive)
+ : BatchDownloadWork(app, range, type, downloadDir, archive)
+{
+}
+
+BasicWork::State
+BestEffortBatchDownloadWork::onChildFailure()
+{
+ // TODO: Emit a warning about the failed download
+ abortSuccess();
+ return State::WORK_RUNNING;
+}
+} // namespace stellar
\ No newline at end of file
diff --git a/src/historywork/BestEffortBatchDownloadWork.h b/src/historywork/BestEffortBatchDownloadWork.h
new file mode 100644
index 0000000000..042a2f710f
--- /dev/null
+++ b/src/historywork/BestEffortBatchDownloadWork.h
@@ -0,0 +1,27 @@
+#pragma once
+
+// Copyright 2023 Stellar Development Foundation and contributors. Licensed
+// under the Apache License, Version 2.0. See the COPYING file at the root
+// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0
+
+#include "historywork/BatchDownloadWork.h"
+#include "work/BasicWork.h"
+#include
+
+namespace stellar
+{
+// A version of BatchDownloadWork that does not fail if a download fails.
+class BestEffortBatchDownloadWork : public BatchDownloadWork
+{
+ public:
+ BestEffortBatchDownloadWork(
+ Application& app, CheckpointRange range, std::string const& type,
+ std::shared_ptr downloadDir,
+ std::shared_ptr archive = nullptr);
+ ~BestEffortBatchDownloadWork() = default;
+
+ protected:
+ State onChildFailure() override;
+};
+
+} // namespace stellar
\ No newline at end of file
diff --git a/src/historywork/FetchRecentQsetsWork.cpp b/src/historywork/FetchRecentQsetsWork.cpp
index 6b38200061..2aad41bd06 100644
--- a/src/historywork/FetchRecentQsetsWork.cpp
+++ b/src/historywork/FetchRecentQsetsWork.cpp
@@ -28,7 +28,7 @@ FetchRecentQsetsWork::doReset()
mGetHistoryArchiveStateWork.reset();
mDownloadSCPMessagesWork.reset();
mDownloadDir =
- std::make_unique(mApp.getTmpDirManager().tmpDir(getName()));
+ std::make_shared(mApp.getTmpDirManager().tmpDir(getName()));
}
BasicWork::State
@@ -62,7 +62,7 @@ FetchRecentQsetsWork::doWork()
firstSeq, lastSeq);
auto range = CheckpointRange::inclusive(firstSeq, lastSeq, step);
mDownloadSCPMessagesWork = addWork(
- range, HISTORY_FILE_TYPE_SCP, *mDownloadDir);
+ range, HISTORY_FILE_TYPE_SCP, mDownloadDir);
return State::WORK_RUNNING;
}
else if (mDownloadSCPMessagesWork->getState() != State::WORK_SUCCESS)
diff --git a/src/historywork/FetchRecentQsetsWork.h b/src/historywork/FetchRecentQsetsWork.h
index 168341f191..4c4c376180 100644
--- a/src/historywork/FetchRecentQsetsWork.h
+++ b/src/historywork/FetchRecentQsetsWork.h
@@ -15,7 +15,7 @@ class TmpDir;
class FetchRecentQsetsWork : public Work
{
- std::unique_ptr mDownloadDir;
+ std::shared_ptr mDownloadDir;
uint32_t mLedgerNum;
std::shared_ptr mGetHistoryArchiveStateWork;
std::shared_ptr mDownloadSCPMessagesWork;
diff --git a/src/historywork/WriteVerifiedCheckpointHashesWork.cpp b/src/historywork/WriteVerifiedCheckpointHashesWork.cpp
index 200c0f9e64..537fff8096 100644
--- a/src/historywork/WriteVerifiedCheckpointHashesWork.cpp
+++ b/src/historywork/WriteVerifiedCheckpointHashesWork.cpp
@@ -122,7 +122,7 @@ WriteVerifiedCheckpointHashesWork::yieldMoreWork()
auto tmpDir = std::make_shared(
mApp.getTmpDirManager().tmpDir("verify-" + checkpointStr));
auto getWork = std::make_shared(
- mApp, checkpointRange, HISTORY_FILE_TYPE_LEDGER, *tmpDir, mArchive);
+ mApp, checkpointRange, HISTORY_FILE_TYPE_LEDGER, tmpDir, mArchive);
// When we have a previous-work, we grab a future attached to the promise it
// will fulfill when it runs. This promise might not have a value _yet_ but
diff --git a/src/main/Config.cpp b/src/main/Config.cpp
index 703eeba1c8..939e7974f2 100644
--- a/src/main/Config.cpp
+++ b/src/main/Config.cpp
@@ -1349,6 +1349,12 @@ Config::processConfig(std::shared_ptr t)
throw std::invalid_argument("incomplete HISTORY block");
}
}
+ else if (item.first == "SCP_HISTORY_ARCHIVES")
+ {
+ // TODO: Check for entries that aren't in HISTORY
+ // TODO: Deduplicate. Maybe just make SCP_HISTORY_ARCHIVES a set
+ SCP_HISTORY_ARCHIVES = readArray(item);
+ }
else if (item.first == "DATABASE")
{
DATABASE = SecretValue{readString(item)};
diff --git a/src/main/Config.h b/src/main/Config.h
index 1a2da8b65b..b5980c3e84 100644
--- a/src/main/Config.h
+++ b/src/main/Config.h
@@ -532,6 +532,10 @@ class Config : public std::enable_shared_from_this
// History config
std::map HISTORY;
+ // Archives to use for downloading SCP history
+ // TODO: Document in user facing documentation
+ std::vector SCP_HISTORY_ARCHIVES;
+
// Database config
SecretValue DATABASE;
diff --git a/src/work/BatchWork.cpp b/src/work/BatchWork.cpp
index 1a2d4a16f7..cd9f76b70f 100644
--- a/src/work/BatchWork.cpp
+++ b/src/work/BatchWork.cpp
@@ -29,7 +29,7 @@ BatchWork::doWork()
ZoneScoped;
if (anyChildRaiseFailure())
{
- return State::WORK_FAILURE;
+ return onChildFailure();
}
// Clean up completed children
@@ -83,4 +83,10 @@ BatchWork::addMoreWorkIfNeeded()
mBatch.insert(std::make_pair(w->getName(), w));
}
}
+
+BasicWork::State
+BatchWork::onChildFailure()
+{
+ return State::WORK_FAILURE;
+}
}
diff --git a/src/work/BatchWork.h b/src/work/BatchWork.h
index 76195fc12d..71c0c0470f 100644
--- a/src/work/BatchWork.h
+++ b/src/work/BatchWork.h
@@ -41,5 +41,9 @@ class BatchWork : public Work
virtual bool hasNext() const = 0;
virtual std::shared_ptr yieldMoreWork() = 0;
virtual void resetIter() = 0;
+
+ // Function to call when child work fails. Returns the state that `doWork`
+ // should return.
+ virtual State onChildFailure();
};
}
diff --git a/src/work/Work.cpp b/src/work/Work.cpp
index da53ab5c0c..46d8729162 100644
--- a/src/work/Work.cpp
+++ b/src/work/Work.cpp
@@ -52,7 +52,22 @@ Work::onRun()
if (mAbortChildrenButNotSelf)
{
// Stop whatever work was doing, just wait for children to abort
- return onAbort() ? State::WORK_FAILURE : State::WORK_RUNNING;
+ if (onAbort())
+ {
+ if (mReportSuccessOnAbort)
+ {
+ clearChildren();
+ return State::WORK_SUCCESS;
+ }
+ else
+ {
+ return State::WORK_FAILURE;
+ }
+ }
+ else
+ {
+ return State::WORK_RUNNING;
+ }
}
auto child = yieldNextRunningChild();
@@ -85,6 +100,14 @@ Work::onRun()
}
}
+void
+Work::abortSuccess()
+{
+ mReportSuccessOnAbort = true;
+ mAbortChildrenButNotSelf = true;
+ shutdownChildren();
+}
+
bool
Work::onAbort()
{
diff --git a/src/work/Work.h b/src/work/Work.h
index 7c946f62df..845a523404 100644
--- a/src/work/Work.h
+++ b/src/work/Work.h
@@ -123,6 +123,9 @@ class Work : public BasicWork
// Provide additional cleanup logic for reset
virtual void doReset();
+ // Abort, but report success
+ void abortSuccess();
+
private:
std::list> mChildren;
std::list>::const_iterator mNextChild;
@@ -136,6 +139,7 @@ class Work : public BasicWork
void shutdownChildren();
bool mAbortChildrenButNotSelf{false};
+ bool mReportSuccessOnAbort{false};
};
namespace WorkUtils