diff --git a/Dockerfile.dapper b/Dockerfile.dapper index 3af1612b7..33b04eb43 100644 --- a/Dockerfile.dapper +++ b/Dockerfile.dapper @@ -46,8 +46,8 @@ RUN echo "Cloning longhorn/dep-versions SRC_BRANCH=${SRC_BRANCH} SRC_TAG=${SRC_T echo "dep-versions commit: $(git rev-parse HEAD)" # Build spdk -RUN export REPO_OVERRIDE="" && \ - export COMMIT_ID_OVERRIDE="" && \ +RUN export REPO_OVERRIDE="https://github.com/derekbit/spdk.git" && \ + export COMMIT_ID_OVERRIDE="02e30236f46f9fa606faa34daffbe630bd0761b2" && \ bash /usr/src/dep-versions/scripts/build-spdk.sh "${REPO_OVERRIDE}" "${COMMIT_ID_OVERRIDE}" "${ARCH}" # Build libjson-c-devel diff --git a/pkg/spdk/disk/types_test.go b/pkg/spdk/disk/types_test.go index a0bf831d7..7ce80dd9f 100644 --- a/pkg/spdk/disk/types_test.go +++ b/pkg/spdk/disk/types_test.go @@ -12,7 +12,7 @@ type TestSuite struct{} var _ = Suite(&TestSuite{}) -func (s *TestSuite) TestIsVfioPci(c *C) { +func (s *TestSuite) xxTestIsVfioPci(c *C) { // Add test case for isVfioPci function testCases := []struct { name string @@ -47,7 +47,7 @@ func (s *TestSuite) TestIsVfioPci(c *C) { } } -func (s *TestSuite) TestIsUioPciGeneric(c *C) { +func (s *TestSuite) xxTestIsUioPciGeneric(c *C) { // Add test case for isUioPciGeneric function testCases := []struct { name string diff --git a/pkg/spdk/engine.go b/pkg/spdk/engine.go index 9f4f32cff..c086b8402 100644 --- a/pkg/spdk/engine.go +++ b/pkg/spdk/engine.go @@ -1874,26 +1874,33 @@ func (e *Engine) ReplicaAdd(spdkClient *spdkclient.Client, dstReplicaName, dstRe } // Ask the source replica to expose the newly created snapshot if the source replica and destination replica are not on the same node. + e.log.Infof("Asking source replica %s to expose snapshot %s for rebuilding destination replica %s", srcReplicaName, snapshotName, dstReplicaName) externalSnapshotAddress, err := srcReplicaServiceCli.ReplicaRebuildingSrcStart(srcReplicaName, dstReplicaName, dstReplicaAddress, snapshotName) if err != nil { return err } + e.log.Infof("Asking destination replica %s to start rebuilding from source replica %s exposed snapshot %s", dstReplicaName, srcReplicaName, snapshotName) // The destination replica attaches the source replica exposed snapshot as the external snapshot then create a head based on it. dstHeadLvolAddress, err := dstReplicaServiceCli.ReplicaRebuildingDstStart(dstReplicaName, srcReplicaName, srcReplicaAddress, snapshotName, externalSnapshotAddress, rebuildingSnapshotList) if err != nil { return err } + logrus.Infof("Engine connecting rebuilding replica %s head lvol %s", dstReplicaName, dstHeadLvolAddress) + // Add rebuilding replica head bdev to the base bdev list of the RAID bdev dstHeadLvolBdevName, err := connectNVMfBdev(spdkClient, dstReplicaName, dstHeadLvolAddress, e.ctrlrLossTimeout, e.fastIOFailTimeoutSec) if err != nil { return err } + + logrus.Infof("Engine adding rebuilding replica %s head bdev %s to the base bdev list for engine %s", dstReplicaName, dstHeadLvolBdevName, e.Name) if _, err := spdkClient.BdevRaidGrowBaseBdev(e.Name, dstHeadLvolBdevName); err != nil { return errors.Wrapf(err, "failed to adding the rebuilding replica %s head bdev %s to the base bdev list for engine %s", dstReplicaName, dstHeadLvolBdevName, e.Name) } + logrus.Infof("Engine connected rebuilding replica %s head bdev %s to the RAID bdev %s successfully", dstReplicaName, dstHeadLvolBdevName, e.Name) e.ReplicaStatusMap[dstReplicaName] = &EngineReplicaStatus{ Address: dstReplicaAddress, Mode: types.ModeWO, diff --git a/pkg/spdk/engine_test.go b/pkg/spdk/engine_test.go index 016161859..bd2245bba 100644 --- a/pkg/spdk/engine_test.go +++ b/pkg/spdk/engine_test.go @@ -12,7 +12,7 @@ import ( . "gopkg.in/check.v1" ) -func (s *TestSuite) TestcheckInitiatorAndTargetCreationRequirementsForNvmeTcpFrontend(c *C) { +func (s *TestSuite) xxTestcheckInitiatorAndTargetCreationRequirementsForNvmeTcpFrontend(c *C) { testCases := []struct { name string podIP string @@ -123,7 +123,7 @@ func (s *TestSuite) TestcheckInitiatorAndTargetCreationRequirementsForNvmeTcpFro } } -func (s *TestSuite) TestIsNewEngine(c *C) { +func (s *TestSuite) xxTestIsNewEngine(c *C) { testCases := []struct { name string engine *Engine @@ -182,7 +182,7 @@ func (s *TestSuite) TestIsNewEngine(c *C) { } } -func (s *TestSuite) TestReleaseTargetAndStandbyTargetPorts(c *C) { +func (s *TestSuite) xxTestReleaseTargetAndStandbyTargetPorts(c *C) { testCases := []struct { name string engine *Engine diff --git a/pkg/spdk/replica.go b/pkg/spdk/replica.go index 83385ff6f..796ca5858 100644 --- a/pkg/spdk/replica.go +++ b/pkg/spdk/replica.go @@ -2820,6 +2820,7 @@ func (r *Replica) RebuildingDstStart(spdkClient *spdkclient.Client, srcReplicaNa } externalSnapshotLvolName := GetReplicaSnapshotLvolName(srcReplicaName, externalSnapshotName) + r.log.Infof("RebuildingDstStart: connecting NVMf bdev for external snapshot lvol %s from src replica %s at address %s", externalSnapshotLvolName, srcReplicaName, srcReplicaAddress) externalSnapshotBdevName, err := connectNVMfBdev(spdkClient, externalSnapshotLvolName, externalSnapshotAddress, replicaCtrlrLossTimeoutSec, replicaFastIOFailTimeoutSec) if err != nil { @@ -2838,10 +2839,13 @@ func (r *Replica) RebuildingDstStart(spdkClient *spdkclient.Client, srcReplicaNa } } + r.log.Infof("RebuildingDstStart: replica %v isExposed %v", r.Name, r.IsExposed) if r.IsExposed { + r.log.Infof("RebuildingDstStart: stopping exposing bdev %v", r.Name) if err := spdkClient.StopExposeBdev(helpertypes.GetNQN(r.Name)); err != nil { return "", err } + r.log.Infof("RebuildingDstStart: stopped exposing bdev %v", r.Name) r.IsExposed = false } // TODO: Uncomment below code after the RAID delta bitmap feature is ready @@ -2853,19 +2857,25 @@ func (r *Replica) RebuildingDstStart(spdkClient *spdkclient.Client, srcReplicaNa // r.log.WithError(err).Warnf("Failed to rename the previous head lvol %s to %s for dst replica %v rebuilding start, will try to remove it instead", r.Head.Alias, expiredLvolName, r.Name) // } //} + + r.log.Infof("RebuildingDstStart: deleting bdev %v", r.Alias) if _, err := spdkClient.BdevLvolDelete(r.Alias); err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { return "", err } + r.log.Infof("RebuildingDstStart: deleted bdev %v", r.Alias) + // Retain the backing image in the active chain. All unverified lvols should be removed first. r.Head = nil r.ActiveChain = []*Lvol{r.ActiveChain[0]} // Create a new head lvol based on the external src snapshot lvol then + r.log.Infof("RebuildingDstStart: cloning bdev %v", r.rebuildingDstCache.externalSnapshotBdevName) headLvolUUID, err := spdkClient.BdevLvolCloneBdev(r.rebuildingDstCache.externalSnapshotBdevName, r.LvsName, r.Name) if err != nil { return "", err } + r.log.Infof("RebuildingDstStart: cloned bdev %v", r.rebuildingDstCache.externalSnapshotBdevName) headBdevLvol, err := spdkClient.BdevLvolGetByName(headLvolUUID, 0) if err != nil { return "", err @@ -2874,9 +2884,11 @@ func (r *Replica) RebuildingDstStart(spdkClient *spdkclient.Client, srcReplicaNa r.ActiveChain = append(r.ActiveChain, r.Head) nguid := commonutils.RandomID(nvmeNguidLength) + r.log.Infof("RebuildingDstStart: starting to expose bdev %v with nguid %s on port %d", r.Name, nguid, r.rebuildingDstCache.rebuildingPort) if err := spdkClient.StartExposeBdev(helpertypes.GetNQN(r.Name), r.Head.UUID, nguid, r.IP, strconv.Itoa(int(r.PortStart))); err != nil { return "", err } + r.log.Infof("RebuildingDstStart: started to expose bdev %v with nguid %s on port %d", r.Name, nguid, r.rebuildingDstCache.rebuildingPort) r.IsExposed = true dstHeadLvolAddress := net.JoinHostPort(r.IP, strconv.Itoa(int(r.PortStart))) @@ -3395,6 +3407,7 @@ func (r *Replica) RebuildingDstShallowCopyStart(spdkClient *spdkclient.Client, s if !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { return err } + r.log.Infof("Replica did not find an existing snapshot lvol %s hence will do a full shallow copy for snapshot %s", dstSnapLvolName, snapshotName) // Directly start a shallow copy when there is no existing snapshot lvol return srcReplicaServiceCli.ReplicaRebuildingSrcShallowCopyStart(r.rebuildingDstCache.srcReplicaName, snapshotName, dstRebuildingLvolAddress) } diff --git a/pkg/spdk/server.go b/pkg/spdk/server.go index e77f389df..afb0acda9 100644 --- a/pkg/spdk/server.go +++ b/pkg/spdk/server.go @@ -1008,6 +1008,7 @@ func (s *Server) ReplicaRebuildingDstStart(ctx context.Context, req *spdkrpc.Rep for _, snapshot := range req.RebuildingSnapshotList { rebuildingSnapshotList = append(rebuildingSnapshotList, api.ProtoLvolToLvol(snapshot)) } + address, err := r.RebuildingDstStart(spdkClient, req.SrcReplicaName, req.SrcReplicaAddress, req.ExternalSnapshotName, req.ExternalSnapshotAddress, rebuildingSnapshotList) if err != nil { return nil, err diff --git a/pkg/spdk/util_test.go b/pkg/spdk/util_test.go index a06e1cf59..7f36f8432 100644 --- a/pkg/spdk/util_test.go +++ b/pkg/spdk/util_test.go @@ -12,7 +12,7 @@ type TestSuite struct{} var _ = Suite(&TestSuite{}) -func (s *TestSuite) TestSplitHostPort(c *C) { +func (s *TestSuite) xxTestSplitHostPort(c *C) { type testCase struct { address string expectedHost string @@ -55,7 +55,7 @@ func (s *TestSuite) TestSplitHostPort(c *C) { } } -func (s *TestSuite) TestExtractBackingImageAndDiskUUID(c *C) { +func (s *TestSuite) xxTestExtractBackingImageAndDiskUUID(c *C) { type testCase struct { lvolName string expectedBIName string diff --git a/pkg/spdk_test.go b/pkg/spdk_test.go index 3dd2857d8..73e87989b 100644 --- a/pkg/spdk_test.go +++ b/pkg/spdk_test.go @@ -240,7 +240,7 @@ func (s *TestSuite) TestSPDKMultipleThread(c *C) { c.Assert(err, IsNil) }() - concurrentCount := 5 + concurrentCount := 10 dataCountInMB := 100 wg := sync.WaitGroup{} wg.Add(concurrentCount) @@ -454,11 +454,15 @@ func (s *TestSuite) TestSPDKMultipleThread(c *C) { c.Assert(replica3.Head.CreationTime, Not(Equals), "") c.Assert(replica3.Head.Parent, Equals, "") + logrus.Infof("Debug ====> Add Replica %s", replicaName3) err = spdkCli.EngineReplicaAdd(engineName, replicaName3, net.JoinHostPort(ip, strconv.Itoa(int(replica3.PortStart)))) c.Assert(err, IsNil) + logrus.Infof("Debug ====> Wait for Replica %s rebuilding complete", replicaName3) WaitForReplicaRebuildingComplete(c, spdkCli, engineName, replicaName3) + logrus.Infof("Debug ====> Check data integrity after Replica %s rebuilding", replicaName3) + snapshotNameRebuild := "" for replicaName := range replicaAddressMap { replica, err := spdkCli.ReplicaGet(replicaName) @@ -1610,12 +1614,12 @@ func (s *TestSuite) spdkMultipleThreadSnapshotOpsAndRebuilding(c *C, withBacking } } -func (s *TestSuite) TestSPDKMultipleThreadSnapshotOpsAndRebuildingWithoutBackingImage(c *C) { +func (s *TestSuite) xxTestSPDKMultipleThreadSnapshotOpsAndRebuildingWithoutBackingImage(c *C) { fmt.Println("Testing SPDK snapshot operations with multiple threads without backing image") s.spdkMultipleThreadSnapshotOpsAndRebuilding(c, false) } -func (s *TestSuite) TestSPDKMultipleThreadSnapshotOpsAndRebuildingWithBackingImage(c *C) { +func (s *TestSuite) xxTestSPDKMultipleThreadSnapshotOpsAndRebuildingWithBackingImage(c *C) { fmt.Println("Testing SPDK snapshot operations with multiple threads with backing image") s.spdkMultipleThreadSnapshotOpsAndRebuilding(c, true) } @@ -2311,12 +2315,12 @@ func (s *TestSuite) spdkMultipleThreadFastRebuilding(c *C, withBackingImage bool } } -func (s *TestSuite) TestSPDKMultipleThreadFastRebuildingWithoutBackingImage(c *C) { +func (s *TestSuite) xxTestSPDKMultipleThreadFastRebuildingWithoutBackingImage(c *C) { fmt.Println("Testing SPDK fast rebuilding with multiple threads with backing image") s.spdkMultipleThreadFastRebuilding(c, false) } -func (s *TestSuite) TestSPDKMultipleThreadFastRebuildingWithBackingImage(c *C) { +func (s *TestSuite) xxTestSPDKMultipleThreadFastRebuildingWithBackingImage(c *C) { fmt.Println("Testing SPDK fast rebuilding with multiple threads without backing image") s.spdkMultipleThreadFastRebuilding(c, true) } @@ -2558,7 +2562,7 @@ func WaitForReplicaRebuildingCompleteTimeout(c *C, spdkCli *client.SPDKClient, e c.Assert(complete, Equals, true) } -func (s *TestSuite) TestSPDKEngineOnlyWithTarget(c *C) { +func (s *TestSuite) xxTestSPDKEngineOnlyWithTarget(c *C) { fmt.Println("Testing SPDK basic operations with engine only with target") diskDriverName := "aio" diff --git a/pkg/util/util.go b/pkg/util/util.go index e89a9466e..4214e2693 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -9,6 +9,7 @@ import ( "os/exec" "regexp" "strings" + "sync" "syscall" "time" @@ -128,36 +129,60 @@ func stopSPDKTgtDaemon(timeout time.Duration, signal syscall.Signal) error { return errors.Wrap(err, "failed to find spdk_tgt") } - var errs error + var ( + errs error + wg sync.WaitGroup + ) + for _, process := range processes { - logrus.Infof("Sending signal %v to spdk_tgt %v", signal, process.Pid) - if err := process.Signal(signal); err != nil { - errs = multierr.Append(errs, errors.Wrapf(err, "failed to send signal %v to spdk_tgt %v", signal, process.Pid)) - } else { + wg.Add(1) + + go func(p *os.Process) { + defer wg.Done() + + logrus.Infof("Sending signal %v to spdk_tgt %v", signal, p.Pid) + if err := p.Signal(signal); err != nil { + errs = multierr.Append(errs, errors.Wrapf(err, "failed to send signal %v to spdk_tgt %v", signal, p.Pid)) + return + } + done := make(chan error, 1) go func() { - _, err := process.Wait() + _, err := p.Wait() done <- err close(done) }() select { case <-time.After(timeout): - logrus.Warnf("spdk_tgt %v failed to exit in time, sending signal %v", process.Pid, signal) - err = process.Signal(signal) - if err != nil { - errs = multierr.Append(errs, errors.Wrapf(err, "failed to send signal %v to spdk_tgt %v", signal, process.Pid)) + logrus.Warnf("spdk_tgt %v failed to exit in time, sending signal %v", p.Pid, signal) + if err := p.Signal(signal); err != nil { + errs = multierr.Append(errs, errors.Wrapf(err, "failed to send signal %v to spdk_tgt %v", signal, p.Pid)) } case err := <-done: if err != nil { - errs = multierr.Append(errs, errors.Wrapf(err, "spdk_tgt %v exited with error", process.Pid)) + errs = multierr.Append(errs, errors.Wrapf(err, "spdk_tgt %v exited with error", p.Pid)) } else { - logrus.Infof("spdk_tgt %v exited successfully", process.Pid) + logrus.Infof("spdk_tgt %v exited successfully", p.Pid) } } - } + + for { + exists, err := proc.IsProcessExists(p.Pid) + if err != nil { + logrus.Errorf("Failed to check if spdk_tgt %v exists: %v", p.Pid, err) + break + } + if !exists { + logrus.Infof("Confirmed spdk_tgt %v is gone", p.Pid) + break + } + time.Sleep(1 * time.Second) + } + }(process) } + wg.Wait() return errs } diff --git a/scripts/test b/scripts/test index e51080e28..ee0c2d914 100755 --- a/scripts/test +++ b/scripts/test @@ -49,4 +49,7 @@ PACKAGES="$(find . -name '*.go' -print0 | xargs -0 -I{} dirname {} | cut -f2 -d trap "rm -f /tmp/test-disk" EXIT -go test -v -p 1 -race -cover ${PACKAGES} -coverprofile=coverage.out -timeout 180m +echo 'core.%p' > /proc/sys/kernel/core_pattern + + +go test -v -p 1 -race -cover ${PACKAGES} -coverprofile=coverage.out -timeout 480m -count=100 \ No newline at end of file diff --git a/scripts/validate b/scripts/validate index 5620acb35..384321739 100755 --- a/scripts/validate +++ b/scripts/validate @@ -8,11 +8,11 @@ echo Running go validation PACKAGES="$(find . -name '*.go' -print0 | xargs -0 -I{} dirname {} | cut -f2 -d/ | sort -u | grep -Ev '(^\.$|.git|.trash-cache|vendor|bin)' | sed -e 's!^!./!' -e 's!$!/...!')" echo Packages: ${PACKAGES} -echo Running: go vet -go vet ${PACKAGES} +# echo Running: go vet +# go vet ${PACKAGES} -echo "Running: golangci-lint" -golangci-lint run --timeout=5m +# echo "Running: golangci-lint" +# golangci-lint run --timeout=5m -echo Running: go fmt -test -z "$(go fmt ${PACKAGES} | tee /dev/stderr)" +# echo Running: go fmt +# test -z "$(go fmt ${PACKAGES} | tee /dev/stderr)" diff --git a/vendor/github.com/longhorn/go-common-libs/proc/proc.go b/vendor/github.com/longhorn/go-common-libs/proc/proc.go index 581b7ec5e..4550ce21c 100644 --- a/vendor/github.com/longhorn/go-common-libs/proc/proc.go +++ b/vendor/github.com/longhorn/go-common-libs/proc/proc.go @@ -6,6 +6,7 @@ import ( "path/filepath" "strconv" "strings" + "syscall" "github.com/c9s/goprocinfo/linux" "github.com/mitchellh/go-ps" @@ -177,6 +178,29 @@ func FindProcessByName(name string) (*os.Process, error) { return nil, fmt.Errorf("process %s is not found", name) } +func IsProcessExists(pid int) (bool, error) { + _, err := os.FindProcess(pid) + if err != nil { + return false, errors.Wrapf(err, "failed to find process %v", pid) + } + + // Sending signal 0 to check process existence. + err = syscall.Kill(pid, 0) + if err != nil { + if err == syscall.ESRCH { + // No such process + return false, nil + } + if err == syscall.EPERM { + // Process exists, but no permission to signal it + return true, nil + } + return false, errors.Wrapf(err, "unexpected error while signaling process %v", pid) + } + + return true, nil +} + // FindProcessByCmdline finds the processes with matching cmdline func FindProcessByCmdline(cmdline string) ([]*os.Process, error) { processes, err := ps.Processes() diff --git a/vendor/github.com/longhorn/go-spdk-helper/pkg/jsonrpc/client.go b/vendor/github.com/longhorn/go-spdk-helper/pkg/jsonrpc/client.go index f48d3bd8c..f61d52717 100644 --- a/vendor/github.com/longhorn/go-spdk-helper/pkg/jsonrpc/client.go +++ b/vendor/github.com/longhorn/go-spdk-helper/pkg/jsonrpc/client.go @@ -160,7 +160,7 @@ func (c *Client) handleSend(msgWrapper *messageWrapper) { id := c.idCounter if err := c.encoder.Encode(NewMessage(id, msgWrapper.method, msgWrapper.params)); err != nil { - logrus.WithError(err).Errorf("Failed to encode during handleSend") + logrus.WithError(err).Errorf("Failed to encode during handleSend method %v params %v", msgWrapper.method, msgWrapper.params) // In case of the cached error info of the old encoder fails the following response, it's better to recreate the encoder. c.encoder = json.NewEncoder(c.conn)