Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/partial-messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -364,16 +364,16 @@ Mirror this checklist in issue #435.
`PublishAction<PeerState>` (with `nextPeerState`),
`PublishActionsFn<PeerState>`, `PartialMessagesPeerFeedback`, and
`GroupState` container with TTL + DoS caps. No routing yet.
- [ ] **Step 3** — Inbound `RPC.partial` dispatch: replace the stub at
- [x] **Step 3** — Inbound `RPC.partial` dispatch: replace the stub at
`GossipRouter.kt:476` with the full flow (validate caps, create/update
group state, call `onIncomingRpc`).
- [ ] **Step 4** — Outbound `publishPartial(...)` on the `Gossip` facade;
- [x] **Step 4** — Outbound `publishPartial(...)` on the `Gossip` facade;
route through `GossipRpcPartsQueue` (do **not** bypass — PR #433 got
this wrong). Enforce the "omit `partialMessage` when peer supports but
didn't request" MUST.
- [ ] **Step 5** — End-to-end integration test with a trivial bitmap-based
- [x] **Step 5** — End-to-end integration test with a trivial bitmap-based
handler. Exercises Steps 1-4 before any routing changes.
- [ ] **Step 6** — Routing: full-message suppression (§5.1).
- [x] **Step 6** — Routing: full-message suppression (§5.1).
- [ ] **Step 7** — Routing: IDONTWANT suppression (§5.2).
- [ ] **Step 8** — Heartbeat tick + TTL GC + cleanup hooks (§6.4).
- [ ] **Step 9** — Routing: IHAVE replacement with `onEmitGossip` (§5.3).
Expand Down
42 changes: 38 additions & 4 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,22 @@ abstract class AbstractRouter(
override fun onPeerActive(peer: PeerHandler) {
val partsQueue = pendingRpcParts.getQueue(peer)
subscribedTopics.forEach {
partsQueue.addSubscribe(it)
enqueueSubscribe(partsQueue, it)
}
flushPending(peer)
}

/**
* Enqueues a subscribe announcement for [topic] onto [partsQueue].
*
* The default implementation emits a bare subscribe with no per-topic options.
* Subclasses (e.g. GossipRouter) override this to attach per-topic options
* such as partial-message flags.
*/
protected open fun enqueueSubscribe(partsQueue: RpcPartsQueue, topic: Topic) {
partsQueue.addSubscribe(topic)
}

protected open fun notifyMalformedMessage(peer: PeerHandler) {}
protected open fun notifyUnseenMessage(peer: PeerHandler, msg: PubsubMessage) {}
protected open fun notifyNonSubscribedMessage(peer: PeerHandler, msg: Rpc.Message) {}
Expand All @@ -172,7 +183,17 @@ abstract class AbstractRouter(
}

try {
val subscriptions = msg.subscriptionsList.map { PubsubSubscription(it.topicid, it.subscribe) }
val subscriptions = msg.subscriptionsList.map {
// Per partial-messages spec: flags MUST be ignored on subscribe=false, and the
// receiving side coerces supportsSendingPartial := requestsPartial || supportsSendingPartial.
// The coercion rule is also applied on the outbound side by GossipRouter.
PubsubSubscription(
topic = it.topicid,
subscribe = it.subscribe,
requestsPartial = it.subscribe && it.requestsPartial,
supportsSendingPartial = it.subscribe && (it.supportsSendingPartial || it.requestsPartial)
)
}
subscriptionFilter
.filterIncomingSubscriptions(subscriptions, peersTopics.getByFirst(peer))
.forEach { handleMessageSubscriptions(peer, it) }
Expand Down Expand Up @@ -301,7 +322,20 @@ abstract class AbstractRouter(
}
}

private fun handleMessageSubscriptions(peer: PeerHandler, msg: PubsubSubscription) {
/**
* Applies a single filtered inbound subscription to the router's state.
*
* Called once per `SubOpts` on the pubsub event loop, after
* [SubscriptionFilter.filterIncomingSubscriptions] has run. Subclasses may
* override to react to subscription state changes (for example, to track
* per-topic capability flags). Overrides MUST call `super` so that
* [peersTopics] stays in sync.
*
* [msg] carries the protocol-level flags already normalised by the caller:
* for `subscribe=false` frames, extension flags are zeroed before reaching
* this method.
*/
protected open fun handleMessageSubscriptions(peer: PeerHandler, msg: PubsubSubscription) {
if (msg.subscribe) {
peersTopics.add(peer, msg.topic)
} else {
Expand All @@ -319,7 +353,7 @@ abstract class AbstractRouter(
}

protected open fun subscribe(topic: Topic) {
activePeers.forEach { pendingRpcParts.getQueue(it).addSubscribe(topic) }
activePeers.forEach { enqueueSubscribe(pendingRpcParts.getQueue(it), topic) }
subscribedTopics += topic
}

Expand Down
7 changes: 6 additions & 1 deletion libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ typealias Topic = String
typealias MessageId = WBytes
typealias PubsubMessageFactory = (Rpc.Message) -> PubsubMessage

data class PubsubSubscription(val topic: Topic, val subscribe: Boolean)
data class PubsubSubscription(
val topic: Topic,
val subscribe: Boolean,
val requestsPartial: Boolean = false,
val supportsSendingPartial: Boolean = false
)

interface PubsubMessage {
val protobufMessage: Rpc.Message
Expand Down
41 changes: 32 additions & 9 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/RpcPartsQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,23 @@ interface RpcPartsQueue {
fun addPublish(message: Rpc.Message)

fun addSubscribe(topic: Topic) {
addSubscription(topic, SubscriptionStatus.Subscribed)
addSubscribe(topic, requestsPartial = false, supportsSendingPartial = false)
}

fun addSubscribe(topic: Topic, requestsPartial: Boolean, supportsSendingPartial: Boolean) {
addSubscription(topic, SubscriptionStatus.Subscribed, requestsPartial, supportsSendingPartial)
}

fun addUnsubscribe(topic: Topic) {
addSubscription(topic, SubscriptionStatus.Unsubscribed)
addSubscription(topic, SubscriptionStatus.Unsubscribed, requestsPartial = false, supportsSendingPartial = false)
}

fun addSubscription(topic: Topic, status: SubscriptionStatus)
fun addSubscription(
topic: Topic,
status: SubscriptionStatus,
requestsPartial: Boolean,
supportsSendingPartial: Boolean
)

fun takeMerged(): List<Rpc.RPC>
}
Expand All @@ -38,11 +47,20 @@ open class DefaultRpcPartsQueue : RpcPartsQueue {
}
}

protected data class SubscriptionPart(val topic: Topic, val status: RpcPartsQueue.SubscriptionStatus) : AbstractPart {
protected data class SubscriptionPart(
val topic: Topic,
val status: RpcPartsQueue.SubscriptionStatus,
val requestsPartial: Boolean = false,
val supportsSendingPartial: Boolean = false
) : AbstractPart {
override fun appendToBuilder(builder: Rpc.RPC.Builder) {
builder.addSubscriptionsBuilder().apply {
setTopicid(topic)
setSubscribe(status == RpcPartsQueue.SubscriptionStatus.Subscribed)
val subBuilder = builder.addSubscriptionsBuilder()
subBuilder.topicid = topic
subBuilder.subscribe = status == RpcPartsQueue.SubscriptionStatus.Subscribed
// Per spec: partial flags MUST NOT be sent on unsubscribe (subscribe=false).
if (status == RpcPartsQueue.SubscriptionStatus.Subscribed) {
if (requestsPartial) subBuilder.requestsPartial = true
if (supportsSendingPartial) subBuilder.supportsSendingPartial = true
}
}
}
Expand All @@ -57,8 +75,13 @@ open class DefaultRpcPartsQueue : RpcPartsQueue {
addPart(PublishPart(message))
}

override fun addSubscription(topic: Topic, status: RpcPartsQueue.SubscriptionStatus) {
addPart(SubscriptionPart(topic, status))
override fun addSubscription(
topic: Topic,
status: RpcPartsQueue.SubscriptionStatus,
requestsPartial: Boolean,
supportsSendingPartial: Boolean
) {
addPart(SubscriptionPart(topic, status, requestsPartial, supportsSendingPartial))
}

override fun takeMerged(): List<Rpc.RPC> {
Expand Down
16 changes: 16 additions & 0 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import io.libp2p.core.multistream.ProtocolDescriptor
import io.libp2p.core.pubsub.PubsubApi
import io.libp2p.pubsub.PubsubApiImpl
import io.libp2p.pubsub.PubsubProtocol
import io.libp2p.pubsub.Topic
import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder
import io.libp2p.pubsub.gossip.partialmessages.PublishActionsFn
import io.netty.channel.ChannelHandler
import org.slf4j.LoggerFactory
import java.util.concurrent.CompletableFuture
Expand All @@ -32,6 +34,20 @@ class Gossip @JvmOverloads constructor(
return router.score.getCachedScore(peerId)
}

/**
* Queues outbound [pubsub.pb.Rpc.PartialMessagesExtension] RPCs for [topic]/[groupId]
* by invoking the client's [actionsFn] on the current group state.
*
* Submits to the pubsub event thread; the returned future completes when the RPCs
* have been enqueued and flushed.
*/
fun publishPartial(
topic: Topic,
groupId: ByteArray,
actionsFn: PublishActionsFn<*>
): CompletableFuture<Unit> =
router.submitOnEventThread { router.publishPartial(topic, groupId, actionsFn) }

override val protocolDescriptor =
when (router.protocol) {
PubsubProtocol.Gossip_V_1_3 -> {
Expand Down
101 changes: 95 additions & 6 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import io.libp2p.core.pubsub.ValidationResult
import io.libp2p.etc.types.*
import io.libp2p.etc.util.P2PService
import io.libp2p.pubsub.*
import io.libp2p.pubsub.gossip.partialmessages.PartialMessagesAdapter
import io.libp2p.pubsub.gossip.partialmessages.PublishActionsFn
import io.libp2p.pubsub.gossip.partialmessages.toGroupId
import org.slf4j.LoggerFactory
import pubsub.pb.Rpc
import java.time.Duration
Expand Down Expand Up @@ -134,6 +137,56 @@ open class GossipRouter(
override val pendingRpcParts = PendingRpcPartsMap<GossipRpcPartsQueue> { DefaultGossipRpcPartsQueue(params) }

val gossipExtensionsState = GossipExtensionsState(gossipExtensionsConfig)
val partialSubscriptionState = PartialSubscriptionState()
internal var partialMessages: PartialMessagesAdapter? = null

/**
* Local per-topic subscription options that affect outbound subscribe announcements.
* Accessed only on the pubsub event loop.
*/
private val localTopicPartialFlags: MutableMap<Topic, PartialSubFlags> = mutableMapOf()

/**
* Configures the partial-messages flags advertised on this node's subscribe
* announcements for [topic]. Must be called before [subscribe] for the flags
* to take effect on the initial announcement; a subsequent call will affect
* later re-announcements (e.g. on new peer activation).
*
* Per spec, the send-side also applies the coercion
* `supportsSendingPartial := requestsPartial || supportsSendingPartial`.
*/
fun setTopicPartialFlags(topic: Topic, requestsPartial: Boolean, supportsSendingPartial: Boolean) {
runOnEventThread {
val coerced = PartialSubFlags.coerce(requestsPartial, supportsSendingPartial)
if (coerced == PartialSubFlags.NONE) {
localTopicPartialFlags -= topic
} else {
localTopicPartialFlags[topic] = coerced
}
}
}

/**
* Queues outbound [pubsub.pb.Rpc.PartialMessagesExtension] RPCs for [topic]/[groupId]
* by invoking the client's [actionsFn] on the current group state.
*
* Must be called on the pubsub event thread.
*/
fun publishPartial(topic: Topic, groupId: ByteArray, actionsFn: PublishActionsFn<*>) {
val adapter = partialMessages ?: return
val gid = groupId.toGroupId()

fun peerRequestsPartial(peerId: PeerId) =
partialSubscriptionState.peerRequestsPartial(topic, peerId)

fun enqueue(peerId: PeerId, partialMessage: ByteArray?, partsMetadata: ByteArray?) {
val peerHandler = activePeers.find { it.peerId == peerId } ?: return
pendingRpcParts.getQueue(peerHandler).addPartialMessage(topic, groupId, partialMessage, partsMetadata)
}

adapter.publishPartial(topic, gid, actionsFn, ::peerRequestsPartial, ::enqueue)
flushAllPending()
}

private fun setBackOff(peer: PeerHandler, topic: Topic) = setBackOff(peer, topic, params.pruneBackoff.toMillis())
private fun setBackOff(peer: PeerHandler, topic: Topic, delay: Long) {
Expand Down Expand Up @@ -161,9 +214,28 @@ open class GossipRouter(
acceptRequestsWhitelist -= peer
pendingRpcParts.popQueue(peer) // discard them
gossipExtensionsState.onPeerDisconnected(peer.peerId)
partialSubscriptionState.onPeerDisconnected(peer.peerId)
super.onPeerDisconnected(peer)
}

override fun enqueueSubscribe(partsQueue: RpcPartsQueue, topic: Topic) {
val flags = localTopicPartialFlags[topic] ?: PartialSubFlags.NONE
partsQueue.addSubscribe(topic, flags.requestsPartial, flags.supportsSendingPartial)
}

override fun handleMessageSubscriptions(peer: PeerHandler, msg: PubsubSubscription) {
super.handleMessageSubscriptions(peer, msg)
if (msg.subscribe) {
partialSubscriptionState.setPeerFlags(
msg.topic,
peer.peerId,
PartialSubFlags(msg.requestsPartial, msg.supportsSendingPartial)
)
} else {
partialSubscriptionState.removePeerFlags(msg.topic, peer.peerId)
}
}

override fun onPeerActive(peer: PeerHandler) {
super.onPeerActive(peer)
eventBroadcaster.notifyConnected(peer.peerId, peer.getRemoteAddress())
Expand Down Expand Up @@ -477,12 +549,19 @@ open class GossipRouter(
partialMessagesExtension: Rpc.PartialMessagesExtension,
receivedFrom: PeerHandler
) {
logger.trace(
"Processing partial message extension message {} from {}",
partialMessagesExtension.toString(),
receivedFrom.peerId
)
// TODO: implement partial message handling (https://github.com/libp2p/jvm-libp2p/issues/435)
val topic = partialMessagesExtension.topicID
if (!partialMessagesExtension.hasTopicID() || topic.isEmpty()) {
logger.debug("Dropping partial message from {}: missing topicID", receivedFrom.peerId)
return
}

if (!partialMessagesExtension.hasGroupID() || partialMessagesExtension.groupID.isEmpty) {
logger.debug("Dropping partial message from {}: missing groupID", receivedFrom.peerId)
return
}

logger.trace("Processing partial message extension for topic {} from {}", topic, receivedFrom.peerId)
partialMessages?.onIncomingRpc(topic, receivedFrom.peerId, partialMessagesExtension)
}

override fun broadcastInbound(msgs: List<PubsubMessage>, receivedFrom: PeerHandler) {
Expand All @@ -500,6 +579,7 @@ open class GossipRouter(
.plus(peersFromMesh)
.distinct()
.minus(receivedFrom)
.filterNot { peerRequestsPartialForMessage(it, pubMsg.topics) }
.filterNot { peerDoesNotWantMessage(it, pubMsg.messageId) }
.forEach { submitPublishMessage(it, pubMsg) }
mCache += pubMsg
Expand All @@ -524,6 +604,7 @@ open class GossipRouter(
return if (peers.isNotEmpty()) {
iDontWant(msg)
val publishedMessages = peers
.filterNot { peerRequestsPartialForMessage(it, msg.topics) }
.filterNot { peerDoesNotWantMessage(it, msg.messageId) }
.map { submitPublishMessage(it, msg) }
if (publishedMessages.isEmpty()) {
Expand Down Expand Up @@ -615,6 +696,8 @@ open class GossipRouter(
super.unsubscribe(topic)
mesh[topic]?.copy()?.forEach { prune(it, topic) }
mesh -= topic
localTopicPartialFlags -= topic
partialSubscriptionState.removeTopic(topic)
}

private fun catchingHeartbeat() {
Expand Down Expand Up @@ -751,6 +834,12 @@ open class GossipRouter(
}
}

private fun peerRequestsPartialForMessage(peer: PeerHandler, topics: Collection<Topic>): Boolean {
if (!gossipExtensionsState.partialMessagesEnabled()) return false
if (!gossipExtensionsState.peerSupportsPartialMessages(peer.peerId)) return false
return topics.any { partialSubscriptionState.peerRequestsPartial(it, peer.peerId) }
}

private fun peerDoesNotWantMessage(peer: PeerHandler, messageId: MessageId): Boolean {
return peerIDontWant[peer]?.messageIdsAndTimeReceived?.contains(messageId) == true
}
Expand Down
Loading
Loading