diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java index 3eb19ff3c89e..b54c1a665742 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator; import org.apache.beam.sdk.extensions.gcp.storage.PathValidator; import org.apache.beam.sdk.extensions.gcp.util.GcsUtil; +import org.apache.beam.sdk.extensions.gcp.util.GcsUtilLegacy; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; @@ -49,7 +50,7 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline @JsonIgnore @Description( "The GoogleCloudStorageReadOptions instance that should be used to read from Google Cloud Storage.") - @Default.InstanceFactory(GcsUtil.GcsReadOptionsFactory.class) + @Default.InstanceFactory(GcsUtilLegacy.GcsReadOptionsFactory.class) @Hidden GoogleCloudStorageReadOptions getGoogleCloudStorageReadOptions(); diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index 77670eafbb40..0bb4e88fe8d9 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -17,143 +17,70 @@ */ package org.apache.beam.sdk.extensions.gcp.util; -import static org.apache.beam.sdk.io.FileSystemUtils.wildcardToRegexp; -import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; - -import com.google.api.client.googleapis.batch.BatchRequest; -import com.google.api.client.googleapis.batch.json.JsonBatchCallback; -import com.google.api.client.googleapis.json.GoogleJsonError; -import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest; -import com.google.api.client.http.HttpHeaders; import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.http.HttpStatusCodes; -import com.google.api.client.http.HttpTransport; import com.google.api.client.util.BackOff; import com.google.api.client.util.Sleeper; import com.google.api.services.storage.Storage; import com.google.api.services.storage.model.Bucket; import com.google.api.services.storage.model.Objects; -import com.google.api.services.storage.model.RewriteResponse; import com.google.api.services.storage.model.StorageObject; import com.google.auth.Credentials; -import com.google.auto.value.AutoValue; -import com.google.cloud.hadoop.gcsio.CreateObjectOptions; -import com.google.cloud.hadoop.gcsio.GoogleCloudStorage; -import com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl; -import com.google.cloud.hadoop.gcsio.GoogleCloudStorageOptions; -import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions; -import com.google.cloud.hadoop.gcsio.StorageResourceId; -import com.google.cloud.hadoop.util.ApiErrorExtractor; -import com.google.cloud.hadoop.util.AsyncWriteChannelOptions; -import com.google.cloud.hadoop.util.ResilientOperation; -import com.google.cloud.hadoop.util.RetryDeterminer; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.io.FileNotFoundException; import java.io.IOException; -import java.lang.reflect.Method; import java.nio.channels.SeekableByteChannel; import java.nio.channels.WritableByteChannel; -import java.nio.file.AccessDeniedException; -import java.nio.file.FileAlreadyExistsException; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; import java.util.function.Supplier; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers; -import org.apache.beam.runners.core.metrics.MonitoringInfoConstants; -import org.apache.beam.runners.core.metrics.ServiceCallMetric; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; -import org.apache.beam.sdk.extensions.gcp.util.channels.CountingSeekableByteChannel; -import org.apache.beam.sdk.extensions.gcp.util.channels.CountingWritableByteChannel; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; import org.apache.beam.sdk.io.fs.MoveOptions; -import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions; -import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.FluentBackoff; -import org.apache.beam.sdk.util.MoreFutures; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors; import org.checkerframework.checker.nullness.qual.Nullable; -import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** Provides operations on GCS. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) + public class GcsUtil { + @VisibleForTesting GcsUtilLegacy delegate; + + public static class GcsCountersOptions { + final GcsUtilLegacy.GcsCountersOptions delegate; + + private GcsCountersOptions(GcsUtilLegacy.GcsCountersOptions delegate) { + this.delegate = delegate; + } - @AutoValue - public abstract static class GcsCountersOptions { - public abstract @Nullable String getReadCounterPrefix(); + public @Nullable String getReadCounterPrefix() { + return delegate.getReadCounterPrefix(); + } - public abstract @Nullable String getWriteCounterPrefix(); + public @Nullable String getWriteCounterPrefix() { + return delegate.getWriteCounterPrefix(); + } public boolean hasAnyPrefix() { - return getWriteCounterPrefix() != null || getReadCounterPrefix() != null; + return delegate.hasAnyPrefix(); } public static GcsCountersOptions create( @Nullable String readCounterPrefix, @Nullable String writeCounterPrefix) { - return new AutoValue_GcsUtil_GcsCountersOptions(readCounterPrefix, writeCounterPrefix); + return new GcsCountersOptions( + GcsUtilLegacy.GcsCountersOptions.create(readCounterPrefix, writeCounterPrefix)); } } - public static class GcsReadOptionsFactory - implements DefaultValueFactory { - @Override - public GoogleCloudStorageReadOptions create(PipelineOptions options) { - return GoogleCloudStorageReadOptions.DEFAULT; - } - } - - /** - * This is a {@link DefaultValueFactory} able to create a {@link GcsUtil} using any transport - * flags specified on the {@link PipelineOptions}. - */ public static class GcsUtilFactory implements DefaultValueFactory { - /** - * Returns an instance of {@link GcsUtil} based on the {@link PipelineOptions}. - * - *

If no instance has previously been created, one is created and the value stored in {@code - * options}. - */ @Override public GcsUtil create(PipelineOptions options) { - LOG.debug("Creating new GcsUtil"); GcsOptions gcsOptions = options.as(GcsOptions.class); Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions); return new GcsUtil( storageBuilder.build(), storageBuilder.getHttpRequestInitializer(), gcsOptions.getExecutorService(), - hasExperiment(options, "use_grpc_for_gcs"), + org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment( + options, "use_grpc_for_gcs"), gcsOptions.getGcpCredential(), gcsOptions.getGcsUploadBufferSizeBytes(), gcsOptions.getGcsRewriteDataOpBatchLimit(), @@ -164,106 +91,16 @@ public GcsUtil create(PipelineOptions options) { gcsOptions.getEnableBucketWriteMetricCounter() ? gcsOptions.getGcsWriteCounterPrefix() : null), - gcsOptions.getGoogleCloudStorageReadOptions()); - } - - /** Returns an instance of {@link GcsUtil} based on the given parameters. */ - public static GcsUtil create( - PipelineOptions options, - Storage storageClient, - HttpRequestInitializer httpRequestInitializer, - ExecutorService executorService, - Credentials credentials, - @Nullable Integer uploadBufferSizeBytes, - GcsCountersOptions gcsCountersOptions, - GoogleCloudStorageReadOptions gcsReadOptions) { - return new GcsUtil( - storageClient, - httpRequestInitializer, - executorService, - hasExperiment(options, "use_grpc_for_gcs"), - credentials, - uploadBufferSizeBytes, - null, - gcsCountersOptions, - gcsReadOptions); + gcsOptions); } } - private static final Logger LOG = LoggerFactory.getLogger(GcsUtil.class); - - /** Maximum number of items to retrieve per Objects.List request. */ - private static final long MAX_LIST_ITEMS_PER_CALL = 1024; - - /** Matches a glob containing a wildcard, capturing the portion before the first wildcard. */ - private static final Pattern GLOB_PREFIX = Pattern.compile("(?[^\\[*?]*)[\\[*?].*"); - - /** Maximum number of requests permitted in a GCS batch request. */ - private static final int MAX_REQUESTS_PER_BATCH = 100; - /** Default maximum number of requests permitted in a GCS batch request where data is copied. */ - private static final int MAX_REQUESTS_PER_COPY_BATCH = 10; - /** Maximum number of concurrent batches of requests executing on GCS. */ - private static final int MAX_CONCURRENT_BATCHES = 256; - - private static final FluentBackoff BACKOFF_FACTORY = - FluentBackoff.DEFAULT.withMaxRetries(10).withInitialBackoff(Duration.standardSeconds(1)); - private static final RetryDeterminer RETRY_DETERMINER = - new RetryDeterminer() { - @Override - public boolean shouldRetry(IOException e) { - if (e instanceof GoogleJsonResponseException) { - int statusCode = ((GoogleJsonResponseException) e).getStatusCode(); - return statusCode == 408 // Request Timeout - || statusCode == 429 // Too many requests - || (statusCode >= 500 && statusCode < 600); // Server errors - } - return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e); - } - }; - - ///////////////////////////////////////////////////////////////////////////// - - /** Client for the GCS API. */ - private Storage storageClient; - - private Supplier batchRequestSupplier; - - private final HttpRequestInitializer httpRequestInitializer; - /** Buffer size for GCS uploads (in bytes). */ - private final @Nullable Integer uploadBufferSizeBytes; - - // Helper delegate for turning IOExceptions from API calls into higher-level semantics. - private final ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); - - // Unbounded thread pool for codependent pipeline operations that will deadlock the pipeline if - // starved for threads. - // Exposed for testing. - final ExecutorService executorService; - - private final Credentials credentials; - - private GoogleCloudStorage googleCloudStorage; - private GoogleCloudStorageOptions googleCloudStorageOptions; - - private final int rewriteDataOpBatchLimit; - - private final GcsCountersOptions gcsCountersOptions; - - /** Rewrite operation setting. For testing purposes only. */ - @VisibleForTesting @Nullable Long maxBytesRewrittenPerCall; - - @VisibleForTesting @Nullable AtomicInteger numRewriteTokensUsed; - - /** Returns the prefix portion of the glob that doesn't contain wildcards. */ public static String getNonWildcardPrefix(String globExp) { - Matcher m = GLOB_PREFIX.matcher(globExp); - checkArgument(m.matches(), String.format("Glob expression: [%s] is not expandable.", globExp)); - return m.group("PREFIX"); + return GcsUtilLegacy.getNonWildcardPrefix(globExp); } - /** Returns true if the given {@code spec} contains wildcard. */ public static boolean isWildcard(GcsPath spec) { - return GLOB_PREFIX.matcher(spec.getObject()).matches(); + return GcsUtilLegacy.isWildcard(spec); } @VisibleForTesting @@ -276,1177 +113,283 @@ public static boolean isWildcard(GcsPath spec) { @Nullable Integer uploadBufferSizeBytes, @Nullable Integer rewriteDataOpBatchLimit, GcsCountersOptions gcsCountersOptions, - GoogleCloudStorageReadOptions gcsReadOptions) { - this.storageClient = storageClient; - this.httpRequestInitializer = httpRequestInitializer; - this.uploadBufferSizeBytes = uploadBufferSizeBytes; - this.executorService = executorService; - this.credentials = credentials; - this.maxBytesRewrittenPerCall = null; - this.numRewriteTokensUsed = null; - googleCloudStorageOptions = - GoogleCloudStorageOptions.builder() - .setAppName("Beam") - .setReadChannelOptions(gcsReadOptions) - .setGrpcEnabled(shouldUseGrpc) - .build(); - googleCloudStorage = - createGoogleCloudStorage(googleCloudStorageOptions, storageClient, credentials); - this.batchRequestSupplier = - () -> { - // Capture reference to this so that the most recent storageClient and initializer - // are used. - GcsUtil util = this; - return new BatchInterface() { - final BatchRequest batch = util.storageClient.batch(util.httpRequestInitializer); - - @Override - public void queue( - AbstractGoogleJsonClientRequest request, JsonBatchCallback cb) - throws IOException { - request.queue(batch, cb); - } - - @Override - public void execute() throws IOException { - batch.execute(); - } - - @Override - public int size() { - return batch.size(); - } - }; - }; - this.rewriteDataOpBatchLimit = - rewriteDataOpBatchLimit == null ? MAX_REQUESTS_PER_COPY_BATCH : rewriteDataOpBatchLimit; - this.gcsCountersOptions = gcsCountersOptions; + GcsOptions gcsOptions) { + this.delegate = + new GcsUtilLegacy( + storageClient, + httpRequestInitializer, + executorService, + shouldUseGrpc, + credentials, + uploadBufferSizeBytes, + rewriteDataOpBatchLimit, + gcsCountersOptions.delegate, + gcsOptions); } - // Use this only for testing purposes. protected void setStorageClient(Storage storageClient) { - this.storageClient = storageClient; + delegate.setStorageClient(storageClient); } - // Use this only for testing purposes. - protected void setBatchRequestSupplier(Supplier supplier) { - this.batchRequestSupplier = supplier; + protected void setBatchRequestSupplier(Supplier supplier) { + delegate.setBatchRequestSupplier(supplier); } - /** - * Expands a pattern into matched paths. The pattern path may contain globs, which are expanded in - * the result. For patterns that only match a single object, we ensure that the object exists. - */ public List expand(GcsPath gcsPattern) throws IOException { - Pattern p = null; - String prefix = null; - if (isWildcard(gcsPattern)) { - // Part before the first wildcard character. - prefix = getNonWildcardPrefix(gcsPattern.getObject()); - p = Pattern.compile(wildcardToRegexp(gcsPattern.getObject())); - } else { - // Not a wildcard. - try { - // Use a get request to fetch the metadata of the object, and ignore the return value. - // The request has strong global consistency. - getObject(gcsPattern); - return ImmutableList.of(gcsPattern); - } catch (FileNotFoundException e) { - // If the path was not found, return an empty list. - return ImmutableList.of(); - } - } - - LOG.debug( - "matching files in bucket {}, prefix {} against pattern {}", - gcsPattern.getBucket(), - prefix, - p.toString()); - - String pageToken = null; - List results = new ArrayList<>(); - do { - Objects objects = listObjects(gcsPattern.getBucket(), prefix, pageToken); - if (objects.getItems() == null) { - break; - } - - // Filter objects based on the regex. - for (StorageObject o : objects.getItems()) { - String name = o.getName(); - // Skip directories, which end with a slash. - if (p.matcher(name).matches() && !name.endsWith("/")) { - LOG.debug("Matched object: {}", name); - results.add(GcsPath.fromObject(o)); - } - } - pageToken = objects.getNextPageToken(); - } while (pageToken != null); - - return results; + return delegate.expand(gcsPattern); } @VisibleForTesting @Nullable Integer getUploadBufferSizeBytes() { - return uploadBufferSizeBytes; - } - - private static BackOff createBackOff() { - return BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff()); + return delegate.getUploadBufferSizeBytes(); } - /** - * Returns the file size from GCS or throws {@link FileNotFoundException} if the resource does not - * exist. - */ public long fileSize(GcsPath path) throws IOException { - return getObject(path).getSize().longValue(); + return delegate.fileSize(path); } - /** Returns the {@link StorageObject} for the given {@link GcsPath}. */ public StorageObject getObject(GcsPath gcsPath) throws IOException { - return getObject(gcsPath, createBackOff(), Sleeper.DEFAULT); + return delegate.getObject(gcsPath); } @VisibleForTesting StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper) throws IOException { - Storage.Objects.Get getObject = - storageClient.objects().get(gcsPath.getBucket(), gcsPath.getObject()); - try { - return ResilientOperation.retry( - getObject::execute, backoff, RetryDeterminer.SOCKET_ERRORS, IOException.class, sleeper); - } catch (IOException | InterruptedException e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - if (e instanceof IOException && errorExtractor.itemNotFound((IOException) e)) { - throw new FileNotFoundException(gcsPath.toString()); - } - throw new IOException( - String.format("Unable to get the file object for path %s.", gcsPath), e); - } + return delegate.getObject(gcsPath, backoff, sleeper); } - /** - * Returns {@link StorageObjectOrIOException StorageObjectOrIOExceptions} for the given {@link - * GcsPath GcsPaths}. - */ public List getObjects(List gcsPaths) throws IOException { - if (gcsPaths.isEmpty()) { - return ImmutableList.of(); - } else if (gcsPaths.size() == 1) { - GcsPath path = gcsPaths.get(0); - try { - StorageObject object = getObject(path); - return ImmutableList.of(StorageObjectOrIOException.create(object)); - } catch (IOException e) { - return ImmutableList.of(StorageObjectOrIOException.create(e)); - } catch (Exception e) { - IOException ioException = - new IOException(String.format("Error trying to get %s: %s", path, e)); - return ImmutableList.of(StorageObjectOrIOException.create(ioException)); - } - } - - List results = new ArrayList<>(); - executeBatches(makeGetBatches(gcsPaths, results)); - ImmutableList.Builder ret = ImmutableList.builder(); - for (StorageObjectOrIOException[] result : results) { - ret.add(result[0]); - } - return ret.build(); + List legacy = delegate.getObjects(gcsPaths); + return legacy.stream() + .map(StorageObjectOrIOException::fromLegacy) + .collect(java.util.stream.Collectors.toList()); } public Objects listObjects(String bucket, String prefix, @Nullable String pageToken) throws IOException { - return listObjects(bucket, prefix, pageToken, null); + return delegate.listObjects(bucket, prefix, pageToken); } - /** - * Lists {@link Objects} given the {@code bucket}, {@code prefix}, {@code pageToken}. - * - *

For more details, see https://cloud.google.com/storage/docs/json_api/v1/objects/list. - */ public Objects listObjects( String bucket, String prefix, @Nullable String pageToken, @Nullable String delimiter) throws IOException { - // List all objects that start with the prefix (including objects in sub-directories). - Storage.Objects.List listObject = storageClient.objects().list(bucket); - listObject.setMaxResults(MAX_LIST_ITEMS_PER_CALL); - listObject.setPrefix(prefix); - listObject.setDelimiter(delimiter); - - if (pageToken != null) { - listObject.setPageToken(pageToken); - } - - try { - return ResilientOperation.retry( - listObject::execute, createBackOff(), RetryDeterminer.SOCKET_ERRORS, IOException.class); - } catch (Exception e) { - throw new IOException( - String.format("Unable to match files in bucket %s, prefix %s.", bucket, prefix), e); - } + return delegate.listObjects(bucket, prefix, pageToken, delimiter); } - /** - * Returns the file size from GCS or throws {@link FileNotFoundException} if the resource does not - * exist. - */ @VisibleForTesting List fileSizes(List paths) throws IOException { - List results = getObjects(paths); - - ImmutableList.Builder ret = ImmutableList.builder(); - for (StorageObjectOrIOException result : results) { - ret.add(toFileSize(result)); - } - return ret.build(); - } - - private Long toFileSize(StorageObjectOrIOException storageObjectOrIOException) - throws IOException { - if (storageObjectOrIOException.ioException() != null) { - throw storageObjectOrIOException.ioException(); - } else { - return storageObjectOrIOException.storageObject().getSize().longValue(); - } - } - - @VisibleForTesting - void setCloudStorageImpl(GoogleCloudStorage g) { - googleCloudStorage = g; + return delegate.fileSizes(paths); } - @VisibleForTesting - void setCloudStorageImpl(GoogleCloudStorageOptions g) { - googleCloudStorageOptions = g; - } - - /** - * Create an integer consumer that updates the counter identified by a prefix and a bucket name. - */ - private static Consumer createCounterConsumer(String counterNamePrefix, String bucket) { - return Metrics.counter(GcsUtil.class, String.format("%s_%s", counterNamePrefix, bucket))::inc; - } - - private WritableByteChannel wrapInCounting( - WritableByteChannel writableByteChannel, String bucket) { - if (writableByteChannel instanceof CountingWritableByteChannel) { - return writableByteChannel; - } - return Optional.ofNullable(gcsCountersOptions.getWriteCounterPrefix()) - .map( - prefix -> { - LOG.debug( - "wrapping writable byte channel using counter name prefix {} and bucket {}", - prefix, - bucket); - return new CountingWritableByteChannel( - writableByteChannel, createCounterConsumer(prefix, bucket)); - }) - .orElse(writableByteChannel); - } - - private SeekableByteChannel wrapInCounting( - SeekableByteChannel seekableByteChannel, String bucket) { - if (seekableByteChannel instanceof CountingSeekableByteChannel - || !gcsCountersOptions.hasAnyPrefix()) { - return seekableByteChannel; - } - - return new CountingSeekableByteChannel( - seekableByteChannel, - Optional.ofNullable(gcsCountersOptions.getReadCounterPrefix()) - .map( - prefix -> { - LOG.debug( - "wrapping seekable byte channel with \"bytes read\" counter name prefix {}" - + " and bucket {}", - prefix, - bucket); - return createCounterConsumer(prefix, bucket); - }) - .orElse(null), - Optional.ofNullable(gcsCountersOptions.getWriteCounterPrefix()) - .map( - prefix -> { - LOG.debug( - "wrapping seekable byte channel with \"bytes written\" counter name prefix {}" - + " and bucket {}", - prefix, - bucket); - return createCounterConsumer(prefix, bucket); - }) - .orElse(null)); - } - - /** - * Opens an object in GCS. - * - *

Returns a SeekableByteChannel that provides access to data in the bucket. - * - * @param path the GCS filename to read from - * @return a SeekableByteChannel that can read the object data - */ public SeekableByteChannel open(GcsPath path) throws IOException { - String bucket = path.getBucket(); - SeekableByteChannel channel = - googleCloudStorage.open( - new StorageResourceId(path.getBucket(), path.getObject()), - this.googleCloudStorageOptions.getReadChannelOptions()); - return wrapInCounting(channel, bucket); - } - - /** - * Opens an object in GCS. - * - *

Returns a SeekableByteChannel that provides access to data in the bucket. - * - * @param path the GCS filename to read from - * @param readOptions Fine-grained options for behaviors of retries, buffering, etc. - * @return a SeekableByteChannel that can read the object data - */ - @VisibleForTesting - SeekableByteChannel open(GcsPath path, GoogleCloudStorageReadOptions readOptions) - throws IOException { - HashMap baseLabels = new HashMap<>(); - baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, ""); - baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "Storage"); - baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "GcsGet"); - baseLabels.put( - MonitoringInfoConstants.Labels.RESOURCE, - GcpResourceIdentifiers.cloudStorageBucket(path.getBucket())); - baseLabels.put( - MonitoringInfoConstants.Labels.GCS_PROJECT_ID, - String.valueOf(googleCloudStorageOptions.getProjectId())); - baseLabels.put(MonitoringInfoConstants.Labels.GCS_BUCKET, path.getBucket()); - - ServiceCallMetric serviceCallMetric = - new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels); - try { - SeekableByteChannel channel = - googleCloudStorage.open( - new StorageResourceId(path.getBucket(), path.getObject()), readOptions); - serviceCallMetric.call("ok"); - return wrapInCounting(channel, path.getBucket()); - } catch (IOException e) { - if (e.getCause() instanceof GoogleJsonResponseException) { - serviceCallMetric.call(((GoogleJsonResponseException) e.getCause()).getDetails().getCode()); - } - throw e; - } + return delegate.open(path); } /** @deprecated Use {@link #create(GcsPath, CreateOptions)} instead. */ @Deprecated public WritableByteChannel create(GcsPath path, String type) throws IOException { - CreateOptions.Builder builder = CreateOptions.builder().setContentType(type); - return create(path, builder.build()); + return delegate.create(path, type); } /** @deprecated Use {@link #create(GcsPath, CreateOptions)} instead. */ @Deprecated public WritableByteChannel create(GcsPath path, String type, Integer uploadBufferSizeBytes) throws IOException { - CreateOptions.Builder builder = - CreateOptions.builder() - .setContentType(type) - .setUploadBufferSizeBytes(uploadBufferSizeBytes); - return create(path, builder.build()); + return delegate.create(path, type, uploadBufferSizeBytes); } - @AutoValue - public abstract static class CreateOptions { - /** - * If true, the created file is expected to not exist. Instead of checking for file presence - * before writing a write exception may occur if the file does exist. - */ - public abstract boolean getExpectFileToNotExist(); + public static class CreateOptions { + final GcsUtilLegacy.CreateOptions delegate; - /** - * If non-null, the upload buffer size to be used. If null, the buffer size corresponds to {code - * GCSUtil.getUploadBufferSizeBytes} - */ - public abstract @Nullable Integer getUploadBufferSizeBytes(); + private CreateOptions(GcsUtilLegacy.CreateOptions delegate) { + this.delegate = delegate; + } + + public boolean getExpectFileToNotExist() { + return delegate.getExpectFileToNotExist(); + } + + public @Nullable Integer getUploadBufferSizeBytes() { + return delegate.getUploadBufferSizeBytes(); + } - /** The content type for the created file, eg "text/plain". */ - public abstract @Nullable String getContentType(); + public @Nullable String getContentType() { + return delegate.getContentType(); + } public static Builder builder() { - return new AutoValue_GcsUtil_CreateOptions.Builder().setExpectFileToNotExist(false); + return new Builder(GcsUtilLegacy.CreateOptions.builder()); } - @AutoValue.Builder - public abstract static class Builder { - public abstract Builder setContentType(String value); + public static class Builder { + private final GcsUtilLegacy.CreateOptions.Builder delegateBuilder; - public abstract Builder setUploadBufferSizeBytes(int value); + private Builder(GcsUtilLegacy.CreateOptions.Builder delegateBuilder) { + this.delegateBuilder = delegateBuilder; + } - public abstract Builder setExpectFileToNotExist(boolean value); + public Builder setContentType(String value) { + delegateBuilder.setContentType(value); + return this; + } - public abstract CreateOptions build(); - } - } + public Builder setUploadBufferSizeBytes(int value) { + delegateBuilder.setUploadBufferSizeBytes(value); + return this; + } - /** - * Creates an object in GCS and prepares for uploading its contents. - * - * @param path the GCS file to write to - * @param options to be used for creating and configuring file upload - * @return a WritableByteChannel that can be used to write data to the object. - */ - public WritableByteChannel create(GcsPath path, CreateOptions options) throws IOException { - AsyncWriteChannelOptions wcOptions = googleCloudStorageOptions.getWriteChannelOptions(); - @Nullable - Integer uploadBufferSizeBytes = - options.getUploadBufferSizeBytes() != null - ? options.getUploadBufferSizeBytes() - : getUploadBufferSizeBytes(); - if (uploadBufferSizeBytes != null) { - wcOptions = wcOptions.toBuilder().setUploadChunkSize(uploadBufferSizeBytes).build(); - } - GoogleCloudStorageOptions newGoogleCloudStorageOptions = - googleCloudStorageOptions.toBuilder().setWriteChannelOptions(wcOptions).build(); - GoogleCloudStorage gcpStorage = - createGoogleCloudStorage( - newGoogleCloudStorageOptions, this.storageClient, this.credentials); - StorageResourceId resourceId = - new StorageResourceId( - path.getBucket(), - path.getObject(), - // If we expect the file not to exist, we set a generation id of 0. This avoids a read - // to identify the object exists already and should be overwritten. - // See {@link GoogleCloudStorage#create(StorageResourceId, GoogleCloudStorageOptions)} - options.getExpectFileToNotExist() ? 0L : StorageResourceId.UNKNOWN_GENERATION_ID); - CreateObjectOptions.Builder createBuilder = - CreateObjectOptions.builder().setOverwriteExisting(true); - if (options.getContentType() != null) { - createBuilder = createBuilder.setContentType(options.getContentType()); - } + public Builder setExpectFileToNotExist(boolean value) { + delegateBuilder.setExpectFileToNotExist(value); + return this; + } - HashMap baseLabels = new HashMap<>(); - baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, ""); - baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "Storage"); - baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "GcsInsert"); - baseLabels.put( - MonitoringInfoConstants.Labels.RESOURCE, - GcpResourceIdentifiers.cloudStorageBucket(path.getBucket())); - baseLabels.put( - MonitoringInfoConstants.Labels.GCS_PROJECT_ID, - String.valueOf(googleCloudStorageOptions.getProjectId())); - baseLabels.put(MonitoringInfoConstants.Labels.GCS_BUCKET, path.getBucket()); - - ServiceCallMetric serviceCallMetric = - new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels); - try { - WritableByteChannel channel = gcpStorage.create(resourceId, createBuilder.build()); - serviceCallMetric.call("ok"); - return wrapInCounting(channel, path.getBucket()); - } catch (IOException e) { - if (e.getCause() instanceof GoogleJsonResponseException) { - serviceCallMetric.call(((GoogleJsonResponseException) e.getCause()).getDetails().getCode()); + public CreateOptions build() { + return new CreateOptions(delegateBuilder.build()); } - throw e; } } - GoogleCloudStorage createGoogleCloudStorage( - GoogleCloudStorageOptions options, Storage storage, Credentials credentials) { - try { - return new GoogleCloudStorageImpl(options, storage, credentials); - } catch (NoSuchMethodError e) { - // gcs-connector 3.x drops the direct constructor and exclusively uses Builder - // TODO eliminate reflection once Beam drops Java 8 support and upgrades to gcsio 3.x - try { - final Method builderMethod = GoogleCloudStorageImpl.class.getMethod("builder"); - Object builder = builderMethod.invoke(null); - final Class builderClass = - Class.forName( - "com.google.cloud.hadoop.gcsio.AutoBuilder_GoogleCloudStorageImpl_Builder"); - - final Method setOptionsMethod = - builderClass.getMethod("setOptions", GoogleCloudStorageOptions.class); - setOptionsMethod.setAccessible(true); - builder = setOptionsMethod.invoke(builder, options); - - final Method setHttpTransportMethod = - builderClass.getMethod("setHttpTransport", HttpTransport.class); - setHttpTransportMethod.setAccessible(true); - builder = - setHttpTransportMethod.invoke(builder, storage.getRequestFactory().getTransport()); - - final Method setCredentialsMethod = - builderClass.getMethod("setCredentials", Credentials.class); - setCredentialsMethod.setAccessible(true); - builder = setCredentialsMethod.invoke(builder, credentials); - - final Method setHttpRequestInitializerMethod = - builderClass.getMethod("setHttpRequestInitializer", HttpRequestInitializer.class); - setHttpRequestInitializerMethod.setAccessible(true); - builder = setHttpRequestInitializerMethod.invoke(builder, httpRequestInitializer); - - final Method buildMethod = builderClass.getMethod("build"); - buildMethod.setAccessible(true); - return (GoogleCloudStorage) buildMethod.invoke(builder); - } catch (Exception reflectionError) { - throw new RuntimeException( - "Failed to construct GoogleCloudStorageImpl from gcsio 3.x Builder", reflectionError); - } - } + public WritableByteChannel create(GcsPath path, CreateOptions options) throws IOException { + return delegate.create(path, options.delegate); } - /** - * Checks whether the GCS bucket exists. Similar to {@link #bucketAccessible(GcsPath)}, but throws - * exception if the bucket is inaccessible due to permissions or does not exist. - */ public void verifyBucketAccessible(GcsPath path) throws IOException { - verifyBucketAccessible(path, createBackOff(), Sleeper.DEFAULT); + delegate.verifyBucketAccessible(path); } - /** Returns whether the GCS bucket exists and is accessible. */ public boolean bucketAccessible(GcsPath path) throws IOException { - return bucketAccessible(path, createBackOff(), Sleeper.DEFAULT); + return delegate.bucketAccessible(path); } - /** - * Returns the project number of the project which owns this bucket. If the bucket exists, it must - * be accessible otherwise the permissions exception will be propagated. If the bucket does not - * exist, an exception will be thrown. - */ public long bucketOwner(GcsPath path) throws IOException { - return getBucket(path, createBackOff(), Sleeper.DEFAULT).getProjectNumber().longValue(); + return delegate.bucketOwner(path); } - /** - * Creates a {@link Bucket} under the specified project in Cloud Storage or propagates an - * exception. - */ public void createBucket(String projectId, Bucket bucket) throws IOException { - createBucket(projectId, bucket, createBackOff(), Sleeper.DEFAULT); + delegate.createBucket(projectId, bucket); } - /** Get the {@link Bucket} from Cloud Storage path or propagates an exception. */ - @Nullable - public Bucket getBucket(GcsPath path) throws IOException { - return getBucket(path, createBackOff(), Sleeper.DEFAULT); + public @Nullable Bucket getBucket(GcsPath path) throws IOException { + return delegate.getBucket(path); } - /** Remove an empty {@link Bucket} in Cloud Storage or propagates an exception. */ public void removeBucket(Bucket bucket) throws IOException { - removeBucket(bucket, createBackOff(), Sleeper.DEFAULT); + delegate.removeBucket(bucket); } - /** - * Returns whether the GCS bucket exists. This will return false if the bucket is inaccessible due - * to permissions. - */ @VisibleForTesting boolean bucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { - try { - return getBucket(path, backoff, sleeper) != null; - } catch (AccessDeniedException | FileNotFoundException e) { - return false; - } + return delegate.bucketAccessible(path, backoff, sleeper); } - /** - * Checks whether the GCS bucket exists. Similar to {@link #bucketAccessible(GcsPath, BackOff, - * Sleeper)}, but throws exception if the bucket is inaccessible due to permissions or does not - * exist. - */ @VisibleForTesting void verifyBucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { - getBucket(path, backoff, sleeper); + delegate.verifyBucketAccessible(path, backoff, sleeper); } @VisibleForTesting @Nullable Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { - Storage.Buckets.Get getBucket = storageClient.buckets().get(path.getBucket()); - - try { - return ResilientOperation.retry( - getBucket::execute, - backoff, - new RetryDeterminer() { - @Override - public boolean shouldRetry(IOException e) { - if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) { - return false; - } - return RETRY_DETERMINER.shouldRetry(e); - } - }, - IOException.class, - sleeper); - } catch (GoogleJsonResponseException e) { - if (errorExtractor.accessDenied(e)) { - throw new AccessDeniedException(path.toString(), null, e.getMessage()); - } - if (errorExtractor.itemNotFound(e)) { - throw new FileNotFoundException(e.getMessage()); - } - throw e; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException( - String.format( - "Error while attempting to verify existence of bucket gs://%s", path.getBucket()), - e); - } + return delegate.getBucket(path, backoff, sleeper); } @VisibleForTesting void createBucket(String projectId, Bucket bucket, BackOff backoff, Sleeper sleeper) throws IOException { - Storage.Buckets.Insert insertBucket = storageClient.buckets().insert(projectId, bucket); - insertBucket.setPredefinedAcl("projectPrivate"); - insertBucket.setPredefinedDefaultObjectAcl("projectPrivate"); - - try { - ResilientOperation.retry( - insertBucket::execute, - backoff, - new RetryDeterminer() { - @Override - public boolean shouldRetry(IOException e) { - if (errorExtractor.itemAlreadyExists(e) || errorExtractor.accessDenied(e)) { - return false; - } - return RETRY_DETERMINER.shouldRetry(e); - } - }, - IOException.class, - sleeper); - return; - } catch (GoogleJsonResponseException e) { - if (errorExtractor.accessDenied(e)) { - throw new AccessDeniedException(bucket.getName(), null, e.getMessage()); - } - if (errorExtractor.itemAlreadyExists(e)) { - throw new FileAlreadyExistsException(bucket.getName(), null, e.getMessage()); - } - throw e; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException( - String.format( - "Error while attempting to create bucket gs://%s for project %s", - bucket.getName(), projectId), - e); - } + delegate.createBucket(projectId, bucket, backoff, sleeper); } @VisibleForTesting void removeBucket(Bucket bucket, BackOff backoff, Sleeper sleeper) throws IOException { - Storage.Buckets.Delete getBucket = storageClient.buckets().delete(bucket.getName()); - - try { - ResilientOperation.retry( - getBucket::execute, - backoff, - new RetryDeterminer() { - @Override - public boolean shouldRetry(IOException e) { - if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) { - return false; - } - return RETRY_DETERMINER.shouldRetry(e); - } - }, - IOException.class, - sleeper); - } catch (GoogleJsonResponseException e) { - if (errorExtractor.accessDenied(e)) { - throw new AccessDeniedException(bucket.getName(), null, e.getMessage()); - } - if (errorExtractor.itemNotFound(e)) { - throw new FileNotFoundException(e.getMessage()); - } - throw e; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException( - String.format("Error while attempting to remove bucket gs://%s", bucket.getName()), e); - } + delegate.removeBucket(bucket, backoff, sleeper); } - private static void executeBatches(List batches) throws IOException { - ExecutorService executor = - MoreExecutors.listeningDecorator( - new ThreadPoolExecutor( - MAX_CONCURRENT_BATCHES, - MAX_CONCURRENT_BATCHES, - 0L, - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>())); - - List> futures = new ArrayList<>(); - for (final BatchInterface batch : batches) { - futures.add(MoreFutures.runAsync(batch::execute, executor)); - } - - try { - try { - MoreFutures.get(MoreFutures.allOf(futures)); - } catch (ExecutionException e) { - if (e.getCause() instanceof FileNotFoundException) { - throw (FileNotFoundException) e.getCause(); - } - throw new IOException("Error executing batch GCS request", e); - } finally { - // Give the other batches a chance to complete in error cases. - executor.shutdown(); - if (!executor.awaitTermination(5, TimeUnit.MINUTES)) { - LOG.warn("Taking over 5 minutes to flush gcs op batches after error"); - executor.shutdownNow(); - if (!executor.awaitTermination(5, TimeUnit.MINUTES)) { - LOG.warn("Took over 10 minutes to flush gcs op batches after error and interruption."); - } - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted while executing batch GCS request", e); - } - } - - /** - * Makes get {@link BatchInterface BatchInterfaces}. - * - * @param paths {@link GcsPath GcsPaths}. - * @param results mutable {@link List} for return values. - * @return {@link BatchInterface BatchInterfaces} to execute. - * @throws IOException - */ @VisibleForTesting - List makeGetBatches( + List makeGetBatches( Collection paths, List results) throws IOException { - List batches = new ArrayList<>(); - for (List filesToGet : - Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) { - BatchInterface batch = batchRequestSupplier.get(); - for (GcsPath path : filesToGet) { - results.add(enqueueGetFileSize(path, batch)); - } - batches.add(batch); - } - return batches; - } - - /** - * Wrapper for rewriting that supports multiple calls as well as possibly deleting the source - * file. - * - *

Usage: create, enqueue(), and execute batch. Then, check getReadyToEnqueue() if another - * round of enqueue() and execute is required. Repeat until getReadyToEnqueue() returns false. - */ - class RewriteOp extends JsonBatchCallback { - private final GcsPath from; - private final GcsPath to; - private final boolean deleteSource; - private final boolean ignoreMissingSource; - private boolean readyToEnqueue; - private boolean performDelete; - private @Nullable GoogleJsonError lastError; - @VisibleForTesting Storage.Objects.Rewrite rewriteRequest; - - public boolean getReadyToEnqueue() { - return readyToEnqueue; - } - - public @Nullable GoogleJsonError getLastError() { - return lastError; - } - - public GcsPath getFrom() { - return from; - } - - public GcsPath getTo() { - return to; - } - - public boolean isMetadataOperation() { - return performDelete || from.getBucket().equals(to.getBucket()); - } - - public void enqueue(BatchInterface batch) throws IOException { - if (!readyToEnqueue) { - throw new IOException( - String.format( - "Invalid state for Rewrite, from=%s, to=%s, readyToEnqueue=%s", - from, to, readyToEnqueue)); - } - if (!performDelete) { - batch.queue(rewriteRequest, this); - return; - } - Storage.Objects.Delete deleteRequest = - storageClient.objects().delete(from.getBucket(), from.getObject()); - batch.queue( - deleteRequest, - new JsonBatchCallback() { - @Override - public void onSuccess(Void obj, HttpHeaders responseHeaders) { - LOG.debug("Successfully deleted {} after moving to {}", from, to); - readyToEnqueue = false; - lastError = null; - } - - @Override - public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) - throws IOException { - if (e.getCode() == 404) { - LOG.info( - "Ignoring failed deletion of moved file {} which already does not exist: {}", - from, - e); - readyToEnqueue = false; - lastError = null; - } else { - readyToEnqueue = true; - lastError = e; - } - } - }); - } - - public RewriteOp(GcsPath from, GcsPath to, boolean deleteSource, boolean ignoreMissingSource) - throws IOException { - this.from = from; - this.to = to; - this.deleteSource = deleteSource; - this.ignoreMissingSource = ignoreMissingSource; - rewriteRequest = - storageClient - .objects() - .rewrite(from.getBucket(), from.getObject(), to.getBucket(), to.getObject(), null); - if (maxBytesRewrittenPerCall != null) { - rewriteRequest.setMaxBytesRewrittenPerCall(maxBytesRewrittenPerCall); - } - readyToEnqueue = true; - } + List legacyResults = new java.util.ArrayList<>(); + List legacyBatch = delegate.makeGetBatches(paths, legacyResults); - @Override - public void onSuccess(RewriteResponse rewriteResponse, HttpHeaders responseHeaders) - throws IOException { - lastError = null; - if (rewriteResponse.getDone()) { - if (deleteSource) { - readyToEnqueue = true; - performDelete = true; - } else { - readyToEnqueue = false; - } - } else { - LOG.debug( - "Rewrite progress: {} of {} bytes, {} to {}", - rewriteResponse.getTotalBytesRewritten(), - rewriteResponse.getObjectSize(), - from, - to); - rewriteRequest.setRewriteToken(rewriteResponse.getRewriteToken()); - readyToEnqueue = true; - if (numRewriteTokensUsed != null) { - numRewriteTokensUsed.incrementAndGet(); - } + for (GcsUtilLegacy.StorageObjectOrIOException[] legacyResult : legacyResults) { + StorageObjectOrIOException[] result = new StorageObjectOrIOException[legacyResult.length]; + for (int i = 0; i < legacyResult.length; ++i) { + result[i] = StorageObjectOrIOException.fromLegacy(legacyResult[i]); } + results.add(result); } - @Override - public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { - if (e.getCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) { - if (ignoreMissingSource) { - // Treat a missing source as a successful rewrite. - readyToEnqueue = false; - lastError = null; - } else { - throw new FileNotFoundException( - String.format( - "Rewrite from %s to %s has failed. Either source or sink not found. " - + "Failed with error: %s", - from.toString(), to.toString(), e.getMessage())); - } - } else if (e.getCode() == 403 - && e.getErrors().size() == 1 - && e.getErrors().get(0).getReason().equals("retentionPolicyNotMet")) { - List srcAndDestObjects = getObjects(Arrays.asList(from, to)); - String srcHash = srcAndDestObjects.get(0).storageObject().getMd5Hash(); - String destHash = srcAndDestObjects.get(1).storageObject().getMd5Hash(); - if (srcHash != null && srcHash.equals(destHash)) { - // Source and destination are identical. Treat this as a successful rewrite - LOG.warn( - "Caught retentionPolicyNotMet error while rewriting to a bucket with retention " - + "policy. Skipping because destination {} and source {} are considered identical " - + "because their MD5 Hashes are equal.", - getFrom(), - getTo()); - - if (deleteSource) { - readyToEnqueue = true; - performDelete = true; - } else { - readyToEnqueue = false; - } - lastError = null; - } else { - // User is attempting to write to a file that hasn't met its retention policy yet. - // Not a transient error so likely will not be fixed by a retry - throw new IOException(e.getMessage()); - } - } else { - lastError = e; - readyToEnqueue = true; - } - } + return legacyBatch; } public void copy(Iterable srcFilenames, Iterable destFilenames) throws IOException { - rewriteHelper( - srcFilenames, - destFilenames, - /*deleteSource=*/ false, - /*ignoreMissingSource=*/ false, - /*ignoreExistingDest=*/ false); + delegate.copy(srcFilenames, destFilenames); } public void rename( Iterable srcFilenames, Iterable destFilenames, MoveOptions... moveOptions) throws IOException { - // Rename is implemented as a rewrite followed by deleting the source. If the new object is in - // the same location, the copy is a metadata-only operation. - Set moveOptionSet = Sets.newHashSet(moveOptions); - final boolean ignoreMissingSrc = - moveOptionSet.contains(StandardMoveOptions.IGNORE_MISSING_FILES); - final boolean ignoreExistingDest = - moveOptionSet.contains(StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS); - rewriteHelper( - srcFilenames, destFilenames, /*deleteSource=*/ true, ignoreMissingSrc, ignoreExistingDest); - } - - private void rewriteHelper( - Iterable srcFilenames, - Iterable destFilenames, - boolean deleteSource, - boolean ignoreMissingSource, - boolean ignoreExistingDest) - throws IOException { - LinkedList rewrites = - makeRewriteOps( - srcFilenames, destFilenames, deleteSource, ignoreMissingSource, ignoreExistingDest); - org.apache.beam.sdk.util.BackOff backoff = BACKOFF_FACTORY.backoff(); - while (true) { - List batches = makeRewriteBatches(rewrites); // Removes completed rewrite ops. - if (batches.isEmpty()) { - break; - } - Preconditions.checkState(!rewrites.isEmpty()); - RewriteOp sampleErrorOp = - rewrites.stream().filter(op -> op.getLastError() != null).findFirst().orElse(null); - if (sampleErrorOp != null) { - long backOffMillis = backoff.nextBackOffMillis(); - if (backOffMillis == org.apache.beam.sdk.util.BackOff.STOP) { - throw new IOException( - String.format( - "Error completing file copies with retries, sample: from %s to %s due to %s", - sampleErrorOp.getFrom().toString(), - sampleErrorOp.getTo().toString(), - sampleErrorOp.getLastError())); - } - LOG.warn( - "Retrying with backoff unsuccessful copy requests, sample request: from {} to {} due to {}", - sampleErrorOp.getFrom(), - sampleErrorOp.getTo(), - sampleErrorOp.getLastError()); - try { - Thread.sleep(backOffMillis); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException( - String.format( - "Interrupted backoff of file copies with retries, sample: from %s to %s due to %s", - sampleErrorOp.getFrom().toString(), - sampleErrorOp.getTo().toString(), - sampleErrorOp.getLastError())); - } - } - executeBatches(batches); - } + delegate.rename(srcFilenames, destFilenames, moveOptions); } - LinkedList makeRewriteOps( + @VisibleForTesting + @SuppressWarnings("JdkObsolete") + java.util.LinkedList makeRewriteOps( Iterable srcFilenames, Iterable destFilenames, boolean deleteSource, boolean ignoreMissingSource, boolean ignoreExistingDest) throws IOException { - List srcList = Lists.newArrayList(srcFilenames); - List destList = Lists.newArrayList(destFilenames); - checkArgument( - srcList.size() == destList.size(), - "Number of source files %s must equal number of destination files %s", - srcList.size(), - destList.size()); - LinkedList rewrites = Lists.newLinkedList(); - for (int i = 0; i < srcList.size(); i++) { - final GcsPath sourcePath = GcsPath.fromUri(srcList.get(i)); - final GcsPath destPath = GcsPath.fromUri(destList.get(i)); - if (ignoreExistingDest && !sourcePath.getBucket().equals(destPath.getBucket())) { - throw new UnsupportedOperationException( - "Skipping dest existence is only supported within a bucket."); - } - rewrites.addLast(new RewriteOp(sourcePath, destPath, deleteSource, ignoreMissingSource)); - } - return rewrites; + return delegate.makeRewriteOps( + srcFilenames, destFilenames, deleteSource, ignoreMissingSource, ignoreExistingDest); } - List makeRewriteBatches(LinkedList rewrites) throws IOException { - List batches = new ArrayList<>(); - @Nullable BatchInterface opBatch = null; - boolean useSeparateRewriteDataBatch = this.rewriteDataOpBatchLimit != MAX_REQUESTS_PER_BATCH; - Iterator it = rewrites.iterator(); - List deferredRewriteDataOps = new ArrayList<>(); - while (it.hasNext()) { - RewriteOp rewrite = it.next(); - if (!rewrite.getReadyToEnqueue()) { - it.remove(); - continue; - } - if (useSeparateRewriteDataBatch && !rewrite.isMetadataOperation()) { - deferredRewriteDataOps.add(rewrite); - } else { - if (opBatch != null && opBatch.size() >= MAX_REQUESTS_PER_BATCH) { - opBatch = null; - } - if (opBatch == null) { - opBatch = batchRequestSupplier.get(); - batches.add(opBatch); - } - rewrite.enqueue(opBatch); - } - } - for (RewriteOp rewrite : deferredRewriteDataOps) { - if (opBatch != null && opBatch.size() >= this.rewriteDataOpBatchLimit) { - opBatch = null; - } - if (opBatch == null) { - opBatch = batchRequestSupplier.get(); - batches.add(opBatch); - } - rewrite.enqueue(opBatch); - } - return batches; + @VisibleForTesting + @SuppressWarnings("JdkObsolete") + List makeRewriteBatches( + java.util.LinkedList rewrites) throws IOException { + return delegate.makeRewriteBatches(rewrites); } - List makeRemoveBatches(Collection filenames) throws IOException { - List batches = new ArrayList<>(); - for (List filesToDelete : - Lists.partition(Lists.newArrayList(filenames), MAX_REQUESTS_PER_BATCH)) { - BatchInterface batch = batchRequestSupplier.get(); - for (String file : filesToDelete) { - enqueueDelete(GcsPath.fromUri(file), batch); - } - batches.add(batch); - } - return batches; + @VisibleForTesting + List makeRemoveBatches(Collection filenames) + throws IOException { + return delegate.makeRemoveBatches(filenames); } public void remove(Collection filenames) throws IOException { - // TODO(https://github.com/apache/beam/issues/19859): It would be better to add per-file retries - // and backoff - // instead of failing everything if a single operation fails. - executeBatches(makeRemoveBatches(filenames)); + delegate.remove(filenames); } - private StorageObjectOrIOException[] enqueueGetFileSize(final GcsPath path, BatchInterface batch) - throws IOException { - final StorageObjectOrIOException[] ret = new StorageObjectOrIOException[1]; - - Storage.Objects.Get getRequest = - storageClient.objects().get(path.getBucket(), path.getObject()); - batch.queue( - getRequest, - new JsonBatchCallback() { - @Override - public void onSuccess(StorageObject response, HttpHeaders httpHeaders) - throws IOException { - ret[0] = StorageObjectOrIOException.create(response); - } - - @Override - public void onFailure(GoogleJsonError e, HttpHeaders httpHeaders) throws IOException { - IOException ioException; - if (e.getCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) { - ioException = new FileNotFoundException(path.toString()); - } else { - ioException = new IOException(String.format("Error trying to get %s: %s", path, e)); - } - ret[0] = StorageObjectOrIOException.create(ioException); - } - }); - return ret; - } - - /** A class that holds either a {@link StorageObject} or an {@link IOException}. */ - // It is clear from the name that this class holds either StorageObject or IOException. @SuppressFBWarnings("NM_CLASS_NOT_EXCEPTION") - @AutoValue - public abstract static class StorageObjectOrIOException { - - /** Returns the {@link StorageObject}. */ - public abstract @Nullable StorageObject storageObject(); + public static class StorageObjectOrIOException { + final GcsUtilLegacy.StorageObjectOrIOException delegate; - /** Returns the {@link IOException}. */ - public abstract @Nullable IOException ioException(); + private StorageObjectOrIOException(GcsUtilLegacy.StorageObjectOrIOException delegate) { + this.delegate = delegate; + } - @VisibleForTesting public static StorageObjectOrIOException create(StorageObject storageObject) { - return new AutoValue_GcsUtil_StorageObjectOrIOException( - checkNotNull(storageObject, "storageObject"), null /* ioException */); + return new StorageObjectOrIOException( + GcsUtilLegacy.StorageObjectOrIOException.create(storageObject)); } - @VisibleForTesting public static StorageObjectOrIOException create(IOException ioException) { - return new AutoValue_GcsUtil_StorageObjectOrIOException( - null /* storageObject */, checkNotNull(ioException, "ioException")); + return new StorageObjectOrIOException( + GcsUtilLegacy.StorageObjectOrIOException.create(ioException)); } - } - - private void enqueueDelete(final GcsPath file, BatchInterface batch) throws IOException { - Storage.Objects.Delete deleteRequest = - storageClient.objects().delete(file.getBucket(), file.getObject()); - batch.queue( - deleteRequest, - new JsonBatchCallback() { - @Override - public void onSuccess(Void obj, HttpHeaders responseHeaders) { - LOG.debug("Successfully deleted {}", file); - } - - @Override - public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { - if (e.getCode() == 404) { - LOG.info( - "Ignoring failed deletion of file {} which already does not exist: {}", file, e); - } else { - throw new IOException(String.format("Error trying to delete %s: %s", file, e)); - } - } - }); - } - @VisibleForTesting - interface BatchInterface { - void queue(AbstractGoogleJsonClientRequest request, JsonBatchCallback cb) - throws IOException; + static StorageObjectOrIOException fromLegacy(GcsUtilLegacy.StorageObjectOrIOException legacy) { + return new StorageObjectOrIOException(legacy); + } - void execute() throws IOException; + public @Nullable StorageObject storageObject() { + return delegate.storageObject(); + } - int size(); + public @Nullable IOException ioException() { + return delegate.ioException(); + } } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilLegacy.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilLegacy.java new file mode 100644 index 000000000000..e3aa01034899 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilLegacy.java @@ -0,0 +1,1448 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.util; + +import static org.apache.beam.sdk.io.FileSystemUtils.wildcardToRegexp; +import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.client.googleapis.batch.BatchRequest; +import com.google.api.client.googleapis.batch.json.JsonBatchCallback; +import com.google.api.client.googleapis.json.GoogleJsonError; +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest; +import com.google.api.client.http.HttpHeaders; +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.client.http.HttpStatusCodes; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.Sleeper; +import com.google.api.services.storage.Storage; +import com.google.api.services.storage.model.Bucket; +import com.google.api.services.storage.model.Objects; +import com.google.api.services.storage.model.RewriteResponse; +import com.google.api.services.storage.model.StorageObject; +import com.google.auth.Credentials; +import com.google.auto.value.AutoValue; +import com.google.cloud.hadoop.gcsio.CreateObjectOptions; +import com.google.cloud.hadoop.gcsio.GoogleCloudStorage; +import com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl; +import com.google.cloud.hadoop.gcsio.GoogleCloudStorageOptions; +import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions; +import com.google.cloud.hadoop.gcsio.StorageResourceId; +import com.google.cloud.hadoop.util.ApiErrorExtractor; +import com.google.cloud.hadoop.util.AsyncWriteChannelOptions; +import com.google.cloud.hadoop.util.ResilientOperation; +import com.google.cloud.hadoop.util.RetryDeterminer; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.lang.reflect.Method; +import java.nio.channels.SeekableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.nio.file.AccessDeniedException; +import java.nio.file.FileAlreadyExistsException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers; +import org.apache.beam.runners.core.metrics.MonitoringInfoConstants; +import org.apache.beam.runners.core.metrics.ServiceCallMetric; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.extensions.gcp.util.channels.CountingSeekableByteChannel; +import org.apache.beam.sdk.extensions.gcp.util.channels.CountingWritableByteChannel; +import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; +import org.apache.beam.sdk.io.fs.MoveOptions; +import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.MoreFutures; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Provides operations on GCS. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class GcsUtilLegacy { + + @AutoValue + public abstract static class GcsCountersOptions { + public abstract @Nullable String getReadCounterPrefix(); + + public abstract @Nullable String getWriteCounterPrefix(); + + public boolean hasAnyPrefix() { + return getWriteCounterPrefix() != null || getReadCounterPrefix() != null; + } + + public static GcsCountersOptions create( + @Nullable String readCounterPrefix, @Nullable String writeCounterPrefix) { + return new AutoValue_GcsUtilLegacy_GcsCountersOptions(readCounterPrefix, writeCounterPrefix); + } + } + + public static class GcsReadOptionsFactory + implements DefaultValueFactory { + @Override + public GoogleCloudStorageReadOptions create(PipelineOptions options) { + return GoogleCloudStorageReadOptions.DEFAULT; + } + } + + /** + * This is a {@link DefaultValueFactory} able to create a {@link GcsUtilLegacy} using any + * transport flags specified on the {@link PipelineOptions}. + */ + public static class GcsUtilFactory implements DefaultValueFactory { + /** + * Returns an instance of {@link GcsUtilLegacy} based on the {@link PipelineOptions}. + * + *

If no instance has previously been created, one is created and the value stored in {@code + * options}. + */ + @Override + public GcsUtilLegacy create(PipelineOptions options) { + LOG.debug("Creating new GcsUtil"); + GcsOptions gcsOptions = options.as(GcsOptions.class); + Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions); + return new GcsUtilLegacy( + storageBuilder.build(), + storageBuilder.getHttpRequestInitializer(), + gcsOptions.getExecutorService(), + hasExperiment(options, "use_grpc_for_gcs"), + gcsOptions.getGcpCredential(), + gcsOptions.getGcsUploadBufferSizeBytes(), + gcsOptions.getGcsRewriteDataOpBatchLimit(), + GcsCountersOptions.create( + gcsOptions.getEnableBucketReadMetricCounter() + ? gcsOptions.getGcsReadCounterPrefix() + : null, + gcsOptions.getEnableBucketWriteMetricCounter() + ? gcsOptions.getGcsWriteCounterPrefix() + : null), + gcsOptions.getGoogleCloudStorageReadOptions()); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(GcsUtilLegacy.class); + + /** Maximum number of items to retrieve per Objects.List request. */ + private static final long MAX_LIST_ITEMS_PER_CALL = 1024; + + /** Matches a glob containing a wildcard, capturing the portion before the first wildcard. */ + private static final Pattern GLOB_PREFIX = Pattern.compile("(?[^\\[*?]*)[\\[*?].*"); + + /** Maximum number of requests permitted in a GCS batch request. */ + private static final int MAX_REQUESTS_PER_BATCH = 100; + /** Default maximum number of requests permitted in a GCS batch request where data is copied. */ + private static final int MAX_REQUESTS_PER_COPY_BATCH = 10; + /** Maximum number of concurrent batches of requests executing on GCS. */ + private static final int MAX_CONCURRENT_BATCHES = 256; + + private static final FluentBackoff BACKOFF_FACTORY = + FluentBackoff.DEFAULT.withMaxRetries(10).withInitialBackoff(Duration.standardSeconds(1)); + private static final RetryDeterminer RETRY_DETERMINER = + new RetryDeterminer() { + @Override + public boolean shouldRetry(IOException e) { + if (e instanceof GoogleJsonResponseException) { + int statusCode = ((GoogleJsonResponseException) e).getStatusCode(); + return statusCode == 408 // Request Timeout + || statusCode == 429 // Too many requests + || (statusCode >= 500 && statusCode < 600); // Server errors + } + return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e); + } + }; + + ///////////////////////////////////////////////////////////////////////////// + + /** Client for the GCS API. */ + private Storage storageClient; + + private Supplier batchRequestSupplier; + + private final HttpRequestInitializer httpRequestInitializer; + /** Buffer size for GCS uploads (in bytes). */ + private final @Nullable Integer uploadBufferSizeBytes; + + // Helper delegate for turning IOExceptions from API calls into higher-level semantics. + private final ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); + + // Unbounded thread pool for codependent pipeline operations that will deadlock the pipeline if + // starved for threads. + // Exposed for testing. + final ExecutorService executorService; + + private final Credentials credentials; + + private GoogleCloudStorage googleCloudStorage; + private GoogleCloudStorageOptions googleCloudStorageOptions; + + private final int rewriteDataOpBatchLimit; + + private final GcsCountersOptions gcsCountersOptions; + + /** Rewrite operation setting. For testing purposes only. */ + @VisibleForTesting @Nullable Long maxBytesRewrittenPerCall; + + @VisibleForTesting @Nullable AtomicInteger numRewriteTokensUsed; + + /** Returns the prefix portion of the glob that doesn't contain wildcards. */ + public static String getNonWildcardPrefix(String globExp) { + Matcher m = GLOB_PREFIX.matcher(globExp); + checkArgument(m.matches(), String.format("Glob expression: [%s] is not expandable.", globExp)); + return m.group("PREFIX"); + } + + /** Returns true if the given {@code spec} contains wildcard. */ + public static boolean isWildcard(GcsPath spec) { + return GLOB_PREFIX.matcher(spec.getObject()).matches(); + } + + @VisibleForTesting + GcsUtilLegacy( + Storage storageClient, + HttpRequestInitializer httpRequestInitializer, + ExecutorService executorService, + Boolean shouldUseGrpc, + Credentials credentials, + @Nullable Integer uploadBufferSizeBytes, + @Nullable Integer rewriteDataOpBatchLimit, + GcsCountersOptions gcsCountersOptions, + GcsOptions gcsOptions) { + this( + storageClient, + httpRequestInitializer, + executorService, + shouldUseGrpc, + credentials, + uploadBufferSizeBytes, + rewriteDataOpBatchLimit, + gcsCountersOptions, + gcsOptions.getGoogleCloudStorageReadOptions()); + } + + @VisibleForTesting + GcsUtilLegacy( + Storage storageClient, + HttpRequestInitializer httpRequestInitializer, + ExecutorService executorService, + Boolean shouldUseGrpc, + Credentials credentials, + @Nullable Integer uploadBufferSizeBytes, + @Nullable Integer rewriteDataOpBatchLimit, + GcsCountersOptions gcsCountersOptions, + GoogleCloudStorageReadOptions gcsReadOptions) { + this.storageClient = storageClient; + this.httpRequestInitializer = httpRequestInitializer; + this.uploadBufferSizeBytes = uploadBufferSizeBytes; + this.executorService = executorService; + this.credentials = credentials; + this.maxBytesRewrittenPerCall = null; + this.numRewriteTokensUsed = null; + googleCloudStorageOptions = + GoogleCloudStorageOptions.builder() + .setAppName("Beam") + .setReadChannelOptions(gcsReadOptions) + .setGrpcEnabled(shouldUseGrpc) + .build(); + googleCloudStorage = + createGoogleCloudStorage(googleCloudStorageOptions, storageClient, credentials); + this.batchRequestSupplier = + () -> { + // Capture reference to this so that the most recent storageClient and initializer + // are used. + GcsUtilLegacy util = this; + return new BatchInterface() { + final BatchRequest batch = util.storageClient.batch(util.httpRequestInitializer); + + @Override + public void queue( + AbstractGoogleJsonClientRequest request, JsonBatchCallback cb) + throws IOException { + request.queue(batch, cb); + } + + @Override + public void execute() throws IOException { + batch.execute(); + } + + @Override + public int size() { + return batch.size(); + } + }; + }; + this.rewriteDataOpBatchLimit = + rewriteDataOpBatchLimit == null ? MAX_REQUESTS_PER_COPY_BATCH : rewriteDataOpBatchLimit; + this.gcsCountersOptions = gcsCountersOptions; + } + + // Use this only for testing purposes. + protected void setStorageClient(Storage storageClient) { + this.storageClient = storageClient; + } + + // Use this only for testing purposes. + protected void setBatchRequestSupplier(Supplier supplier) { + this.batchRequestSupplier = supplier; + } + + /** + * Expands a pattern into matched paths. The pattern path may contain globs, which are expanded in + * the result. For patterns that only match a single object, we ensure that the object exists. + */ + public List expand(GcsPath gcsPattern) throws IOException { + Pattern p = null; + String prefix = null; + if (isWildcard(gcsPattern)) { + // Part before the first wildcard character. + prefix = getNonWildcardPrefix(gcsPattern.getObject()); + p = Pattern.compile(wildcardToRegexp(gcsPattern.getObject())); + } else { + // Not a wildcard. + try { + // Use a get request to fetch the metadata of the object, and ignore the return value. + // The request has strong global consistency. + getObject(gcsPattern); + return ImmutableList.of(gcsPattern); + } catch (FileNotFoundException e) { + // If the path was not found, return an empty list. + return ImmutableList.of(); + } + } + + LOG.debug( + "matching files in bucket {}, prefix {} against pattern {}", + gcsPattern.getBucket(), + prefix, + p.toString()); + + String pageToken = null; + List results = new ArrayList<>(); + do { + Objects objects = listObjects(gcsPattern.getBucket(), prefix, pageToken); + if (objects.getItems() == null) { + break; + } + + // Filter objects based on the regex. + for (StorageObject o : objects.getItems()) { + String name = o.getName(); + // Skip directories, which end with a slash. + if (p.matcher(name).matches() && !name.endsWith("/")) { + LOG.debug("Matched object: {}", name); + results.add(GcsPath.fromObject(o)); + } + } + pageToken = objects.getNextPageToken(); + } while (pageToken != null); + + return results; + } + + @VisibleForTesting + @Nullable + Integer getUploadBufferSizeBytes() { + return uploadBufferSizeBytes; + } + + private static BackOff createBackOff() { + return BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff()); + } + + /** + * Returns the file size from GCS or throws {@link FileNotFoundException} if the resource does not + * exist. + */ + public long fileSize(GcsPath path) throws IOException { + return getObject(path).getSize().longValue(); + } + + /** Returns the {@link StorageObject} for the given {@link GcsPath}. */ + public StorageObject getObject(GcsPath gcsPath) throws IOException { + return getObject(gcsPath, createBackOff(), Sleeper.DEFAULT); + } + + @VisibleForTesting + StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper) throws IOException { + Storage.Objects.Get getObject = + storageClient.objects().get(gcsPath.getBucket(), gcsPath.getObject()); + try { + return ResilientOperation.retry( + getObject::execute, backoff, RetryDeterminer.SOCKET_ERRORS, IOException.class, sleeper); + } catch (IOException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + if (e instanceof IOException && errorExtractor.itemNotFound((IOException) e)) { + throw new FileNotFoundException(gcsPath.toString()); + } + throw new IOException( + String.format("Unable to get the file object for path %s.", gcsPath), e); + } + } + + /** + * Returns {@link StorageObjectOrIOException StorageObjectOrIOExceptions} for the given {@link + * GcsPath GcsPaths}. + */ + public List getObjects(List gcsPaths) throws IOException { + if (gcsPaths.isEmpty()) { + return ImmutableList.of(); + } else if (gcsPaths.size() == 1) { + GcsPath path = gcsPaths.get(0); + try { + StorageObject object = getObject(path); + return ImmutableList.of(StorageObjectOrIOException.create(object)); + } catch (IOException e) { + return ImmutableList.of(StorageObjectOrIOException.create(e)); + } catch (Exception e) { + IOException ioException = + new IOException(String.format("Error trying to get %s: %s", path, e)); + return ImmutableList.of(StorageObjectOrIOException.create(ioException)); + } + } + + List results = new ArrayList<>(); + executeBatches(makeGetBatches(gcsPaths, results)); + ImmutableList.Builder ret = ImmutableList.builder(); + for (StorageObjectOrIOException[] result : results) { + ret.add(result[0]); + } + return ret.build(); + } + + public Objects listObjects(String bucket, String prefix, @Nullable String pageToken) + throws IOException { + return listObjects(bucket, prefix, pageToken, null); + } + + /** + * Lists {@link Objects} given the {@code bucket}, {@code prefix}, {@code pageToken}. + * + *

For more details, see https://cloud.google.com/storage/docs/json_api/v1/objects/list. + */ + public Objects listObjects( + String bucket, String prefix, @Nullable String pageToken, @Nullable String delimiter) + throws IOException { + // List all objects that start with the prefix (including objects in sub-directories). + Storage.Objects.List listObject = storageClient.objects().list(bucket); + listObject.setMaxResults(MAX_LIST_ITEMS_PER_CALL); + listObject.setPrefix(prefix); + listObject.setDelimiter(delimiter); + + if (pageToken != null) { + listObject.setPageToken(pageToken); + } + + try { + return ResilientOperation.retry( + listObject::execute, createBackOff(), RetryDeterminer.SOCKET_ERRORS, IOException.class); + } catch (Exception e) { + throw new IOException( + String.format("Unable to match files in bucket %s, prefix %s.", bucket, prefix), e); + } + } + + /** + * Returns the file size from GCS or throws {@link FileNotFoundException} if the resource does not + * exist. + */ + @VisibleForTesting + List fileSizes(List paths) throws IOException { + List results = getObjects(paths); + + ImmutableList.Builder ret = ImmutableList.builder(); + for (StorageObjectOrIOException result : results) { + ret.add(toFileSize(result)); + } + return ret.build(); + } + + private Long toFileSize(StorageObjectOrIOException storageObjectOrIOException) + throws IOException { + if (storageObjectOrIOException.ioException() != null) { + throw storageObjectOrIOException.ioException(); + } else { + return storageObjectOrIOException.storageObject().getSize().longValue(); + } + } + + @VisibleForTesting + void setCloudStorageImpl(GoogleCloudStorage g) { + googleCloudStorage = g; + } + + @VisibleForTesting + void setCloudStorageImpl(GoogleCloudStorageOptions g) { + googleCloudStorageOptions = g; + } + + /** + * Create an integer consumer that updates the counter identified by a prefix and a bucket name. + */ + private static Consumer createCounterConsumer(String counterNamePrefix, String bucket) { + return Metrics.counter(GcsUtil.class, String.format("%s_%s", counterNamePrefix, bucket))::inc; + } + + private WritableByteChannel wrapInCounting( + WritableByteChannel writableByteChannel, String bucket) { + if (writableByteChannel instanceof CountingWritableByteChannel) { + return writableByteChannel; + } + return Optional.ofNullable(gcsCountersOptions.getWriteCounterPrefix()) + .map( + prefix -> { + LOG.debug( + "wrapping writable byte channel using counter name prefix {} and bucket {}", + prefix, + bucket); + return new CountingWritableByteChannel( + writableByteChannel, createCounterConsumer(prefix, bucket)); + }) + .orElse(writableByteChannel); + } + + private SeekableByteChannel wrapInCounting( + SeekableByteChannel seekableByteChannel, String bucket) { + if (seekableByteChannel instanceof CountingSeekableByteChannel + || !gcsCountersOptions.hasAnyPrefix()) { + return seekableByteChannel; + } + + return new CountingSeekableByteChannel( + seekableByteChannel, + Optional.ofNullable(gcsCountersOptions.getReadCounterPrefix()) + .map( + prefix -> { + LOG.debug( + "wrapping seekable byte channel with \"bytes read\" counter name prefix {}" + + " and bucket {}", + prefix, + bucket); + return createCounterConsumer(prefix, bucket); + }) + .orElse(null), + Optional.ofNullable(gcsCountersOptions.getWriteCounterPrefix()) + .map( + prefix -> { + LOG.debug( + "wrapping seekable byte channel with \"bytes written\" counter name prefix {}" + + " and bucket {}", + prefix, + bucket); + return createCounterConsumer(prefix, bucket); + }) + .orElse(null)); + } + + /** + * Opens an object in GCS. + * + *

Returns a SeekableByteChannel that provides access to data in the bucket. + * + * @param path the GCS filename to read from + * @return a SeekableByteChannel that can read the object data + */ + public SeekableByteChannel open(GcsPath path) throws IOException { + return open(path, this.googleCloudStorageOptions.getReadChannelOptions()); + } + + /** + * Opens an object in GCS. + * + *

Returns a SeekableByteChannel that provides access to data in the bucket. + * + * @param path the GCS filename to read from + * @param readOptions Fine-grained options for behaviors of retries, buffering, etc. + * @return a SeekableByteChannel that can read the object data + */ + @VisibleForTesting + SeekableByteChannel open(GcsPath path, GoogleCloudStorageReadOptions readOptions) + throws IOException { + HashMap baseLabels = new HashMap<>(); + baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, ""); + baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "Storage"); + baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "GcsGet"); + baseLabels.put( + MonitoringInfoConstants.Labels.RESOURCE, + GcpResourceIdentifiers.cloudStorageBucket(path.getBucket())); + baseLabels.put( + MonitoringInfoConstants.Labels.GCS_PROJECT_ID, + String.valueOf(googleCloudStorageOptions.getProjectId())); + baseLabels.put(MonitoringInfoConstants.Labels.GCS_BUCKET, path.getBucket()); + + ServiceCallMetric serviceCallMetric = + new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels); + try { + SeekableByteChannel channel = + googleCloudStorage.open( + new StorageResourceId(path.getBucket(), path.getObject()), readOptions); + serviceCallMetric.call("ok"); + return wrapInCounting(channel, path.getBucket()); + } catch (IOException e) { + if (e.getCause() instanceof GoogleJsonResponseException) { + serviceCallMetric.call(((GoogleJsonResponseException) e.getCause()).getDetails().getCode()); + } + throw e; + } + } + + /** @deprecated Use {@link #create(GcsPath, CreateOptions)} instead. */ + @Deprecated + public WritableByteChannel create(GcsPath path, String type) throws IOException { + CreateOptions.Builder builder = CreateOptions.builder().setContentType(type); + return create(path, builder.build()); + } + + /** @deprecated Use {@link #create(GcsPath, CreateOptions)} instead. */ + @Deprecated + public WritableByteChannel create(GcsPath path, String type, Integer uploadBufferSizeBytes) + throws IOException { + CreateOptions.Builder builder = + CreateOptions.builder() + .setContentType(type) + .setUploadBufferSizeBytes(uploadBufferSizeBytes); + return create(path, builder.build()); + } + + @AutoValue + public abstract static class CreateOptions { + /** + * If true, the created file is expected to not exist. Instead of checking for file presence + * before writing a write exception may occur if the file does exist. + */ + public abstract boolean getExpectFileToNotExist(); + + /** + * If non-null, the upload buffer size to be used. If null, the buffer size corresponds to {code + * GCSUtil.getUploadBufferSizeBytes} + */ + public abstract @Nullable Integer getUploadBufferSizeBytes(); + + /** The content type for the created file, eg "text/plain". */ + public abstract @Nullable String getContentType(); + + public static Builder builder() { + return new AutoValue_GcsUtilLegacy_CreateOptions.Builder().setExpectFileToNotExist(false); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setContentType(String value); + + public abstract Builder setUploadBufferSizeBytes(int value); + + public abstract Builder setExpectFileToNotExist(boolean value); + + public abstract CreateOptions build(); + } + } + + /** + * Creates an object in GCS and prepares for uploading its contents. + * + * @param path the GCS file to write to + * @param options to be used for creating and configuring file upload + * @return a WritableByteChannel that can be used to write data to the object. + */ + public WritableByteChannel create(GcsPath path, CreateOptions options) throws IOException { + AsyncWriteChannelOptions wcOptions = googleCloudStorageOptions.getWriteChannelOptions(); + @Nullable + Integer uploadBufferSizeBytes = + options.getUploadBufferSizeBytes() != null + ? options.getUploadBufferSizeBytes() + : getUploadBufferSizeBytes(); + if (uploadBufferSizeBytes != null) { + wcOptions = wcOptions.toBuilder().setUploadChunkSize(uploadBufferSizeBytes).build(); + } + GoogleCloudStorageOptions newGoogleCloudStorageOptions = + googleCloudStorageOptions.toBuilder().setWriteChannelOptions(wcOptions).build(); + GoogleCloudStorage gcpStorage = + createGoogleCloudStorage( + newGoogleCloudStorageOptions, this.storageClient, this.credentials); + StorageResourceId resourceId = + new StorageResourceId( + path.getBucket(), + path.getObject(), + // If we expect the file not to exist, we set a generation id of 0. This avoids a read + // to identify the object exists already and should be overwritten. + // See {@link GoogleCloudStorage#create(StorageResourceId, GoogleCloudStorageOptions)} + options.getExpectFileToNotExist() ? 0L : StorageResourceId.UNKNOWN_GENERATION_ID); + CreateObjectOptions.Builder createBuilder = + CreateObjectOptions.builder().setOverwriteExisting(true); + if (options.getContentType() != null) { + createBuilder = createBuilder.setContentType(options.getContentType()); + } + + HashMap baseLabels = new HashMap<>(); + baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, ""); + baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "Storage"); + baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "GcsInsert"); + baseLabels.put( + MonitoringInfoConstants.Labels.RESOURCE, + GcpResourceIdentifiers.cloudStorageBucket(path.getBucket())); + baseLabels.put( + MonitoringInfoConstants.Labels.GCS_PROJECT_ID, + String.valueOf(googleCloudStorageOptions.getProjectId())); + baseLabels.put(MonitoringInfoConstants.Labels.GCS_BUCKET, path.getBucket()); + + ServiceCallMetric serviceCallMetric = + new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels); + try { + WritableByteChannel channel = gcpStorage.create(resourceId, createBuilder.build()); + serviceCallMetric.call("ok"); + return wrapInCounting(channel, path.getBucket()); + } catch (IOException e) { + if (e.getCause() instanceof GoogleJsonResponseException) { + serviceCallMetric.call(((GoogleJsonResponseException) e.getCause()).getDetails().getCode()); + } + throw e; + } + } + + GoogleCloudStorage createGoogleCloudStorage( + GoogleCloudStorageOptions options, Storage storage, Credentials credentials) { + try { + return new GoogleCloudStorageImpl(options, storage, credentials); + } catch (NoSuchMethodError e) { + // gcs-connector 3.x drops the direct constructor and exclusively uses Builder + // TODO eliminate reflection once Beam drops Java 8 support and upgrades to gcsio 3.x + try { + final Method builderMethod = GoogleCloudStorageImpl.class.getMethod("builder"); + Object builder = builderMethod.invoke(null); + final Class builderClass = + Class.forName( + "com.google.cloud.hadoop.gcsio.AutoBuilder_GoogleCloudStorageImpl_Builder"); + + final Method setOptionsMethod = + builderClass.getMethod("setOptions", GoogleCloudStorageOptions.class); + setOptionsMethod.setAccessible(true); + builder = setOptionsMethod.invoke(builder, options); + + final Method setHttpTransportMethod = + builderClass.getMethod("setHttpTransport", HttpTransport.class); + setHttpTransportMethod.setAccessible(true); + builder = + setHttpTransportMethod.invoke(builder, storage.getRequestFactory().getTransport()); + + final Method setCredentialsMethod = + builderClass.getMethod("setCredentials", Credentials.class); + setCredentialsMethod.setAccessible(true); + builder = setCredentialsMethod.invoke(builder, credentials); + + final Method setHttpRequestInitializerMethod = + builderClass.getMethod("setHttpRequestInitializer", HttpRequestInitializer.class); + setHttpRequestInitializerMethod.setAccessible(true); + builder = setHttpRequestInitializerMethod.invoke(builder, httpRequestInitializer); + + final Method buildMethod = builderClass.getMethod("build"); + buildMethod.setAccessible(true); + return (GoogleCloudStorage) buildMethod.invoke(builder); + } catch (Exception reflectionError) { + throw new RuntimeException( + "Failed to construct GoogleCloudStorageImpl from gcsio 3.x Builder", reflectionError); + } + } + } + + /** + * Checks whether the GCS bucket exists. Similar to {@link #bucketAccessible(GcsPath)}, but throws + * exception if the bucket is inaccessible due to permissions or does not exist. + */ + public void verifyBucketAccessible(GcsPath path) throws IOException { + verifyBucketAccessible(path, createBackOff(), Sleeper.DEFAULT); + } + + /** Returns whether the GCS bucket exists and is accessible. */ + public boolean bucketAccessible(GcsPath path) throws IOException { + return bucketAccessible(path, createBackOff(), Sleeper.DEFAULT); + } + + /** + * Returns the project number of the project which owns this bucket. If the bucket exists, it must + * be accessible otherwise the permissions exception will be propagated. If the bucket does not + * exist, an exception will be thrown. + */ + public long bucketOwner(GcsPath path) throws IOException { + return getBucket(path, createBackOff(), Sleeper.DEFAULT).getProjectNumber().longValue(); + } + + /** + * Creates a {@link Bucket} under the specified project in Cloud Storage or propagates an + * exception. + */ + public void createBucket(String projectId, Bucket bucket) throws IOException { + createBucket(projectId, bucket, createBackOff(), Sleeper.DEFAULT); + } + + /** Get the {@link Bucket} from Cloud Storage path or propagates an exception. */ + @Nullable + public Bucket getBucket(GcsPath path) throws IOException { + return getBucket(path, createBackOff(), Sleeper.DEFAULT); + } + + /** Remove an empty {@link Bucket} in Cloud Storage or propagates an exception. */ + public void removeBucket(Bucket bucket) throws IOException { + removeBucket(bucket, createBackOff(), Sleeper.DEFAULT); + } + + /** + * Returns whether the GCS bucket exists. This will return false if the bucket is inaccessible due + * to permissions. + */ + @VisibleForTesting + boolean bucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { + try { + return getBucket(path, backoff, sleeper) != null; + } catch (AccessDeniedException | FileNotFoundException e) { + return false; + } + } + + /** + * Checks whether the GCS bucket exists. Similar to {@link #bucketAccessible(GcsPath, BackOff, + * Sleeper)}, but throws exception if the bucket is inaccessible due to permissions or does not + * exist. + */ + @VisibleForTesting + void verifyBucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { + getBucket(path, backoff, sleeper); + } + + @VisibleForTesting + @Nullable + Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { + Storage.Buckets.Get getBucket = storageClient.buckets().get(path.getBucket()); + + try { + return ResilientOperation.retry( + getBucket::execute, + backoff, + new RetryDeterminer() { + @Override + public boolean shouldRetry(IOException e) { + if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) { + return false; + } + return RETRY_DETERMINER.shouldRetry(e); + } + }, + IOException.class, + sleeper); + } catch (GoogleJsonResponseException e) { + if (errorExtractor.accessDenied(e)) { + throw new AccessDeniedException(path.toString(), null, e.getMessage()); + } + if (errorExtractor.itemNotFound(e)) { + throw new FileNotFoundException(e.getMessage()); + } + throw e; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException( + String.format( + "Error while attempting to verify existence of bucket gs://%s", path.getBucket()), + e); + } + } + + @VisibleForTesting + void createBucket(String projectId, Bucket bucket, BackOff backoff, Sleeper sleeper) + throws IOException { + Storage.Buckets.Insert insertBucket = storageClient.buckets().insert(projectId, bucket); + insertBucket.setPredefinedAcl("projectPrivate"); + insertBucket.setPredefinedDefaultObjectAcl("projectPrivate"); + + try { + ResilientOperation.retry( + insertBucket::execute, + backoff, + new RetryDeterminer() { + @Override + public boolean shouldRetry(IOException e) { + if (errorExtractor.itemAlreadyExists(e) || errorExtractor.accessDenied(e)) { + return false; + } + return RETRY_DETERMINER.shouldRetry(e); + } + }, + IOException.class, + sleeper); + return; + } catch (GoogleJsonResponseException e) { + if (errorExtractor.accessDenied(e)) { + throw new AccessDeniedException(bucket.getName(), null, e.getMessage()); + } + if (errorExtractor.itemAlreadyExists(e)) { + throw new FileAlreadyExistsException(bucket.getName(), null, e.getMessage()); + } + throw e; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException( + String.format( + "Error while attempting to create bucket gs://%s for project %s", + bucket.getName(), projectId), + e); + } + } + + @VisibleForTesting + void removeBucket(Bucket bucket, BackOff backoff, Sleeper sleeper) throws IOException { + Storage.Buckets.Delete getBucket = storageClient.buckets().delete(bucket.getName()); + + try { + ResilientOperation.retry( + getBucket::execute, + backoff, + new RetryDeterminer() { + @Override + public boolean shouldRetry(IOException e) { + if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) { + return false; + } + return RETRY_DETERMINER.shouldRetry(e); + } + }, + IOException.class, + sleeper); + } catch (GoogleJsonResponseException e) { + if (errorExtractor.accessDenied(e)) { + throw new AccessDeniedException(bucket.getName(), null, e.getMessage()); + } + if (errorExtractor.itemNotFound(e)) { + throw new FileNotFoundException(e.getMessage()); + } + throw e; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException( + String.format("Error while attempting to remove bucket gs://%s", bucket.getName()), e); + } + } + + private static void executeBatches(List batches) throws IOException { + ExecutorService executor = + MoreExecutors.listeningDecorator( + new ThreadPoolExecutor( + MAX_CONCURRENT_BATCHES, + MAX_CONCURRENT_BATCHES, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>())); + + List> futures = new ArrayList<>(); + for (final BatchInterface batch : batches) { + futures.add(MoreFutures.runAsync(batch::execute, executor)); + } + + try { + try { + MoreFutures.get(MoreFutures.allOf(futures)); + } catch (ExecutionException e) { + if (e.getCause() instanceof FileNotFoundException) { + throw (FileNotFoundException) e.getCause(); + } + throw new IOException("Error executing batch GCS request", e); + } finally { + // Give the other batches a chance to complete in error cases. + executor.shutdown(); + if (!executor.awaitTermination(5, TimeUnit.MINUTES)) { + LOG.warn("Taking over 5 minutes to flush gcs op batches after error"); + executor.shutdownNow(); + if (!executor.awaitTermination(5, TimeUnit.MINUTES)) { + LOG.warn("Took over 10 minutes to flush gcs op batches after error and interruption."); + } + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while executing batch GCS request", e); + } + } + + /** + * Makes get {@link BatchInterface BatchInterfaces}. + * + * @param paths {@link GcsPath GcsPaths}. + * @param results mutable {@link List} for return values. + * @return {@link BatchInterface BatchInterfaces} to execute. + * @throws IOException + */ + @VisibleForTesting + List makeGetBatches( + Collection paths, List results) throws IOException { + List batches = new ArrayList<>(); + for (List filesToGet : + Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) { + BatchInterface batch = batchRequestSupplier.get(); + for (GcsPath path : filesToGet) { + results.add(enqueueGetFileSize(path, batch)); + } + batches.add(batch); + } + return batches; + } + + /** + * Wrapper for rewriting that supports multiple calls as well as possibly deleting the source + * file. + * + *

Usage: create, enqueue(), and execute batch. Then, check getReadyToEnqueue() if another + * round of enqueue() and execute is required. Repeat until getReadyToEnqueue() returns false. + */ + class RewriteOp extends JsonBatchCallback { + private final GcsPath from; + private final GcsPath to; + private final boolean deleteSource; + private final boolean ignoreMissingSource; + private boolean readyToEnqueue; + private boolean performDelete; + private @Nullable GoogleJsonError lastError; + @VisibleForTesting Storage.Objects.Rewrite rewriteRequest; + + public boolean getReadyToEnqueue() { + return readyToEnqueue; + } + + public @Nullable GoogleJsonError getLastError() { + return lastError; + } + + public GcsPath getFrom() { + return from; + } + + public GcsPath getTo() { + return to; + } + + public boolean isMetadataOperation() { + return performDelete || from.getBucket().equals(to.getBucket()); + } + + public void enqueue(BatchInterface batch) throws IOException { + if (!readyToEnqueue) { + throw new IOException( + String.format( + "Invalid state for Rewrite, from=%s, to=%s, readyToEnqueue=%s", + from, to, readyToEnqueue)); + } + if (!performDelete) { + batch.queue(rewriteRequest, this); + return; + } + Storage.Objects.Delete deleteRequest = + storageClient.objects().delete(from.getBucket(), from.getObject()); + batch.queue( + deleteRequest, + new JsonBatchCallback() { + @Override + public void onSuccess(Void obj, HttpHeaders responseHeaders) { + LOG.debug("Successfully deleted {} after moving to {}", from, to); + readyToEnqueue = false; + lastError = null; + } + + @Override + public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) + throws IOException { + if (e.getCode() == 404) { + LOG.info( + "Ignoring failed deletion of moved file {} which already does not exist: {}", + from, + e); + readyToEnqueue = false; + lastError = null; + } else { + readyToEnqueue = true; + lastError = e; + } + } + }); + } + + public RewriteOp(GcsPath from, GcsPath to, boolean deleteSource, boolean ignoreMissingSource) + throws IOException { + this.from = from; + this.to = to; + this.deleteSource = deleteSource; + this.ignoreMissingSource = ignoreMissingSource; + rewriteRequest = + storageClient + .objects() + .rewrite(from.getBucket(), from.getObject(), to.getBucket(), to.getObject(), null); + if (maxBytesRewrittenPerCall != null) { + rewriteRequest.setMaxBytesRewrittenPerCall(maxBytesRewrittenPerCall); + } + readyToEnqueue = true; + } + + @Override + public void onSuccess(RewriteResponse rewriteResponse, HttpHeaders responseHeaders) + throws IOException { + lastError = null; + if (rewriteResponse.getDone()) { + if (deleteSource) { + readyToEnqueue = true; + performDelete = true; + } else { + readyToEnqueue = false; + } + } else { + LOG.debug( + "Rewrite progress: {} of {} bytes, {} to {}", + rewriteResponse.getTotalBytesRewritten(), + rewriteResponse.getObjectSize(), + from, + to); + rewriteRequest.setRewriteToken(rewriteResponse.getRewriteToken()); + readyToEnqueue = true; + if (numRewriteTokensUsed != null) { + numRewriteTokensUsed.incrementAndGet(); + } + } + } + + @Override + public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { + if (e.getCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) { + if (ignoreMissingSource) { + // Treat a missing source as a successful rewrite. + readyToEnqueue = false; + lastError = null; + } else { + throw new FileNotFoundException( + String.format( + "Rewrite from %s to %s has failed. Either source or sink not found. " + + "Failed with error: %s", + from.toString(), to.toString(), e.getMessage())); + } + } else if (e.getCode() == 403 + && e.getErrors().size() == 1 + && e.getErrors().get(0).getReason().equals("retentionPolicyNotMet")) { + List srcAndDestObjects = getObjects(Arrays.asList(from, to)); + String srcHash = srcAndDestObjects.get(0).storageObject().getMd5Hash(); + String destHash = srcAndDestObjects.get(1).storageObject().getMd5Hash(); + if (srcHash != null && srcHash.equals(destHash)) { + // Source and destination are identical. Treat this as a successful rewrite + LOG.warn( + "Caught retentionPolicyNotMet error while rewriting to a bucket with retention " + + "policy. Skipping because destination {} and source {} are considered identical " + + "because their MD5 Hashes are equal.", + getFrom(), + getTo()); + + if (deleteSource) { + readyToEnqueue = true; + performDelete = true; + } else { + readyToEnqueue = false; + } + lastError = null; + } else { + // User is attempting to write to a file that hasn't met its retention policy yet. + // Not a transient error so likely will not be fixed by a retry + throw new IOException(e.getMessage()); + } + } else { + lastError = e; + readyToEnqueue = true; + } + } + } + + public void copy(Iterable srcFilenames, Iterable destFilenames) + throws IOException { + rewriteHelper( + srcFilenames, + destFilenames, + /*deleteSource=*/ false, + /*ignoreMissingSource=*/ false, + /*ignoreExistingDest=*/ false); + } + + public void rename( + Iterable srcFilenames, Iterable destFilenames, MoveOptions... moveOptions) + throws IOException { + // Rename is implemented as a rewrite followed by deleting the source. If the new object is in + // the same location, the copy is a metadata-only operation. + Set moveOptionSet = Sets.newHashSet(moveOptions); + final boolean ignoreMissingSrc = + moveOptionSet.contains(StandardMoveOptions.IGNORE_MISSING_FILES); + final boolean ignoreExistingDest = + moveOptionSet.contains(StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS); + rewriteHelper( + srcFilenames, destFilenames, /*deleteSource=*/ true, ignoreMissingSrc, ignoreExistingDest); + } + + private void rewriteHelper( + Iterable srcFilenames, + Iterable destFilenames, + boolean deleteSource, + boolean ignoreMissingSource, + boolean ignoreExistingDest) + throws IOException { + LinkedList rewrites = + makeRewriteOps( + srcFilenames, destFilenames, deleteSource, ignoreMissingSource, ignoreExistingDest); + org.apache.beam.sdk.util.BackOff backoff = BACKOFF_FACTORY.backoff(); + while (true) { + List batches = makeRewriteBatches(rewrites); // Removes completed rewrite ops. + if (batches.isEmpty()) { + break; + } + Preconditions.checkState(!rewrites.isEmpty()); + RewriteOp sampleErrorOp = + rewrites.stream().filter(op -> op.getLastError() != null).findFirst().orElse(null); + if (sampleErrorOp != null) { + long backOffMillis = backoff.nextBackOffMillis(); + if (backOffMillis == org.apache.beam.sdk.util.BackOff.STOP) { + throw new IOException( + String.format( + "Error completing file copies with retries, sample: from %s to %s due to %s", + sampleErrorOp.getFrom().toString(), + sampleErrorOp.getTo().toString(), + sampleErrorOp.getLastError())); + } + LOG.warn( + "Retrying with backoff unsuccessful copy requests, sample request: from {} to {} due to {}", + sampleErrorOp.getFrom(), + sampleErrorOp.getTo(), + sampleErrorOp.getLastError()); + try { + Thread.sleep(backOffMillis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException( + String.format( + "Interrupted backoff of file copies with retries, sample: from %s to %s due to %s", + sampleErrorOp.getFrom().toString(), + sampleErrorOp.getTo().toString(), + sampleErrorOp.getLastError())); + } + } + executeBatches(batches); + } + } + + LinkedList makeRewriteOps( + Iterable srcFilenames, + Iterable destFilenames, + boolean deleteSource, + boolean ignoreMissingSource, + boolean ignoreExistingDest) + throws IOException { + List srcList = Lists.newArrayList(srcFilenames); + List destList = Lists.newArrayList(destFilenames); + checkArgument( + srcList.size() == destList.size(), + "Number of source files %s must equal number of destination files %s", + srcList.size(), + destList.size()); + LinkedList rewrites = Lists.newLinkedList(); + for (int i = 0; i < srcList.size(); i++) { + final GcsPath sourcePath = GcsPath.fromUri(srcList.get(i)); + final GcsPath destPath = GcsPath.fromUri(destList.get(i)); + if (ignoreExistingDest && !sourcePath.getBucket().equals(destPath.getBucket())) { + throw new UnsupportedOperationException( + "Skipping dest existence is only supported within a bucket."); + } + rewrites.addLast(new RewriteOp(sourcePath, destPath, deleteSource, ignoreMissingSource)); + } + return rewrites; + } + + List makeRewriteBatches(LinkedList rewrites) throws IOException { + List batches = new ArrayList<>(); + @Nullable BatchInterface opBatch = null; + boolean useSeparateRewriteDataBatch = this.rewriteDataOpBatchLimit != MAX_REQUESTS_PER_BATCH; + Iterator it = rewrites.iterator(); + List deferredRewriteDataOps = new ArrayList<>(); + while (it.hasNext()) { + RewriteOp rewrite = it.next(); + if (!rewrite.getReadyToEnqueue()) { + it.remove(); + continue; + } + if (useSeparateRewriteDataBatch && !rewrite.isMetadataOperation()) { + deferredRewriteDataOps.add(rewrite); + } else { + if (opBatch != null && opBatch.size() >= MAX_REQUESTS_PER_BATCH) { + opBatch = null; + } + if (opBatch == null) { + opBatch = batchRequestSupplier.get(); + batches.add(opBatch); + } + rewrite.enqueue(opBatch); + } + } + for (RewriteOp rewrite : deferredRewriteDataOps) { + if (opBatch != null && opBatch.size() >= this.rewriteDataOpBatchLimit) { + opBatch = null; + } + if (opBatch == null) { + opBatch = batchRequestSupplier.get(); + batches.add(opBatch); + } + rewrite.enqueue(opBatch); + } + return batches; + } + + List makeRemoveBatches(Collection filenames) throws IOException { + List batches = new ArrayList<>(); + for (List filesToDelete : + Lists.partition(Lists.newArrayList(filenames), MAX_REQUESTS_PER_BATCH)) { + BatchInterface batch = batchRequestSupplier.get(); + for (String file : filesToDelete) { + enqueueDelete(GcsPath.fromUri(file), batch); + } + batches.add(batch); + } + return batches; + } + + public void remove(Collection filenames) throws IOException { + // TODO(https://github.com/apache/beam/issues/19859): It would be better to add per-file retries + // and backoff + // instead of failing everything if a single operation fails. + executeBatches(makeRemoveBatches(filenames)); + } + + private StorageObjectOrIOException[] enqueueGetFileSize(final GcsPath path, BatchInterface batch) + throws IOException { + final StorageObjectOrIOException[] ret = new StorageObjectOrIOException[1]; + + Storage.Objects.Get getRequest = + storageClient.objects().get(path.getBucket(), path.getObject()); + batch.queue( + getRequest, + new JsonBatchCallback() { + @Override + public void onSuccess(StorageObject response, HttpHeaders httpHeaders) + throws IOException { + ret[0] = StorageObjectOrIOException.create(response); + } + + @Override + public void onFailure(GoogleJsonError e, HttpHeaders httpHeaders) throws IOException { + IOException ioException; + if (e.getCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) { + ioException = new FileNotFoundException(path.toString()); + } else { + ioException = new IOException(String.format("Error trying to get %s: %s", path, e)); + } + ret[0] = StorageObjectOrIOException.create(ioException); + } + }); + return ret; + } + + /** A class that holds either a {@link StorageObject} or an {@link IOException}. */ + // It is clear from the name that this class holds either StorageObject or IOException. + @SuppressFBWarnings("NM_CLASS_NOT_EXCEPTION") + @AutoValue + public abstract static class StorageObjectOrIOException { + + /** Returns the {@link StorageObject}. */ + public abstract @Nullable StorageObject storageObject(); + + /** Returns the {@link IOException}. */ + public abstract @Nullable IOException ioException(); + + @VisibleForTesting + public static StorageObjectOrIOException create(StorageObject storageObject) { + return new AutoValue_GcsUtilLegacy_StorageObjectOrIOException( + checkNotNull(storageObject, "storageObject"), null /* ioException */); + } + + @VisibleForTesting + public static StorageObjectOrIOException create(IOException ioException) { + return new AutoValue_GcsUtilLegacy_StorageObjectOrIOException( + null /* storageObject */, checkNotNull(ioException, "ioException")); + } + } + + private void enqueueDelete(final GcsPath file, BatchInterface batch) throws IOException { + Storage.Objects.Delete deleteRequest = + storageClient.objects().delete(file.getBucket(), file.getObject()); + batch.queue( + deleteRequest, + new JsonBatchCallback() { + @Override + public void onSuccess(Void obj, HttpHeaders responseHeaders) { + LOG.debug("Successfully deleted {}", file); + } + + @Override + public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { + if (e.getCode() == 404) { + LOG.info( + "Ignoring failed deletion of file {} which already does not exist: {}", file, e); + } else { + throw new IOException(String.format("Error trying to delete %s: %s", file, e)); + } + } + }); + } + + @VisibleForTesting + interface BatchInterface { + void queue(AbstractGoogleJsonClientRequest request, JsonBatchCallback cb) + throws IOException; + + void execute() throws IOException; + + int size(); + } +} diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java index 6477564f01a1..b6c92ab9369d 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java @@ -73,12 +73,12 @@ public void testRewriteMultiPart() throws IOException { gcsOptions.getGcpTempLocation() + String.format( "/GcsUtilIT-%tF-% gcsUtil.open(GcsPath.fromComponents("testbucket", "testbucket"), readOptions)); + IOException.class, () -> gcsUtil.open(GcsPath.fromComponents("testbucket", "testbucket"))); verifyMetricWasSet("my_project", "testbucket", "GcsGet", "permission_denied", 1); } @@ -1029,7 +1039,7 @@ public void testGCSWriteMetricsIsSet() throws IOException { GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); GoogleCloudStorage mockStorage = Mockito.mock(GoogleCloudStorage.class); - gcsUtil.setCloudStorageImpl( + gcsUtil.delegate.setCloudStorageImpl( GoogleCloudStorageOptions.builder() .setAppName("Beam") .setGrpcEnabled(true) @@ -1154,7 +1164,7 @@ public void testMakeRewriteOps() throws IOException { public void testMakeRewriteOpsWithOptions() throws IOException { GcsOptions gcsOptions = gcsOptionsWithTestCredential(); GcsUtil gcsUtil = gcsOptions.getGcsUtil(); - gcsUtil.maxBytesRewrittenPerCall = 1337L; + gcsUtil.delegate.maxBytesRewrittenPerCall = 1337L; LinkedList rewrites = gcsUtil.makeRewriteOps(makeStrings("s", 1), makeStrings("d", 1), false, false, false); @@ -1617,11 +1627,68 @@ public void testListObjectsException() throws IOException { public static class GcsUtilMock extends GcsUtil { - public GoogleCloudStorage googleCloudStorage; - public static GcsUtilMock createMockWithMockStorage(PipelineOptions options, byte[] readPayload) throws IOException { GcsUtilMock gcsUtilMock = createMock(options); + + GcsUtilLegacyMock mockLegacy = + GcsUtilLegacyMock.createMockWithMockStorage(options, readPayload); + gcsUtilMock.delegate = mockLegacy; + + return gcsUtilMock; + } + + public static GcsUtilMock createMock(PipelineOptions options) { + GcsOptions gcsOptions = options.as(GcsOptions.class); + Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions); + return new GcsUtilMock( + storageBuilder.build(), + storageBuilder.getHttpRequestInitializer(), + gcsOptions.getExecutorService(), + hasExperiment(options, "use_grpc_for_gcs"), + gcsOptions.getGcpCredential(), + gcsOptions.getGcsUploadBufferSizeBytes(), + gcsOptions.getGcsRewriteDataOpBatchLimit(), + GcsUtil.GcsCountersOptions.create( + gcsOptions.getEnableBucketReadMetricCounter() + ? gcsOptions.getGcsReadCounterPrefix() + : null, + gcsOptions.getEnableBucketWriteMetricCounter() + ? gcsOptions.getGcsWriteCounterPrefix() + : null), + gcsOptions); + } + + private GcsUtilMock( + Storage storageClient, + HttpRequestInitializer httpRequestInitializer, + ExecutorService executorService, + Boolean shouldUseGrpc, + Credentials credentials, + @Nullable Integer uploadBufferSizeBytes, + @Nullable Integer rewriteDataOpBatchLimit, + GcsUtil.GcsCountersOptions gcsCountersOptions, + GcsOptions gcsOptions) { + super( + storageClient, + httpRequestInitializer, + executorService, + shouldUseGrpc, + credentials, + uploadBufferSizeBytes, + rewriteDataOpBatchLimit, + gcsCountersOptions, + gcsOptions); + } + } + + public static class GcsUtilLegacyMock extends GcsUtilLegacy { + + public GoogleCloudStorage googleCloudStorage; + + public static GcsUtilLegacyMock createMockWithMockStorage( + PipelineOptions options, byte[] readPayload) throws IOException { + GcsUtilLegacyMock gcsUtilMock = createMock(options); GoogleCloudStorage googleCloudStorageMock = Mockito.mock(GoogleCloudStorage.class); gcsUtilMock.googleCloudStorage = googleCloudStorageMock; // set the mock in the super object as well @@ -1639,10 +1706,10 @@ public static GcsUtilMock createMockWithMockStorage(PipelineOptions options, byt return gcsUtilMock; } - public static GcsUtilMock createMock(PipelineOptions options) { + public static GcsUtilLegacyMock createMock(PipelineOptions options) { GcsOptions gcsOptions = options.as(GcsOptions.class); Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions); - return new GcsUtilMock( + return new GcsUtilLegacyMock( storageBuilder.build(), storageBuilder.getHttpRequestInitializer(), gcsOptions.getExecutorService(), @@ -1650,7 +1717,7 @@ public static GcsUtilMock createMock(PipelineOptions options) { gcsOptions.getGcpCredential(), gcsOptions.getGcsUploadBufferSizeBytes(), gcsOptions.getGcsRewriteDataOpBatchLimit(), - GcsCountersOptions.create( + GcsUtilLegacy.GcsCountersOptions.create( gcsOptions.getEnableBucketReadMetricCounter() ? gcsOptions.getGcsReadCounterPrefix() : null, @@ -1660,7 +1727,7 @@ public static GcsUtilMock createMock(PipelineOptions options) { gcsOptions.getGoogleCloudStorageReadOptions()); } - private GcsUtilMock( + private GcsUtilLegacyMock( Storage storageClient, HttpRequestInitializer httpRequestInitializer, ExecutorService executorService, @@ -1668,7 +1735,7 @@ private GcsUtilMock( Credentials credentials, @Nullable Integer uploadBufferSizeBytes, @Nullable Integer rewriteDataOpBatchLimit, - GcsCountersOptions gcsCountersOptions, + GcsUtilLegacy.GcsCountersOptions gcsCountersOptions, GoogleCloudStorageReadOptions gcsReadOptions) { super( storageClient, @@ -1698,7 +1765,9 @@ public void testCreate() throws IOException { GoogleCloudStorage mockStorage = Mockito.mock(GoogleCloudStorage.class); WritableByteChannel mockChannel = Mockito.mock(WritableByteChannel.class); - gcsUtil.googleCloudStorage = mockStorage; + GcsUtilLegacyMock mockLegacy = GcsUtilLegacyMock.createMock(gcsOptions); + mockLegacy.googleCloudStorage = mockStorage; + gcsUtil.delegate = mockLegacy; when(mockStorage.create(any(), any())).thenReturn(mockChannel); @@ -1716,7 +1785,9 @@ public void testCreateWithException() throws IOException { GoogleCloudStorage mockStorage = Mockito.mock(GoogleCloudStorage.class); - gcsUtil.googleCloudStorage = mockStorage; + GcsUtilLegacyMock mockLegacy = GcsUtilLegacyMock.createMock(gcsOptions); + mockLegacy.googleCloudStorage = mockStorage; + gcsUtil.delegate = mockLegacy; when(mockStorage.create(any(), any())).thenThrow(new RuntimeException("testException")); @@ -1762,13 +1833,15 @@ private void testReadMetrics(boolean enabled, GoogleCloudStorageReadOptions read GcsOptions gcsOptions = PipelineOptionsFactory.create().as(GcsOptions.class); gcsOptions.setEnableBucketReadMetricCounter(enabled); gcsOptions.setGcsReadCounterPrefix("test_counter"); + if (readOptions != null) { + gcsOptions.setGoogleCloudStorageReadOptions(readOptions); + } byte[] payload = "some_bytes".getBytes(StandardCharsets.UTF_8); GcsUtilMock gcsUtil = GcsUtilMock.createMockWithMockStorage(gcsOptions, payload); String bucketName = "some_bucket"; GcsPath gcsPath = new GcsPath(null, bucketName, "o1"); // act - try (SeekableByteChannel byteChannel = - readOptions != null ? gcsUtil.open(gcsPath, readOptions) : gcsUtil.open(gcsPath)) { + try (SeekableByteChannel byteChannel = gcsUtil.open(gcsPath)) { int bytesReadReportedByChannel = byteChannel.read(ByteBuffer.allocate(payload.length)); long bytesReadReportedByMetric = testMetricsContainer