Skip to content
Draft

WIP #402

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Dockerfile.dapper
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ RUN echo "Cloning longhorn/dep-versions SRC_BRANCH=${SRC_BRANCH} SRC_TAG=${SRC_T
echo "dep-versions commit: $(git rev-parse HEAD)"

# Build spdk
RUN export REPO_OVERRIDE="" && \
export COMMIT_ID_OVERRIDE="" && \
RUN export REPO_OVERRIDE="https://github.com/derekbit/spdk.git" && \
export COMMIT_ID_OVERRIDE="02e30236f46f9fa606faa34daffbe630bd0761b2" && \
bash /usr/src/dep-versions/scripts/build-spdk.sh "${REPO_OVERRIDE}" "${COMMIT_ID_OVERRIDE}" "${ARCH}"

# Build libjson-c-devel
Expand Down
4 changes: 2 additions & 2 deletions pkg/spdk/disk/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type TestSuite struct{}

var _ = Suite(&TestSuite{})

func (s *TestSuite) TestIsVfioPci(c *C) {
func (s *TestSuite) xxTestIsVfioPci(c *C) {
// Add test case for isVfioPci function
testCases := []struct {
name string
Expand Down Expand Up @@ -47,7 +47,7 @@ func (s *TestSuite) TestIsVfioPci(c *C) {
}
}

func (s *TestSuite) TestIsUioPciGeneric(c *C) {
func (s *TestSuite) xxTestIsUioPciGeneric(c *C) {
// Add test case for isUioPciGeneric function
testCases := []struct {
name string
Expand Down
7 changes: 7 additions & 0 deletions pkg/spdk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1874,26 +1874,33 @@ func (e *Engine) ReplicaAdd(spdkClient *spdkclient.Client, dstReplicaName, dstRe
}

// Ask the source replica to expose the newly created snapshot if the source replica and destination replica are not on the same node.
e.log.Infof("Asking source replica %s to expose snapshot %s for rebuilding destination replica %s", srcReplicaName, snapshotName, dstReplicaName)
externalSnapshotAddress, err := srcReplicaServiceCli.ReplicaRebuildingSrcStart(srcReplicaName, dstReplicaName, dstReplicaAddress, snapshotName)
if err != nil {
return err
}

e.log.Infof("Asking destination replica %s to start rebuilding from source replica %s exposed snapshot %s", dstReplicaName, srcReplicaName, snapshotName)
// The destination replica attaches the source replica exposed snapshot as the external snapshot then create a head based on it.
dstHeadLvolAddress, err := dstReplicaServiceCli.ReplicaRebuildingDstStart(dstReplicaName, srcReplicaName, srcReplicaAddress, snapshotName, externalSnapshotAddress, rebuildingSnapshotList)
if err != nil {
return err
}

logrus.Infof("Engine connecting rebuilding replica %s head lvol %s", dstReplicaName, dstHeadLvolAddress)

// Add rebuilding replica head bdev to the base bdev list of the RAID bdev
dstHeadLvolBdevName, err := connectNVMfBdev(spdkClient, dstReplicaName, dstHeadLvolAddress, e.ctrlrLossTimeout, e.fastIOFailTimeoutSec)
if err != nil {
return err
}

logrus.Infof("Engine adding rebuilding replica %s head bdev %s to the base bdev list for engine %s", dstReplicaName, dstHeadLvolBdevName, e.Name)
if _, err := spdkClient.BdevRaidGrowBaseBdev(e.Name, dstHeadLvolBdevName); err != nil {
return errors.Wrapf(err, "failed to adding the rebuilding replica %s head bdev %s to the base bdev list for engine %s", dstReplicaName, dstHeadLvolBdevName, e.Name)
}

logrus.Infof("Engine connected rebuilding replica %s head bdev %s to the RAID bdev %s successfully", dstReplicaName, dstHeadLvolBdevName, e.Name)
e.ReplicaStatusMap[dstReplicaName] = &EngineReplicaStatus{
Address: dstReplicaAddress,
Mode: types.ModeWO,
Expand Down
6 changes: 3 additions & 3 deletions pkg/spdk/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
. "gopkg.in/check.v1"
)

func (s *TestSuite) TestcheckInitiatorAndTargetCreationRequirementsForNvmeTcpFrontend(c *C) {
func (s *TestSuite) xxTestcheckInitiatorAndTargetCreationRequirementsForNvmeTcpFrontend(c *C) {
testCases := []struct {
name string
podIP string
Expand Down Expand Up @@ -123,7 +123,7 @@ func (s *TestSuite) TestcheckInitiatorAndTargetCreationRequirementsForNvmeTcpFro
}
}

func (s *TestSuite) TestIsNewEngine(c *C) {
func (s *TestSuite) xxTestIsNewEngine(c *C) {
testCases := []struct {
name string
engine *Engine
Expand Down Expand Up @@ -182,7 +182,7 @@ func (s *TestSuite) TestIsNewEngine(c *C) {
}
}

func (s *TestSuite) TestReleaseTargetAndStandbyTargetPorts(c *C) {
func (s *TestSuite) xxTestReleaseTargetAndStandbyTargetPorts(c *C) {
testCases := []struct {
name string
engine *Engine
Expand Down
13 changes: 13 additions & 0 deletions pkg/spdk/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2820,6 +2820,7 @@ func (r *Replica) RebuildingDstStart(spdkClient *spdkclient.Client, srcReplicaNa
}

externalSnapshotLvolName := GetReplicaSnapshotLvolName(srcReplicaName, externalSnapshotName)
r.log.Infof("RebuildingDstStart: connecting NVMf bdev for external snapshot lvol %s from src replica %s at address %s", externalSnapshotLvolName, srcReplicaName, srcReplicaAddress)
externalSnapshotBdevName, err := connectNVMfBdev(spdkClient, externalSnapshotLvolName, externalSnapshotAddress,
replicaCtrlrLossTimeoutSec, replicaFastIOFailTimeoutSec)
if err != nil {
Expand All @@ -2838,10 +2839,13 @@ func (r *Replica) RebuildingDstStart(spdkClient *spdkclient.Client, srcReplicaNa
}
}

r.log.Infof("RebuildingDstStart: replica %v isExposed %v", r.Name, r.IsExposed)
if r.IsExposed {
r.log.Infof("RebuildingDstStart: stopping exposing bdev %v", r.Name)
if err := spdkClient.StopExposeBdev(helpertypes.GetNQN(r.Name)); err != nil {
return "", err
}
r.log.Infof("RebuildingDstStart: stopped exposing bdev %v", r.Name)
r.IsExposed = false
}
// TODO: Uncomment below code after the RAID delta bitmap feature is ready
Expand All @@ -2853,19 +2857,25 @@ func (r *Replica) RebuildingDstStart(spdkClient *spdkclient.Client, srcReplicaNa
// r.log.WithError(err).Warnf("Failed to rename the previous head lvol %s to %s for dst replica %v rebuilding start, will try to remove it instead", r.Head.Alias, expiredLvolName, r.Name)
// }
//}

r.log.Infof("RebuildingDstStart: deleting bdev %v", r.Alias)
if _, err := spdkClient.BdevLvolDelete(r.Alias); err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) {
return "", err
}

r.log.Infof("RebuildingDstStart: deleted bdev %v", r.Alias)

// Retain the backing image in the active chain. All unverified lvols should be removed first.
r.Head = nil
r.ActiveChain = []*Lvol{r.ActiveChain[0]}

// Create a new head lvol based on the external src snapshot lvol then
r.log.Infof("RebuildingDstStart: cloning bdev %v", r.rebuildingDstCache.externalSnapshotBdevName)
headLvolUUID, err := spdkClient.BdevLvolCloneBdev(r.rebuildingDstCache.externalSnapshotBdevName, r.LvsName, r.Name)
if err != nil {
return "", err
}
r.log.Infof("RebuildingDstStart: cloned bdev %v", r.rebuildingDstCache.externalSnapshotBdevName)
headBdevLvol, err := spdkClient.BdevLvolGetByName(headLvolUUID, 0)
if err != nil {
return "", err
Expand All @@ -2874,9 +2884,11 @@ func (r *Replica) RebuildingDstStart(spdkClient *spdkclient.Client, srcReplicaNa
r.ActiveChain = append(r.ActiveChain, r.Head)

nguid := commonutils.RandomID(nvmeNguidLength)
r.log.Infof("RebuildingDstStart: starting to expose bdev %v with nguid %s on port %d", r.Name, nguid, r.rebuildingDstCache.rebuildingPort)
if err := spdkClient.StartExposeBdev(helpertypes.GetNQN(r.Name), r.Head.UUID, nguid, r.IP, strconv.Itoa(int(r.PortStart))); err != nil {
return "", err
}
r.log.Infof("RebuildingDstStart: started to expose bdev %v with nguid %s on port %d", r.Name, nguid, r.rebuildingDstCache.rebuildingPort)
r.IsExposed = true
dstHeadLvolAddress := net.JoinHostPort(r.IP, strconv.Itoa(int(r.PortStart)))

Expand Down Expand Up @@ -3395,6 +3407,7 @@ func (r *Replica) RebuildingDstShallowCopyStart(spdkClient *spdkclient.Client, s
if !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) {
return err
}
r.log.Infof("Replica did not find an existing snapshot lvol %s hence will do a full shallow copy for snapshot %s", dstSnapLvolName, snapshotName)
// Directly start a shallow copy when there is no existing snapshot lvol
return srcReplicaServiceCli.ReplicaRebuildingSrcShallowCopyStart(r.rebuildingDstCache.srcReplicaName, snapshotName, dstRebuildingLvolAddress)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/spdk/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,7 @@ func (s *Server) ReplicaRebuildingDstStart(ctx context.Context, req *spdkrpc.Rep
for _, snapshot := range req.RebuildingSnapshotList {
rebuildingSnapshotList = append(rebuildingSnapshotList, api.ProtoLvolToLvol(snapshot))
}

address, err := r.RebuildingDstStart(spdkClient, req.SrcReplicaName, req.SrcReplicaAddress, req.ExternalSnapshotName, req.ExternalSnapshotAddress, rebuildingSnapshotList)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/spdk/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type TestSuite struct{}

var _ = Suite(&TestSuite{})

func (s *TestSuite) TestSplitHostPort(c *C) {
func (s *TestSuite) xxTestSplitHostPort(c *C) {
type testCase struct {
address string
expectedHost string
Expand Down Expand Up @@ -55,7 +55,7 @@ func (s *TestSuite) TestSplitHostPort(c *C) {
}
}

func (s *TestSuite) TestExtractBackingImageAndDiskUUID(c *C) {
func (s *TestSuite) xxTestExtractBackingImageAndDiskUUID(c *C) {
type testCase struct {
lvolName string
expectedBIName string
Expand Down
16 changes: 10 additions & 6 deletions pkg/spdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (s *TestSuite) TestSPDKMultipleThread(c *C) {
c.Assert(err, IsNil)
}()

concurrentCount := 5
concurrentCount := 10
dataCountInMB := 100
wg := sync.WaitGroup{}
wg.Add(concurrentCount)
Expand Down Expand Up @@ -454,11 +454,15 @@ func (s *TestSuite) TestSPDKMultipleThread(c *C) {
c.Assert(replica3.Head.CreationTime, Not(Equals), "")
c.Assert(replica3.Head.Parent, Equals, "")

logrus.Infof("Debug ====> Add Replica %s", replicaName3)
err = spdkCli.EngineReplicaAdd(engineName, replicaName3, net.JoinHostPort(ip, strconv.Itoa(int(replica3.PortStart))))
c.Assert(err, IsNil)

logrus.Infof("Debug ====> Wait for Replica %s rebuilding complete", replicaName3)
WaitForReplicaRebuildingComplete(c, spdkCli, engineName, replicaName3)

logrus.Infof("Debug ====> Check data integrity after Replica %s rebuilding", replicaName3)

snapshotNameRebuild := ""
for replicaName := range replicaAddressMap {
replica, err := spdkCli.ReplicaGet(replicaName)
Expand Down Expand Up @@ -1610,12 +1614,12 @@ func (s *TestSuite) spdkMultipleThreadSnapshotOpsAndRebuilding(c *C, withBacking
}
}

func (s *TestSuite) TestSPDKMultipleThreadSnapshotOpsAndRebuildingWithoutBackingImage(c *C) {
func (s *TestSuite) xxTestSPDKMultipleThreadSnapshotOpsAndRebuildingWithoutBackingImage(c *C) {
fmt.Println("Testing SPDK snapshot operations with multiple threads without backing image")
s.spdkMultipleThreadSnapshotOpsAndRebuilding(c, false)
}

func (s *TestSuite) TestSPDKMultipleThreadSnapshotOpsAndRebuildingWithBackingImage(c *C) {
func (s *TestSuite) xxTestSPDKMultipleThreadSnapshotOpsAndRebuildingWithBackingImage(c *C) {
fmt.Println("Testing SPDK snapshot operations with multiple threads with backing image")
s.spdkMultipleThreadSnapshotOpsAndRebuilding(c, true)
}
Expand Down Expand Up @@ -2311,12 +2315,12 @@ func (s *TestSuite) spdkMultipleThreadFastRebuilding(c *C, withBackingImage bool
}
}

func (s *TestSuite) TestSPDKMultipleThreadFastRebuildingWithoutBackingImage(c *C) {
func (s *TestSuite) xxTestSPDKMultipleThreadFastRebuildingWithoutBackingImage(c *C) {
fmt.Println("Testing SPDK fast rebuilding with multiple threads with backing image")
s.spdkMultipleThreadFastRebuilding(c, false)
}

func (s *TestSuite) TestSPDKMultipleThreadFastRebuildingWithBackingImage(c *C) {
func (s *TestSuite) xxTestSPDKMultipleThreadFastRebuildingWithBackingImage(c *C) {
fmt.Println("Testing SPDK fast rebuilding with multiple threads without backing image")
s.spdkMultipleThreadFastRebuilding(c, true)
}
Expand Down Expand Up @@ -2558,7 +2562,7 @@ func WaitForReplicaRebuildingCompleteTimeout(c *C, spdkCli *client.SPDKClient, e
c.Assert(complete, Equals, true)
}

func (s *TestSuite) TestSPDKEngineOnlyWithTarget(c *C) {
func (s *TestSuite) xxTestSPDKEngineOnlyWithTarget(c *C) {
fmt.Println("Testing SPDK basic operations with engine only with target")

diskDriverName := "aio"
Expand Down
51 changes: 38 additions & 13 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os/exec"
"regexp"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -128,36 +129,60 @@ func stopSPDKTgtDaemon(timeout time.Duration, signal syscall.Signal) error {
return errors.Wrap(err, "failed to find spdk_tgt")
}

var errs error
var (
errs error
wg sync.WaitGroup
)

for _, process := range processes {
logrus.Infof("Sending signal %v to spdk_tgt %v", signal, process.Pid)
if err := process.Signal(signal); err != nil {
errs = multierr.Append(errs, errors.Wrapf(err, "failed to send signal %v to spdk_tgt %v", signal, process.Pid))
} else {
wg.Add(1)

go func(p *os.Process) {
defer wg.Done()

logrus.Infof("Sending signal %v to spdk_tgt %v", signal, p.Pid)
if err := p.Signal(signal); err != nil {
errs = multierr.Append(errs, errors.Wrapf(err, "failed to send signal %v to spdk_tgt %v", signal, p.Pid))
return
}

done := make(chan error, 1)
go func() {
_, err := process.Wait()
_, err := p.Wait()
done <- err
close(done)
}()

select {
case <-time.After(timeout):
logrus.Warnf("spdk_tgt %v failed to exit in time, sending signal %v", process.Pid, signal)
err = process.Signal(signal)
if err != nil {
errs = multierr.Append(errs, errors.Wrapf(err, "failed to send signal %v to spdk_tgt %v", signal, process.Pid))
logrus.Warnf("spdk_tgt %v failed to exit in time, sending signal %v", p.Pid, signal)
if err := p.Signal(signal); err != nil {
errs = multierr.Append(errs, errors.Wrapf(err, "failed to send signal %v to spdk_tgt %v", signal, p.Pid))
}
case err := <-done:
if err != nil {
errs = multierr.Append(errs, errors.Wrapf(err, "spdk_tgt %v exited with error", process.Pid))
errs = multierr.Append(errs, errors.Wrapf(err, "spdk_tgt %v exited with error", p.Pid))
} else {
logrus.Infof("spdk_tgt %v exited successfully", process.Pid)
logrus.Infof("spdk_tgt %v exited successfully", p.Pid)
}
}
}

for {
exists, err := proc.IsProcessExists(p.Pid)
if err != nil {
logrus.Errorf("Failed to check if spdk_tgt %v exists: %v", p.Pid, err)
break
}
if !exists {
logrus.Infof("Confirmed spdk_tgt %v is gone", p.Pid)
break
}
time.Sleep(1 * time.Second)
}
}(process)
}

wg.Wait()
return errs
}

Expand Down
5 changes: 4 additions & 1 deletion scripts/test
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,7 @@ PACKAGES="$(find . -name '*.go' -print0 | xargs -0 -I{} dirname {} | cut -f2 -d

trap "rm -f /tmp/test-disk" EXIT

go test -v -p 1 -race -cover ${PACKAGES} -coverprofile=coverage.out -timeout 180m
echo 'core.%p' > /proc/sys/kernel/core_pattern


go test -v -p 1 -race -cover ${PACKAGES} -coverprofile=coverage.out -timeout 480m -count=100
12 changes: 6 additions & 6 deletions scripts/validate
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ echo Running go validation
PACKAGES="$(find . -name '*.go' -print0 | xargs -0 -I{} dirname {} | cut -f2 -d/ | sort -u | grep -Ev '(^\.$|.git|.trash-cache|vendor|bin)' | sed -e 's!^!./!' -e 's!$!/...!')"
echo Packages: ${PACKAGES}

echo Running: go vet
go vet ${PACKAGES}
# echo Running: go vet
# go vet ${PACKAGES}

echo "Running: golangci-lint"
golangci-lint run --timeout=5m
# echo "Running: golangci-lint"
# golangci-lint run --timeout=5m

echo Running: go fmt
test -z "$(go fmt ${PACKAGES} | tee /dev/stderr)"
# echo Running: go fmt
# test -z "$(go fmt ${PACKAGES} | tee /dev/stderr)"
24 changes: 24 additions & 0 deletions vendor/github.com/longhorn/go-common-libs/proc/proc.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading