Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions felix/bpf/ut/attach_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func runAttachTest(t *testing.T, ipv6Enabled bool) {
bpfEpMgr.OnUpdate(linux.NewIfaceStateUpdate("workloadep0", ifacemonitor.StateUp, workload0.Attrs().Index))
}
bpfEpMgr.OnUpdate(linux.NewIfaceAddrsUpdate("hostep1", "1.2.3.4"))
bpfEpMgr.OnUpdate(&proto.HostMetadataUpdate{Hostname: "uthost", Ipv4Addr: "1.2.3.4"})
bpfEpMgr.OnUpdate(&proto.HostMetadataV4V6Update{Hostname: "uthost", Ipv4Addr: "1.2.3.4"})
err = bpfEpMgr.CompleteDeferredWork()
Expect(err).NotTo(HaveOccurred())

Expand Down Expand Up @@ -188,7 +188,7 @@ func runAttachTest(t *testing.T, ipv6Enabled bool) {
if ipv6Enabled {
// IPv6 address update
bpfEpMgr.OnUpdate(linux.NewIfaceAddrsUpdate("hostep1", "1::4"))
bpfEpMgr.OnUpdate(&proto.HostMetadataV6Update{Hostname: "uthost", Ipv6Addr: "1::4"})
bpfEpMgr.OnUpdate(&proto.HostMetadataV4V6Update{Hostname: "uthost", Ipv6Addr: "1::4"})
err = bpfEpMgr.CompleteDeferredWork()
Expect(err).NotTo(HaveOccurred())
Expect(programs.Count()).To(Equal(54))
Expand Down Expand Up @@ -610,7 +610,7 @@ func runAttachTest(t *testing.T, ipv6Enabled bool) {
Expect(err).NotTo(HaveOccurred())
Expect(attached2).To(Equal(attached))

bpfEpMgr.OnUpdate(&proto.HostMetadataUpdate{Hostname: "uthost", Ipv4Addr: "1.2.3.4"})
bpfEpMgr.OnUpdate(&proto.HostMetadataV4V6Update{Hostname: "uthost", Ipv4Addr: "1.2.3.4"})
bpfEpMgr.OnUpdate(linux.NewIfaceStateUpdate("workloadep2", ifacemonitor.StateUp, workload2.Attrs().Index))
bpfEpMgr.OnUpdate(linux.NewIfaceAddrsUpdate("workloadep2", "1.6.6.1"))
bpfEpMgr.OnUpdate(linux.NewIfaceStateUpdate("hostep2", ifacemonitor.StateUp, host2.Attrs().Index))
Expand Down Expand Up @@ -685,7 +685,7 @@ func runAttachTest(t *testing.T, ipv6Enabled bool) {
pm := jumpMapDump(commonMaps.JumpMap)
Expect(pm).To(HaveLen(0))

bpfEpMgr.OnUpdate(&proto.HostMetadataUpdate{Hostname: "uthost", Ipv4Addr: "1.2.3.4"})
bpfEpMgr.OnUpdate(&proto.HostMetadataV4V6Update{Hostname: "uthost", Ipv4Addr: "1.2.3.4"})
bpfEpMgr.OnUpdate(linux.NewIfaceStateUpdate("workloadep2", ifacemonitor.StateUp, workload2.Attrs().Index))
bpfEpMgr.OnUpdate(linux.NewIfaceAddrsUpdate("workloadep2", "1.6.6.1"))
bpfEpMgr.OnUpdate(linux.NewIfaceStateUpdate("hostep2", ifacemonitor.StateUp, host2.Attrs().Index))
Expand Down Expand Up @@ -742,7 +742,7 @@ func TestAttachWithMultipleWorkloadUpdate(t *testing.T) {
workload1 := createVethName("workloadep1")
defer deleteLink(workload1)

bpfEpMgr.OnUpdate(&proto.HostMetadataUpdate{Hostname: "uthost", Ipv4Addr: "1.2.3.4"})
bpfEpMgr.OnUpdate(&proto.HostMetadataV4V6Update{Hostname: "uthost", Ipv4Addr: "1.2.3.4"})
bpfEpMgr.OnUpdate(linux.NewIfaceStateUpdate("workloadep1", ifacemonitor.StateUp, workload1.Attrs().Index))
bpfEpMgr.OnUpdate(linux.NewIfaceAddrsUpdate("workloadep1", "1.6.6.6"))
bpfEpMgr.OnUpdate(&proto.WorkloadEndpointUpdate{
Expand Down Expand Up @@ -919,7 +919,7 @@ func TestRepeatedAttach(t *testing.T) {
regexp.MustCompile("^workloadep[123]"),
)
Expect(err).NotTo(HaveOccurred())
bpfEpMgr.OnUpdate(&proto.HostMetadataUpdate{Hostname: "uthost", Ipv4Addr: "1.2.3.4"})
bpfEpMgr.OnUpdate(&proto.HostMetadataV4V6Update{Hostname: "uthost", Ipv4Addr: "1.2.3.4"})
bpfEpMgr.OnUpdate(linux.NewIfaceStateUpdate("workloadep1", ifacemonitor.StateUp, iface.Attrs().Index))
bpfEpMgr.OnUpdate(linux.NewIfaceAddrsUpdate("workloadep1", "1.6.6.6"))
bpfEpMgr.OnUpdate(&proto.WorkloadEndpointUpdate{
Expand Down Expand Up @@ -1143,7 +1143,7 @@ func TestAttachInterfaceRecreate(t *testing.T) {
}
}()

bpfEpMgr.OnUpdate(&proto.HostMetadataUpdate{Hostname: "uthost", Ipv4Addr: "1.2.3.4"})
bpfEpMgr.OnUpdate(&proto.HostMetadataV4V6Update{Hostname: "uthost", Ipv4Addr: "1.2.3.4"})
bpfEpMgr.OnUpdate(linux.NewIfaceStateUpdate("workloadep0", ifacemonitor.StateUp, workload0.Attrs().Index))
bpfEpMgr.OnUpdate(linux.NewIfaceAddrsUpdate("workloadep0", "1.6.6.6"))
bpfEpMgr.OnUpdate(&proto.WorkloadEndpointUpdate{
Expand Down Expand Up @@ -1234,7 +1234,7 @@ func TestAttachTcx(t *testing.T) {
workload0 := createVethName("workloadep0")
defer deleteLink(workload0)

bpfEpMgr.OnUpdate(&proto.HostMetadataUpdate{Hostname: "uthost", Ipv4Addr: "1.2.3.4"})
bpfEpMgr.OnUpdate(&proto.HostMetadataV4V6Update{Hostname: "uthost", Ipv4Addr: "1.2.3.4"})
bpfEpMgr.OnUpdate(linux.NewIfaceStateUpdate("workloadep0", ifacemonitor.StateUp, workload0.Attrs().Index))
bpfEpMgr.OnUpdate(linux.NewIfaceAddrsUpdate("workloadep0", "1.6.6.6"))
bpfEpMgr.OnUpdate(&proto.WorkloadEndpointUpdate{
Expand Down Expand Up @@ -1352,7 +1352,7 @@ func TestLogFilters(t *testing.T) {
bpfEpMgr.OnUpdate(linux.NewIfaceStateUpdate("hostep1", ifacemonitor.StateUp, host1.Attrs().Index))
bpfEpMgr.OnUpdate(linux.NewIfaceStateUpdate("workloadep0", ifacemonitor.StateUp, workload0.Attrs().Index))
bpfEpMgr.OnUpdate(linux.NewIfaceAddrsUpdate("hostep1", "1.2.3.4"))
bpfEpMgr.OnUpdate(&proto.HostMetadataUpdate{Hostname: "uthost", Ipv4Addr: "1.2.3.4"})
bpfEpMgr.OnUpdate(&proto.HostMetadataV4V6Update{Hostname: "uthost", Ipv4Addr: "1.2.3.4"})
err = bpfEpMgr.CompleteDeferredWork()
Expect(err).NotTo(HaveOccurred())

Expand Down Expand Up @@ -1382,7 +1382,7 @@ func TestLogFilters(t *testing.T) {
bpfEpMgr.OnUpdate(linux.NewIfaceStateUpdate("hostep1", ifacemonitor.StateUp, host1.Attrs().Index))
bpfEpMgr.OnUpdate(linux.NewIfaceStateUpdate("workloadep0", ifacemonitor.StateUp, workload0.Attrs().Index))
bpfEpMgr.OnUpdate(linux.NewIfaceAddrsUpdate("hostep1", "1.2.3.4"))
bpfEpMgr.OnUpdate(&proto.HostMetadataUpdate{Hostname: "uthost", Ipv4Addr: "1.2.3.4"})
bpfEpMgr.OnUpdate(&proto.HostMetadataV4V6Update{Hostname: "uthost", Ipv4Addr: "1.2.3.4"})
err = bpfEpMgr.CompleteDeferredWork()
Expect(err).NotTo(HaveOccurred())

Expand Down
4 changes: 0 additions & 4 deletions felix/calc/calc_graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ type encapCallbacks interface {
}

type passthruCallbacks interface {
OnHostIPUpdate(hostname string, ip *net.IP)
OnHostIPRemove(hostname string)
OnHostIPv6Update(hostname string, ip *net.IP)
OnHostIPv6Remove(hostname string)
OnHostMetadataUpdate(hostname string, ip4 *net.IPNet, ip6 *net.IPNet, asnumber string, labels map[string]string)
OnHostMetadataRemove(hostname string)
OnIPPoolUpdate(model.IPPoolKey, *model.IPPool)
Expand Down
123 changes: 0 additions & 123 deletions felix/calc/calc_graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/projectcalico/calico/felix/config"
extdataplane "github.com/projectcalico/calico/felix/dataplane/external"
"github.com/projectcalico/calico/felix/dataplane/mock"
"github.com/projectcalico/calico/felix/dispatcher"
"github.com/projectcalico/calico/felix/proto"
"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
"github.com/projectcalico/calico/libcalico-go/lib/backend/model"
Expand Down Expand Up @@ -70,8 +69,6 @@ var _ = DescribeTable("Calculation graph pass-through tests",
switch messageReceived.(type) {
case *proto.IPAMPoolUpdate:
Expect(googleproto.Equal(messageReceived.(*proto.IPAMPoolUpdate), expUpdate.(*proto.IPAMPoolUpdate))).To(BeTrue())
case *proto.HostMetadataUpdate:
Expect(googleproto.Equal(messageReceived.(*proto.HostMetadataUpdate), expUpdate.(*proto.HostMetadataUpdate))).To(BeTrue())
case *proto.GlobalBGPConfigUpdate:
Expect(googleproto.Equal(messageReceived.(*proto.GlobalBGPConfigUpdate), expUpdate.(*proto.GlobalBGPConfigUpdate))).To(BeTrue())
case *proto.WireguardEndpointUpdate:
Expand All @@ -96,8 +93,6 @@ var _ = DescribeTable("Calculation graph pass-through tests",
switch messageReceived.(type) {
case *proto.IPAMPoolRemove:
Expect(googleproto.Equal(messageReceived.(*proto.IPAMPoolRemove), expRemove.(*proto.IPAMPoolRemove))).To(BeTrue())
case *proto.HostMetadataRemove:
Expect(googleproto.Equal(messageReceived.(*proto.HostMetadataRemove), expRemove.(*proto.HostMetadataRemove))).To(BeTrue())
case *proto.GlobalBGPConfigUpdate:
Expect(googleproto.Equal(messageReceived.(*proto.GlobalBGPConfigUpdate), expRemove.(*proto.GlobalBGPConfigUpdate))).To(BeTrue())
case *proto.WireguardEndpointRemove:
Expand Down Expand Up @@ -139,16 +134,6 @@ var _ = DescribeTable("Calculation graph pass-through tests",
&proto.IPAMPoolRemove{
Id: "10.0.0.0-16",
}),
Entry("HostIP",
model.HostIPKey{Hostname: "foo"},
&testIP,
&proto.HostMetadataUpdate{
Hostname: "foo",
Ipv4Addr: "10.0.0.1",
},
&proto.HostMetadataRemove{
Hostname: "foo",
}),
Entry("Global BGPConfiguration",
model.ResourceKey{Kind: v3.KindBGPConfiguration, Name: "default"},
&v3.BGPConfiguration{
Expand Down Expand Up @@ -227,114 +212,6 @@ var _ = DescribeTable("Calculation graph pass-through tests",
),
)

var _ = Describe("Host IP duplicate squashing test", func() {
var eb *EventSequencer
var messagesReceived []interface{}
var cg *dispatcher.Dispatcher

BeforeEach(func() {
// Create a calculation graph/event buffer combo.
eb = NewEventSequencer(nil)
messagesReceived = nil
eb.Callback = func(message interface{}) {
log.WithField("message", message).Info("Received message")
messagesReceived = append(messagesReceived, message)
}
conf := config.New()
lookupsCache := NewLookupsCache()
conf.FelixHostname = "hostname"
cg = NewCalculationGraph(eb, lookupsCache, conf, func() {}).AllUpdDispatcher
})

It("should coalesce duplicate updates", func() {
cg.OnUpdate(api.Update{
UpdateType: api.UpdateTypeKVNew,
KVPair: model.KVPair{
Key: model.HostIPKey{Hostname: "foo"},
Value: &testIPAs6,
},
})
eb.Flush()
cg.OnUpdate(api.Update{
UpdateType: api.UpdateTypeKVNew,
KVPair: model.KVPair{
Key: model.HostIPKey{Hostname: "foo"},
Value: &testIPAs4,
},
})
eb.Flush()
Expect(messagesReceived).To(HaveLen(1))
Expect(googleproto.Equal(messagesReceived[0].(*proto.HostMetadataUpdate), &proto.HostMetadataUpdate{
Hostname: "foo",
Ipv4Addr: "10.0.0.1",
})).To(BeTrue())
})
It("should pass on genuine changes", func() {
cg.OnUpdate(api.Update{
UpdateType: api.UpdateTypeKVNew,
KVPair: model.KVPair{
Key: model.HostIPKey{Hostname: "foo"},
Value: &testIPAs6,
},
})
eb.Flush()
cg.OnUpdate(api.Update{
UpdateType: api.UpdateTypeKVNew,
KVPair: model.KVPair{
Key: model.HostIPKey{Hostname: "foo"},
Value: &testIP2,
},
})
eb.Flush()
Expect(messagesReceived).To(HaveLen(2))
Expect(googleproto.Equal(messagesReceived[0].(*proto.HostMetadataUpdate), &proto.HostMetadataUpdate{
Hostname: "foo",
Ipv4Addr: "10.0.0.1",
})).To(BeTrue())
Expect(googleproto.Equal(messagesReceived[1].(*proto.HostMetadataUpdate), &proto.HostMetadataUpdate{
Hostname: "foo",
Ipv4Addr: "10.0.0.2",
})).To(BeTrue())
})
It("should pass on delete and recreate", func() {
cg.OnUpdate(api.Update{
UpdateType: api.UpdateTypeKVNew,
KVPair: model.KVPair{
Key: model.HostIPKey{Hostname: "foo"},
Value: &testIPAs6,
},
})
eb.Flush()
cg.OnUpdate(api.Update{
UpdateType: api.UpdateTypeKVNew,
KVPair: model.KVPair{
Key: model.HostIPKey{Hostname: "foo"},
},
})
eb.Flush()
cg.OnUpdate(api.Update{
UpdateType: api.UpdateTypeKVNew,
KVPair: model.KVPair{
Key: model.HostIPKey{Hostname: "foo"},
Value: &testIPAs6,
},
})
eb.Flush()
Expect(messagesReceived).To(HaveLen(3))
Expect(googleproto.Equal(messagesReceived[0].(*proto.HostMetadataUpdate), &proto.HostMetadataUpdate{
Hostname: "foo",
Ipv4Addr: "10.0.0.1",
})).To(BeTrue())
Expect(googleproto.Equal(messagesReceived[1].(*proto.HostMetadataRemove), &proto.HostMetadataRemove{
Hostname: "foo",
})).To(BeTrue())
Expect(googleproto.Equal(messagesReceived[2].(*proto.HostMetadataUpdate), &proto.HostMetadataUpdate{
Hostname: "foo",
Ipv4Addr: "10.0.0.1",
})).To(BeTrue())
})
})

var _ = Describe("specific scenario tests", func() {
var validationFilter *ValidationFilter
var calcGraph *CalcGraph
Expand Down
74 changes: 9 additions & 65 deletions felix/calc/dataplane_passthru.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,49 +34,23 @@ import (
type DataplanePassthru struct {
ipv6Support bool
callbacks passthruCallbacks

hostIPs map[string]*net.IP
hostIPv6s map[string]*net.IP
}

func NewDataplanePassthru(callbacks passthruCallbacks, ipv6Support bool) *DataplanePassthru {
return &DataplanePassthru{
ipv6Support: ipv6Support,
callbacks: callbacks,
hostIPs: map[string]*net.IP{},
hostIPv6s: map[string]*net.IP{},
}
}

func (h *DataplanePassthru) RegisterWith(dispatcher *dispatcher.Dispatcher) {
dispatcher.Register(model.HostIPKey{}, h.OnUpdate)
dispatcher.Register(model.IPPoolKey{}, h.OnUpdate)
dispatcher.Register(model.WireguardKey{}, h.OnUpdate)
dispatcher.Register(model.ResourceKey{}, h.OnUpdate)
}

func (h *DataplanePassthru) OnUpdate(update api.Update) (filterOut bool) {
switch key := update.Key.(type) {
case model.HostIPKey:
hostname := key.Hostname
if update.Value == nil {
log.WithField("update", update).Debug("Passing-through HostIP deletion")
delete(h.hostIPs, hostname)
h.callbacks.OnHostIPRemove(hostname)
} else {
ip := update.Value.(*net.IP)
oldIP := h.hostIPs[hostname]
// libcalico-go's IP struct wraps a standard library IP struct. To
// compare two IPs, we need to unwrap them and use Equal() since standard
// library IPs have multiple, equivalent, representations.
if oldIP != nil && ip.Equal(oldIP.IP) {
log.WithField("update", update).Debug("Ignoring duplicate HostIP update")
return
}
log.WithField("update", update).Debug("Passing-through HostIP update")
h.hostIPs[hostname] = ip
h.callbacks.OnHostIPUpdate(hostname, ip)
}
case model.IPPoolKey:
if update.Value == nil {
log.WithField("update", update).Debug("Passing-through IPPool deletion")
Expand Down Expand Up @@ -108,15 +82,11 @@ func (h *DataplanePassthru) OnUpdate(update api.Update) (filterOut bool) {
h.callbacks.OnServiceUpdate(kubernetesServiceToProto(update.Value.(*kapiv1.Service)))
}
} else if key.Kind == libv3.KindNode {
// Handle node resource to pass-through HostMetadataV6Update/HostMetadataV6Remove messages
// with IPv6 node address updates. IPv4 updates are handled above my model.HostIPKey updates.
log.WithField("update", update).Debug("Passing-through a Node IPv6 address update")
// Handle node resource to pass-through HostMetadataV4V6Update/HostMetadataV4V6Remove messages
// with IPv4 and IPv6 node address updates.
log.WithField("update", update).Debug("Passing-through a Node update")
hostname := key.Name
if update.Value == nil {
log.WithField("update", update).Debug("Passing-through Node IPv6 address remove")
delete(h.hostIPv6s, hostname)
h.callbacks.OnHostIPv6Remove(hostname)
log.WithField("update", update).Debug("Passing-through Node remove")
h.callbacks.OnHostMetadataRemove(hostname)
} else {
Expand All @@ -140,49 +110,23 @@ func (h *DataplanePassthru) OnUpdate(update api.Update) (filterOut bool) {
asnumber = node.Spec.BGP.ASNumber.String()
}
}
h.callbacks.OnHostMetadataUpdate(hostname, bgpIp4net, bgpIp6net, asnumber, node.Labels)
if node.Spec.BGP != nil {
if node.Spec.BGP.IPv6Address != "" {
ip, _, _ := net.ParseCIDR(node.Spec.BGP.IPv6Address)
oldIP := h.hostIPv6s[hostname]
if oldIP != nil && ip.Equal(oldIP.IP) {
log.WithField("update", update).Debug("Ignoring duplicate Node IPv6 address update")
return
}
log.WithField("update", update).Debug("Passing-through Node IPv6 address update")
h.hostIPv6s[hostname] = ip
h.callbacks.OnHostIPv6Update(hostname, ip)
} else if h.hostIPv6s[hostname] != nil {
log.WithField("update", update).Debug("Passing-through Node IPv6 address remove")
delete(h.hostIPv6s, hostname)
h.callbacks.OnHostIPv6Remove(hostname)
}
}

// If BGP is turned off, try to get IPv6 from the node resource.
// This is a similar fallback as for IPv4, see how HostIPKey is generated in
// libcalico-go/lib/backend/syncersv1/updateprocessors/felixnodeprocessor.go
if h.ipv6Support && node.Spec.BGP == nil {
// BGP is turned off, try to get one from the node resource. This is a
// similar fallback as for IPv4, see how HostIPKey is generated in
// libcalico-go/lib/backend/syncersv1/updateprocessors/felixnodeprocessor.go
var ip *net.IP
ip, _ = cresources.FindNodeAddress(node, libv3.InternalIP, 6)
if ip == nil {
ip, _ = cresources.FindNodeAddress(node, libv3.ExternalIP, 6)
}
if ip != nil {
oldIP := h.hostIPv6s[hostname]
if oldIP != nil && ip.Equal(oldIP.IP) {
log.WithField("update", update).Debug("Ignoring duplicate Node IPv6 address update")
return
}
log.WithField("update", update).Debug("Passing-through Node IPv6 address update")
h.hostIPv6s[hostname] = ip
h.callbacks.OnHostIPv6Update(hostname, ip)
} else if h.hostIPv6s[hostname] != nil {
log.WithField("update", update).Debug("Passing-through Node IPv6 address remove")
delete(h.hostIPv6s, hostname)
h.callbacks.OnHostIPv6Remove(hostname)
bgpIp6net = &net.IPNet{}
bgpIp6net.IP = ip.IP
}
}

h.callbacks.OnHostMetadataUpdate(hostname, bgpIp4net, bgpIp6net, asnumber, node.Labels)
}
} else {
log.WithField("key", key).Debugf("Ignoring v3 resource of kind %s", key.Kind)
Expand Down
Loading