diff --git a/include/bitcoin/server/protocols/protocol_electrum.hpp b/include/bitcoin/server/protocols/protocol_electrum.hpp index 32291e86..38a7a4cc 100644 --- a/include/bitcoin/server/protocols/protocol_electrum.hpp +++ b/include/bitcoin/server/protocols/protocol_electrum.hpp @@ -233,7 +233,6 @@ class BCS_API protocol_electrum { database::history outpoint{}; database::histories spenders{}; - bool operator==(const outpoint_subscription& other) const NOEXCEPT { return outpoint == other.outpoint && spenders == other.spenders; @@ -286,7 +285,7 @@ class BCS_API protocol_electrum void scripthash_notify(const hash_digest& status, const hash_digest& hash, notify_t type) NOEXCEPT; - code get_scripthash_history(hash_digest& status, address_subscription& sub, + code get_scripthash_history(address_subscription& sub, const hash_digest& hash, size_t limit=max_size_t) NOEXCEPT; /// Outpoint. @@ -337,9 +336,10 @@ class BCS_API protocol_electrum // Transformations. static array_t transform(const unspents& unspents) NOEXCEPT; static array_t transform(const histories& histories) NOEXCEPT; - static hash_digest to_status(const histories& histories) NOEXCEPT; - static std::string to_method_name(notify_t type) NOEXCEPT; + static void write_status(system::writer& writer, + const history& history) NOEXCEPT; static bool is_valid_hint(const std::string& hint) NOEXCEPT; + static std::string to_method_name(notify_t type) NOEXCEPT; static object_t to_outpoint_status(size_t output_height) NOEXCEPT; static object_t to_outpoint_status(size_t output_height, const history& history) NOEXCEPT; diff --git a/src/protocols/electrum/protocol_electrum.cpp b/src/protocols/electrum/protocol_electrum.cpp index 70a1e327..90bd3ea2 100644 --- a/src/protocols/electrum/protocol_electrum.cpp +++ b/src/protocols/electrum/protocol_electrum.cpp @@ -197,6 +197,7 @@ void protocol_electrum::do_reorganized(node::header_t) NOEXCEPT { // writer.flush resets hash accumulator, sub.type remains unchanged. sub.state.writer.flush(); + sub.state.status = {}; sub.cursor = {}; } } diff --git a/src/protocols/electrum/protocol_electrum_outpoints.cpp b/src/protocols/electrum/protocol_electrum_outpoints.cpp index 073a08a6..a09dff48 100644 --- a/src/protocols/electrum/protocol_electrum_outpoints.cpp +++ b/src/protocols/electrum/protocol_electrum_outpoints.cpp @@ -126,7 +126,7 @@ bool protocol_electrum::get_outpoint_history(outpoint_subscription& out, const point& prevout) const NOEXCEPT { const auto& query = archive(); - out.outpoint = query.get_tx_history(prevout.hash()); + out.outpoint = query.get_tx_history(query.to_tx(prevout.hash())); if (!out.outpoint.valid()) { out.spenders.clear(); diff --git a/src/protocols/electrum/protocol_electrum_subscribe.cpp b/src/protocols/electrum/protocol_electrum_subscribe.cpp index 069b9ce4..98a12a7c 100644 --- a/src/protocols/electrum/protocol_electrum_subscribe.cpp +++ b/src/protocols/electrum/protocol_electrum_subscribe.cpp @@ -93,14 +93,18 @@ void protocol_electrum::do_scripthash_subscribe(const hash_digest& hash, if (address_subscriptions_.size() < options_.maximum_subscriptions) { using mid = midstate; - const auto limit = options().maximum_history; - auto& at = *address_subscriptions_.try_emplace(hash, type, mid{}).first; - ec = get_scripthash_history(status, at.second, at.first, limit); - if (ec == database::error::limited) ec = error::maximum_depth; + const auto at = address_subscriptions_.try_emplace(hash, type, mid{}); + + // Initial subscription is limited by configured maximum history. + const auto limit = at.second ? options().maximum_history : max_size_t; + + // Partially-cached result idempotent (new or redundant subscription). + ec = get_scripthash_history(at.first->second, at.first->first, limit); + status = at.first->second.state.status; subscribed_address_.store(true, relaxed); } - POST(complete_scripthash_subscribe, ec, hash, status); + POST(complete_scripthash_subscribe, ec, hash, std::move(status)); } void protocol_electrum::complete_scripthash_subscribe(const code& ec, @@ -114,6 +118,7 @@ void protocol_electrum::complete_scripthash_subscribe(const code& ec, if (ec) { + // Limited is only expected code, not found is success/null_hash. send_code(ec); return; } @@ -196,16 +201,19 @@ void protocol_electrum::do_scripthash(node::header_t) NOEXCEPT for (auto& [key, sub]: address_subscriptions_) { - hash_digest status{}; - if (const auto ec = get_scripthash_history(status, sub, key)) + // Depth limit is never imposed once a subscription is accepted. + if (const auto ec = get_scripthash_history(sub, key)) { - if (ec == database::error::canceled) return; + if (ec == database::error::query_canceled || + ec == error::not_found) + return; + LOGF("Electrum::do_scripthash, " << ec.message()); } else { // Asio-buffered message (small, not under caller control). - POST(scripthash_notify, status, key, sub.type); + POST(scripthash_notify, sub.state.status, key, sub.type); } } } @@ -244,38 +252,87 @@ std::string protocol_electrum::to_method_name(notify_t type) NOEXCEPT // static // Height is zero (rooted) or max_size_t for unconfirmed history txs. -// TODO: this can be implemented as electrum json serializer (see bitcoind). -hash_digest protocol_electrum::to_status(const histories& histories) NOEXCEPT +void protocol_electrum::write_status(system::writer& writer, + const history& history) NOEXCEPT { - if (histories.empty()) - return {}; - - midstate out{}; - for (const auto& record: histories) - { - out.writer.write_string(encode_hash(record.tx.hash())); - out.writer.write_string(":"); - out.writer.write_string(std::to_string(to_signed(record.tx.height()))); - out.writer.write_string(":"); - } - - out.writer.flush(); - return out.status; + writer.write_string(encode_hash(history.tx.hash())); + writer.write_string(":"); + writer.write_string(std::to_string(to_signed(history.tx.height()))); + writer.write_string(":"); } -code protocol_electrum::get_scripthash_history(hash_digest& out, - address_subscription& /* sub */, const hash_digest& hash, - size_t limit) NOEXCEPT +code protocol_electrum::get_scripthash_history(address_subscription& sub, + const hash_digest& hash, size_t limit) NOEXCEPT { - // TODO: use cursors and midstate to optimize succesive queries. - // TODO: limit first pass query depth. - - histories histories{}; - database::address_link cursor{}; + histories records{}; const auto& query = archive(); - const auto ec = query.get_history(stopping_, cursor, histories, hash, limit, turbo_); - if (!ec) out = to_status(histories); - return ec; + + if (sub.cursor.is_terminal()) + { + // Initial scan queries all confirmed and unconfired together. + // Initial scan is depth-limited (based on config), others are not. + if (const auto ec = query.get_history(stopping_, sub.cursor, records, + hash, limit, turbo_)) + return ec; + + // Accumulate confirmed status in order. + auto it = records.cbegin(); + const auto cend = records.cend(); + while (it != cend && it->confirmed()) + write_status(sub.state.writer, *it++); + + BC_ASSERT(std::none_of(it, cend, [](const auto& at) + { return at.confirmed(); })); + + // Copy midstate accumulator and write unconfirmeds. + midstate copy = sub.state; + while (it != cend) + write_status(copy.writer, *it++); + + // Flush, cache and return status (always updated on initial). + copy.writer.flush(); + sub.state.status = std::move(copy.status); + return error::success; + } + else + { + // Update scan queries new (cursor) confirmed independently. + if (const auto ec = query.get_confirmed_history(stopping_, sub.cursor, + records, hash, max_size_t, turbo_)) + return ec; + + // Accumulate confirmed status in order. + auto it = records.cbegin(); + auto cend = records.cend(); + while (it != cend && it->confirmed()) + write_status(sub.state.writer, *it++); + + // Copy midstate accumulator for write of unconfirmeds. + midstate copy = sub.state; + records.clear(); + + // Update scan queries all unconfirmed independently. + if (const auto ec = query.get_unconfirmed_history(stopping_, records, + hash, turbo_)) + return ec; + + // Reinitialize iterator for unconfirmed writer. + it = records.cbegin(); + cend = records.cend(); + + // Accumulate unconfirmed status in order. + while (it != cend) + write_status(copy.writer, *it++); + + // Flush, cache and return not found if no writes. + copy.writer.flush(); + if (sub.state.status == copy.status) + return error::not_found; + + // Set cache into midstate object for next run. + sub.state.status = std::move(copy.status); + return error::success; + } } BC_POP_WARNING()