Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions cmd/pitr/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,8 @@ type BackupAzure struct {
}

const (
lastSetFilePrefix string = "last-binlog-set-" // filename prefix for object where the last binlog set will stored
gtidPostfix string = "-gtid-set" // filename postfix for files with GTID set
timelinePath string = "/tmp/pitr-timeline" // path to file with timeline
lastSetFilePrefix string = "last-binlog-set-" // filename prefix for object where the last binlog set will be stored
gtidPostfix string = "-gtid-set" // filename postfix for files with GTID set
)

func New(ctx context.Context, c Config) (*Collector, error) {
Expand Down Expand Up @@ -320,11 +319,12 @@ func (c *Collector) filterBinLogs(ctx context.Context, logs []pxc.Binlog, lastBi
}

func createGapFile(gtidSet pxc.GTIDSet) error {
p := "/tmp/gap-detected"
p := naming.GapDetected
f, err := os.Create(p)
if err != nil {
return errors.Wrapf(err, "create %s", p)
}
defer f.Close()

_, err = f.WriteString(gtidSet.Raw())
if err != nil {
Expand All @@ -346,10 +346,11 @@ func fileExists(name string) (bool, error) {
}

func createTimelineFile(firstTs string) error {
f, err := os.Create(timelinePath)
f, err := os.Create(naming.TimelinePath)
if err != nil {
return errors.Wrapf(err, "create %s", timelinePath)
return errors.Wrapf(err, "create %s", naming.TimelinePath)
}
defer f.Close()

_, err = f.WriteString(firstTs)
if err != nil {
Expand All @@ -360,9 +361,9 @@ func createTimelineFile(firstTs string) error {
}

func updateTimelineFile(lastTs string) error {
f, err := os.OpenFile(timelinePath, os.O_RDWR, 0o644)
f, err := os.OpenFile(naming.TimelinePath, os.O_RDWR, 0o644)
if err != nil {
return errors.Wrapf(err, "open %s", timelinePath)
return errors.Wrapf(err, "open %s", naming.TimelinePath)
}
defer f.Close()

Expand All @@ -373,7 +374,7 @@ func updateTimelineFile(lastTs string) error {
}

if err := scanner.Err(); err != nil {
return errors.Wrapf(err, "scan %s", timelinePath)
return errors.Wrapf(err, "scan %s", naming.TimelinePath)
}

if len(lines) > 1 {
Expand All @@ -383,11 +384,11 @@ func updateTimelineFile(lastTs string) error {
}

if _, err := f.Seek(0, 0); err != nil {
return errors.Wrapf(err, "seek %s", timelinePath)
return errors.Wrapf(err, "seek %s", naming.TimelinePath)
}

if err := f.Truncate(0); err != nil {
return errors.Wrapf(err, "truncate %s", timelinePath)
return errors.Wrapf(err, "truncate %s", naming.TimelinePath)
}

_, err = f.WriteString(strings.Join(lines, "\n"))
Expand Down Expand Up @@ -575,7 +576,7 @@ func (c *Collector) CollectBinLogs(ctx context.Context) error {
return nil
}

if exists, err := fileExists(timelinePath); !exists && err == nil {
if exists, err := fileExists(naming.TimelinePath); !exists && err == nil {
firstTs, err := c.db.GetBinLogFirstTimestamp(ctx, binlogList[0].Name)
if err != nil {
return errors.Wrap(err, "get first timestamp")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
apiVersion: pxc.percona.com/v1
kind: PerconaXtraDBClusterRestore
metadata:
name: restore-on-pitr-minio-gap-bs
spec:
pxcCluster: pitr-gap-errors
pitr:
type: latest
backupSource:
storageName: "minio-binlogs"
backupSource:
destination: #destination
s3:
bucket: operator-testing
credentialsSecret: minio-secret
endpointUrl: https://minio-service.#namespace:9000/
region: us-east-1
verifyTLS: false

16 changes: 16 additions & 0 deletions e2e-tests/pitr-gap-errors/run
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,21 @@ check_binlog_gap_restore() {
exit 1
fi
kubectl_bin delete -f "$test_dir/conf/${restore}.yaml"
elif [ "$type" == "backup-source" ]; then
local restore=restore-on-pitr-minio-gap-bs
destination=$(kubectl_bin get pxc-backup on-pitr-minio-gap -o jsonpath='{.status.destination}')
cat "${test_dir}/conf/${restore}.yaml" \
| $sed -e "s~#destination~${destination}~" \
| $sed -e "s~minio-service.#namespace~minio-service.${namespace}~" \
| kubectl_bin apply -f -

wait_backup_restore "${restore}" "Failed"
local backup_error=$(kubectl_bin get pxc-restore ${restore} -ojsonpath='{.status.comments}' | grep -c "Backup doesn't guarantee consistent recovery with PITR. Annotate PerconaXtraDBClusterRestore with percona.com/unsafe-pitr to force it.")
if [[ $backup_error -eq 0 ]]; then
echo "ERROR: Backup does not have pitr-not-ready file in the storage."
exit 1
fi
kubectl_bin delete -f "$test_dir/conf/${restore}.yaml"
elif [ "$type" == "force" ]; then
local restore=restore-on-pitr-minio-gap-force
kubectl_bin apply -f "$test_dir/conf/${restore}.yaml"
Expand Down Expand Up @@ -354,6 +369,7 @@ main() {
run_backup "$cluster" "on-pitr-minio-gap"
create_binlog_gap
check_binlog_gap_error
check_binlog_gap_restore "backup-source"
check_binlog_gap_restore "error"
check_binlog_gap_restore "force"
check_binlog_gap_restore "no-pitr"
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/pxc/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/percona/percona-xtradb-cluster-operator/pkg/pxc/app"
"github.com/percona/percona-xtradb-cluster-operator/pkg/pxc/app/statefulset"
"github.com/percona/percona-xtradb-cluster-operator/pkg/pxc/backup"
"github.com/percona/percona-xtradb-cluster-operator/pkg/pxc/backup/storage"
"github.com/percona/percona-xtradb-cluster-operator/pkg/util"
"github.com/percona/percona-xtradb-cluster-operator/pkg/version"
)
Expand Down Expand Up @@ -462,7 +463,7 @@ func (r *ReconcilePerconaXtraDBCluster) Reconcile(ctx context.Context, request r
return reconcile.Result{}, err
}

err = backup.CheckPITRErrors(ctx, r.client, r.clientcmd, o)
err = backup.CheckPITRErrors(ctx, r.client, r.clientcmd, o, storage.NewClient)
if err != nil {
return reconcile.Result{}, err
}
Expand Down
19 changes: 11 additions & 8 deletions pkg/controller/pxcrestore/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"github.com/pkg/errors"
batchv1 "k8s.io/api/batch/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
k8sretry "k8s.io/client-go/util/retry"
Expand Down Expand Up @@ -266,17 +264,22 @@ func (r *ReconcilePerconaXtraDBClusterRestore) reconcileStateNew(ctx context.Con
}

if cr.Spec.PITR != nil {
if err := backup.CheckPITRErrors(ctx, r.client, r.clientcmd, cluster); err != nil {
if err := backup.CheckPITRErrors(ctx, r.client, r.clientcmd, cluster, r.newStorageClientFunc); err != nil {
return reconcile.Result{}, err
}

annotations := cr.GetAnnotations()
_, unsafePITR := annotations[api.AnnotationUnsafePITR]
cond := meta.FindStatusCondition(bcp.Status.Conditions, api.BackupConditionPITRReady)
if cond != nil && cond.Status == metav1.ConditionFalse && !unsafePITR {
cr.Status.Comments = fmt.Sprintf("Backup doesn't guarantee consistent recovery with PITR. Annotate PerconaXtraDBClusterRestore with %s to force it.", api.AnnotationUnsafePITR)
cr.Status.State = api.RestoreFailed
return reconcile.Result{}, nil
if !unsafePITR {
ready, err := r.isPITRReady(ctx, cluster, bcp)
if err != nil {
return reconcile.Result{}, errors.Wrap(err, "is pitr ready")
}
if !ready {
cr.Status.Comments = fmt.Sprintf("Backup doesn't guarantee consistent recovery with PITR. Annotate PerconaXtraDBClusterRestore with %s to force it.", api.AnnotationUnsafePITR)
cr.Status.State = api.RestoreFailed
return reconcile.Result{}, nil
}
}
}

Expand Down
32 changes: 32 additions & 0 deletions pkg/controller/pxcrestore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ import (
"github.com/pkg/errors"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

api "github.com/percona/percona-xtradb-cluster-operator/pkg/apis/pxc/v1"
"github.com/percona/percona-xtradb-cluster-operator/pkg/naming"
"github.com/percona/percona-xtradb-cluster-operator/pkg/pxc/backup/storage"
)

func getBackup(ctx context.Context, cl client.Client, cr *api.PerconaXtraDBClusterRestore) (*api.PerconaXtraDBClusterBackup, error) {
Expand Down Expand Up @@ -95,3 +98,32 @@ func isJobFinished(checkJob *batchv1.Job) (bool, error) {
}
return false, nil
}

func (r *ReconcilePerconaXtraDBClusterRestore) isPITRReady(ctx context.Context, cluster *api.PerconaXtraDBCluster, bcp *api.PerconaXtraDBClusterBackup) (bool, error) {
cond := meta.FindStatusCondition(bcp.Status.Conditions, api.BackupConditionPITRReady)
if cond != nil && cond.Status == metav1.ConditionFalse {
return false, nil
}

opts, err := storage.GetOptionsFromBackup(ctx, r.client, cluster, bcp)
if err != nil {
return false, errors.Wrap(err, "failed to get storage options")
}

stg, err := r.newStorageClientFunc(ctx, opts)
if err != nil {
return false, errors.Wrap(err, "new storage")
}

filepath := bcp.Status.Destination.BackupName() + "." + naming.PITRNotReady
objReader, err := stg.GetObject(ctx, filepath)
if err == nil {
objReader.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we handle the error this returns?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least we can log it, maybe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We handle it below

return false, nil
}
if errors.Is(err, storage.ErrObjectNotFound) {
return true, nil
}

return false, errors.Wrap(err, "get pitr-not-ready file from storage")
}
7 changes: 7 additions & 0 deletions pkg/naming/filepaths.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,10 @@ const (
BackupStorageCAFileDirectory = "/etc/s3/certs"
BackupStorageCAFileName = "ca.crt"
)

const (
PITRNotReady = "pitr-not-ready"
GapDetected = "/tmp/gap-detected"
TimelinePath = "/tmp/pitr-timeline" // path to file with timeline
LatestBackupPath = "/tmp/latest-backup"
)
3 changes: 1 addition & 2 deletions pkg/pxc/app/binlogcollector/binlog-collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ var GapFileNotFound = errors.New("gap file not found")

func RemoveGapFile(c *clientcmd.Client, pod *corev1.Pod) error {
stderrBuf := &bytes.Buffer{}
err := c.Exec(pod, "pitr", []string{"/bin/bash", "-c", "rm /tmp/gap-detected"}, nil, nil, stderrBuf, false)
err := c.Exec(pod, "pitr", []string{"/bin/bash", "-c", "rm " + naming.GapDetected}, nil, nil, stderrBuf, false)
if err != nil {
if strings.Contains(stderrBuf.String(), "No such file or directory") {
return GapFileNotFound
Expand Down Expand Up @@ -424,5 +424,4 @@ func InvalidateCache(
"file", gtidCacheKey)

return stg.DeleteObject(ctx, gtidCacheKey)

}
27 changes: 25 additions & 2 deletions pkg/pxc/backup/pitr.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (
api "github.com/percona/percona-xtradb-cluster-operator/pkg/apis/pxc/v1"
"github.com/percona/percona-xtradb-cluster-operator/pkg/naming"
"github.com/percona/percona-xtradb-cluster-operator/pkg/pxc/app/binlogcollector"
"github.com/percona/percona-xtradb-cluster-operator/pkg/pxc/backup/storage"
)

func CheckPITRErrors(ctx context.Context, cl client.Client, clcmd *clientcmd.Client, cr *api.PerconaXtraDBCluster) error {
func CheckPITRErrors(ctx context.Context, cl client.Client, clcmd *clientcmd.Client, cr *api.PerconaXtraDBCluster, storageFunc storage.NewClientFunc) error {
log := logf.FromContext(ctx)

if cr.Spec.Backup == nil || !cr.Spec.Backup.PITR.Enabled {
Expand Down Expand Up @@ -64,7 +65,7 @@ func CheckPITRErrors(ctx context.Context, cl client.Client, clcmd *clientcmd.Cli
stdoutBuf := &bytes.Buffer{}
stderrBuf := &bytes.Buffer{}

err = clcmd.Exec(collectorPod, "pitr", []string{"/bin/bash", "-c", "cat /tmp/gap-detected || true"}, nil, stdoutBuf, stderrBuf, false)
err = clcmd.Exec(collectorPod, "pitr", []string{"/bin/bash", "-c", "cat " + naming.GapDetected + " || true"}, nil, stdoutBuf, stderrBuf, false)
if err != nil {
return errors.Wrapf(err, "exec binlog collector pod %s", collectorPod.Name)
}
Expand All @@ -85,6 +86,10 @@ func CheckPITRErrors(ctx context.Context, cl client.Client, clcmd *clientcmd.Cli
}
meta.SetStatusCondition(&backup.Status.Conditions, condition)

if err := addPitrNotReadyFileToBackup(ctx, cl, cr, backup, storageFunc); err != nil {
return errors.Wrap(err, "add pitr-not-ready file")
}

if err := cl.Status().Update(ctx, backup); err != nil {
return errors.Wrap(err, "update backup status")
}
Expand Down Expand Up @@ -164,6 +169,24 @@ func UpdatePITRTimeline(ctx context.Context, cl client.Client, clcmd *clientcmd.
return nil
}

func addPitrNotReadyFileToBackup(ctx context.Context, cl client.Client, cr *api.PerconaXtraDBCluster, backup *api.PerconaXtraDBClusterBackup, storageFunc storage.NewClientFunc) error {
opts, err := storage.GetOptionsFromBackup(ctx, cl, cr, backup)
if err != nil {
return errors.Wrap(err, "failed to get storage options")
}
scli, err := storageFunc(ctx, opts)
if err != nil {
return errors.Wrap(err, "failed to create storage client")
}

filepath := backup.Status.Destination.BackupName() + "." + naming.PITRNotReady
if err = scli.PutObject(ctx, filepath, bytes.NewBuffer([]byte{}), 0); err != nil {
return errors.Wrap(err, "put pitr-not-ready object")
}

return nil
}

var ErrNoBackups = errors.New("No backups found")

func getLatestSuccessfulBackup(ctx context.Context, cl client.Client, cr *api.PerconaXtraDBCluster) (*api.PerconaXtraDBClusterBackup, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pxc/backup/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (s *S3) GetObject(ctx context.Context, objectName string) (io.ReadCloser, e

// minio client returns error only on Read() method, so we need to call it to see if object exists
_, err = oldObj.Read([]byte{})
if err != nil {
if err != nil && err != io.EOF {
if minio.ToErrorResponse(errors.Cause(err)).Code == "NoSuchKey" {
return nil, ErrObjectNotFound
}
Expand Down
Loading