diff --git a/README.md b/README.md
index 3c62c37..7bd9b70 100644
--- a/README.md
+++ b/README.md
@@ -4,6 +4,30 @@ This tutorial demonstrates how to use [Google Cloud Dataflow](http://cloud.googl
For details about how the tutorial works, see [Processing Logs at Scale Using Cloud Dataflow](http://cloud.google.com/solutions/processing-logs-at-scale-using-dataflow) on the Google Cloud Platform website.
+[TOC levels=3]: # " "
+
+- [Architecture Overview](#architecture-overview)
+- [Prerequisites](#prerequisites)
+- [Sample Web Applications](#sample-web-applications)
+ - [Deploy the Container Engine cluster](#deploy-the-container-engine-cluster)
+ - [Set up Cloud Logging](#set-up-cloud-logging)
+ - [Generate requests](#generate-requests)
+ - [Examining logs](#examining-logs)
+- [Cloud Dataflow Pipeline](#cloud-dataflow-pipeline)
+ - [Create the BigQuery dataset](#create-the-bigquery-dataset)
+ - [Run the pipeline](#run-the-pipeline)
+ - [Monitoring the pipeline](#monitoring-the-pipeline)
+ - [View BigQuery data](#view-bigquery-data)
+- [Cleaning Up](#cleaning-up)
+
+
+
+## Architecture Overview
+
+In this solution, a set of sample microservices run on [Google Kubernetes Engine](https://cloud.google.com/kubernetes-engine/) to implement a website. [Stackdriver Logging](https://cloud.google.com/logging/) collects logs from these services and then saves them to [Google Cloud Storage](https://cloud.google.com/storage/) buckets. [Google Cloud Dataflow](https://cloud.google.com/dataflow/) then processes the logs by extracting metadata and computing basic aggregations. The Cloud Dataflow pipeline is designed to process the log elements daily to generate aggregate metrics for server response times, based on the logs for each day. Finally, the output from Cloud Dataflow is loaded into [Google BigQuery](https://cloud.google.com/bigquery/) tables, where it can be analyzed to provide business intelligence.
+
+
+
## Prerequisites
* [Java JDK](http://www.oracle.com/technetwork/java/javase/downloads/index.html) (version 1.7 or greater)
@@ -20,10 +44,31 @@ After installing the Google Cloud SDK, run `gcloud components update` to install
* gcloud app Python Extensions
* kubectl
+Define names for resources
+
+```bash
+# cd ~/projects/gcp/misc/processing-logs-using-dataflow
+export PROJECT_HOME=$(pwd)
+
+export PROJECT_ID="new-logs-demo" ## pick your favorite name here
+export CLUSTER_NAME="my-first-cluster-9995"
+export BUCKET_NAME=${PROJECT_ID}-data-bucket
+
+# BigQuery dataset name, must be alphanumeric (plus underscores)
+export DATASET_NAME=$(echo ${PROJECT_ID}-bq-data | tr "-" "_" )
+```
+
+
Set your preferred zone and project:
+```bash
+# Configure GCP for project
+# gcloud config set compute/zone ZONE # e.g.
+#gcloud config set compute/zone europe-west1
+gcloud config set compute/zone us-central1-f
+
+gcloud config set project ${PROJECT_ID}
+```
- $ gcloud config set compute/zone ZONE
- $ gcloud config set project PROJECT-ID
Ensure the following APIs are enabled in the [Google Cloud Console](https://console.developers.google.com/). Navigate to **API Manager** and enable:
@@ -34,35 +79,128 @@ Ensure the following APIs are enabled in the [Google Cloud Console](https://cons
* Google Cloud Storage
* Google Container Engine
+
+Or use the shell to enable them https://cloud.google.com/endpoints/docs/openapi/enable-api
+
+```bash
+gcloud services list --available | grep container
+
+gcloud services enable container.googleapis.com
+gcloud services enable containerregistry.googleapis.com
+gcloud services enable dataflow.googleapis.com
+```
+
+Typically the other permission are already enabled by default when using a test-GCP account.
+
## Sample Web Applications
-The `services` folder contains three simple applications built using [Go](http://golang.org) and the [Gin](https://github.com/gin-gonic/gin) HTTP web framework. These applications generate the logs to be analyzed by the Dataflow pipeline. The applications have been packaged as Docker images and are available through [Google Container Registry](https://gcr.io). **Note:** If you are interested in editing/updating these applications, refer to the [README](https://github.com/GoogleCloudPlatform/dataflow-log-analytics/tree/master/services).
+The `services` folder contains three simple applications built using [Go](http://golang.org) and the [Gin](https://github.com/gin-gonic/gin) HTTP web framework. **These applications generate the logs to be analyzed by the Dataflow pipeline.** The applications have been packaged as Docker images and are available through [Google Container Registry](https://gcr.io). **Note:** If you are interested in editing/updating these applications, refer to the [README](https://github.com/GoogleCloudPlatform/dataflow-log-analytics/tree/master/services).
In the `services` folder, there are several scripts you can use to facilitate deployment, configuration, and testing of the sample web applications.
### Deploy the Container Engine cluster
-First, change the current directory to `services`:
+First, we need to build the project and [push the images](https://cloud.google.com/container-registry/docs/pushing-and-pulling) to the registry
+
+```bash
+cd ${PROJECT_HOME}/services
+
+gcloud auth configure-docker
- $ cd dataflow-log-analytics/services
+make build image tag push
+```
+
+We can checkout the images in _Container Registry_ UI
+```
+open https://console.cloud.google.com/gcr/images/new-logs-demo?project=${PROJECT_ID}
+```
Next, deploy the Container Engine cluster with the sample web applications:
- $ ./cluster.sh PROJECT-ID CLUSTER-NAME up
+
+```bash
+./cluster.sh ${PROJECT_ID} ${CLUSTER_NAME} up
+
+## to shut it down use
+# ./cluster.sh ${PROJECT_ID} ${CLUSTER_NAME} down
+
+## to inspect cluster structure see
+open https://console.cloud.google.com/kubernetes/workload_/gcloud/us-central1-f/${CLUSTER_NAME}?${PROJECT_ID}
+
+
+## to test the services extract the public IPs and e.g. do
+curl http://35.224.38.74:8100/browse/category/23
+
+## to reconnect to an existing cluster to use
+## see https://cloud.google.com/kubernetes-engine/docs/quickstart#create_cluster
+#gcloud container clusters list
+gcloud container clusters get-credentials ${CLUSTER_NAME}
+kubectl get pods ## or do something else with the cluster
+```
+
The script will deploy a single-node Container Engine cluster, deploy the web applications, and expose the applications as Kubernetes services.
+Check if there were any issues with the deployment! The cluster may not contain enough cpu power (g1-small -> 0.5 vCPU but (_Die Standard-CPU-Anfrage beträgt 100 MB oder 10 % einer CPU bzw. eines Kerns._). Also see https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-cpu
+
+
### Set up Cloud Logging
-The next step is to configure Cloud Logging to export the web application logs to Google Cloud Storage. The following script first creates a Cloud Storage bucket, configures the appropriate permissions, and sets up automated export from the web applications to Cloud Storage. **Note:** the `BUCKET-NAME` should not be an existing Cloud Storage bucket.
- $ ./logging.sh PROJECT-ID BUCKET-NAME batch up
+By default, Stackdriver logging collects only your container's standard output and standard error streams. It can be configured to also collect any logs your application writes to a file.
+
+
+
+The next step is to configure Cloud Logging to export the web application logs to Google Cloud Storage. The following script first creates a Cloud Storage bucket, configures the appropriate permissions, and sets up automated export from the web applications to Cloud Storage. **Note:** the `${BUCKET_NAME}` should not be an existing Cloud Storage bucket.
+
+For details see https://cloud.google.com/kubernetes-engine/docs/how-to/logging, https://cloud.google.com/storage/docs/gsutil/commands/mb and https://cloud.google.com/sdk/gcloud/reference/beta/logging/sinks/create
+
+
+
+```bash
+cd ${PROJECT_HOME}/services
+
+./logging.sh ${PROJECT_ID} ${BUCKET_NAME} batch up
+
+# ./logging.sh ${PROJECT_ID} ${BUCKET_NAME} batch down
+
+## to check that the sinks were correctly created run
+gcloud logging sinks list
+```
+
+We can list the created buckets with `gsutil ls`.
+
+The sinks are listed in the GCP UI under
+```bash
+open https://console.cloud.google.com/logs/exports?project=${PROJECT_ID}
+```
+
+
+To ensure that data is logged we can create an event and listen on the pod logs
+```
+kubectl logs -f browse-service-2djpb &
+
+
+## TODO adjust IP based on `kubectl get services`
+curl http://35.224.38.74:8100/browse/category/67
+curl http://35.202.0.152:8200/locate/66?zipcode=12345
+```
+
+
+Finally, we can open the bucket directly
+```
+open https://console.cloud.google.com/storage/browser/${BUCKET_NAME}
+```
+
### Generate requests
Now that the applications have been deployed and are logging through Cloud Logging, you can use the following script to generate requests against the applications:
- $ ./load.sh REQUESTS CONCURRENCY
+```bash
+# Usage./load.sh REQUESTS CONCURRENCY
+./load.sh 100 2
+```
This script uses Apache Bench [ab](https://httpd.apache.org/docs/2.2/programs/ab.html) to generate load against the deployed web applications. `REQUESTS` controls how many requests are issued to each application and `CONCURRENCY` controls how many concurrent requests are issued. The logs from the applications are sent to Cloud Storage in hourly batches, and it can take up to two hours before log entries start to appear. For more information, see the [Cloud Logging documentation](https://cloud.google.com/logging/docs/export/using_exported_logs).
@@ -70,7 +208,17 @@ This script uses Apache Bench [ab](https://httpd.apache.org/docs/2.2/programs/ab
For information on examining logs or log structure in Cloud Storage, see the [Cloud Logging documentation](https://cloud.google.com/logging/docs/export/using_exported_logs#log_entries_in_google_cloud_storage).
-## Cloud Dataflow pipeline
+Most importantly we need some patience here because [as documented](https://cloud.google.com/logging/docs/export/using_exported_logs#gcs-availability)
+> Log entries going to Cloud Storage are batched and sent out approximately every hour.
+
+So it will take an hour or so before logs actually arrive in our bucket.
+
+So after an hour, we can check if the logs have arrived in the _GCP Logging_ `open https://console.cloud.google.com/logs/viewer?project=${PROJECT_ID}`
+
+Troubleshooting hints https://cloud.google.com/logging/docs/export/?hl=en_US&_ga=2.100645598.-1938216270.1543417411#troubleshooting
+
+
+## Cloud Dataflow Pipeline
The following diagram shows the structure and flow of the example Dataflow pipeline:
@@ -78,23 +226,37 @@ The following diagram shows the structure and flow of the example Dataflow pipel
### Create the BigQuery dataset
-Before deploying the pipeline, create the BigQuery dataset where output from the Cloud Dataflow pipeline will be stored:
+Before deploying the pipeline, [create the BigQuery dataset](https://cloud.google.com/bigquery/docs/datasets) where output from the Cloud Dataflow pipeline will be stored:
- $ gcloud alpha bigquery datasets create DATASET-NAME
+```bash
+
+bq mk ${DATASET_NAME}
+```
### Run the pipeline
First, change the current directory to `dataflow`:
- $ cd dataflow-log-analytics/dataflow
+```
+cd ${PROJECT_HOME}/dataflow
+```
+
+Next, Run the pipeline. Replace `${BUCKET_NAME}` with the same name you used for the logging setup:
-Next, Run the pipeline. Replace `BUCKET-NAME` with the same name you used for the logging setup:
+```bash
+./pipeline.sh ${PROJECT_ID} ${DATASET_NAME} ${BUCKET_NAME} run
+```
- $ ./pipeline.sh PROJECT-ID DATASET-NAME BUCKET-NAME run
-This command builds the code for the Cloud Dataflow pipeline, uploads it to the specified staging area, and launches the job. To see all options available for this pipeline, run the following command:
- $ ./pipeline.sh
+This command builds the code for the Cloud Dataflow pipeline, uploads it to the specified staging area, and launches the job. Since we're using legacy APIs we need to point it to some credentials (see https://cloud.google.com/storage/docs/reference/libraries#setting_up_authentication). To see all options available for this pipeline, run the following command:
+
+```
+
+export GOOGLE_APPLICATION_CREDENTIALS="[PATH]"
+
+./pipeline.sh
+```
### Monitoring the pipeline
@@ -106,20 +268,35 @@ While the pipeline is running, you can see its status in the [Google Developers
After the job has completed, you can see the output in the [BigQuery console](https://bigquery.cloud.google.com) and compose and run queries against the data.
-## Cleaning up
+```bash
+open https://console.cloud.google.com/bigquery?project=${PROJECT_ID}
+```
+
+And run quries, e.g.
+```sql
+SELECT destination, count(destination) FROM `new-logs-demo.new_logs_demo_bq_data.all_logs_table` GROUP BY destination LIMIT 1000
+```
+
+## Cleaning Up
To clean up and remove all resources used in this example:
1. Delete the BigQuery dataset:
- $ gcloud alpha bigquery datasets delete DATASET-NAME
+ $ gcloud alpha bigquery datasets delete ${DATASET_NAME}
1. Deactivate the Cloud Logging exports. This step deletes the exports and the specified Cloud Storage bucket:
$ cd dataflow-log-analytics/services
- $ ./logging.sh PROJECT-ID BUCKET-NAME batch down
+ $ ./logging.sh ${PROJECT_ID} ${BUCKET_NAME} batch down
1. Delete the Container Engine cluster used to run the sample web applications:
$ cd dataflow-log-analytics/services
- $ ./cluster.sh PROJECT-ID CLUSTER-NAME down
+ $ ./cluster.sh ${PROJECT_ID} ${CLUSTER_NAME} down
+
+
+Alternatively, we can also the delete the complete project in a single step
+```bash
+gcloud projects delete ${PROJECT_ID}
+```
diff --git a/dataflow/pipeline.sh b/dataflow/pipeline.sh
index ce13eaf..68536cb 100755
--- a/dataflow/pipeline.sh
+++ b/dataflow/pipeline.sh
@@ -42,10 +42,12 @@ case "$COMMAND" in
-Dexec.args="\
--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging \
- --runner=BlockingDataflowPipelineRunner \
+ --runner=DataflowRunner \
--homeLogSource=gs://${BUCKET_NAME}/sample-home-service/*/*/*/*.json \
--browseLogSource=gs://${BUCKET_NAME}/sample-browse-service/*/*/*/*.json \
--locateLogSource=gs://${BUCKET_NAME}/sample-locate-service/*/*/*/*.json \
+ --gcpTempLocation=gs://${BUCKET_NAME}/temp \
+ --tempLocation=gs://${BUCKET_NAME}/temp \
--allLogsTableName=${DATASET_NAME}.all_logs_table \
--maxRespTimeTableName=${DATASET_NAME}.max_response_time_table \
--meanRespTimeTableName=${DATASET_NAME}.mean_response_time_table"
diff --git a/dataflow/pom.xml b/dataflow/pom.xml
index 88a7b7f..c65cf0a 100644
--- a/dataflow/pom.xml
+++ b/dataflow/pom.xml
@@ -25,23 +25,25 @@ limitations under the License.
com.google.cloud.dataflowgoogle-cloud-dataflow-java-sdk-all
- LATEST
+
+ 2.5.0com.google.apisgoogle-api-services-logging
- v1beta3-rev10-1.21.0
+ v2-rev608-1.25.0com.google.http-clientgoogle-http-client
- 1.20.0
+ 1.27.0com.google.http-clientgoogle-http-client-jackson2
- 1.20.0
+ 1.27.0
+
junitjunit
@@ -67,6 +69,14 @@ limitations under the License.
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+ 7
+ 7
+
+
diff --git a/dataflow/src/main/java/com/google/cloud/solutions/LogAnalyticsPipeline.java b/dataflow/src/main/java/com/google/cloud/solutions/LogAnalyticsPipeline.java
index 3f4be5a..c49a9d1 100644
--- a/dataflow/src/main/java/com/google/cloud/solutions/LogAnalyticsPipeline.java
+++ b/dataflow/src/main/java/com/google/cloud/solutions/LogAnalyticsPipeline.java
@@ -1,17 +1,17 @@
-/**
-Copyright Google Inc. 2015
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-**/
+/*
+ Copyright Google Inc. 2015
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
package com.google.cloud.solutions;
@@ -20,19 +20,22 @@
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
-import com.google.api.services.logging.model.LogEntry;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.*;
-import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
+
+import com.google.api.services.logging.v2.model.LogEntry;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TypeDescriptor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
@@ -55,7 +58,7 @@ public class LogAnalyticsPipeline {
* - The input String is a Cloud Logging LogEntry JSON object
* - The "structPayload.log" field contains the log message to be parsed
*/
- private static class EmitLogMessageFn extends DoFn {
+ private static class EmitLogMessageFn extends DoFn {
private boolean outputWithTimestamp;
private String regexPattern;
@@ -64,15 +67,15 @@ public EmitLogMessageFn(boolean outputWithTimestamp, String regexPattern) {
this.regexPattern = regexPattern;
}
- @Override
- public void processElement(ProcessContext c) {
- LogMessage logMessage = parseEntry(c.element());
- if(logMessage != null) {
- if(this.outputWithTimestamp) {
- c.outputWithTimestamp(logMessage, logMessage.getTimestamp());
- }
- else {
- c.output(logMessage);
+
+ @ProcessElement
+ public void processElement(@Element String element, OutputReceiver receiver) {
+ LogMessage logMessage = parseEntry(element);
+ if (logMessage != null) {
+ if (this.outputWithTimestamp) {
+ receiver.outputWithTimestamp(logMessage, logMessage.getTimestamp());
+ } else {
+ receiver.output(logMessage);
}
}
}
@@ -84,30 +87,26 @@ private LogMessage parseEntry(String entry) {
JsonParser parser = new JacksonFactory().createJsonParser(entry);
LogEntry logEntry = parser.parse(LogEntry.class);
logString = logEntry.getTextPayload();
- }
- catch (IOException e) {
+ } catch (IOException e) {
LOG.error("IOException parsing entry: " + e.getMessage());
- }
- catch(NullPointerException e) {
+ } catch (NullPointerException e) {
LOG.error("NullPointerException parsing entry: " + e.getMessage());
}
Pattern p = Pattern.compile(this.regexPattern);
Matcher m = p.matcher(logString);
- if(m.find()) {
+ if (m.find()) {
DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy/MM/dd - HH:mm:ss");
Instant timestamp = fmt.parseDateTime(m.group("timestamp")).toInstant();
int httpStatusCode = Integer.valueOf(m.group("httpStatusCode"));
double responseTime = 0;
- if(m.group("resolution").equals("ns")) {
+ if (m.group("resolution").equals("ns")) {
responseTime = Double.valueOf(m.group("responseTime")) / 1e9;
- }
- else if(m.group("resolution").equals("µs")) {
+ } else if (m.group("resolution").equals("µs")) {
responseTime = Double.valueOf(m.group("responseTime")) / 1e6;
- }
- else if(m.group("resolution").equals("ms")) {
+ } else if (m.group("resolution").equals("ms")) {
responseTime = Double.valueOf(m.group("responseTime")) / 1e3;
}
@@ -116,8 +115,7 @@ else if(m.group("resolution").equals("ms")) {
String destination = m.group("destination");
return new LogMessage(timestamp, httpStatusCode, responseTime, source, httpMethod, destination);
- }
- else {
+ } else {
return null;
}
}
@@ -128,19 +126,17 @@ else if(m.group("resolution").equals("ms")) {
* - Transforms LogMessage objects to BigQuery TableRow objects
*/
private static class LogMessageTableRowFn extends DoFn {
- @Override
- public void processElement(ProcessContext c) {
- LogMessage msg = c.element();
-
+ @ProcessElement
+ public void processElement(@Element LogMessage msg, OutputReceiver receiver) {
TableRow row = new TableRow()
- .set("timestamp", msg.getTimestamp().toString())
- .set("httpStatusCode", msg.getHttpStatusCode())
- .set("responseTime", msg.getResponseTime())
- .set("source", msg.getSource())
- .set("httpMethod", msg.getHttpMethod())
- .set("destination", msg.getDestination());
-
- c.output(row);
+ .set("timestamp", msg.getTimestamp().toString())
+ .set("httpStatusCode", msg.getHttpStatusCode())
+ .set("responseTime", msg.getResponseTime())
+ .set("source", msg.getSource())
+ .set("httpMethod", msg.getHttpMethod())
+ .set("destination", msg.getDestination());
+
+ receiver.output(row);
}
}
@@ -150,7 +146,7 @@ public void processElement(ProcessContext c) {
* - Creates a BigQuery TableSchema from an input String
* - Writes the output PCollection to BigQuery
*/
- private static class TableRowOutputTransform extends PTransform>,PCollection> {
+ private static class TableRowOutputTransform extends PTransform>, PCollection> {
private String tableSchema;
private String tableName;
@@ -163,7 +159,7 @@ public static TableSchema createTableSchema(String schema) {
String[] fieldTypePairs = schema.split(",");
List fields = new ArrayList();
- for(String entry : fieldTypePairs) {
+ for (String entry : fieldTypePairs) {
String[] fieldAndType = entry.split(":");
fields.add(new TableFieldSchema().setName(fieldAndType[0]).setType(fieldAndType[1]));
}
@@ -171,28 +167,35 @@ public static TableSchema createTableSchema(String schema) {
return new TableSchema().setFields(fields);
}
+
@Override
- public PCollection apply(PCollection> input) {
+ public PCollection expand(PCollection> input) {
PCollection output = input.
- apply(ParDo.named("aggregateToTableRow").of(new DoFn, TableRow>() {
- @Override
- public void processElement(ProcessContext c) {
- KV e = c.element();
-
- TableRow row = new TableRow()
- .set("destination", e.getKey())
- .set("aggResponseTime", e.getValue());
-
- c.output(row);
- }
- }));
-
- output.apply(BigQueryIO.Write
- .named("tableRowToBigQuery")
- .to(this.tableName)
- .withSchema(createTableSchema(this.tableSchema))
- .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
- .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
+ apply(ParDo.of(new DoFn, TableRow>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ KV e = c.element();
+
+ TableRow row = new TableRow()
+ .set("destination", e.getKey())
+ .set("aggResponseTime", e.getValue());
+
+ c.output(row);
+ }
+ }));
+
+ output.apply(BigQueryIO.write()
+ .to(this.tableName)
+ .withSchema(createTableSchema(this.tableSchema))
+ .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
+ .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+ .withFormatFunction(new SimpleFunction() {
+ @Override
+ public TableRow apply(TableRow input) {
+ return input;
+ }
+ }));
+
return output;
}
@@ -201,9 +204,9 @@ public void processElement(ProcessContext c) {
public static void main(String[] args) {
PipelineOptionsFactory.register(LogAnalyticsPipelineOptions.class);
LogAnalyticsPipelineOptions options = PipelineOptionsFactory
- .fromArgs(args)
- .withValidation()
- .as(LogAnalyticsPipelineOptions.class);
+ .fromArgs(args)
+ .withValidation()
+ .as(LogAnalyticsPipelineOptions.class);
Pipeline p = Pipeline.create(options);
@@ -212,106 +215,117 @@ public static void main(String[] args) {
PCollection locateLogs;
boolean outputWithTimestamp;
- /**
- * If the pipeline is started in "streaming" mode, treat the input sources as Pub/Sub subscriptions
+ /*
+ If the pipeline is started in "streaming" mode, treat the input sources as Pub/Sub subscriptions
*/
- if(options.isStreaming()) {
+ if (options.isStreaming()) {
outputWithTimestamp = false;
- homeLogs = p.apply(PubsubIO.Read.named("homeLogsPubSubRead").subscription(options.getHomeLogSource()));
- browseLogs = p.apply(PubsubIO.Read.named("browseLogsPubSubRead").subscription(options.getBrowseLogSource()));
- locateLogs = p.apply(PubsubIO.Read.named("locateLogsPubSubRead").subscription(options.getLocateLogSource()));
+ homeLogs = p.apply(PubsubIO.readStrings().fromSubscription(options.getHomeLogSource()));
+ browseLogs = p.apply(PubsubIO.readStrings().fromSubscription(options.getBrowseLogSource()));
+ locateLogs = p.apply(PubsubIO.readStrings().fromSubscription(options.getLocateLogSource()));
}
- /**
- * If the pipeline is not started in "streaming" mode, treat the input sources as Cloud Storage paths
+ /*
+ If the pipeline is not started in "streaming" mode, treat the input sources as Cloud Storage paths
*/
else {
outputWithTimestamp = true;
// [START readingData]
- homeLogs = p.apply(TextIO.Read.named("homeLogsTextRead").from(options.getHomeLogSource()));
- browseLogs = p.apply(TextIO.Read.named("browseLogsTextRead").from(options.getBrowseLogSource()));
- locateLogs = p.apply(TextIO.Read.named("locateLogsTextRead").from(options.getLocateLogSource()));
+ homeLogs = p.apply(TextIO.read().from(options.getHomeLogSource()));
+ browseLogs = p.apply(TextIO.read().from(options.getBrowseLogSource()));
+ locateLogs = p.apply(TextIO.read().from(options.getLocateLogSource()));
// [END readingData]
}
- /**
- * Flatten all input PCollections into a single PCollection
+ /*
+ Flatten all input PCollections into a single PCollection
*/
// [START flattenCollections]
PCollection allLogs = PCollectionList
- .of(homeLogs)
- .and(browseLogs)
- .and(locateLogs)
- .apply(Flatten.pCollections());
+ .of(homeLogs)
+ .and(browseLogs)
+ .and(locateLogs)
+ .apply(Flatten.pCollections());
// [END flattenCollections]
- /**
- * Transform "allLogs" PCollection to PCollection and apply custom windowing scheme
+ /*
+ Transform "allLogs" PCollection to PCollection and apply custom windowing scheme
*/
// [START transformStringToLogMessage]
PCollection allLogMessages = allLogs
- .apply(ParDo.named("allLogsToLogMessage").of(new EmitLogMessageFn(outputWithTimestamp, options.getLogRegexPattern())));
+ .apply(ParDo.of(new EmitLogMessageFn(outputWithTimestamp, options.getLogRegexPattern())));
// [END transformStringToLogMessage]
// [START applyWindowing]
PCollection allLogMessagesDaily = allLogMessages
- .apply(Window.named("allLogMessageToDaily").into(FixedWindows.of(Duration.standardDays(1))));
+ .apply(Window.into(FixedWindows.of(Duration.standardDays(1))));
// [END applyWindowing]
- /**
- * Transform "allLogs" PCollection to PCollection
+ //language=RegExp
+ String foo = "\\[GIN\\]\\s+(?\\d{4}/\\d{2}/\\d{2} \\- \\d{2}\\:\\d{2}\\:\\d{2}).*? (?\\d{3}) .*?(?\\d+\\.?\\d*)(?\\S{1,}) \\| (?[0-9\\.:]+?) \\|\\S+?\\s+?\\S+?\\s+?(?\\w+?)\\s+?(?[a-z0-9/]+)";
+ /*
+ Transform "allLogs" PCollection to PCollection
*/
// [START logMessageToTableRow]
PCollection logsAsTableRows = allLogMessagesDaily
- .apply(ParDo.named("logMessageToTableRow").of(new LogMessageTableRowFn()));
+ .apply(ParDo.of(new LogMessageTableRowFn()));
// [END logMessageToTableRow]
- /**
- * Output "allLogs" PCollection to BigQuery
+ /*
+ Output "allLogs" PCollection to BigQuery
*/
TableSchema allLogsTableSchema = TableRowOutputTransform.createTableSchema(options.getAllLogsTableSchema());
// [START allLogsToBigQuery]
- logsAsTableRows.apply(BigQueryIO.Write
- .named("allLogsToBigQuery")
- .to(options.getAllLogsTableName())
- .withSchema(allLogsTableSchema)
- .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
- .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
+ logsAsTableRows.apply(BigQueryIO.write()
+ .withFormatFunction(new SimpleFunction() {
+ @Override
+ public TableRow apply(TableRow input) {
+ return input;
+// return super.apply(input);
+ }
+ })
+ .to(options.getAllLogsTableName())
+ .withSchema(allLogsTableSchema)
+ .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
+ .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
// [END allLogsToBigQuery]
- /**
- * Create new PCollection>
- * - Contains "destination->responseTime" key-value pairs
- * - Used for computing responseTime aggregations
+ /*
+ Create new PCollection>
+ - Contains "destination->responseTime" key-value pairs
+ - Used for computing responseTime aggregations
*/
- PCollection> destResponseTimeCollection = allLogMessagesDaily
- .apply(ParDo.named("logMessageToDestRespTime").of(new DoFn>() {
- @Override
- public void processElement(ProcessContext processContext) throws Exception {
- LogMessage l = processContext.element();
- processContext.output(KV.of(l.getDestination(), l.getResponseTime()));
- }
- }));
-
- /**
- * Transform PCollection> to PCollection
- * - First aggregate "destination->responseTime" key-value pairs into
- * - destination->maxResponseTime and destination->meanResponseTime
- * - Use custom PTransform to output PCollection to BigQuery
+ PCollection> destResponseTimeCollection = allLogMessagesDaily
+ .apply(ParDo.of(new DoFn>() {
+
+ @ProcessElement
+ public void processElement(ProcessContext processContext) throws Exception {
+ LogMessage l = processContext.element();
+ processContext.output(KV.of(l.getDestination(), l.getResponseTime()));
+ }
+ }));
+
+ /*
+ Transform PCollection> to PCollection
+ - First aggregate "destination->responseTime" key-value pairs into
+ - destination->maxResponseTime and destination->meanResponseTime
+ - Use custom PTransform to output PCollection to BigQuery
*/
// [START computeAggregations]
- PCollection> destMaxRespTime = destResponseTimeCollection
- .apply(Combine.perKey(new Max.MaxDoubleFn()));
+ PCollection> destMaxRespTime = destResponseTimeCollection
+ .apply(Combine.perKey(Max.ofDoubles()));
- PCollection> destMeanRespTime = destResponseTimeCollection
- .apply(Mean.perKey());
+ PCollection> destMeanRespTime = destResponseTimeCollection
+ .apply(Mean.perKey());
// [END computeAggregations]
+
+ // see https://beam.apache.org/releases/javadoc/2.3.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html
PCollection destMaxRespTimeRows = destMaxRespTime
- .apply(new TableRowOutputTransform(options.getMaxRespTimeTableSchema(), options.getMaxRespTimeTableName()));
+ .apply(new TableRowOutputTransform(options.getMaxRespTimeTableSchema(), options.getMaxRespTimeTableName()));
PCollection destMeanRespTimeRows = destMeanRespTime
- .apply(new TableRowOutputTransform(options.getMeanRespTimeTableSchema(), options.getMeanRespTimeTableName()));
+ .apply(new TableRowOutputTransform(options.getMeanRespTimeTableSchema(), options.getMeanRespTimeTableName()));
PipelineResult r = p.run();
diff --git a/dataflow/src/main/java/com/google/cloud/solutions/LogAnalyticsPipelineOptions.java b/dataflow/src/main/java/com/google/cloud/solutions/LogAnalyticsPipelineOptions.java
index 999cfc0..8d9acfe 100644
--- a/dataflow/src/main/java/com/google/cloud/solutions/LogAnalyticsPipelineOptions.java
+++ b/dataflow/src/main/java/com/google/cloud/solutions/LogAnalyticsPipelineOptions.java
@@ -15,9 +15,9 @@
package com.google.cloud.solutions;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
@SuppressWarnings("unused")
public interface LogAnalyticsPipelineOptions extends DataflowPipelineOptions {
@@ -34,7 +34,7 @@ public interface LogAnalyticsPipelineOptions extends DataflowPipelineOptions {
void setLocateLogSource(String locateLogSource);
@Description("Regular expression pattern used to parse embedded log messages inside Cloud Logging entries")
- @Default.String("\\[GIN\\]\\s+(?\\d{4}/\\d{2}/\\d{2} \\- \\d{2}\\:\\d{2}\\:\\d{2}).*? (?\\d{3}) .*?(?\\d+\\.?\\d*)(?\\S{1,}) \\| (?[0-9\\.:]+?) \\|\\S+?\\s+?\\S+?\\s+?(?\\w+?)\\s+?(?[a-z0-9/]+)")
+ @Default.String(value = "\\[GIN\\]\\s+(?\\d{4}/\\d{2}/\\d{2} \\- \\d{2}\\:\\d{2}\\:\\d{2}).*? (?\\d{3}) .*?(?\\d+\\.?\\d*)(?\\S{1,}) \\|\\s+(?[0-9\\.:]+)\\s\\|\\s(?[A-Z]+)\\s*(?[a-z0-9/]+)")
String getLogRegexPattern();
void setLogRegexPattern(String logRegexPattern);
diff --git a/dataflow/src/main/java/com/google/cloud/solutions/LogMessage.java b/dataflow/src/main/java/com/google/cloud/solutions/LogMessage.java
index 4e2096a..e075edd 100644
--- a/dataflow/src/main/java/com/google/cloud/solutions/LogMessage.java
+++ b/dataflow/src/main/java/com/google/cloud/solutions/LogMessage.java
@@ -15,9 +15,9 @@
package com.google.cloud.solutions;
-import com.google.cloud.dataflow.sdk.coders.AvroCoder;
-import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
import org.apache.avro.reflect.Nullable;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
import org.joda.time.Instant;
@DefaultCoder(AvroCoder.class)
diff --git a/images/overview.png b/images/overview.png
new file mode 100644
index 0000000..35509da
Binary files /dev/null and b/images/overview.png differ
diff --git a/images/stackdriver_gke_logs.png b/images/stackdriver_gke_logs.png
new file mode 100644
index 0000000..1e05daa
Binary files /dev/null and b/images/stackdriver_gke_logs.png differ
diff --git a/services/Makefile b/services/Makefile
index c122e00..6e36e98 100644
--- a/services/Makefile
+++ b/services/Makefile
@@ -22,15 +22,15 @@ build:
image:
$(foreach s, $(services), \
- docker build -t crcsmnky/sample-$(s)-service $(s) ;)
+ docker build -t ${USER}/sample-$(s)-service $(s) ;)
tag:
$(foreach s, $(services), \
- docker tag -f crcsmnky/sample-$(s)-service gcr.io/$(project)/sample-$(s)-service ;)
+ docker tag ${USER}/sample-$(s)-service gcr.io/${PROJECT_ID}/sample-$(s)-service ;)
push:
$(foreach s, $(services), \
- gcloud docker --project $(project) push gcr.io/$(project)/sample-$(s)-service ;)
+ docker push gcr.io/${PROJECT_ID}/sample-$(s)-service ;)
clean:
$(foreach s, $(services), \
diff --git a/services/cluster.sh b/services/cluster.sh
index 1d20aad..4374ac4 100755
--- a/services/cluster.sh
+++ b/services/cluster.sh
@@ -15,9 +15,15 @@
set -e
-MACHINE_TYPE=g1-small
ZONE=us-central1-f
-NUM_NODES=1
+
+MACHINE_TYPE=f1-micro
+NUM_NODES=3
+
+#MACHINE_TYPE=g1-small
+#MACHINE_TYPE=n1-standard-1
+#NUM_NODES=1
+
PROJECT_ID=${1}
CLUSTER_NAME=${2}
COMMAND=${3}
@@ -41,8 +47,8 @@ fi
case "$COMMAND" in
up )
echo "* Creating Google Container Engine cluster ${CLUSTER_NAME} under project ${PROJECT_ID}..."
+ # --scopes monitoring,logging-write \
gcloud container clusters create ${CLUSTER_NAME} \
- --scopes monitoring,logging-write \
--project ${PROJECT_ID} \
--machine-type ${MACHINE_TYPE} \
--zone ${ZONE} \
@@ -52,7 +58,8 @@ case "$COMMAND" in
echo -e "\n* Deploying microservices Replication Controllers..."
for f in `ls kubernetes/*-controller.yaml`; do
- kubectl create -f $f
+ # DEBUG f=kubernetes/browse-controller.yaml
+ envsubst < $f | kubectl create -f -
done
echo -e "\n* Deploying microservices Services..."
diff --git a/services/kubernetes/browse-controller.yaml b/services/kubernetes/browse-controller.yaml
index d4dcb52..a68b9de 100644
--- a/services/kubernetes/browse-controller.yaml
+++ b/services/kubernetes/browse-controller.yaml
@@ -33,7 +33,7 @@ spec:
spec:
containers:
- name: sample-browse-service
- image: gcr.io/cloud-solutions-images/sample-browse-service:latest
+ image: gcr.io/${PROJECT_ID}/sample-browse-service:latest
ports:
- name: browse-http
containerPort: 8100
diff --git a/services/kubernetes/home-controller.yaml b/services/kubernetes/home-controller.yaml
index 7508342..1d479c5 100644
--- a/services/kubernetes/home-controller.yaml
+++ b/services/kubernetes/home-controller.yaml
@@ -33,7 +33,9 @@ spec:
spec:
containers:
- name: sample-home-service
- image: gcr.io/cloud-solutions-images/sample-home-service:latest
+ ## note that kubectl will not perform this variable substiution
+ ## see https://serverfault.com/questions/791715/using-environment-variables-in-kubernetes-deployment-spec
+ image: gcr.io/${PROJECT_ID}/sample-home-service:latest
ports:
- name: home-http
containerPort: 8000
diff --git a/services/kubernetes/locate-controller.yaml b/services/kubernetes/locate-controller.yaml
index 57aa4ba..affbd57 100644
--- a/services/kubernetes/locate-controller.yaml
+++ b/services/kubernetes/locate-controller.yaml
@@ -33,7 +33,9 @@ spec:
spec:
containers:
- name: sample-locate-service
- image: gcr.io/cloud-solutions-images/sample-locate-service:latest
+ ## note that kubectl will not perform this variable substiution
+ ## see https://serverfault.com/questions/791715/using-environment-variables-in-kubernetes-deployment-spec
+ image: gcr.io/${PROJECT_ID}/sample-locate-service:latest
ports:
- name: locate-http
containerPort: 8200
diff --git a/services/logging.sh b/services/logging.sh
index 6de55af..b11ea5c 100755
--- a/services/logging.sh
+++ b/services/logging.sh
@@ -119,17 +119,29 @@ case "$MODE" in
# error_exit "gs://${GCS_BUCKET} exists, please choose a new bucket name"
# fi
+ # Create a Cloud Storage Bucket
gsutil -q mb gs://${GCS_BUCKET}
+
+ # Allow Stackdriver Logging access to the bucket
+ ## https://cloud.google.com/storage/docs/gsutil/commands/acl
gsutil -q acl ch -g cloud-logs@google.com:O gs://${GCS_BUCKET}
+
echo "done"
+ # For each microservice, set up Stackdriver Logging exports
echo -n "* Creating Log Export sinks..."
for s in ${SERVICE_NAMES[@]}; do
+ #eg s=sample-home-service
gcloud beta logging sinks create ${s} \
storage.googleapis.com/${GCS_BUCKET} \
- --log="${s}" \
+ --log-filter="resource.type=\"container\" \"${s}\"" \
--project=${PROJECT_ID} \
--quiet >/dev/null || error_exit "Error creating Log Export sinks"
+
+ # we need to grant the associcated service account access to the bucket
+ serviceAccount=$(gcloud beta logging sinks describe --format "value[](writerIdentity)" ${s} | sed 's/serviceAccount://g')
+ gsutil acl ch -u ${serviceAccount}:W gs://${GCS_BUCKET
+ }
done
echo "done"
;;
@@ -137,8 +149,6 @@ case "$MODE" in
echo -n "* Deleting Log Export sinks..."
for s in ${SERVICE_NAMES[@]}; do
gcloud beta logging sinks delete ${s} \
- --log="${s}" \
- --project=${PROJECT_ID} \
--quiet >/dev/null || error_exit "Error deleting Log Export Sinks"
done
echo "done"