Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 70 additions & 45 deletions src/gax-internal/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub mod from_status;
pub mod status;
pub mod tonic;

#[cfg(google_cloud_unstable_tracing)]
use crate::observability::attributes::{self, keys::*, otel_status_codes};
use ::tonic::client::Grpc;
use ::tonic::transport::Channel;
use from_status::to_gax_error;
Expand All @@ -43,21 +45,16 @@ use google_cloud_gax::retry_policy::{
};
use google_cloud_gax::retry_throttler::SharedRetryThrottler;
use http::HeaderMap;
#[cfg(google_cloud_unstable_tracing)]
use opentelemetry_semantic_conventions::{attribute as otel_attr, trace as otel_trace};
use std::sync::Arc;
use std::time::Duration;

// A tonic::transport::Channel always has a Buffer layer.
const DEFAULT_REQUEST_BUFFER_CAPACITY: usize = 1024;

#[cfg(not(google_cloud_unstable_tracing))]
pub type GrpcService = Channel;

#[cfg(google_cloud_unstable_tracing)]
pub type GrpcService = tower::util::Either<
crate::observability::grpc_tracing::TracingTowerService<Channel>,
crate::observability::grpc_tracing::NoTracingTowerService<Channel>,
>;

/// The inner gRPC client type.
pub type InnerClient = Grpc<GrpcService>;

Expand All @@ -75,6 +72,8 @@ pub struct Client {
inner: InnerClient,
#[cfg(google_cloud_unstable_tracing)]
metric: crate::observability::TransportMetric,
#[cfg(google_cloud_unstable_tracing)]
tracing_attributes: Option<TracingAttributes>,
credentials: Credentials,
retry_policy: Arc<dyn RetryPolicy>,
backoff_policy: Arc<dyn BackoffPolicy>,
Expand Down Expand Up @@ -122,13 +121,15 @@ impl Client {
let inner = Self::make_inner(&config, default_endpoint, tracing_enabled).await?;

#[cfg(google_cloud_unstable_tracing)]
let (inner, _) =
let (inner, tracing_attributes) =
Self::make_inner(&config, default_endpoint, tracing_enabled, instrumentation).await?;

Ok(Self {
inner,
#[cfg(google_cloud_unstable_tracing)]
metric: crate::observability::TransportMetric::new(instrumentation),
#[cfg(google_cloud_unstable_tracing)]
tracing_attributes,
credentials,
retry_policy: config.retry_policy.clone().unwrap_or_else(|| {
Arc::new(
Expand Down Expand Up @@ -365,33 +366,62 @@ impl Client {
options: &RequestOptions,
remaining_time: Option<std::time::Duration>,
headers: HeaderMap,
prior_attempt_count: i64,
_prior_attempt_count: i64,
) -> Result<tonic::Response<Response>>
where
Request: prost::Message + 'static,
Response: prost::Message + std::default::Default + 'static,
{
let headers = self.add_auth_headers(headers).await?;
let metadata = tonic::MetadataMap::from_headers(headers);
let mut request = ::tonic::Request::from_parts(metadata, extensions, request);

#[cfg(google_cloud_unstable_tracing)]
{
use crate::observability::grpc_tracing::{AttemptCount, ResourceName};
use google_cloud_gax::options::internal::{
RequestOptionsExt, ResourceName as GaxResourceName,
let span = if let Some(attrs) = &self.tracing_attributes {
let rpc_method = path.path().trim_start_matches('/');
let (service, version, repo, artifact) = if let Some(info) = attrs.instrumentation {
(
Some(info.service_name),
Some(info.client_version),
Some("googleapis/google-cloud-rust"),
Some(info.client_artifact),
)
} else {
(None, None, None, None)
};
request
.extensions_mut()
.insert(AttemptCount::new(prior_attempt_count));
if let Some(n) = options.get_extension::<GaxResourceName>() {
request
.extensions_mut()
.insert(ResourceName::new(n.0.to_string()));
}
}
#[cfg(not(google_cloud_unstable_tracing))]
let _ = prior_attempt_count;
let resend_count = if _prior_attempt_count > 0 {
Some(_prior_attempt_count)
} else {
None
};

tracing::info_span!(
"grpc.request",
{ OTEL_NAME } = rpc_method,
{ RPC_SYSTEM_NAME } = attributes::RPC_SYSTEM_GRPC,
{ OTEL_KIND } = attributes::OTEL_KIND_CLIENT,
{ otel_trace::RPC_METHOD } = rpc_method,
{ otel_trace::SERVER_ADDRESS } = attrs.server_address,
{ otel_trace::SERVER_PORT } = attrs.server_port,
{ otel_attr::URL_DOMAIN } = attrs.url_domain,
{ RPC_RESPONSE_STATUS_CODE } = tracing::field::Empty,
{ OTEL_STATUS_CODE } = otel_status_codes::UNSET,
{ otel_trace::ERROR_TYPE } = tracing::field::Empty,
{ GCP_CLIENT_SERVICE } = service,
{ GCP_CLIENT_VERSION } = version,
{ GCP_CLIENT_REPO } = repo,
{ GCP_CLIENT_ARTIFACT } = artifact,
{ GCP_GRPC_RESEND_COUNT } = resend_count,
{ GCP_RESOURCE_DESTINATION_ID } = tracing::field::Empty,
)
} else {
tracing::Span::none()
};

#[allow(unused_mut)]
let mut headers = self.add_auth_headers(headers).await?;

#[cfg(google_cloud_unstable_tracing)]
crate::observability::propagation::inject_context(&span, &mut headers);

let metadata = tonic::MetadataMap::from_headers(headers);
let mut request = ::tonic::Request::from_parts(metadata, extensions, request);

if let Some(timeout) = effective_timeout(options, remaining_time) {
request.set_timeout(timeout);
Expand All @@ -411,12 +441,20 @@ impl Client {
let result = pending.await;
#[cfg(google_cloud_unstable_tracing)]
let result = {
use crate::observability::{WithTransportLogging, WithTransportMetric};
use crate::observability::{
WithTransportLogging, WithTransportMetric, WithTransportSpan,
};

let pending =
WithTransportMetric::new(self.metric.clone(), pending, prior_attempt_count as u32);
WithTransportMetric::new(self.metric.clone(), pending, _prior_attempt_count as u32);
let pending = WithTransportLogging::new(pending);
pending.await
let pending = WithTransportSpan::new(span, pending);

if let Some(recorder) = crate::observability::RequestRecorder::current() {
recorder.scope(pending).await
} else {
pending.await
}
};

result
Expand Down Expand Up @@ -493,23 +531,10 @@ impl Client {
instrumentation,
};

let inner_client = InnerClient::new(channel);
if tracing_enabled {
use tower::Layer;
let layer = crate::observability::grpc_tracing::TracingTowerLayer::new(
uri,
default_host,
instrumentation,
);
let service = layer.layer(channel);
let service = tower::util::Either::Left(service);
let inner_client = InnerClient::new(service);
Ok((inner_client, Some(attrs)))
} else {
use tower::Layer;
let layer = crate::observability::grpc_tracing::NoTracingTowerLayer;
let service = layer.layer(channel);
let service = tower::util::Either::Right(service);
let inner_client = InnerClient::new(service);
Ok((inner_client, None))
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/gax-internal/src/observability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ pub use client_signals::{
WithClientMetric, WithClientSpan, WithTransportMetric,
};

#[cfg(all(google_cloud_unstable_tracing, feature = "_internal-http-client"))]
#[cfg(all(
google_cloud_unstable_tracing,
any(feature = "_internal-http-client", feature = "_internal-grpc-client")
))]
pub use client_signals::{WithTransportLogging, WithTransportSpan};

#[doc(hidden)]
Expand Down
1 change: 0 additions & 1 deletion src/gax-internal/src/observability/attributes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ pub mod error_type_values {
/// A client-configured timeout was reached.
pub const CLIENT_TIMEOUT: &str = "CLIENT_TIMEOUT";
/// The operation was cancelled by the caller.
#[cfg(feature = "_internal-grpc-client")]
pub const CLIENT_CANCELLED: &str = "CLIENT_CANCELLED";
/// Failure to establish the network connection (DNS, TCP, TLS).
pub const CLIENT_CONNECTION_ERROR: &str = "CLIENT_CONNECTION_ERROR";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,32 @@ use opentelemetry_semantic_conventions::attribute::HTTP_RESPONSE_STATUS_CODE;
use opentelemetry_semantic_conventions::trace::{
ERROR_TYPE, HTTP_REQUEST_RESEND_COUNT, HTTP_RESPONSE_BODY_SIZE, URL_SCHEME,
};
use pin_project::pin_project;
use pin_project::{pin_project, pinned_drop};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tracing::Span;

/// A future instrumented to add span attributes for transport attempts.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[pin_project]
#[pin_project(PinnedDrop)]
pub struct WithTransportSpan<F> {
#[pin]
inner: F,
span: Span,
completed: bool,
}

impl<F, R> WithTransportSpan<F>
where
F: Future<Output = Result<R, Error>>,
{
pub fn new(span: Span, inner: F) -> Self {
Self { inner, span }
Self {
inner,
span,
completed: false,
}
}
}

Expand All @@ -58,40 +63,92 @@ where
let span = self.span.clone();
let this = self.project();
let output = futures::ready!(this.inner.poll(cx));
*this.completed = true;

let Some(snapshot) = RequestRecorder::current().map(|r| r.client_snapshot()) else {
let Some(recorder) = RequestRecorder::current() else {
return Poll::Ready(output);
};
let snapshot = recorder.client_snapshot();

if let Some(rn) = snapshot.resource_name() {
span.record(GCP_RESOURCE_DESTINATION_ID, rn);
}

match &output {
Ok(_) => {
tracing::record_all!(
span,
{ HTTP_RESPONSE_STATUS_CODE } = snapshot.http_status_code().map(|v| v as i64),
{ NETWORK_PEER_ADDRESS } = snapshot.network_peer_address(),
{ NETWORK_PEER_PORT } = snapshot.network_peer_port(),
{ HTTP_RESPONSE_BODY_SIZE } = snapshot.http_response_body_size(),
{ HTTP_REQUEST_RESEND_COUNT } = snapshot.http_resend_count().map(|v| v as i64),
{ URL_SCHEME } = snapshot.url_scheme()
);
if snapshot.rpc_system() == Some("grpc") {
tracing::record_all!(
span,
{ RPC_RESPONSE_STATUS_CODE } = "OK",
{ HTTP_RESPONSE_STATUS_CODE } =
snapshot.http_status_code().map(|v| v as i64),
{ NETWORK_PEER_ADDRESS } = snapshot.network_peer_address(),
{ NETWORK_PEER_PORT } = snapshot.network_peer_port(),
{ HTTP_RESPONSE_BODY_SIZE } = snapshot.http_response_body_size(),
{ HTTP_REQUEST_RESEND_COUNT } =
snapshot.http_resend_count().map(|v| v as i64),
{ URL_SCHEME } = snapshot.url_scheme()
);
} else {
tracing::record_all!(
span,
{ HTTP_RESPONSE_STATUS_CODE } =
snapshot.http_status_code().map(|v| v as i64),
{ NETWORK_PEER_ADDRESS } = snapshot.network_peer_address(),
{ NETWORK_PEER_PORT } = snapshot.network_peer_port(),
{ HTTP_RESPONSE_BODY_SIZE } = snapshot.http_response_body_size(),
{ HTTP_REQUEST_RESEND_COUNT } =
snapshot.http_resend_count().map(|v| v as i64),
{ URL_SCHEME } = snapshot.url_scheme()
);
}
}
Err(err) => {
let error_type = ErrorType::from_gax_error(err);
tracing::record_all!(
span,
{ OTEL_STATUS_CODE } = otel_status_codes::ERROR,
{ HTTP_RESPONSE_STATUS_CODE } = err.http_status_code().map(|v| v as i64),
{ ERROR_TYPE } = error_type.as_str(),
{ OTEL_STATUS_DESCRIPTION } = err.to_string(),
{ HTTP_REQUEST_RESEND_COUNT } = snapshot.http_resend_count().map(|v| v as i64)
);
let rpc_status_code = err.status().map(|s| s.code.name());

if snapshot.rpc_system() == Some("grpc") {
tracing::record_all!(
span,
{ OTEL_STATUS_CODE } = otel_status_codes::ERROR,
{ RPC_RESPONSE_STATUS_CODE } = rpc_status_code,
{ ERROR_TYPE } = error_type.as_str(),
{ OTEL_STATUS_DESCRIPTION } = err.to_string(),
{ HTTP_REQUEST_RESEND_COUNT } =
snapshot.http_resend_count().map(|v| v as i64)
);
} else {
tracing::record_all!(
span,
{ OTEL_STATUS_CODE } = otel_status_codes::ERROR,
{ HTTP_RESPONSE_STATUS_CODE } = err.http_status_code().map(|v| v as i64),
{ ERROR_TYPE } = error_type.as_str(),
{ OTEL_STATUS_DESCRIPTION } = err.to_string(),
{ HTTP_REQUEST_RESEND_COUNT } =
snapshot.http_resend_count().map(|v| v as i64)
);
}
crate::observability::errors::emit_error_log(&span, err);
}
}
Poll::Ready(output)
}
}

#[pinned_drop]
impl<F> PinnedDrop for WithTransportSpan<F> {
fn drop(self: Pin<&mut Self>) {
let this = self.project();
if !*this.completed {
this.span.record(OTEL_STATUS_CODE, otel_status_codes::ERROR);
this.span.record(
ERROR_TYPE,
crate::observability::attributes::error_type_values::CLIENT_CANCELLED,
);
}
}
}

#[cfg(test)]
mod tests {
use super::super::super::http_tracing::create_http_attempt_span;
Expand Down
Loading
Loading