From f9f565625047551c41c75c6216ec88f2ac504581 Mon Sep 17 00:00:00 2001 From: Samuel Manzanera Date: Tue, 2 Apr 2024 10:58:14 +0200 Subject: [PATCH 1/3] add opentelemetry deps/config --- config/config.exs | 4 ++++ config/dev.exs | 4 ++++ config/prod.exs | 6 ++++++ mix.exs | 3 +++ mix.lock | 12 ++++++++++++ rel/config.exs | 2 ++ 6 files changed, 31 insertions(+) diff --git a/config/config.exs b/config/config.exs index 733f3c7115..80acc18f7c 100644 --- a/config/config.exs +++ b/config/config.exs @@ -211,6 +211,10 @@ config :ex_cldr, config :ex_json_schema, :remote_schema_resolver, {Archethic.Utils, :local_schema_resolver!} +config :opentelemetry, + resource: [service: %{name: "archethic"}], + traces_exporter: :otlp + # Import environment specific config. This must remain at the bottom # of this file so it overrides the configuration defined above. import_config("#{Mix.env()}.exs") diff --git a/config/dev.exs b/config/dev.exs index 68f736225a..49ba396d17 100755 --- a/config/dev.exs +++ b/config/dev.exs @@ -183,3 +183,7 @@ config :archethic, :throttle, period: 1000, limit: System.get_env("ARCHETHIC_THROTTLE_IP_AND_PATH", "5000") |> String.to_integer() ] + +config :opentelemetry_exporter, + otlp_protocol: :http_protobuf, + otlp_endpoint: "http://localhost:4318" diff --git a/config/prod.exs b/config/prod.exs index 2565faf5a8..b46a83288c 100755 --- a/config/prod.exs +++ b/config/prod.exs @@ -293,3 +293,9 @@ config :archethic, :throttle, period: 1000, limit: System.get_env("ARCHETHIC_THROTTLE_IP_AND_PATH", "20") |> String.to_integer() ] + +if System.get_env("ARCHETHIC_OTLP_ENDPOINT") do + config :opentelemetry_exporter, + otlp_protocol: :http_protobuf, + otlp_endpoint: System.fetch("ARCHETHIC_OTLP_ENDPOINT") +end diff --git a/mix.exs b/mix.exs index 0fa08dc82b..501927335b 100644 --- a/mix.exs +++ b/mix.exs @@ -102,6 +102,9 @@ defmodule Archethic.MixProject do {:telemetry_metrics_prometheus_core, "~> 1.1"}, {:telemetry_poller, "~> 1.0"}, {:phoenix_live_dashboard, "~> 0.7"}, + {:opentelemetry_exporter, "~> 1.6"}, + {:opentelemetry, "~> 1.4"}, + {:opentelemetry_api, "~> 1.2"}, # Utils {:crontab, "~> 1.1"}, diff --git a/mix.lock b/mix.lock index 4a00bbca10..3ba3eab97a 100644 --- a/mix.lock +++ b/mix.lock @@ -2,6 +2,7 @@ "absinthe": {:hex, :absinthe, "1.7.0", "36819e7b1fd5046c9c734f27fe7e564aed3bda59f0354c37cd2df88fd32dd014", [:mix], [{:dataloader, "~> 1.0.0", [hex: :dataloader, repo: "hexpm", optional: true]}, {:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "566a5b5519afc9b29c4d367f0c6768162de3ec03e9bf9916f9dc2bcbe7c09643"}, "absinthe_phoenix": {:hex, :absinthe_phoenix, "2.0.2", "e607b438db900049b9b3760f8ecd0591017a46122fffed7057bf6989020992b5", [:mix], [{:absinthe, "~> 1.5", [hex: :absinthe, repo: "hexpm", optional: false]}, {:absinthe_plug, "~> 1.5", [hex: :absinthe_plug, repo: "hexpm", optional: false]}, {:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.5", [hex: :phoenix, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.13 or ~> 3.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.0", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}], "hexpm", "d36918925c380dc7d2ed7d039c9a3b4182ec36723f7417a68745ade5aab22f8d"}, "absinthe_plug": {:hex, :absinthe_plug, "1.5.8", "38d230641ba9dca8f72f1fed2dfc8abd53b3907d1996363da32434ab6ee5d6ab", [:mix], [{:absinthe, "~> 1.5", [hex: :absinthe, repo: "hexpm", optional: false]}, {:plug, "~> 1.4", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "bbb04176647b735828861e7b2705465e53e2cf54ccf5a73ddd1ebd855f996e5a"}, + "acceptor_pool": {:hex, :acceptor_pool, "1.0.0", "43c20d2acae35f0c2bcd64f9d2bde267e459f0f3fd23dab26485bf518c281b21", [:rebar3], [], "hexpm", "0cbcd83fdc8b9ad2eee2067ef8b91a14858a5883cb7cd800e6fcd5803e158788"}, "artificery": {:hex, :artificery, "0.4.3", "0bc4260f988dcb9dda4b23f9fc3c6c8b99a6220a331534fdf5bf2fd0d4333b02", [:mix], [], "hexpm", "12e95333a30e20884e937abdbefa3e7f5e05609c2ba8cf37b33f000b9ffc0504"}, "benchee": {:hex, :benchee, "1.1.0", "f3a43817209a92a1fade36ef36b86e1052627fd8934a8b937ac9ab3a76c43062", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}, {:statistex, "~> 1.0", [hex: :statistex, repo: "hexpm", optional: false]}], "hexpm", "7da57d545003165a012b587077f6ba90b89210fd88074ce3c60ce239eb5e6d93"}, "benchee_html": {:hex, :benchee_html, "1.0.0", "5b4d24effebd060f466fb460ec06576e7b34a00fc26b234fe4f12c4f05c95947", [:mix], [{:benchee, ">= 0.99.0 and < 2.0.0", [hex: :benchee, repo: "hexpm", optional: false]}, {:benchee_json, "~> 1.0", [hex: :benchee_json, repo: "hexpm", optional: false]}], "hexpm", "5280af9aac432ff5ca4216d03e8a93f32209510e925b60e7f27c33796f69e699"}, @@ -9,6 +10,7 @@ "blankable": {:hex, :blankable, "1.0.0", "89ab564a63c55af117e115144e3b3b57eb53ad43ba0f15553357eb283e0ed425", [:mix], [], "hexpm", "7cf11aac0e44f4eedbee0c15c1d37d94c090cb72a8d9fddf9f7aec30f9278899"}, "bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"}, "castore": {:hex, :castore, "1.0.3", "7130ba6d24c8424014194676d608cb989f62ef8039efd50ff4b3f33286d06db8", [:mix], [], "hexpm", "680ab01ef5d15b161ed6a95449fac5c6b8f60055677a8e79acf01b27baa4390b"}, + "chatterbox": {:hex, :ts_chatterbox, "0.15.1", "5cac4d15dd7ad61fc3c4415ce4826fc563d4643dee897a558ec4ea0b1c835c9c", [:rebar3], [{:hpack, "~> 0.3.0", [hex: :hpack_erl, repo: "hexpm", optional: false]}], "hexpm", "4f75b91451338bc0da5f52f3480fa6ef6e3a2aeecfc33686d6b3d0a0948f31aa"}, "cldr_utils": {:hex, :cldr_utils, "2.21.0", "1bdbb8de3870ab4831f11f877b40cce838a03bf7da272430c232c19726d53f14", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:certifi, "~> 2.5", [hex: :certifi, repo: "hexpm", optional: true]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}], "hexpm", "26f56101663f5aca4e727e0eb983b578ba5b170e2f12e8456df9995809a7a93b"}, "complex": {:hex, :complex, "0.5.0", "af2d2331ff6170b61bb738695e481b27a66780e18763e066ee2cd863d0b1dd92", [:mix], [], "hexpm", "2683bd3c184466cfb94fad74cbfddfaa94b860e27ad4ca1bffe3bff169d91ef1"}, "cors_plug": {:hex, :cors_plug, "3.0.3", "7c3ac52b39624bc616db2e937c282f3f623f25f8d550068b6710e58d04a0e330", [:mix], [{:plug, "~> 1.13", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "3f2d759e8c272ed3835fab2ef11b46bddab8c1ab9528167bd463b6452edf830d"}, @@ -17,6 +19,7 @@ "cowlib": {:hex, :cowlib, "2.11.0", "0b9ff9c346629256c42ebe1eeb769a83c6cb771a6ee5960bd110ab0b9b872063", [:make, :rebar3], [], "hexpm", "2b3e9da0b21c4565751a6d4901c20d1b4cc25cbb7fd50d91d2ab6dd287bc86a9"}, "credo": {:hex, :credo, "1.6.7", "323f5734350fd23a456f2688b9430e7d517afb313fbd38671b8a4449798a7854", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "41e110bfb007f7eda7f897c10bf019ceab9a0b269ce79f015d54b0dcf4fc7dd3"}, "crontab": {:hex, :crontab, "1.1.13", "3bad04f050b9f7f1c237809e42223999c150656a6b2afbbfef597d56df2144c5", [:mix], [{:ecto, "~> 1.0 or ~> 2.0 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: true]}], "hexpm", "d67441bec989640e3afb94e123f45a2bc42d76e02988c9613885dc3d01cf7085"}, + "ctx": {:hex, :ctx, "0.6.0", "8ff88b70e6400c4df90142e7f130625b82086077a45364a78d208ed3ed53c7fe", [:rebar3], [], "hexpm", "a14ed2d1b67723dbebbe423b28d7615eb0bdcba6ff28f2d1f1b0a7e1d4aa5fc2"}, "dart_sass": {:hex, :dart_sass, "0.5.1", "d45f20a8e324313689fb83287d4702352793ce8c9644bc254155d12656ade8b6", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}], "hexpm", "24f8a1c67e8b5267c51a33cbe6c0b5ebf12c2c83ace88b5ac04947d676b4ec81"}, "decimal": {:hex, :decimal, "2.0.0", "a78296e617b0f5dd4c6caf57c714431347912ffb1d0842e998e9792b5642d697", [:mix], [], "hexpm", "34666e9c55dea81013e77d9d87370fe6cb6291d1ef32f46a1600230b1d44f577"}, "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"}, @@ -48,6 +51,9 @@ "gen_state_machine": {:hex, :gen_state_machine, "3.0.0", "1e57f86a494e5c6b14137ebef26a7eb342b3b0070c7135f2d6768ed3f6b6cdff", [:mix], [], "hexpm", "0a59652574bebceb7309f6b749d2a41b45fdeda8dbb4da0791e355dd19f0ed15"}, "git_diff": {:hex, :git_diff, "0.6.4", "ec53ebf8bf83b4527d938d6433e3686b47a3e2a23135a21038f76736c16bb6e0", [:mix], [], "hexpm", "9e05563c136c91e960a306fd296156b2e8d74e294ae60961e69a36e118023a5f"}, "git_hooks": {:hex, :git_hooks, "0.7.3", "09489e94d88dfc767662e22aff2b6208bd7cf555a19dd0e1477cca4683ce0701", [:mix], [{:blankable, "~> 1.0.0", [hex: :blankable, repo: "hexpm", optional: false]}, {:recase, "~> 0.7.0", [hex: :recase, repo: "hexpm", optional: false]}], "hexpm", "d6ddedeb4d3a8602bc3f84e087a38f6150a86d9e790628ed8bc70e6d90681659"}, + "gproc": {:hex, :gproc, "0.9.1", "f1df0364423539cf0b80e8201c8b1839e229e5f9b3ccb944c5834626998f5b8c", [:rebar3], [], "hexpm", "905088e32e72127ed9466f0bac0d8e65704ca5e73ee5a62cb073c3117916d507"}, + "grpcbox": {:hex, :grpcbox, "0.17.1", "6e040ab3ef16fe699ffb513b0ef8e2e896da7b18931a1ef817143037c454bcce", [:rebar3], [{:acceptor_pool, "~> 1.0.0", [hex: :acceptor_pool, repo: "hexpm", optional: false]}, {:chatterbox, "~> 0.15.1", [hex: :ts_chatterbox, repo: "hexpm", optional: false]}, {:ctx, "~> 0.6.0", [hex: :ctx, repo: "hexpm", optional: false]}, {:gproc, "~> 0.9.1", [hex: :gproc, repo: "hexpm", optional: false]}], "hexpm", "4a3b5d7111daabc569dc9cbd9b202a3237d81c80bf97212fbc676832cb0ceb17"}, + "hpack": {:hex, :hpack_erl, "0.3.0", "2461899cc4ab6a0ef8e970c1661c5fc6a52d3c25580bc6dd204f84ce94669926", [:rebar3], [], "hexpm", "d6137d7079169d8c485c6962dfe261af5b9ef60fbc557344511c1e65e3d95fb0"}, "hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"}, "html_entities": {:hex, :html_entities, "0.5.2", "9e47e70598da7de2a9ff6af8758399251db6dbb7eebe2b013f2bbd2515895c3c", [:mix], [], "hexpm", "c53ba390403485615623b9531e97696f076ed415e8d8058b1dbaa28181f4fdcc"}, "inet_cidr": {:hex, :erl_cidr, "1.2.0", "9205ffb290c0de8d2b82147976602fbf5bfa6d594834e60556afaf3b82856b95", [:rebar3], [], "hexpm", "3505f5dfac7d862806c7051a3dd475363a45bccf39ca1faee8eda6a6b33cf335"}, @@ -68,6 +74,10 @@ "nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"}, "nx": {:hex, :nx, "0.5.1", "118134b8c97c2a8f86c87aa8434994c1cbbe139a306b89cca04e08dd46228067", [:mix], [{:complex, "~> 0.5", [hex: :complex, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ceb8fbbe19b3c4252a7188d8b0e059fac9da0f4a4f3bb770fc665fdd0b29f0c5"}, "observer_cli": {:hex, :observer_cli, "1.7.4", "3c1bfb6d91bf68f6a3d15f46ae20da0f7740d363ee5bc041191ce8722a6c4fae", [:mix, :rebar3], [{:recon, "~> 2.5.1", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm", "50de6d95d814f447458bd5d72666a74624eddb0ef98bdcee61a0153aae0865ff"}, + "opentelemetry": {:hex, :opentelemetry, "1.4.0", "f928923ed80adb5eb7894bac22e9a198478e6a8f04020ae1d6f289fdcad0b498", [:rebar3], [{:opentelemetry_api, "~> 1.3.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:opentelemetry_semantic_conventions, "~> 0.2", [hex: :opentelemetry_semantic_conventions, repo: "hexpm", optional: false]}], "hexpm", "50b32ce127413e5d87b092b4d210a3449ea80cd8224090fe68d73d576a3faa15"}, + "opentelemetry_api": {:hex, :opentelemetry_api, "1.3.0", "03e2177f28dd8d11aaa88e8522c81c2f6a788170fe52f7a65262340961e663f9", [:mix, :rebar3], [{:opentelemetry_semantic_conventions, "~> 0.2", [hex: :opentelemetry_semantic_conventions, repo: "hexpm", optional: false]}], "hexpm", "b9e5ff775fd064fa098dba3c398490b77649a352b40b0b730a6b7dc0bdd68858"}, + "opentelemetry_exporter": {:hex, :opentelemetry_exporter, "1.7.0", "dec4e90c0667cf11a3642f7fe71982dbc0c6bfbb8725a0b13766830718cf0d98", [:rebar3], [{:grpcbox, ">= 0.0.0", [hex: :grpcbox, repo: "hexpm", optional: false]}, {:opentelemetry, "~> 1.4.0", [hex: :opentelemetry, repo: "hexpm", optional: false]}, {:opentelemetry_api, "~> 1.3.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:tls_certificate_check, "~> 1.18", [hex: :tls_certificate_check, repo: "hexpm", optional: false]}], "hexpm", "d0f25f6439ec43f2561537c3fabbe177b38547cddaa3a692cbb8f4770dbefc1e"}, + "opentelemetry_semantic_conventions": {:hex, :opentelemetry_semantic_conventions, "0.2.0", "b67fe459c2938fcab341cb0951c44860c62347c005ace1b50f8402576f241435", [:mix, :rebar3], [], "hexpm", "d61fa1f5639ee8668d74b527e6806e0503efc55a42db7b5f39939d84c07d6895"}, "pathex": {:hex, :pathex, "2.5.0", "350ed75b41dd7c579843bc6052463d36d9a35362f5430ff3ad12a13c6a783ce6", [:mix], [], "hexpm", "031a2063c59eae2f697373f41814e9d9076105ab2173bd3a88fbe8789fdb434b"}, "phoenix": {:hex, :phoenix, "1.6.15", "0a1d96bbc10747fd83525370d691953cdb6f3ccbac61aa01b4acb012474b047d", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.0", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 1.0 or ~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: false]}, {:plug, "~> 1.10", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.2", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "d70ab9fbf6b394755ea88b644d34d79d8b146e490973151f248cacd122d20672"}, "phoenix_html": {:hex, :phoenix_html, "3.2.0", "1c1219d4b6cb22ac72f12f73dc5fad6c7563104d083f711c3fcd8551a1f4ae11", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "36ec97ba56d25c0136ef1992c37957e4246b649d620958a1f9fa86165f8bc54f"}, @@ -88,12 +98,14 @@ "rustler_precompiled": {:hex, :rustler_precompiled, "0.6.3", "f838d94bc35e1844973ee7266127b156fdc962e9e8b7ff666c8fb4fed7964d23", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:rustler, "~> 0.23", [hex: :rustler, repo: "hexpm", optional: true]}], "hexpm", "e18ecca3669a7454b3a2be75ae6c3ef01d550bc9a8cf5fbddcfff843b881d7c6"}, "sizeable": {:hex, :sizeable, "1.0.2", "625fe06a5dad188b52121a140286f1a6ae1adf350a942cf419499ecd8a11ee29", [:mix], [], "hexpm", "4bab548e6dfba777b400ca50830a9e3a4128e73df77ab1582540cf5860601762"}, "sobelow": {:hex, :sobelow, "0.11.1", "23438964486f8112b41e743bbfd402da3e5b296fdc9eacab29914b79c48916dd", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "9897363a7eff96f4809304a90aad819e2ad5e5d24db547af502885146746a53c"}, + "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, "statistex": {:hex, :statistex, "1.0.0", "f3dc93f3c0c6c92e5f291704cf62b99b553253d7969e9a5fa713e5481cd858a5", [:mix], [], "hexpm", "ff9d8bee7035028ab4742ff52fc80a2aa35cece833cf5319009b52f1b5a86c27"}, "stream_data": {:hex, :stream_data, "0.6.0", "e87a9a79d7ec23d10ff83eb025141ef4915eeb09d4491f79e52f2562b73e5f47", [:mix], [], "hexpm", "b92b5031b650ca480ced047578f1d57ea6dd563f5b57464ad274718c9c29501c"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, "telemetry_metrics": {:hex, :telemetry_metrics, "0.6.1", "315d9163a1d4660aedc3fee73f33f1d355dcc76c5c3ab3d59e76e3edf80eef1f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7be9e0871c41732c233be71e4be11b96e56177bf15dde64a8ac9ce72ac9834c6"}, "telemetry_metrics_prometheus_core": {:hex, :telemetry_metrics_prometheus_core, "1.1.0", "4e15f6d7dbedb3a4e3aed2262b7e1407f166fcb9c30ca3f96635dfbbef99965c", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "0dd10e7fe8070095df063798f82709b0a1224c31b8baf6278b423898d591a069"}, "telemetry_poller": {:hex, :telemetry_poller, "1.0.0", "db91bb424e07f2bb6e73926fcafbfcbcb295f0193e0a00e825e589a0a47e8453", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "b3a24eafd66c3f42da30fc3ca7dda1e9d546c12250a2d60d7b81d264fbec4f6e"}, + "tls_certificate_check": {:hex, :tls_certificate_check, "1.22.1", "0f450cc1568a67a65ce5e15df53c53f9a098c3da081c5f126199a72505858dc1", [:rebar3], [{:ssl_verify_fun, "~> 1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "3092be0babdc0e14c2e900542351e066c0fa5a9cf4b3597559ad1e67f07938c0"}, "websockex": {:hex, :websockex, "0.4.3", "92b7905769c79c6480c02daacaca2ddd49de936d912976a4d3c923723b647bf0", [:mix], [], "hexpm", "95f2e7072b85a3a4cc385602d42115b73ce0b74a9121d0d6dbbf557645ac53e4"}, "xla": {:hex, :xla, "0.4.4", "c3a8ed1f579bda949df505e49ff65415c8281d991fbd6ae1d8f3c5d0fd155f54", [:make, :mix], [{:elixir_make, "~> 0.4", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "484f3f9011db3c9f1ff1e98eecefd382f3882a07ada540fd58803db1d2dab671"}, } diff --git a/rel/config.exs b/rel/config.exs index 17297289c1..c277ec1045 100644 --- a/rel/config.exs +++ b/rel/config.exs @@ -69,6 +69,8 @@ release :archethic_node do applications: [ :runtime_tools, :observer_cli, + opentelemetry_exporter: :permanent, + opentelemetry: :temporary, archethic: :permanent ] ) From 4f47a772500e2ea49bb5f3fc696c8901fec24695 Mon Sep 17 00:00:00 2001 From: Samuel Manzanera Date: Tue, 2 Apr 2024 11:57:54 +0200 Subject: [PATCH 2/3] Transmit trace in P2P messages --- lib/archethic.ex | 2 +- .../beacon_chain/network_coordinates.ex | 2 +- .../beacon_chain/subset/p2p_sampling.ex | 2 +- lib/archethic/mining/transaction_context.ex | 2 +- lib/archethic/p2p.ex | 62 ++++++++++++------- lib/archethic/p2p/client.ex | 2 +- lib/archethic/p2p/client/connection.ex | 20 ++++-- lib/archethic/p2p/client/default_impl.ex | 19 ++++-- lib/archethic/p2p/listener_protocol.ex | 9 +-- lib/archethic/p2p/message.ex | 7 ++- .../p2p/message/acknowledge_storage.ex | 6 +- .../p2p/message/add_mining_context.ex | 2 +- lib/archethic/p2p/message/cross_validate.ex | 3 +- .../p2p/message/get_beacon_summaries.ex | 4 +- .../message/get_beacon_summaries_aggregate.ex | 4 +- .../p2p/message/get_beacon_summary.ex | 3 +- .../p2p/message/get_bootstraping_nodes.ex | 4 +- .../p2p/message/get_current_summaries.ex | 4 +- .../p2p/message/get_dashboard_data.ex | 4 +- .../p2p/message/get_first_public_key.ex | 4 +- .../message/get_first_transaction_address.ex | 2 + .../p2p/message/get_genesis_address.ex | 4 +- .../p2p/message/get_last_transaction.ex | 3 +- .../message/get_last_transaction_address.ex | 3 +- .../p2p/message/get_network_stats.ex | 4 +- .../p2p/message/get_next_addresses.ex | 3 +- .../p2p/message/get_storage_nonce.ex | 3 +- lib/archethic/p2p/message/get_transaction.ex | 3 +- .../p2p/message/get_transaction_chain.ex | 3 +- .../message/get_transaction_chain_length.ex | 3 +- .../p2p/message/get_transaction_inputs.ex | 3 +- .../p2p/message/get_transaction_summary.ex | 5 +- .../p2p/message/get_unspent_outputs.ex | 3 +- lib/archethic/p2p/message/list_nodes.ex | 4 +- lib/archethic/p2p/message/new_beacon_slot.ex | 5 +- lib/archethic/p2p/message/new_transaction.ex | 3 +- .../p2p/message/notify_end_of_node_sync.ex | 3 +- .../notify_last_transaction_address.ex | 3 +- .../p2p/message/notify_previous_chain.ex | 4 +- .../message/notify_replication_validation.ex | 4 +- lib/archethic/p2p/message/ping.ex | 4 +- .../p2p/message/register_beacon_updates.ex | 3 +- .../p2p/message/replicate_transaction.ex | 3 +- .../p2p/message/request_chain_lock.ex | 3 +- lib/archethic/p2p/message/shard_repair.ex | 3 +- lib/archethic/p2p/message/start_mining.ex | 3 +- lib/archethic/p2p/message/unlock_chain.ex | 3 +- .../message/validate_smart_contract_call.ex | 4 +- lib/archethic/p2p/message_envelop.ex | 40 +++++++----- .../dashboard_metrics_aggregator.ex | 2 +- test/archethic/p2p/client/connection_test.exs | 12 ++-- test/archethic/p2p_test.exs | 14 ++--- .../self_repair/self_repair_test.exs | 4 +- 53 files changed, 204 insertions(+), 127 deletions(-) diff --git a/lib/archethic.ex b/lib/archethic.ex index f52b073a74..8769be9ce5 100644 --- a/lib/archethic.ex +++ b/lib/archethic.ex @@ -186,7 +186,7 @@ defmodule Archethic do transaction_type: type ) - _ -> + :ok -> :ok end end) diff --git a/lib/archethic/beacon_chain/network_coordinates.ex b/lib/archethic/beacon_chain/network_coordinates.ex index 3fcb1cf2a2..dfedfce3ac 100644 --- a/lib/archethic/beacon_chain/network_coordinates.ex +++ b/lib/archethic/beacon_chain/network_coordinates.ex @@ -270,7 +270,7 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do TaskSupervisor, beacon_nodes, fn node -> - P2P.send_message(node, %GetNetworkStats{summary_time: summary_time}, timeout) + P2P.send_message(node, %GetNetworkStats{summary_time: summary_time}, timeout: timeout) end, timeout: timeout + 1_000, ordered: false, diff --git a/lib/archethic/beacon_chain/subset/p2p_sampling.ex b/lib/archethic/beacon_chain/subset/p2p_sampling.ex index 34f6abf840..9109fa57e2 100644 --- a/lib/archethic/beacon_chain/subset/p2p_sampling.ex +++ b/lib/archethic/beacon_chain/subset/p2p_sampling.ex @@ -45,7 +45,7 @@ defmodule Archethic.BeaconChain.Subset.P2PSampling do defp do_sample_p2p_view(node = %Node{}, timeout) do start_time = System.monotonic_time(:millisecond) - case P2P.send_message(node, %Ping{}, timeout) do + case P2P.send_message(node, %Ping{}, timeout: timeout) do {:ok, %Ok{}} -> end_time = System.monotonic_time(:millisecond) end_time - start_time diff --git a/lib/archethic/mining/transaction_context.ex b/lib/archethic/mining/transaction_context.ex index 10aad1e9e0..df67a63497 100644 --- a/lib/archethic/mining/transaction_context.ex +++ b/lib/archethic/mining/transaction_context.ex @@ -122,7 +122,7 @@ defmodule Archethic.Mining.TransactionContext do TaskSupervisor, node_public_keys, fn node_public_key -> - {node_public_key, P2P.send_message(node_public_key, %Ping{}, 1000)} + {node_public_key, P2P.send_message(node_public_key, %Ping{}, timeout: 1000)} end, on_timeout: :kill_task ) diff --git a/lib/archethic/p2p.ex b/lib/archethic/p2p.ex index 723d27dc21..f065cb7254 100644 --- a/lib/archethic/p2p.ex +++ b/lib/archethic/p2p.ex @@ -369,21 +369,25 @@ defmodule Archethic.P2P do For mode details see `send_message/3` """ - @spec send_message!(Crypto.key() | Node.t(), Message.request(), timeout()) :: Message.response() - def send_message!(node, message, timeout \\ 0) - - def send_message!(public_key, message, timeout) when is_binary(public_key) do + @spec send_message!( + node :: Crypto.key() | Node.t(), + request :: Message.request(), + options :: [timeout: timeout(), trace: binary()] + ) :: Message.response() + def send_message!(node, message, opts) + + def send_message!(public_key, message, opts) when is_binary(public_key) do public_key |> get_node_info! - |> send_message!(message, timeout) + |> send_message!(message, opts) end def send_message!( node = %Node{ip: ip, port: port}, message, - timeout + opts ) do - case Client.send_message(node, message, timeout) do + case Client.send_message(node, message, opts) do {:ok, ref} -> ref @@ -398,29 +402,38 @@ defmodule Archethic.P2P do If the exchange fails, the node availability history will decrease and will be locally unavailable until the next exchange """ - @spec send_message(Crypto.key() | Node.t(), Message.request(), timeout()) :: + @spec send_message( + node :: Crypto.key() | Node.t(), + request :: Message.request(), + options :: [timeout: timeout(), trace: binary()] + ) :: {:ok, Message.response()} | {:error, :not_found} | {:error, :timeout} | {:error, :closed} - def send_message(node, message, timeout \\ 0) + def send_message(node, message, opts \\ []) - def send_message(public_key, message, timeout) when is_binary(public_key) do + def send_message(public_key, message, opts) when is_binary(public_key) do case get_node_info(public_key) do {:ok, node} -> - send_message(node, message, timeout) + send_message(node, message, opts) {:error, :not_found} -> {:error, :not_found} end end - def send_message(node, message, timeout) do - timeout = if timeout == 0, do: Message.get_timeout(message), else: timeout - do_send_message(node, message, timeout) + def send_message(node, message, opts) do + opts = + Keyword.update(opts, :timeout, Message.get_timeout(message), fn + 0 -> Message.get_timeout(message) + timeout -> timeout + end) + + do_send_message(node, message, opts) end - defdelegate do_send_message(node, message, timeout), to: Client, as: :send_message + defdelegate do_send_message(node, message, opts), to: Client, as: :send_message @doc """ Return the nearest storages nodes from the local node @@ -603,14 +616,21 @@ defmodule Archethic.P2P do @doc """ Send multiple message at once for the given nodes. """ - @spec broadcast_message(list(Node.t()), Message.request()) :: :ok - def broadcast_message(nodes, message) do - Task.Supervisor.start_child(TaskSupervisor, fn -> do_broadcast_message(nodes, message) end) + @spec broadcast_message( + list(Node.t()), + Message.request(), + opts :: [timeout: timeout(), trace: binary()] + ) :: :ok + def broadcast_message(nodes, message, opts \\ []) do + Task.Supervisor.start_child(TaskSupervisor, fn -> + do_broadcast_message(nodes, message, opts) + end) + :ok end - defp do_broadcast_message(nodes, message) do - Task.Supervisor.async_stream_nolink(TaskSupervisor, nodes, &send_message(&1, message), + defp do_broadcast_message(nodes, message, opts) do + Task.Supervisor.async_stream_nolink(TaskSupervisor, nodes, &send_message(&1, message, opts), ordered: false, on_timeout: :kill_task, timeout: Message.get_timeout(message) + 2000 @@ -759,7 +779,7 @@ defmodule Archethic.P2P do Task.Supervisor.async_stream_nolink( TaskSupervisor, group, - &send_message(&1, message, timeout), + &send_message(&1, message, timeout: timeout), ordered: false, on_timeout: :kill_task, timeout: timeout + 2000 diff --git a/lib/archethic/p2p/client.ex b/lib/archethic/p2p/client.ex index ac9870b110..c170329cd6 100644 --- a/lib/archethic/p2p/client.ex +++ b/lib/archethic/p2p/client.ex @@ -22,7 +22,7 @@ defmodule Archethic.P2P.Client do @callback send_message( node :: Node.t(), message :: Message.request(), - timeout :: non_neg_integer() + opts :: [timeout: timeout(), trace: binary()] ) :: {:ok, Message.response()} | {:error, :timeout} diff --git a/lib/archethic/p2p/client/connection.ex b/lib/archethic/p2p/client/connection.ex index 32601d416b..8ea4e63b28 100644 --- a/lib/archethic/p2p/client/connection.ex +++ b/lib/archethic/p2p/client/connection.ex @@ -41,14 +41,19 @@ defmodule Archethic.P2P.Client.Connection do It may returns `{:error, :timeout}` if either the send or the receiving take more than the timeout value provided. It may also returns `{:error, :closed}` is the socket closed or any error in the transport layer """ - @spec send_message(Crypto.key(), Message.request(), timeout()) :: + @spec send_message( + node_public_key :: Crypto.key(), + request :: Message.request(), + opts :: [timeout: timeout(), trace: binary] + ) :: {:ok, Message.response()} | {:error, :timeout} | {:error, :closed} - def send_message(public_key, message, timeout \\ 3_000) do + def send_message(public_key, message, opts \\ []) do ref = make_ref() - GenStateMachine.cast(via_tuple(public_key), {:send_message, ref, self(), message, timeout}) + timeout = Keyword.get(opts, :timeout, 3_000) + GenStateMachine.cast(via_tuple(public_key), {:send_message, ref, self(), message, opts}) receive do {^ref, msg} -> @@ -265,7 +270,7 @@ defmodule Archethic.P2P.Client.Connection do def handle_event( :cast, - {:send_message, ref, from, _msg, _timeout}, + {:send_message, ref, from, _msg, _opts}, :disconnected, _data ) do @@ -275,7 +280,7 @@ defmodule Archethic.P2P.Client.Connection do def handle_event( :cast, - {:send_message, ref, from, message, timeout}, + {:send_message, ref, from, message, opts}, {:connected, socket}, data = %{ request_id: request_id, @@ -299,7 +304,8 @@ defmodule Archethic.P2P.Client.Connection do message: message, message_id: request_id, sender_public_key: Crypto.first_node_public_key(), - signature: signature + signature: signature, + trace: Keyword.get(opts, :trace, "") }, node_public_key ) @@ -326,6 +332,8 @@ defmodule Archethic.P2P.Client.Connection do res end) + timeout = Keyword.get(opts, :timeout, 3_000) + new_data = data |> Map.update!( diff --git a/lib/archethic/p2p/client/default_impl.ex b/lib/archethic/p2p/client/default_impl.ex index 3ad4e8fe36..f164678f8d 100644 --- a/lib/archethic/p2p/client/default_impl.ex +++ b/lib/archethic/p2p/client/default_impl.ex @@ -75,7 +75,14 @@ defmodule Archethic.P2P.Client.DefaultImpl do @doc """ Send a message to the given node using the right connection bearer """ - @spec send_message(Node.t(), message :: Message.request(), timeout :: non_neg_integer()) :: + @spec send_message( + node :: Node.t(), + message :: Message.request(), + options :: [ + timeout: timeout(), + trace: binary() + ] + ) :: {:ok, Message.response()} | {:error, :timeout} | {:error, :closed} @@ -83,13 +90,17 @@ defmodule Archethic.P2P.Client.DefaultImpl do def send_message( %Node{first_public_key: node_public_key}, message, - timeout + opts \\ [] ) do if node_public_key == Crypto.first_node_public_key() do # if the node was itself just process the message - {:ok, Message.process(message, node_public_key)} + {:ok, + Message.process(message, %{ + sender_public_key: node_public_key, + trace: Keyword.get(opts, :trace, "") + })} else - case Connection.send_message(node_public_key, message, timeout) do + case Connection.send_message(node_public_key, message, opts) do {:ok, data} -> {:ok, data} diff --git a/lib/archethic/p2p/listener_protocol.ex b/lib/archethic/p2p/listener_protocol.ex index 4c98945228..640c846c94 100644 --- a/lib/archethic/p2p/listener_protocol.ex +++ b/lib/archethic/p2p/listener_protocol.ex @@ -74,7 +74,8 @@ defmodule Archethic.P2P.ListenerProtocol do message: message, sender_public_key: sender_pkey, signature: signature, - decrypted_raw_message: decrypted_raw_message + decrypted_raw_message: decrypted_raw_message, + trace: trace }} -> valid_signature? = Crypto.verify?( @@ -85,7 +86,7 @@ defmodule Archethic.P2P.ListenerProtocol do if valid_signature? do message - |> process_msg(sender_pkey) + |> process_msg(%{sender_public_key: sender_pkey, trace: trace}) |> encode_response(message_id, sender_pkey) |> reply(transport, socket, message) else @@ -127,10 +128,10 @@ defmodule Archethic.P2P.ListenerProtocol do {:error, Exception.format(:error, err, __STACKTRACE__)} end - defp process_msg(message, sender_pkey) do + defp process_msg(message, metadata) do start_processing_time = System.monotonic_time() - Message.process(message, sender_pkey) + Message.process(message, metadata) |> tap(fn _ -> :telemetry.execute( [:archethic, :p2p, :handle_message], diff --git a/lib/archethic/p2p/message.ex b/lib/archethic/p2p/message.ex index 9a2a4b5e50..3e5b69114b 100644 --- a/lib/archethic/p2p/message.ex +++ b/lib/archethic/p2p/message.ex @@ -216,6 +216,9 @@ defmodule Archethic.P2P.Message do @doc """ Handle a P2P message by processing it and return list of responses to be streamed back to the client """ - @spec process(request(), Crypto.key()) :: response() - def process(msg, key), do: msg.__struct__.process(msg, key) + @spec process(request(), metadata()) :: + response() + def process(msg, metadata), do: msg.__struct__.process(msg, metadata) + + @type metadata() :: %{sender_public_key: Crypto.key(), trace: binary()} end diff --git a/lib/archethic/p2p/message/acknowledge_storage.ex b/lib/archethic/p2p/message/acknowledge_storage.ex index c1c7ff5450..c0b7eda949 100644 --- a/lib/archethic/p2p/message/acknowledge_storage.ex +++ b/lib/archethic/p2p/message/acknowledge_storage.ex @@ -8,9 +8,9 @@ defmodule Archethic.P2P.Message.AcknowledgeStorage do @enforce_keys [:address, :signature] defstruct [:address, :signature] - alias Archethic.Crypto alias Archethic.Mining alias Archethic.Utils + alias Archethic.P2P.Message alias Archethic.P2P.Message.Ok @type t :: %__MODULE__{ @@ -18,13 +18,13 @@ defmodule Archethic.P2P.Message.AcknowledgeStorage do signature: binary() } - @spec process(__MODULE__.t(), Crypto.key()) :: Ok.t() + @spec process(__MODULE__.t(), Message.metadata()) :: Ok.t() def process( %__MODULE__{ address: address, signature: signature }, - node_public_key + %{sender_public_key: node_public_key} ) do Mining.confirm_replication(address, signature, node_public_key) %Ok{} diff --git a/lib/archethic/p2p/message/add_mining_context.ex b/lib/archethic/p2p/message/add_mining_context.ex index cb08b5b7bb..1ea208928a 100644 --- a/lib/archethic/p2p/message/add_mining_context.ex +++ b/lib/archethic/p2p/message/add_mining_context.ex @@ -40,7 +40,7 @@ defmodule Archethic.P2P.Message.AddMiningContext do previous_storage_nodes_public_keys: list(Crypto.key()) } - @spec process(__MODULE__.t(), Crypto.key()) :: Ok.t() + @spec process(__MODULE__.t(), Message.metadata()) :: Ok.t() def process( %__MODULE__{ address: tx_address, diff --git a/lib/archethic/p2p/message/cross_validate.ex b/lib/archethic/p2p/message/cross_validate.ex index de095c5c9d..1c933d2339 100644 --- a/lib/archethic/p2p/message/cross_validate.ex +++ b/lib/archethic/p2p/message/cross_validate.ex @@ -21,6 +21,7 @@ defmodule Archethic.P2P.Message.CrossValidate do alias Archethic.Crypto alias Archethic.TransactionChain.Transaction.ValidationStamp alias Archethic.Mining + alias Archethic.P2P.Message alias Archethic.P2P.Message.Ok alias Archethic.Utils @@ -36,7 +37,7 @@ defmodule Archethic.P2P.Message.CrossValidate do aggregated_utxos: list(VersionedUnspentOutput.t()) } - @spec process(__MODULE__.t(), Crypto.key()) :: Ok.t() + @spec process(__MODULE__.t(), Message.metadata()) :: Ok.t() def process( %__MODULE__{ address: tx_address, diff --git a/lib/archethic/p2p/message/get_beacon_summaries.ex b/lib/archethic/p2p/message/get_beacon_summaries.ex index e594944d79..251fa5976d 100644 --- a/lib/archethic/p2p/message/get_beacon_summaries.ex +++ b/lib/archethic/p2p/message/get_beacon_summaries.ex @@ -4,8 +4,8 @@ defmodule Archethic.P2P.Message.GetBeaconSummaries do """ defstruct [:addresses] - alias Archethic.Crypto alias Archethic.BeaconChain + alias Archethic.P2P.Message alias Archethic.P2P.Message.BeaconSummaryList alias Archethic.Utils alias Archethic.Utils.VarInt @@ -14,7 +14,7 @@ defmodule Archethic.P2P.Message.GetBeaconSummaries do addresses: list(binary()) } - @spec process(__MODULE__.t(), Crypto.key()) :: BeaconSummaryList.t() + @spec process(__MODULE__.t(), Message.metadata()) :: BeaconSummaryList.t() def process(%__MODULE__{addresses: addresses}, _) do %BeaconSummaryList{ summaries: BeaconChain.get_beacon_summaries(addresses) diff --git a/lib/archethic/p2p/message/get_beacon_summaries_aggregate.ex b/lib/archethic/p2p/message/get_beacon_summaries_aggregate.ex index a0da996b55..f030263ac8 100644 --- a/lib/archethic/p2p/message/get_beacon_summaries_aggregate.ex +++ b/lib/archethic/p2p/message/get_beacon_summaries_aggregate.ex @@ -3,9 +3,9 @@ defmodule Archethic.P2P.Message.GetBeaconSummariesAggregate do Represents a message to get a beacon summary aggregate """ - alias Archethic.Crypto alias Archethic.BeaconChain alias Archethic.BeaconChain.SummaryAggregate + alias Archethic.P2P.Message alias Archethic.P2P.Message.NotFound @enforce_keys [:date] @@ -15,7 +15,7 @@ defmodule Archethic.P2P.Message.GetBeaconSummariesAggregate do date: DateTime.t() } - @spec process(__MODULE__.t(), Crypto.key()) :: SummaryAggregate.t() | NotFound.t() + @spec process(__MODULE__.t(), Message.metadata()) :: SummaryAggregate.t() | NotFound.t() def process(%__MODULE__{date: date}, _) do case BeaconChain.get_summaries_aggregate(date) do {:ok, aggregate} -> diff --git a/lib/archethic/p2p/message/get_beacon_summary.ex b/lib/archethic/p2p/message/get_beacon_summary.ex index bd6c662d0d..417ec8df58 100644 --- a/lib/archethic/p2p/message/get_beacon_summary.ex +++ b/lib/archethic/p2p/message/get_beacon_summary.ex @@ -9,6 +9,7 @@ defmodule Archethic.P2P.Message.GetBeaconSummary do alias Archethic.Crypto alias Archethic.Utils alias Archethic.BeaconChain + alias Archethic.P2P.Message alias Archethic.P2P.Message.NotFound alias Archethic.BeaconChain.Summary @@ -16,7 +17,7 @@ defmodule Archethic.P2P.Message.GetBeaconSummary do address: Crypto.versioned_hash() } - @spec process(__MODULE__.t(), Crypto.key()) :: Summary.t() | NotFound.t() + @spec process(__MODULE__.t(), Message.metadata()) :: Summary.t() | NotFound.t() def process(%__MODULE__{address: address}, _) do case BeaconChain.get_summary(address) do {:ok, summary} -> diff --git a/lib/archethic/p2p/message/get_bootstraping_nodes.ex b/lib/archethic/p2p/message/get_bootstraping_nodes.ex index 4449226679..3ad7fd06f0 100644 --- a/lib/archethic/p2p/message/get_bootstraping_nodes.ex +++ b/lib/archethic/p2p/message/get_bootstraping_nodes.ex @@ -7,8 +7,8 @@ defmodule Archethic.P2P.Message.GetBootstrappingNodes do """ alias Archethic.P2P + alias Archethic.P2P.Message alias Archethic.P2P.Message.BootstrappingNodes - alias Archethic.Crypto @enforce_keys [:patch] defstruct [:patch] @@ -17,7 +17,7 @@ defmodule Archethic.P2P.Message.GetBootstrappingNodes do patch: binary() } - @spec process(__MODULE__.t(), Crypto.key()) :: BootstrappingNodes.t() + @spec process(__MODULE__.t(), Message.metadata()) :: BootstrappingNodes.t() def process(%__MODULE__{patch: patch}, _) do top_nodes = P2P.authorized_and_available_nodes() diff --git a/lib/archethic/p2p/message/get_current_summaries.ex b/lib/archethic/p2p/message/get_current_summaries.ex index e702912d01..6e434fcab8 100644 --- a/lib/archethic/p2p/message/get_current_summaries.ex +++ b/lib/archethic/p2p/message/get_current_summaries.ex @@ -6,17 +6,17 @@ defmodule Archethic.P2P.Message.GetCurrentSummaries do @enforce_keys [:subsets] defstruct [:subsets] - alias Archethic.Crypto alias Archethic.BeaconChain alias Archethic.BeaconChain.Slot alias Archethic.BeaconChain.Subset + alias Archethic.P2P.Message alias Archethic.P2P.Message.TransactionSummaryList @type t :: %__MODULE__{ subsets: list(binary()) } - @spec process(__MODULE__.t(), Crypto.key()) :: TransactionSummaryList.t() + @spec process(__MODULE__.t(), Message.metadata()) :: TransactionSummaryList.t() def process(%__MODULE__{subsets: subsets}, _) do transaction_summaries = Enum.flat_map(subsets, fn subset -> diff --git a/lib/archethic/p2p/message/get_dashboard_data.ex b/lib/archethic/p2p/message/get_dashboard_data.ex index c1e87ca1f7..38ca0ef9d5 100644 --- a/lib/archethic/p2p/message/get_dashboard_data.ex +++ b/lib/archethic/p2p/message/get_dashboard_data.ex @@ -3,7 +3,7 @@ defmodule Archethic.P2P.Message.GetDashboardData do Represents a message to request the first public key from a transaction chain """ - alias Archethic.Crypto + alias Archethic.P2P.Message alias Archethic.P2P.Message.DashboardData alias ArchethicWeb.DashboardMetrics @@ -13,7 +13,7 @@ defmodule Archethic.P2P.Message.GetDashboardData do since: nil | DateTime.t() } - @spec process(t(), Crypto.key()) :: DashboardData.t() + @spec process(t(), Message.metadata()) :: DashboardData.t() def process(%__MODULE__{since: nil}, _) do %DashboardData{buckets: DashboardMetrics.get_all()} end diff --git a/lib/archethic/p2p/message/get_first_public_key.ex b/lib/archethic/p2p/message/get_first_public_key.ex index 9d2332e39b..045eef218e 100644 --- a/lib/archethic/p2p/message/get_first_public_key.ex +++ b/lib/archethic/p2p/message/get_first_public_key.ex @@ -6,9 +6,9 @@ defmodule Archethic.P2P.Message.GetFirstPublicKey do @enforce_keys [:public_key] defstruct [:public_key] - alias Archethic.Crypto alias Archethic.Utils alias Archethic.TransactionChain + alias Archethic.P2P.Message alias Archethic.P2P.Message.FirstPublicKey @type t() :: %__MODULE__{ @@ -16,7 +16,7 @@ defmodule Archethic.P2P.Message.GetFirstPublicKey do } # Returns the first public_key for a given public_key and if the public_key is used for the first time, return the same public_key. - @spec process(__MODULE__.t(), Crypto.key()) :: FirstPublicKey.t() + @spec process(__MODULE__.t(), Message.metadata()) :: FirstPublicKey.t() def process(%__MODULE__{public_key: public_key}, _) do %FirstPublicKey{ public_key: TransactionChain.get_first_public_key(public_key) diff --git a/lib/archethic/p2p/message/get_first_transaction_address.ex b/lib/archethic/p2p/message/get_first_transaction_address.ex index f2a9bd12f2..01af576515 100644 --- a/lib/archethic/p2p/message/get_first_transaction_address.ex +++ b/lib/archethic/p2p/message/get_first_transaction_address.ex @@ -7,6 +7,7 @@ defmodule Archethic.P2P.Message.GetFirstTransactionAddress do Hash of first public key gives first transaction address """ alias Archethic.Utils + alias Archethic.P2P.Message alias Archethic.P2P.Message.FirstTransactionAddress alias Archethic.TransactionChain alias Archethic.P2P.Message.NotFound @@ -34,6 +35,7 @@ defmodule Archethic.P2P.Message.GetFirstTransactionAddress do {%__MODULE__{address: address}, rest} end + @spec process(t(), Message.metadata()) :: NotFound.t() | FirstTransactionAddress.t() def process(%__MODULE__{address: address}, _) do case TransactionChain.get_first_transaction_address(address) do {:error, :transaction_not_exists} -> diff --git a/lib/archethic/p2p/message/get_genesis_address.ex b/lib/archethic/p2p/message/get_genesis_address.ex index 963fef98f7..c6f129cf8a 100644 --- a/lib/archethic/p2p/message/get_genesis_address.ex +++ b/lib/archethic/p2p/message/get_genesis_address.ex @@ -6,16 +6,16 @@ defmodule Archethic.P2P.Message.GetGenesisAddress do @enforce_keys [:address] defstruct [:address] - alias Archethic.Crypto alias Archethic.Utils alias Archethic.TransactionChain + alias Archethic.P2P.Message alias Archethic.P2P.Message.GenesisAddress @type t() :: %__MODULE__{ address: binary() } - @spec process(__MODULE__.t(), Crypto.key()) :: GenesisAddress.t() + @spec process(__MODULE__.t(), Message.metadata()) :: GenesisAddress.t() def process(%__MODULE__{address: address}, _) do genesis_address = TransactionChain.get_genesis_address(address) diff --git a/lib/archethic/p2p/message/get_last_transaction.ex b/lib/archethic/p2p/message/get_last_transaction.ex index 7d1dc2177e..41b4e81967 100644 --- a/lib/archethic/p2p/message/get_last_transaction.ex +++ b/lib/archethic/p2p/message/get_last_transaction.ex @@ -7,6 +7,7 @@ defmodule Archethic.P2P.Message.GetLastTransaction do alias Archethic.Crypto alias Archethic.Utils + alias Archethic.P2P.Message alias Archethic.P2P.Message.Error alias Archethic.P2P.Message.NotFound alias Archethic.TransactionChain @@ -16,7 +17,7 @@ defmodule Archethic.P2P.Message.GetLastTransaction do address: Crypto.versioned_hash() } - @spec process(__MODULE__.t(), Crypto.key()) :: NotFound.t() | Error.t() | Transaction.t() + @spec process(__MODULE__.t(), Message.metadata()) :: NotFound.t() | Error.t() | Transaction.t() def process(%__MODULE__{address: address}, _) do case TransactionChain.get_last_transaction(address) do {:ok, tx} -> diff --git a/lib/archethic/p2p/message/get_last_transaction_address.ex b/lib/archethic/p2p/message/get_last_transaction_address.ex index 3b78b757dd..07cc9fa85c 100644 --- a/lib/archethic/p2p/message/get_last_transaction_address.ex +++ b/lib/archethic/p2p/message/get_last_transaction_address.ex @@ -8,6 +8,7 @@ defmodule Archethic.P2P.Message.GetLastTransactionAddress do alias Archethic.Crypto alias Archethic.Utils alias Archethic.TransactionChain + alias Archethic.P2P.Message alias Archethic.P2P.Message.LastTransactionAddress @type t :: %__MODULE__{ @@ -15,7 +16,7 @@ defmodule Archethic.P2P.Message.GetLastTransactionAddress do timestamp: DateTime.t() } - @spec process(__MODULE__.t(), Crypto.key()) :: LastTransactionAddress.t() + @spec process(__MODULE__.t(), Message.metadata()) :: LastTransactionAddress.t() def process(%__MODULE__{address: address, timestamp: timestamp}, _) do {address, time} = TransactionChain.get_last_address(address, timestamp) %LastTransactionAddress{address: address, timestamp: time} diff --git a/lib/archethic/p2p/message/get_network_stats.ex b/lib/archethic/p2p/message/get_network_stats.ex index 764d8f23f1..176df275ee 100644 --- a/lib/archethic/p2p/message/get_network_stats.ex +++ b/lib/archethic/p2p/message/get_network_stats.ex @@ -8,7 +8,7 @@ defmodule Archethic.P2P.Message.GetNetworkStats do alias Archethic.BeaconChain.NetworkCoordinates alias Archethic.BeaconChain.Subset.StatsCollector - alias Archethic.Crypto + alias Archethic.P2P.Message alias Archethic.P2P.Message.NetworkStats @type t :: %__MODULE__{ @@ -39,7 +39,7 @@ defmodule Archethic.P2P.Message.GetNetworkStats do @doc """ Process the message to get the network stats from the summary cache """ - @spec process(t(), Crypto.key()) :: NetworkStats.t() + @spec process(t(), Message.metadata()) :: NetworkStats.t() def process(%__MODULE__{summary_time: summary_time}, _node_public_key) do %NetworkStats{stats: StatsCollector.get(summary_time, NetworkCoordinates.timeout())} end diff --git a/lib/archethic/p2p/message/get_next_addresses.ex b/lib/archethic/p2p/message/get_next_addresses.ex index 9e91bbb122..ececfa6d7e 100644 --- a/lib/archethic/p2p/message/get_next_addresses.ex +++ b/lib/archethic/p2p/message/get_next_addresses.ex @@ -9,11 +9,12 @@ defmodule Archethic.P2P.Message.GetNextAddresses do alias Archethic.Utils alias Archethic.Utils.VarInt alias Archethic.TransactionChain + alias Archethic.P2P.Message alias Archethic.P2P.Message.AddressList @type t :: %__MODULE__{address: Crypto.prepended_hash()} - @spec process(__MODULE__.t(), Crypto.key()) :: AddressList.t() + @spec process(__MODULE__.t(), Message.metadata()) :: AddressList.t() def process(%__MODULE__{address: address, limit: limit}, _) do %AddressList{addresses: TransactionChain.get_next_addresses(address, limit)} end diff --git a/lib/archethic/p2p/message/get_storage_nonce.ex b/lib/archethic/p2p/message/get_storage_nonce.ex index 44856ca8a3..b3ca80e148 100644 --- a/lib/archethic/p2p/message/get_storage_nonce.ex +++ b/lib/archethic/p2p/message/get_storage_nonce.ex @@ -5,6 +5,7 @@ defmodule Archethic.P2P.Message.GetStorageNonce do This message is used during the node bootstrapping """ + alias Archethic.P2P.Message alias Archethic.P2P.Message.EncryptedStorageNonce alias Archethic.Crypto alias Archethic.Utils @@ -33,7 +34,7 @@ defmodule Archethic.P2P.Message.GetStorageNonce do <> end - @spec process(__MODULE__.t(), Crypto.key()) :: EncryptedStorageNonce.t() + @spec process(__MODULE__.t(), Message.metadata()) :: EncryptedStorageNonce.t() def process(%__MODULE__{public_key: public_key}, _) do %EncryptedStorageNonce{ digest: Crypto.encrypt_storage_nonce(public_key) diff --git a/lib/archethic/p2p/message/get_transaction.ex b/lib/archethic/p2p/message/get_transaction.ex index 41f559181f..aa1b21107e 100644 --- a/lib/archethic/p2p/message/get_transaction.ex +++ b/lib/archethic/p2p/message/get_transaction.ex @@ -8,6 +8,7 @@ defmodule Archethic.P2P.Message.GetTransaction do alias Archethic.Crypto alias Archethic.TransactionChain alias Archethic.TransactionChain.Transaction + alias Archethic.P2P.Message alias Archethic.P2P.Message.NotFound alias Archethic.P2P.Message.Error alias Archethic.Utils @@ -16,7 +17,7 @@ defmodule Archethic.P2P.Message.GetTransaction do address: Crypto.versioned_hash() } - @spec process(__MODULE__.t(), Crypto.key()) :: NotFound.t() | Error.t() | Transaction.t() + @spec process(__MODULE__.t(), Message.metadata()) :: NotFound.t() | Error.t() | Transaction.t() def process(%__MODULE__{address: tx_address}, _) do case TransactionChain.get_transaction(tx_address) do {:ok, tx} -> diff --git a/lib/archethic/p2p/message/get_transaction_chain.ex b/lib/archethic/p2p/message/get_transaction_chain.ex index 64fe8fa8ad..0076a6a19f 100644 --- a/lib/archethic/p2p/message/get_transaction_chain.ex +++ b/lib/archethic/p2p/message/get_transaction_chain.ex @@ -7,6 +7,7 @@ defmodule Archethic.P2P.Message.GetTransactionChain do alias Archethic.Crypto alias Archethic.DB + alias Archethic.P2P.Message alias Archethic.P2P.Message.TransactionList alias Archethic.TransactionChain alias Archethic.Utils @@ -18,7 +19,7 @@ defmodule Archethic.P2P.Message.GetTransactionChain do } # paging_state received contains binary offset for next page, to be used for query - @spec process(__MODULE__.t(), Crypto.key()) :: TransactionList.t() + @spec process(__MODULE__.t(), Message.metadata()) :: TransactionList.t() def process(%__MODULE__{address: tx_address, paging_state: paging_state, order: order}, _) do {chain, more?, paging_address} = case TransactionChain.resolve_paging_state(tx_address, paging_state, order) do diff --git a/lib/archethic/p2p/message/get_transaction_chain_length.ex b/lib/archethic/p2p/message/get_transaction_chain_length.ex index 528c71e68b..2242b06bee 100644 --- a/lib/archethic/p2p/message/get_transaction_chain_length.ex +++ b/lib/archethic/p2p/message/get_transaction_chain_length.ex @@ -8,6 +8,7 @@ defmodule Archethic.P2P.Message.GetTransactionChainLength do alias Archethic.Crypto alias Archethic.TransactionChain alias Archethic.Utils + alias Archethic.P2P.Message alias Archethic.P2P.Message.TransactionChainLength @type t :: %__MODULE__{ @@ -15,7 +16,7 @@ defmodule Archethic.P2P.Message.GetTransactionChainLength do } # Returns the length of the transaction chain - @spec process(__MODULE__.t(), Crypto.key()) :: TransactionChainLength.t() + @spec process(__MODULE__.t(), Message.metadata()) :: TransactionChainLength.t() def process(%__MODULE__{address: address}, _) do %TransactionChainLength{ length: TransactionChain.get_size(address) diff --git a/lib/archethic/p2p/message/get_transaction_inputs.ex b/lib/archethic/p2p/message/get_transaction_inputs.ex index e54ebf5338..675888c12e 100644 --- a/lib/archethic/p2p/message/get_transaction_inputs.ex +++ b/lib/archethic/p2p/message/get_transaction_inputs.ex @@ -8,6 +8,7 @@ defmodule Archethic.P2P.Message.GetTransactionInputs do alias Archethic.Crypto alias Archethic.TransactionChain alias Archethic.TransactionChain.VersionedTransactionInput + alias Archethic.P2P.Message alias Archethic.P2P.Message.TransactionInputList alias Archethic.Utils alias Archethic.Utils.VarInt @@ -18,7 +19,7 @@ defmodule Archethic.P2P.Message.GetTransactionInputs do limit: non_neg_integer() } - @spec process(__MODULE__.t(), Crypto.key()) :: TransactionInputList.t() + @spec process(__MODULE__.t(), Message.metadata()) :: TransactionInputList.t() def process(%__MODULE__{address: address, offset: offset, limit: limit}, _) do {inputs, more?, offset} = address diff --git a/lib/archethic/p2p/message/get_transaction_summary.ex b/lib/archethic/p2p/message/get_transaction_summary.ex index 708f9e2589..676649f0f0 100644 --- a/lib/archethic/p2p/message/get_transaction_summary.ex +++ b/lib/archethic/p2p/message/get_transaction_summary.ex @@ -5,9 +5,9 @@ defmodule Archethic.P2P.Message.GetTransactionSummary do @enforce_keys [:address] defstruct [:address] - alias Archethic.Crypto alias Archethic.Utils alias Archethic.TransactionChain + alias Archethic.P2P.Message alias Archethic.P2P.Message.TransactionSummaryMessage alias Archethic.P2P.Message.NotFound @@ -15,7 +15,8 @@ defmodule Archethic.P2P.Message.GetTransactionSummary do address: binary() } - @spec process(__MODULE__.t(), Crypto.key()) :: TransactionSummaryMessage.t() | NotFound.t() + @spec process(__MODULE__.t(), Message.metadata()) :: + TransactionSummaryMessage.t() | NotFound.t() def process(%__MODULE__{address: address}, _) do case TransactionChain.get_transaction_summary(address) do {:ok, summary} -> diff --git a/lib/archethic/p2p/message/get_unspent_outputs.ex b/lib/archethic/p2p/message/get_unspent_outputs.ex index 924b1a232c..afcee9cfd2 100644 --- a/lib/archethic/p2p/message/get_unspent_outputs.ex +++ b/lib/archethic/p2p/message/get_unspent_outputs.ex @@ -6,6 +6,7 @@ defmodule Archethic.P2P.Message.GetUnspentOutputs do defstruct [:address, offset: nil, limit: 0] alias Archethic.Crypto + alias Archethic.P2P.Message alias Archethic.P2P.Message.UnspentOutputList alias Archethic.TransactionChain @@ -28,7 +29,7 @@ defmodule Archethic.P2P.Message.GetUnspentOutputs do limit: non_neg_integer() } - @spec process(__MODULE__.t(), Crypto.key()) :: UnspentOutputList.t() + @spec process(__MODULE__.t(), Message.metadata()) :: UnspentOutputList.t() def process(%__MODULE__{address: genesis_address, offset: offset, limit: limit}, _) do sorted_utxos = genesis_address diff --git a/lib/archethic/p2p/message/list_nodes.ex b/lib/archethic/p2p/message/list_nodes.ex index 90ae832782..6017f5193b 100644 --- a/lib/archethic/p2p/message/list_nodes.ex +++ b/lib/archethic/p2p/message/list_nodes.ex @@ -2,8 +2,8 @@ defmodule Archethic.P2P.Message.ListNodes do @moduledoc """ Represents a message to fetch the list of nodes """ - alias Archethic.Crypto alias Archethic.P2P + alias Archethic.P2P.Message alias Archethic.P2P.Message.NodeList defstruct [:authorized_and_available?] @@ -12,7 +12,7 @@ defmodule Archethic.P2P.Message.ListNodes do authorized_and_available?: boolean() } - @spec process(__MODULE__.t(), Crypto.key()) :: NodeList.t() + @spec process(__MODULE__.t(), Message.metadata()) :: NodeList.t() def process(%__MODULE__{authorized_and_available?: false}, _), do: %NodeList{nodes: P2P.list_nodes()} diff --git a/lib/archethic/p2p/message/new_beacon_slot.ex b/lib/archethic/p2p/message/new_beacon_slot.ex index 7e2401a3ce..25485e4ea0 100644 --- a/lib/archethic/p2p/message/new_beacon_slot.ex +++ b/lib/archethic/p2p/message/new_beacon_slot.ex @@ -11,6 +11,7 @@ defmodule Archethic.P2P.Message.NewBeaconSlot do alias Archethic.Crypto alias Archethic.Election alias Archethic.P2P + alias Archethic.P2P.Message alias Archethic.P2P.Message.Ok alias Archethic.P2P.Message.Error alias Archethic.Utils @@ -21,10 +22,10 @@ defmodule Archethic.P2P.Message.NewBeaconSlot do slot: Slot.t() } - @spec process(__MODULE__.t(), Crypto.key()) :: Ok.t() | Error.t() + @spec process(__MODULE__.t(), Message.metadata()) :: Ok.t() | Error.t() def process( %__MODULE__{slot: slot = %Slot{subset: subset, slot_time: slot_time}}, - node_public_key + %{sender_public_key: node_public_key} ) do summary_time = BeaconChain.next_summary_date(slot_time) node_list = P2P.authorized_and_available_nodes(summary_time, true) diff --git a/lib/archethic/p2p/message/new_transaction.ex b/lib/archethic/p2p/message/new_transaction.ex index d68cb07eee..2e8543e4e9 100644 --- a/lib/archethic/p2p/message/new_transaction.ex +++ b/lib/archethic/p2p/message/new_transaction.ex @@ -9,6 +9,7 @@ defmodule Archethic.P2P.Message.NewTransaction do alias Archethic.Contracts.Contract alias Archethic.Crypto + alias Archethic.P2P.Message alias Archethic.P2P.Message.Ok alias Archethic.P2P.Message.Error alias Archethic.TransactionChain.Transaction @@ -20,7 +21,7 @@ defmodule Archethic.P2P.Message.NewTransaction do contract_context: nil | Contract.Context.t() } - @spec process(__MODULE__.t(), Crypto.key()) :: Ok.t() | Error.t() + @spec process(__MODULE__.t(), Message.metadata()) :: Ok.t() | Error.t() def process( %__MODULE__{ transaction: tx, diff --git a/lib/archethic/p2p/message/notify_end_of_node_sync.ex b/lib/archethic/p2p/message/notify_end_of_node_sync.ex index 9533100d13..5fede2fb3d 100644 --- a/lib/archethic/p2p/message/notify_end_of_node_sync.ex +++ b/lib/archethic/p2p/message/notify_end_of_node_sync.ex @@ -11,13 +11,14 @@ defmodule Archethic.P2P.Message.NotifyEndOfNodeSync do alias Archethic.BeaconChain alias Archethic.Utils alias Archethic.P2P.Message.Ok + alias Archethic.P2P.Message @type t :: %__MODULE__{ node_public_key: Crypto.key(), timestamp: DateTime.t() } - @spec process(__MODULE__.t(), Crypto.key()) :: Ok.t() + @spec process(__MODULE__.t(), Message.metadata()) :: Ok.t() def process(%__MODULE__{node_public_key: public_key, timestamp: timestamp}, _) do BeaconChain.add_end_of_node_sync(public_key, timestamp) %Ok{} diff --git a/lib/archethic/p2p/message/notify_last_transaction_address.ex b/lib/archethic/p2p/message/notify_last_transaction_address.ex index b0abd362de..118ec065bc 100644 --- a/lib/archethic/p2p/message/notify_last_transaction_address.ex +++ b/lib/archethic/p2p/message/notify_last_transaction_address.ex @@ -11,6 +11,7 @@ defmodule Archethic.P2P.Message.NotifyLastTransactionAddress do alias Archethic.P2P alias Archethic.TransactionChain alias Archethic.P2P.Message.Ok + alias Archethic.P2P.Message @type t :: %__MODULE__{ last_address: Crypto.versioned_hash(), @@ -19,7 +20,7 @@ defmodule Archethic.P2P.Message.NotifyLastTransactionAddress do timestamp: DateTime.t() } - @spec process(__MODULE__.t(), Crypto.key()) :: Ok.t() + @spec process(__MODULE__.t(), Message.metadata()) :: Ok.t() def process( %__MODULE__{ genesis_address: genesis_address, diff --git a/lib/archethic/p2p/message/notify_previous_chain.ex b/lib/archethic/p2p/message/notify_previous_chain.ex index a0d54efd61..8bf39eb5a3 100644 --- a/lib/archethic/p2p/message/notify_previous_chain.ex +++ b/lib/archethic/p2p/message/notify_previous_chain.ex @@ -5,16 +5,16 @@ defmodule Archethic.P2P.Message.NotifyPreviousChain do defstruct [:address] - alias Archethic.Crypto alias Archethic.Utils alias Archethic.Replication + alias Archethic.P2P.Message alias Archethic.P2P.Message.Ok @type t :: %__MODULE__{ address: binary() } - @spec process(__MODULE__.t(), Crypto.key()) :: Ok.t() + @spec process(__MODULE__.t(), Message.metadata()) :: Ok.t() def process(%__MODULE__{address: address}, _) do Replication.acknowledge_previous_storage_nodes(address) %Ok{} diff --git a/lib/archethic/p2p/message/notify_replication_validation.ex b/lib/archethic/p2p/message/notify_replication_validation.ex index 0fb79b354e..8559f1eeee 100644 --- a/lib/archethic/p2p/message/notify_replication_validation.ex +++ b/lib/archethic/p2p/message/notify_replication_validation.ex @@ -10,9 +10,11 @@ defmodule Archethic.P2P.Message.NotifyReplicationValidation do alias Archethic.Utils alias Archethic.Mining + alias Archethic.P2P.Message alias Archethic.P2P.Message.Ok - def process(%__MODULE__{address: address}, node_public_key) do + @spec process(t(), Message.metadata()) :: Ok.t() + def process(%__MODULE__{address: address}, %{sender_public_key: node_public_key}) do Mining.notify_replication_validation(address, node_public_key) %Ok{} end diff --git a/lib/archethic/p2p/message/ping.ex b/lib/archethic/p2p/message/ping.ex index f052a68051..abafbdd1f5 100644 --- a/lib/archethic/p2p/message/ping.ex +++ b/lib/archethic/p2p/message/ping.ex @@ -5,12 +5,12 @@ defmodule Archethic.P2P.Message.Ping do defstruct [] - alias Archethic.Crypto + alias Archethic.P2P.Message alias Archethic.P2P.Message.Ok @type t :: %__MODULE__{} - @spec process(__MODULE__.t(), Crypto.key()) :: Ok.t() + @spec process(__MODULE__.t(), Message.metadata()) :: Ok.t() def process(%__MODULE__{}, _), do: %Ok{} @spec serialize(t()) :: bitstring() diff --git a/lib/archethic/p2p/message/register_beacon_updates.ex b/lib/archethic/p2p/message/register_beacon_updates.ex index 3200210d27..cf61a63e5c 100644 --- a/lib/archethic/p2p/message/register_beacon_updates.ex +++ b/lib/archethic/p2p/message/register_beacon_updates.ex @@ -6,6 +6,7 @@ defmodule Archethic.P2P.Message.RegisterBeaconUpdates do alias Archethic.Crypto alias Archethic.Utils alias Archethic.BeaconChain + alias Archethic.P2P.Message alias Archethic.P2P.Message.Ok @enforce_keys [:node_public_key, :subset] @@ -16,7 +17,7 @@ defmodule Archethic.P2P.Message.RegisterBeaconUpdates do subset: binary() } - @spec process(__MODULE__.t(), Crypto.key()) :: Ok.t() + @spec process(__MODULE__.t(), Message.metadata()) :: Ok.t() def process(%__MODULE__{node_public_key: node_public_key, subset: subset}, _) do BeaconChain.subscribe_for_beacon_updates(subset, node_public_key) %Ok{} diff --git a/lib/archethic/p2p/message/replicate_transaction.ex b/lib/archethic/p2p/message/replicate_transaction.ex index 42d7a8a17c..105333d33d 100644 --- a/lib/archethic/p2p/message/replicate_transaction.ex +++ b/lib/archethic/p2p/message/replicate_transaction.ex @@ -8,6 +8,7 @@ defmodule Archethic.P2P.Message.ReplicateTransaction do alias Archethic.Crypto alias Archethic.Election alias Archethic.P2P + alias Archethic.P2P.Message alias Archethic.P2P.Message.ReplicationError alias Archethic.P2P.Message.Ok alias Archethic.Replication @@ -23,7 +24,7 @@ defmodule Archethic.P2P.Message.ReplicateTransaction do genesis_address: Crypto.prepended_hash() } - @spec process(__MODULE__.t(), Crypto.key()) :: Ok.t() | ReplicationError.t() + @spec process(__MODULE__.t(), Message.metadata()) :: Ok.t() | ReplicationError.t() def process( %__MODULE__{ transaction: diff --git a/lib/archethic/p2p/message/request_chain_lock.ex b/lib/archethic/p2p/message/request_chain_lock.ex index 1bf9ceeebd..67083985bb 100644 --- a/lib/archethic/p2p/message/request_chain_lock.ex +++ b/lib/archethic/p2p/message/request_chain_lock.ex @@ -8,13 +8,14 @@ defmodule Archethic.P2P.Message.RequestChainLock do alias Archethic.Crypto alias Archethic.Mining.ChainLock + alias Archethic.P2P.Message alias Archethic.P2P.Message.Ok alias Archethic.P2P.Message.Error alias Archethic.Utils @type t :: %__MODULE__{address: Crypto.prepended_hash(), hash: Crypto.versioned_hash()} - @spec process(__MODULE__.t(), Crypto.key()) :: Ok.t() | Error.t() + @spec process(__MODULE__.t(), Message.metadata()) :: Ok.t() | Error.t() def process(%__MODULE__{address: address, hash: hash}, _) do case ChainLock.lock(address, hash) do :ok -> %Ok{} diff --git a/lib/archethic/p2p/message/shard_repair.ex b/lib/archethic/p2p/message/shard_repair.ex index 978d610467..12dd194f0e 100644 --- a/lib/archethic/p2p/message/shard_repair.ex +++ b/lib/archethic/p2p/message/shard_repair.ex @@ -11,6 +11,7 @@ defmodule Archethic.P2P.Message.ShardRepair do alias Archethic.Utils alias Archethic.Utils.VarInt + alias Archethic.P2P.Message alias Archethic.P2P.Message.Ok @type t :: %__MODULE__{ @@ -19,7 +20,7 @@ defmodule Archethic.P2P.Message.ShardRepair do io_addresses: list(Crypto.prepended_hash()) } - @spec process(__MODULE__.t(), Crypto.key()) :: Ok.t() + @spec process(__MODULE__.t(), Message.metadata()) :: Ok.t() def process( %__MODULE__{ first_address: first_address, diff --git a/lib/archethic/p2p/message/start_mining.ex b/lib/archethic/p2p/message/start_mining.ex index 207768067d..e162401479 100644 --- a/lib/archethic/p2p/message/start_mining.ex +++ b/lib/archethic/p2p/message/start_mining.ex @@ -25,6 +25,7 @@ defmodule Archethic.P2P.Message.StartMining do alias Archethic.Mining alias Archethic.Utils alias Archethic.TransactionChain.Transaction + alias Archethic.P2P.Message alias Archethic.P2P.Message.Ok alias Archethic.P2P.Message.Error alias Archethic.SelfRepair.NetworkChain @@ -41,7 +42,7 @@ defmodule Archethic.P2P.Message.StartMining do contract_context: nil | Contract.Context.t() } - @spec process(__MODULE__.t(), Crypto.key()) :: Ok.t() | Error.t() + @spec process(__MODULE__.t(), Message.metadata()) :: Ok.t() | Error.t() def process( %__MODULE__{ transaction: tx = %Transaction{}, diff --git a/lib/archethic/p2p/message/unlock_chain.ex b/lib/archethic/p2p/message/unlock_chain.ex index 18c41e8be9..a19c7a0c02 100644 --- a/lib/archethic/p2p/message/unlock_chain.ex +++ b/lib/archethic/p2p/message/unlock_chain.ex @@ -4,6 +4,7 @@ defmodule Archethic.P2P.Message.UnlockChain do """ alias Archethic.Crypto alias Archethic.Mining.ChainLock + alias Archethic.P2P.Message alias Archethic.P2P.Message.Ok alias Archethic.Utils @@ -12,7 +13,7 @@ defmodule Archethic.P2P.Message.UnlockChain do @type t :: %__MODULE__{address: Crypto.prepended_hash()} - @spec process(__MODULE__.t(), Crypto.key()) :: Ok.t() + @spec process(__MODULE__.t(), Message.metadata()) :: Ok.t() def process(%__MODULE__{address: address}, _) do ChainLock.unlock(address) %Ok{} diff --git a/lib/archethic/p2p/message/validate_smart_contract_call.ex b/lib/archethic/p2p/message/validate_smart_contract_call.ex index 76748a573f..f6774e5afc 100644 --- a/lib/archethic/p2p/message/validate_smart_contract_call.ex +++ b/lib/archethic/p2p/message/validate_smart_contract_call.ex @@ -11,9 +11,9 @@ defmodule Archethic.P2P.Message.ValidateSmartContractCall do alias Archethic.Contracts.Contract.Failure alias Archethic.Contracts.Contract.ConditionRejected alias Archethic.Contracts.Contract.ActionWithTransaction - alias Archethic.Crypto alias Archethic.Mining alias Archethic.OracleChain + alias Archethic.P2P.Message alias Archethic.P2P.Message.SmartContractCallValidation alias Archethic.TransactionChain alias Archethic.TransactionChain.Transaction @@ -61,7 +61,7 @@ defmodule Archethic.P2P.Message.ValidateSmartContractCall do } end - @spec process(t(), Crypto.key()) :: SmartContractCallValidation.t() + @spec process(t(), Message.metadata()) :: SmartContractCallValidation.t() def process( msg = %__MODULE__{ recipient: %Recipient{address: recipient_address}, diff --git a/lib/archethic/p2p/message_envelop.ex b/lib/archethic/p2p/message_envelop.ex index 871c097391..886ca3adf9 100644 --- a/lib/archethic/p2p/message_envelop.ex +++ b/lib/archethic/p2p/message_envelop.ex @@ -9,7 +9,8 @@ defmodule Archethic.P2P.MessageEnvelop do :message, :sender_public_key, :signature, - :decrypted_raw_message + :decrypted_raw_message, + trace: "" ] alias Archethic.Crypto @@ -21,7 +22,8 @@ defmodule Archethic.P2P.MessageEnvelop do message_id: non_neg_integer(), sender_public_key: Crypto.key(), signature: binary(), - decrypted_raw_message: binary() | nil + decrypted_raw_message: binary() | nil, + trace: binary() } @doc """ @@ -32,15 +34,16 @@ defmodule Archethic.P2P.MessageEnvelop do message_id: message_id, sender_public_key: sender_public_key, message: message, - signature: signature + signature: signature, + trace: trace }) do encoded_message = message |> Message.encode() |> Utils.wrap_binary() - <> + <> end @doc """ @@ -52,7 +55,8 @@ defmodule Archethic.P2P.MessageEnvelop do message_id: message_id, sender_public_key: sender_public_key, message: message, - signature: signature + signature: signature, + trace: trace }, recipient_public_key ) @@ -63,8 +67,8 @@ defmodule Archethic.P2P.MessageEnvelop do |> Utils.wrap_binary() |> Crypto.ec_encrypt(recipient_public_key) - <> + <> end @doc """ @@ -76,8 +80,8 @@ defmodule Archethic.P2P.MessageEnvelop do def decode(<>) do key_size = Crypto.key_size(curve_id) - <> = rest + <> = rest {data, _} = Message.decode(message) @@ -88,15 +92,17 @@ defmodule Archethic.P2P.MessageEnvelop do message: data, sender_public_key: sender_public_key, signature: signature, - decrypted_raw_message: message + decrypted_raw_message: message, + trace: trace } end def decode(<>) do key_size = Crypto.key_size(curve_id) - <> = rest + <> = rest message = Crypto.ec_decrypt_with_first_node_key!(encrypted_message) @@ -109,7 +115,8 @@ defmodule Archethic.P2P.MessageEnvelop do message: data, sender_public_key: sender_public_key, signature: signature, - decrypted_raw_message: message + decrypted_raw_message: message, + trace: trace } end @@ -122,8 +129,9 @@ defmodule Archethic.P2P.MessageEnvelop do def decode_raw_message(<>) do key_size = Crypto.key_size(curve_id) - <<_public_key::binary-size(key_size), signature_size::8, - _signature::binary-size(signature_size), encrypted_message::bitstring>> = rest + <<_public_key::binary-size(key_size), trace_size::8, _trace::binary-size(trace_size), + signature_size::8, _signature::binary-size(signature_size), + encrypted_message::bitstring>> = rest {message_id, encrypted_message} end diff --git a/lib/archethic_web/dashboard_metrics_aggregator.ex b/lib/archethic_web/dashboard_metrics_aggregator.ex index 9bbc555f31..41c05fdcdc 100644 --- a/lib/archethic_web/dashboard_metrics_aggregator.ex +++ b/lib/archethic_web/dashboard_metrics_aggregator.ex @@ -136,7 +136,7 @@ defmodule ArchethicWeb.DashboardMetricsAggregator do case P2P.send_message( node, %GetDashboardData{since: since}, - @timeout_seconds * 1000 + timeout: @timeout_seconds * 1000 ) do {:ok, %DashboardData{buckets: buckets}} -> remote_buckets = prefix_buckets(first_public_key, buckets) diff --git a/test/archethic/p2p/client/connection_test.exs b/test/archethic/p2p/client/connection_test.exs index e3cc0bf040..a6f76f1eda 100644 --- a/test/archethic/p2p/client/connection_test.exs +++ b/test/archethic/p2p/client/connection_test.exs @@ -144,7 +144,7 @@ defmodule Archethic.P2P.Client.ConnectionTest do Connection.send_message( Crypto.first_node_public_key(), %GetTransaction{address: ArchethicCase.random_address()}, - 200 + timeout: 200 ) # ensure there was no delay @@ -202,7 +202,7 @@ defmodule Archethic.P2P.Client.ConnectionTest do Connection.send_message( Crypto.first_node_public_key(), %GetTransaction{address: ArchethicCase.random_address()}, - 10 + timeout: 10 ) assert {_, %{messages: %{}}} = :sys.get_state(pid) @@ -424,7 +424,7 @@ defmodule Archethic.P2P.Client.ConnectionTest do Connection.send_message( Crypto.first_node_public_key(), %GetTransaction{address: ArchethicCase.random_address()}, - 1000 + timeout: 1000 ) Process.sleep(10) @@ -441,7 +441,7 @@ defmodule Archethic.P2P.Client.ConnectionTest do Connection.send_message( Crypto.first_node_public_key(), %GetTransaction{address: ArchethicCase.random_address()}, - 1000 + timeout: 1000 ) Process.sleep(10) @@ -502,7 +502,7 @@ defmodule Archethic.P2P.Client.ConnectionTest do Connection.send_message( Crypto.first_node_public_key(), %GetTransaction{address: ArchethicCase.random_address()}, - 1000 + timeout: 1000 ) Process.sleep(10) @@ -544,7 +544,7 @@ defmodule Archethic.P2P.Client.ConnectionTest do Connection.send_message( node_key, %GetTransaction{address: ArchethicCase.random_address()}, - 10 + timeout: 10 ) Process.sleep(10) diff --git a/test/archethic/p2p_test.exs b/test/archethic/p2p_test.exs index 5e4b7fa310..9dede76d45 100644 --- a/test/archethic/p2p_test.exs +++ b/test/archethic/p2p_test.exs @@ -95,7 +95,7 @@ defmodule Archethic.P2PTest do |> expect( :send_message, 3, - fn _node, %GetTransaction{}, _timeout -> + fn _node, %GetTransaction{}, _opts -> {:ok, %Transaction{}} end ) @@ -106,16 +106,16 @@ defmodule Archethic.P2PTest do test "should run resolver conflicts when the results are different", %{nodes: nodes} do MockClient |> stub(:send_message, fn - %Node{port: 3004}, %GetTransaction{}, _timeout -> + %Node{port: 3004}, %GetTransaction{}, _opts -> {:ok, %Transaction{}} - %Node{port: 3003}, %GetTransaction{}, _timeout -> + %Node{port: 3003}, %GetTransaction{}, _opts -> {:ok, %NotFound{}} - %Node{port: 3002}, %GetTransaction{}, _timeout -> + %Node{port: 3002}, %GetTransaction{}, _opts -> {:ok, %NotFound{}} - _, %GetTransaction{}, _timeout -> + _, %GetTransaction{}, _opts -> {:ok, %Transaction{}} end) @@ -139,14 +139,14 @@ defmodule Archethic.P2PTest do |> expect( :send_message, 4, - fn _node, _message, _timeout -> + fn _node, _message, _opts -> {:ok, %Transaction{}} end ) |> expect( :send_message, 1, - fn _node, _message, _timeout -> + fn _node, _message, _opts -> :timer.sleep(200) {:ok, %NotFound{}} end diff --git a/test/archethic/self_repair/self_repair_test.exs b/test/archethic/self_repair/self_repair_test.exs index 735cfb5a8c..1eb4c62b53 100644 --- a/test/archethic/self_repair/self_repair_test.exs +++ b/test/archethic/self_repair/self_repair_test.exs @@ -63,9 +63,9 @@ defmodule Archethic.SelfRepairTest do end) MockClient - |> expect(:send_message, fn node, msg = %GetNextAddresses{address: "Alice2"}, timeout -> + |> expect(:send_message, fn node, msg = %GetNextAddresses{address: "Alice2"}, opts -> send(me, :get_next_addresses) - DefaultImpl.send_message(node, msg, timeout) + DefaultImpl.send_message(node, msg, opts) end) SelfRepair.update_last_address("Alice2", [node]) From 1e2c1bd8e0486208fe84be9b2d221d12ca6432d5 Mon Sep 17 00:00:00 2001 From: Samuel Manzanera Date: Tue, 2 Apr 2024 12:07:27 +0200 Subject: [PATCH 3/3] Use tracing in mining --- lib/archethic.ex | 16 +- lib/archethic/mining.ex | 25 +- lib/archethic/mining/distributed_workflow.ex | 239 +++++++++++++----- lib/archethic/mining/standalone_workflow.ex | 108 ++++++-- .../replicate_pending_transaction_chain.ex | 69 +++-- .../replication_attestation_message.ex | 39 ++- lib/archethic/p2p/message/start_mining.ex | 6 +- .../p2p/message/validate_transaction.ex | 35 ++- lib/archethic/utils.ex | 15 ++ .../explorer/controllers/faucet_controller.ex | 10 + lib/archethic_web/transaction_subscriber.ex | 8 + .../controllers/jsonrpc_controller_test.exs | 4 +- 12 files changed, 432 insertions(+), 142 deletions(-) diff --git a/lib/archethic.ex b/lib/archethic.ex index 8769be9ce5..5f5cb68d28 100644 --- a/lib/archethic.ex +++ b/lib/archethic.ex @@ -33,6 +33,7 @@ defmodule Archethic do alias Archethic.TransactionChain.TransactionInput require Logger + require OpenTelemetry.Tracer @doc """ Returns true if a node is up and false if it is down @@ -123,10 +124,23 @@ defmodule Archethic do contract_context: contract_context } + mining_span = OpenTelemetry.Tracer.start_span("mining") + OpenTelemetry.Tracer.set_current_span(mining_span) + OpenTelemetry.Span.set_attribute(mining_span, "address", Base.encode16(tx.address)) + + OpenTelemetry.Span.set_attribute( + mining_span, + "node", + welcome_node_key |> P2P.get_node_info!() |> Node.endpoint() + ) + + trace = Archethic.Utils.inject_propagated_context() + :persistent_term.put({:initial_mining_span, tx.address}, mining_span) + Task.Supervisor.async_stream_nolink( Archethic.TaskSupervisor, validation_nodes, - &P2P.send_message(&1, message), + &P2P.send_message(&1, message, trace: trace), ordered: false, on_timeout: :kill_task, timeout: Message.get_timeout(message) + 2000 diff --git a/lib/archethic/mining.ex b/lib/archethic/mining.ex index 402e216d38..b2ae0cb07c 100644 --- a/lib/archethic/mining.ex +++ b/lib/archethic/mining.ex @@ -47,9 +47,24 @@ defmodule Archethic.Mining do transaction :: Transaction.t(), welcome_node_public_key :: Crypto.key(), validation_node_public_keys :: list(Crypto.key()), - contract_context :: nil | Contract.Context.t() + contract_context :: nil | Contract.Context.t(), + request_metadata :: %{} ) :: {:ok, pid()} - def start(tx = %Transaction{}, welcome_node_public_key, [_ | []], contract_context) do + def start( + tx, + welcome_node_public_key, + validation_node_keys, + contract_context, + request_metadata \\ %{} + ) + + def start( + tx = %Transaction{}, + welcome_node_public_key, + [_ | []], + contract_context, + _ + ) do StandaloneWorkflow.start_link( transaction: tx, welcome_node: P2P.get_node_info!(welcome_node_public_key), @@ -61,7 +76,8 @@ defmodule Archethic.Mining do tx = %Transaction{}, welcome_node_public_key, validation_node_public_keys, - contract_context + contract_context, + request_metadata ) when is_binary(welcome_node_public_key) and is_list(validation_node_public_keys) do DynamicSupervisor.start_child(WorkerSupervisor, { @@ -70,7 +86,8 @@ defmodule Archethic.Mining do welcome_node: P2P.get_node_info!(welcome_node_public_key), validation_nodes: Enum.map(validation_node_public_keys, &P2P.get_node_info!/1), node_public_key: Crypto.last_node_public_key(), - contract_context: contract_context + contract_context: contract_context, + request_metadata: request_metadata }) end diff --git a/lib/archethic/mining/distributed_workflow.ex b/lib/archethic/mining/distributed_workflow.ex index c78872612e..483ed9215f 100644 --- a/lib/archethic/mining/distributed_workflow.ex +++ b/lib/archethic/mining/distributed_workflow.ex @@ -13,6 +13,7 @@ defmodule Archethic.Mining.DistributedWorkflow do If the atomic commitment is not reached, it starts the malicious detection to ban the dishonest nodes """ + require OpenTelemetry.Tracer alias Archethic.BeaconChain alias Archethic.BeaconChain.ReplicationAttestation alias Archethic.Crypto @@ -54,6 +55,7 @@ defmodule Archethic.Mining.DistributedWorkflow do alias Archethic.Utils require Logger + require OpenTelemetry.Tracer use GenStateMachine, callback_mode: [:handle_event_function, :state_enter], restart: :temporary @vsn 1 @@ -173,6 +175,9 @@ defmodule Archethic.Mining.DistributedWorkflow do node_public_key = Keyword.get(opts, :node_public_key) timeout = Keyword.get(opts, :timeout, get_mining_timeout(tx.type)) contract_context = Keyword.get(opts, :contract_context) + %{trace: trace} = Keyword.get(opts, :request_metadata, %{trace: ""}) + + Utils.extract_progagated_context(trace) Registry.register(WorkflowRegistry, tx.address, []) @@ -181,6 +186,15 @@ defmodule Archethic.Mining.DistributedWorkflow do transaction_type: tx.type ) + mining_span_context = OpenTelemetry.Tracer.start_span("start mining") + OpenTelemetry.Tracer.set_current_span(mining_span_context) + + OpenTelemetry.Span.set_attribute( + mining_span_context, + "node", + node_public_key |> P2P.get_node_info!() |> Node.endpoint() + ) + next_events = [ {:next_event, :internal, {:start_mining, tx, welcome_node, validation_nodes, contract_context}}, @@ -191,7 +205,8 @@ defmodule Archethic.Mining.DistributedWorkflow do %{ node_public_key: node_public_key, start_time: System.monotonic_time(), - timeout: timeout + timeout: timeout, + mining_span_context: mining_span_context }, next_events} end @@ -199,7 +214,7 @@ defmodule Archethic.Mining.DistributedWorkflow do :internal, {:start_mining, tx, welcome_node, validation_nodes, contract_context}, :idle, - data + data = %{mining_span_context: mining_span_context} ) do validation_time = DateTime.utc_now() |> DateTime.truncate(:millisecond) @@ -211,7 +226,9 @@ defmodule Archethic.Mining.DistributedWorkflow do previous_storage_nodes = Election.chain_storage_nodes(previous_address, authorized_nodes) {:ok, genesis_address} = - TransactionChain.fetch_genesis_address(previous_address, previous_storage_nodes) + OpenTelemetry.Tracer.with_span "fetch genesis address", %{links: [mining_span_context]} do + TransactionChain.fetch_genesis_address(previous_address, previous_storage_nodes) + end genesis_storage_nodes = Election.chain_storage_nodes(genesis_address, authorized_nodes) @@ -222,7 +239,10 @@ defmodule Archethic.Mining.DistributedWorkflow do authorized_nodes ) - resolved_addresses = TransactionChain.resolve_transaction_addresses!(tx) + resolved_addresses = + OpenTelemetry.Tracer.with_span "resolve addresses" do + TransactionChain.resolve_transaction_addresses!(tx) + end io_storage_nodes = if Transaction.network_type?(tx.type) do @@ -273,8 +293,13 @@ defmodule Archethic.Mining.DistributedWorkflow do ) do role = if node_public_key == coordinator_key, do: :coordinator, else: :cross_validator + pending_validation = + OpenTelemetry.Tracer.with_span "pending validation" do + PendingTransactionValidation.validate(tx, validation_time) + end + new_context = - case PendingTransactionValidation.validate(tx, validation_time) do + case pending_validation do :ok -> Logger.debug("Pending transaction valid", transaction_address: Base.encode16(tx.address), @@ -342,13 +367,15 @@ defmodule Archethic.Mining.DistributedWorkflow do {prev_tx, unspent_outputs, previous_storage_nodes, chain_storage_nodes_view, beacon_storage_nodes_view, io_storage_nodes_view} = - TransactionContext.get( - Transaction.previous_address(tx), - genesis_address, - Enum.map(chain_storage_nodes, & &1.first_public_key), - Enum.map(beacon_storage_nodes, & &1.first_public_key), - Enum.map(io_storage_nodes, & &1.first_public_key) - ) + OpenTelemetry.Tracer.with_span "fetch context" do + TransactionContext.get( + Transaction.previous_address(tx), + genesis_address, + Enum.map(chain_storage_nodes, & &1.first_public_key), + Enum.map(beacon_storage_nodes, & &1.first_public_key), + Enum.map(io_storage_nodes, & &1.first_public_key) + ) + end now = System.monotonic_time() @@ -431,7 +458,10 @@ defmodule Archethic.Mining.DistributedWorkflow do node_public_key: node_public_key, context: context }) do - notify_transaction_context(context, node_public_key) + OpenTelemetry.Tracer.with_span "notify context" do + notify_transaction_context(context, node_public_key) + end + :keep_state_and_data end @@ -476,9 +506,12 @@ defmodule Archethic.Mining.DistributedWorkflow do context: context = %ValidationContext{ transaction: tx - } + }, + mining_span_context: mining_span_context } ) do + OpenTelemetry.Tracer.set_current_span(mining_span_context) + Logger.info("Aggregate mining context", transaction_address: Base.encode16(tx.address), transaction_type: tx.type @@ -553,11 +586,26 @@ defmodule Archethic.Mining.DistributedWorkflow do _ -> new_context = context - |> ValidationContext.create_validation_stamp() + |> then(fn context -> + OpenTelemetry.Tracer.with_span "create stamp" do + ValidationContext.create_validation_stamp(context) + end + end) |> ValidationContext.create_replication_tree() + request_cross_validation_context = + OpenTelemetry.Tracer.start_span("request cross validations") + + OpenTelemetry.Tracer.set_current_span(request_cross_validation_context) + request_cross_validations(new_context) - {:next_state, :wait_cross_validation_stamps, %{data | context: new_context}} + + new_data = + data + |> Map.put(:context, new_context) + |> Map.put(:request_cross_validation_context, request_cross_validation_context) + + {:next_state, :wait_cross_validation_stamps, new_data} end end @@ -571,9 +619,12 @@ defmodule Archethic.Mining.DistributedWorkflow do context: context = %ValidationContext{ transaction: tx - } + }, + mining_span_context: mining_span_context } ) do + OpenTelemetry.Tracer.set_current_span(mining_span_context) + Logger.info("Cross validation", transaction_address: Base.encode16(tx.address), transaction_type: tx.type @@ -585,9 +636,15 @@ defmodule Archethic.Mining.DistributedWorkflow do |> ValidationContext.set_confirmed_validation_nodes(confirmed_cross_validation_nodes) |> ValidationContext.add_validation_stamp(validation_stamp) |> ValidationContext.add_replication_tree(replication_tree, node_public_key) - |> ValidationContext.cross_validate() + |> then(fn context -> + OpenTelemetry.Tracer.with_span "cross validation" do + ValidationContext.cross_validate(context) + end + end) - notify_cross_validation_stamp(new_context) + OpenTelemetry.Tracer.with_span "notify cross validation stamp" do + notify_cross_validation_stamp(new_context) + end confirmed_cross_validation_nodes = ValidationContext.get_confirmed_validation_nodes(new_context) @@ -623,7 +680,9 @@ defmodule Archethic.Mining.DistributedWorkflow do {:add_cross_validation_stamp, cross_validation_stamp = %CrossValidationStamp{}}, :wait_cross_validation_stamps, data = %{ - context: context = %ValidationContext{transaction: tx} + context: context = %ValidationContext{transaction: tx}, + request_cross_validation_context: request_cross_validation_context, + mining_span_context: mining_span_context } ) do Logger.info("Add cross validation stamp", @@ -634,6 +693,9 @@ defmodule Archethic.Mining.DistributedWorkflow do new_context = ValidationContext.add_cross_validation_stamp(context, cross_validation_stamp) if ValidationContext.enough_cross_validation_stamps?(new_context) do + OpenTelemetry.Tracer.end_span(request_cross_validation_context) + OpenTelemetry.Tracer.set_current_span(mining_span_context) + if ValidationContext.atomic_commitment?(new_context) do {:next_state, :replication, %{data | context: new_context}} else @@ -698,9 +760,21 @@ defmodule Archethic.Mining.DistributedWorkflow do transaction_type: type ) + request_replication_validation_span_context = + OpenTelemetry.Tracer.start_span("request replication validation") + + OpenTelemetry.Tracer.set_current_span(request_replication_validation_span_context) new_context = request_replication_validation(context, node_public_key) - {:keep_state, %{data | context: new_context}} + new_data = + data + |> Map.put(:context, new_context) + |> Map.put( + :request_replication_validation_span_context, + request_replication_validation_span_context + ) + + {:keep_state, new_data} err -> Logger.info("Skipped replication because validation failed: #{inspect(err)}") @@ -714,18 +788,39 @@ defmodule Archethic.Mining.DistributedWorkflow do :cast, {:add_replication_validation, node_public_key}, :replication, - data = %{context: context} + data = %{ + context: context, + request_replication_validation_span_context: + request_replication_validation_span_context, + mining_span_context: mining_span_context + } ) do validation_nodes = ValidationContext.get_validation_nodes(context) if Utils.key_in_node_list?(validation_nodes, node_public_key) do new_context = ValidationContext.add_replication_validation(context, node_public_key) - if ValidationContext.enough_replication_validations?(new_context) do - request_replication(new_context) - end + request_replication_span_context = + if ValidationContext.enough_replication_validations?(new_context) do + OpenTelemetry.Tracer.end_span(request_replication_validation_span_context) - {:keep_state, %{data | context: new_context}} + OpenTelemetry.Tracer.set_current_span(mining_span_context) + + request_replication_span_context = + OpenTelemetry.Tracer.start_span("request_replication") + + OpenTelemetry.Tracer.set_current_span(request_replication_span_context) + + request_replication(new_context) + request_replication_span_context + end + + new_data = + data + |> Map.put(:context, new_context) + |> Map.put(:request_replication_span_context, request_replication_span_context) + + {:keep_state, new_data} else :keep_state_and_data end @@ -742,9 +837,13 @@ defmodule Archethic.Mining.DistributedWorkflow do transaction: %Transaction{address: address, type: type}, validation_time: validation_time, genesis_address: genesis_address - } + }, + request_replication_span_context: request_replication_span_context, + mining_span_context: mining_span_context } ) do + OpenTelemetry.Tracer.set_current_span(request_replication_span_context) + with {:ok, node_index} <- ValidationContext.get_chain_storage_position(context, node_public_key), validated_tx <- ValidationContext.get_validated_transaction(context), @@ -760,6 +859,9 @@ defmodule Archethic.Mining.DistributedWorkflow do new_context = ValidationContext.add_storage_confirmation(context, node_index, signature) if ValidationContext.enough_storage_confirmations?(new_context) do + OpenTelemetry.Tracer.end_span(request_replication_span_context) + OpenTelemetry.Tracer.set_current_span(mining_span_context) + duration = System.monotonic_time() - start_time # send the mining_completed event @@ -806,26 +908,33 @@ defmodule Archethic.Mining.DistributedWorkflow do validated_tx = ValidationContext.get_validated_transaction(context) tx_summary = TransactionSummary.from_transaction(validated_tx, genesis_address) - message = - ReplicationAttestationMessage.from_replication_attestation(%ReplicationAttestation{ - transaction_summary: tx_summary, - confirmations: confirmations - }) + OpenTelemetry.Tracer.with_span "notify attestation" do + trace = Archethic.Utils.inject_propagated_context() - beacon_storage_nodes = ValidationContext.get_beacon_replication_nodes(context) + message = + ReplicationAttestationMessage.from_replication_attestation(%ReplicationAttestation{ + transaction_summary: tx_summary, + confirmations: confirmations + }) - [welcome_node | beacon_storage_nodes] - |> P2P.distinct_nodes() - |> P2P.broadcast_message(message) + beacon_storage_nodes = ValidationContext.get_beacon_replication_nodes(context) - validated_tx = ValidationContext.get_validated_transaction(context) + [welcome_node | beacon_storage_nodes] + |> P2P.distinct_nodes() + |> P2P.broadcast_message(message, trace: trace) - context - |> ValidationContext.get_io_replication_nodes() - |> P2P.broadcast_message(%ReplicateTransaction{ - transaction: validated_tx, - genesis_address: genesis_address - }) + validated_tx = ValidationContext.get_validated_transaction(context) + + context + |> ValidationContext.get_io_replication_nodes() + |> P2P.broadcast_message( + %ReplicateTransaction{ + transaction: validated_tx, + genesis_address: genesis_address + }, + trace: trace + ) + end :keep_state_and_data end @@ -838,15 +947,21 @@ defmodule Archethic.Mining.DistributedWorkflow do context: context = %ValidationContext{ transaction: tx - } + }, + mining_span_context: mining_span_context } ) do unless Transaction.network_type?(tx.type) do - context - |> ValidationContext.get_confirmed_replication_nodes() - |> P2P.broadcast_message(%NotifyPreviousChain{address: tx.address}) + OpenTelemetry.Tracer.with_span "notify previous chain" do + context + |> ValidationContext.get_confirmed_replication_nodes() + |> P2P.broadcast_message(%NotifyPreviousChain{address: tx.address}) + end end + OpenTelemetry.Tracer.set_current_span(mining_span_context) + OpenTelemetry.Tracer.end_span(mining_span_context) + :stop end @@ -1031,15 +1146,19 @@ defmodule Archethic.Mining.DistributedWorkflow do transaction_address: Base.encode16(tx_address) ) - P2P.send_message(coordinator_node, %AddMiningContext{ - address: tx_address, - utxos_hashes: Enum.map(unspent_outputs, &VersionedUnspentOutput.hash/1), - validation_node_public_key: node_public_key, - previous_storage_nodes_public_keys: Enum.map(previous_storage_nodes, & &1.last_public_key), - chain_storage_nodes_view: chain_storage_nodes_view, - beacon_storage_nodes_view: beacon_storage_nodes_view, - io_storage_nodes_view: io_storage_nodes_view - }) + P2P.send_message( + coordinator_node, + %AddMiningContext{ + address: tx_address, + utxos_hashes: Enum.map(unspent_outputs, &VersionedUnspentOutput.hash/1), + validation_node_public_key: node_public_key, + previous_storage_nodes_public_keys: + Enum.map(previous_storage_nodes, & &1.last_public_key), + chain_storage_nodes_view: chain_storage_nodes_view, + beacon_storage_nodes_view: beacon_storage_nodes_view, + io_storage_nodes_view: io_storage_nodes_view + } + ) end defp request_cross_validations( @@ -1122,11 +1241,13 @@ defmodule Archethic.Mining.DistributedWorkflow do inputs: aggregated_utxos } + trace = Archethic.Utils.inject_propagated_context() + results = Task.Supervisor.async_stream_nolink( Archethic.TaskSupervisor, storage_nodes, - &P2P.send_message(&1, message), + &P2P.send_message(&1, message, trace: trace), ordered: false, on_timeout: :kill_task, timeout: Message.get_timeout(message) + 2000, @@ -1190,7 +1311,9 @@ defmodule Archethic.Mining.DistributedWorkflow do genesis_address: genesis_address } - P2P.broadcast_message(storage_nodes, message) + P2P.broadcast_message(storage_nodes, message, + trace: Archethic.Utils.inject_propagated_context() + ) end defp notify_error(reason, %{ diff --git a/lib/archethic/mining/standalone_workflow.ex b/lib/archethic/mining/standalone_workflow.ex index d9414c7503..d971b82f5f 100644 --- a/lib/archethic/mining/standalone_workflow.ex +++ b/lib/archethic/mining/standalone_workflow.ex @@ -38,6 +38,7 @@ defmodule Archethic.Mining.StandaloneWorkflow do alias Archethic.TransactionChain.TransactionSummary require Logger + require OpenTelemetry.Tracer @mining_timeout Application.compile_env!(:archethic, [__MODULE__, :global_timeout]) @@ -65,6 +66,10 @@ defmodule Archethic.Mining.StandaloneWorkflow do transaction_type: tx.type ) + mining_span_context = OpenTelemetry.Tracer.start_span(:mining) + OpenTelemetry.Span.set_attribute(mining_span_context, "address", Base.encode16(tx.address)) + OpenTelemetry.Tracer.set_current_span(mining_span_context) + validation_time = DateTime.utc_now() |> DateTime.truncate(:millisecond) current_node = P2P.get_node_info() @@ -79,13 +84,18 @@ defmodule Archethic.Mining.StandaloneWorkflow do authorized_nodes ) - resolved_addresses = TransactionChain.resolve_transaction_addresses!(tx) + resolved_addresses = + OpenTelemetry.Tracer.with_span :resolve_address, %{links: [mining_span_context]} do + TransactionChain.resolve_transaction_addresses!(tx) + end previous_address = Transaction.previous_address(tx) previous_storage_nodes = Election.chain_storage_nodes(previous_address, authorized_nodes) {:ok, genesis_address} = - TransactionChain.fetch_genesis_address(previous_address, previous_storage_nodes) + OpenTelemetry.Tracer.with_span "fetch genesis address" do + TransactionChain.fetch_genesis_address(previous_address, previous_storage_nodes) + end genesis_storage_nodes = Election.chain_storage_nodes(genesis_address, authorized_nodes) @@ -103,13 +113,15 @@ defmodule Archethic.Mining.StandaloneWorkflow do {prev_tx, unspent_outputs, previous_storage_nodes, chain_storage_nodes_view, beacon_storage_nodes_view, io_storage_nodes_view} = - TransactionContext.get( - Transaction.previous_address(tx), - genesis_address, - Enum.map(chain_storage_nodes, & &1.first_public_key), - Enum.map(beacon_storage_nodes, & &1.first_public_key), - Enum.map(io_storage_nodes, & &1.first_public_key) - ) + OpenTelemetry.Tracer.with_span "fetch context" do + TransactionContext.get( + Transaction.previous_address(tx), + genesis_address, + Enum.map(chain_storage_nodes, & &1.first_public_key), + Enum.map(beacon_storage_nodes, & &1.first_public_key), + Enum.map(io_storage_nodes, & &1.first_public_key) + ) + end :telemetry.execute([:archethic, :mining, :fetch_context], %{ duration: System.monotonic_time() - start @@ -130,8 +142,13 @@ defmodule Archethic.Mining.StandaloneWorkflow do genesis_address: genesis_address ) + pending_validation = + OpenTelemetry.Tracer.with_span "pending validation" do + PendingTransactionValidation.validate(tx, validation_time) + end + validation_context = - case PendingTransactionValidation.validate(tx, validation_time) do + case pending_validation do :ok -> ValidationContext.set_pending_transaction_validation(validation_context, true) @@ -152,12 +169,14 @@ defmodule Archethic.Mining.StandaloneWorkflow do |> ValidationContext.add_aggregated_utxos(unspent_outputs) |> validate() - start_replication(validation_context) + request_replication_span = start_replication(validation_context) new_state = state |> Map.put(:context, validation_context) |> Map.put(:confirmations, []) + |> Map.put(:mining_span_context, mining_span_context) + |> Map.put(:request_replication_span, request_replication_span) {:noreply, new_state, @mining_timeout} end @@ -165,9 +184,17 @@ defmodule Archethic.Mining.StandaloneWorkflow do defp validate(context = %ValidationContext{}) do context |> ValidationContext.confirm_validation_node(Crypto.last_node_public_key()) - |> ValidationContext.create_validation_stamp() + |> then(fn context -> + OpenTelemetry.Tracer.with_span "create stamp" do + ValidationContext.create_validation_stamp(context) + end + end) |> ValidationContext.create_replication_tree() - |> ValidationContext.cross_validate() + |> then(fn context -> + OpenTelemetry.Tracer.with_span "cross validate" do + ValidationContext.cross_validate(context) + end + end) end defp start_replication( @@ -194,17 +221,19 @@ defmodule Archethic.Mining.StandaloneWorkflow do } results = - Task.Supervisor.async_stream_nolink( - Archethic.TaskSupervisor, - replication_nodes, - &P2P.send_message(&1, message), - ordered: false, - on_timeout: :kill_task, - timeout: Message.get_timeout(message) + 2000 - ) - |> Stream.filter(&match?({:ok, _}, &1)) - |> Stream.map(fn {:ok, {:ok, res}} -> res end) - |> Enum.to_list() + OpenTelemetry.Tracer.with_span "request validation" do + Task.Supervisor.async_stream_nolink( + Archethic.TaskSupervisor, + replication_nodes, + &P2P.send_message(&1, message), + ordered: false, + on_timeout: :kill_task, + timeout: Message.get_timeout(message) + 2000 + ) + |> Stream.filter(&match?({:ok, _}, &1)) + |> Stream.map(fn {:ok, {:ok, res}} -> res end) + |> Enum.to_list() + end if Enum.all?(results, &match?(%Ok{}, &1)) do Logger.info( @@ -213,10 +242,15 @@ defmodule Archethic.Mining.StandaloneWorkflow do transaction_type: validated_tx.type ) + replication_request_context = OpenTelemetry.Tracer.start_span("request_replication") + OpenTelemetry.Tracer.set_current_span(replication_request_context) + P2P.broadcast_message(replication_nodes, %ReplicatePendingTransactionChain{ address: validated_tx.address, genesis_address: genesis_address }) + + replication_request_context else errors = Enum.filter(results, &match?(%ReplicationError{}, &1)) @@ -227,6 +261,8 @@ defmodule Archethic.Mining.StandaloneWorkflow do _ -> send(self(), {:replication_error, :invalid_atomic_commitment}) end + + nil end end @@ -293,7 +329,9 @@ defmodule Archethic.Mining.StandaloneWorkflow do transaction: %Transaction{address: address, type: type}, validation_time: validation_time, genesis_address: genesis_address - } + }, + mining_span_context: mining_span_context, + request_replication_span: request_replication_span } ) do with {:ok, node_index} <- @@ -307,6 +345,10 @@ defmodule Archethic.Mining.StandaloneWorkflow do new_state = %{state | context: new_context} if ValidationContext.enough_storage_confirmations?(new_context) do + OpenTelemetry.Tracer.end_span(request_replication_span) + + OpenTelemetry.Tracer.set_current_span(mining_span_context) + duration = System.monotonic_time() - start_time # send the mining_completed event @@ -318,6 +360,8 @@ defmodule Archethic.Mining.StandaloneWorkflow do }) notify(new_state) + + OpenTelemetry.Tracer.end_span(mining_span_context) {:stop, :normal, new_state} else {:noreply, new_state} @@ -359,9 +403,17 @@ defmodule Archethic.Mining.StandaloneWorkflow do end defp notify(%{context: context}) do - notify_attestation(context) - notify_io_nodes(context) - notify_previous_chain(context) + OpenTelemetry.Tracer.with_span :notify_attestation do + notify_attestation(context) + end + + OpenTelemetry.Tracer.with_span "notify I/O" do + notify_io_nodes(context) + end + + OpenTelemetry.Tracer.with_span "notify previous chain" do + notify_previous_chain(context) + end :ok end diff --git a/lib/archethic/p2p/message/replicate_pending_transaction_chain.ex b/lib/archethic/p2p/message/replicate_pending_transaction_chain.ex index 3bf00766e9..969849c717 100644 --- a/lib/archethic/p2p/message/replicate_pending_transaction_chain.ex +++ b/lib/archethic/p2p/message/replicate_pending_transaction_chain.ex @@ -17,40 +17,59 @@ defmodule Archethic.P2P.Message.ReplicatePendingTransactionChain do alias Archethic.TransactionChain.VersionedTransactionInput alias Archethic.TransactionChain.TransactionSummary alias Archethic.P2P + alias Archethic.P2P.Node + alias Archethic.P2P.Message alias Archethic.P2P.Message.Ok alias Archethic.P2P.Message.Error alias Archethic.P2P.Message.AcknowledgeStorage + require OpenTelemetry.Tracer + @type t() :: %__MODULE__{ address: binary(), genesis_address: binary() } - @spec process(__MODULE__.t(), Crypto.key()) :: Ok.t() | Error.t() - def process(%__MODULE__{address: address, genesis_address: genesis_address}, sender_public_key) do - case Replication.get_transaction_in_commit_pool(address) do - {:ok, - tx = %Transaction{ - address: tx_address, - validation_stamp: %ValidationStamp{timestamp: validation_time} - }, validation_inputs} -> - Task.Supervisor.start_child(TaskSupervisor, fn -> - authorized_nodes = P2P.authorized_and_available_nodes(validation_time) - - Replication.sync_transaction_chain(tx, genesis_address, authorized_nodes) - - TransactionChain.write_inputs( - tx_address, - convert_unspent_outputs_to_inputs(validation_inputs) - ) - - P2P.send_message(sender_public_key, get_ack_storage(tx, genesis_address)) - end) - - %Ok{} - - {:error, :transaction_not_exists} -> - %Error{reason: :invalid_transaction} + @spec process(__MODULE__.t(), Message.metadata()) :: Ok.t() | Error.t() + def process( + %__MODULE__{address: address, genesis_address: genesis_address}, + %{ + sender_public_key: sender_public_key, + trace: trace + } + ) do + Utils.extract_progagated_context(trace) + + OpenTelemetry.Tracer.with_span "replicate transaction" do + OpenTelemetry.Tracer.set_attribute( + "node", + P2P.get_node_info() |> Node.endpoint() + ) + + case Replication.get_transaction_in_commit_pool(address) do + {:ok, + tx = %Transaction{ + address: tx_address, + validation_stamp: %ValidationStamp{timestamp: validation_time} + }, validation_inputs} -> + Task.Supervisor.start_child(TaskSupervisor, fn -> + authorized_nodes = P2P.authorized_and_available_nodes(validation_time) + + Replication.sync_transaction_chain(tx, genesis_address, authorized_nodes) + + TransactionChain.write_inputs( + tx_address, + convert_unspent_outputs_to_inputs(validation_inputs) + ) + + P2P.send_message(sender_public_key, get_ack_storage(tx, genesis_address)) + end) + + %Ok{} + + {:error, :transaction_not_exists} -> + %Error{reason: :invalid_transaction} + end end end diff --git a/lib/archethic/p2p/message/replication_attestation_message.ex b/lib/archethic/p2p/message/replication_attestation_message.ex index 6a5d6e4c6d..ade835df07 100644 --- a/lib/archethic/p2p/message/replication_attestation_message.ex +++ b/lib/archethic/p2p/message/replication_attestation_message.ex @@ -5,13 +5,17 @@ defmodule Archethic.P2P.Message.ReplicationAttestationMessage do """ alias Archethic.BeaconChain.ReplicationAttestation - alias Archethic.Crypto alias Archethic.PubSub + alias Archethic.P2P + alias Archethic.P2P.Node + alias Archethic.P2P.Message alias Archethic.P2P.Message.Ok alias Archethic.P2P.Message.Error alias Archethic.TransactionChain.TransactionSummary + alias Archethic.Utils require Logger + require OpenTelemetry.Tracer defstruct replication_attestation: %ReplicationAttestation{} @@ -19,7 +23,7 @@ defmodule Archethic.P2P.Message.ReplicationAttestationMessage do replication_attestation: ReplicationAttestation.t() } - @spec process(__MODULE__.t(), Crypto.key()) :: Ok.t() | Error.t() + @spec process(__MODULE__.t(), Message.metadata()) :: Ok.t() | Error.t() def process( %__MODULE__{ replication_attestation: @@ -30,20 +34,29 @@ defmodule Archethic.P2P.Message.ReplicationAttestationMessage do } } }, - _ + %{trace: trace} ) do - case ReplicationAttestation.validate(attestation) do - :ok -> - PubSub.notify_replication_attestation(attestation) - %Ok{} + Utils.extract_progagated_context(trace) - {:error, :invalid_confirmations_signatures} -> - Logger.error("Invalid attestation signatures", - transaction_address: Base.encode16(tx_address), - transaction_type: tx_type - ) + OpenTelemetry.Tracer.with_span "replicate attestation" do + OpenTelemetry.Tracer.set_attribute( + "node", + P2P.get_node_info() |> Node.endpoint() + ) - %Error{reason: :invalid_attestation} + case ReplicationAttestation.validate(attestation) do + :ok -> + PubSub.notify_replication_attestation(attestation) + %Ok{} + + {:error, :invalid_confirmations_signatures} -> + Logger.error("Invalid attestation signatures", + transaction_address: Base.encode16(tx_address), + transaction_type: tx_type + ) + + %Error{reason: :invalid_attestation} + end end end diff --git a/lib/archethic/p2p/message/start_mining.ex b/lib/archethic/p2p/message/start_mining.ex index e162401479..265da607ce 100644 --- a/lib/archethic/p2p/message/start_mining.ex +++ b/lib/archethic/p2p/message/start_mining.ex @@ -52,14 +52,16 @@ defmodule Archethic.P2P.Message.StartMining do p2p_view_hash: p2p_view_hash, contract_context: contract_context }, - _ + metadata ) do with :ok <- check_synchronization(network_chains_view_hash, p2p_view_hash), :ok <- check_valid_election(tx, validation_nodes), :ok <- check_current_node_is_elected(validation_nodes), :ok <- check_not_already_mining(tx.address), :ok <- Mining.request_chain_lock(tx) do - {:ok, _} = Mining.start(tx, welcome_node_public_key, validation_nodes, contract_context) + {:ok, _} = + Mining.start(tx, welcome_node_public_key, validation_nodes, contract_context, metadata) + %Ok{} else {:error, :invalid_validation_nodes_election} -> diff --git a/lib/archethic/p2p/message/validate_transaction.ex b/lib/archethic/p2p/message/validate_transaction.ex index 789887b7ef..09dbba60c1 100644 --- a/lib/archethic/p2p/message/validate_transaction.ex +++ b/lib/archethic/p2p/message/validate_transaction.ex @@ -8,28 +8,43 @@ defmodule Archethic.P2P.Message.ValidateTransaction do alias Archethic.TransactionChain.Transaction alias Archethic.P2P.Message.ReplicationError alias Archethic.P2P.Message.Ok + alias Archethic.P2P.Message + alias Archethic.P2P + alias Archethic.P2P.Node alias Archethic.Replication - alias Archethic.Crypto alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.VersionedUnspentOutput alias Archethic.Utils + require OpenTelemetry.Tracer + @type t :: %__MODULE__{ transaction: Transaction.t(), contract_context: nil | Contract.Context.t(), inputs: list(VersionedUnspentOutput.t()) } - @spec process(__MODULE__.t(), Crypto.key()) :: Ok.t() | ReplicationError.t() - def process(%__MODULE__{transaction: tx, contract_context: contract_context, inputs: inputs}, _) do - case Replication.validate_transaction(tx, contract_context, inputs) do - :ok -> - Replication.add_transaction_to_commit_pool(tx, inputs) - %Ok{} - - {:error, reason} -> - %ReplicationError{address: tx.address, reason: reason} + @spec process(__MODULE__.t(), Message.metadata()) :: Ok.t() | ReplicationError.t() + def process(%__MODULE__{transaction: tx, contract_context: contract_context, inputs: inputs}, %{ + trace: trace + }) do + Utils.extract_progagated_context(trace) + + OpenTelemetry.Tracer.with_span "validate transaction (storage)" do + OpenTelemetry.Tracer.set_attribute( + "node", + P2P.get_node_info() |> Node.endpoint() + ) + + case Replication.validate_transaction(tx, contract_context, inputs) do + :ok -> + Replication.add_transaction_to_commit_pool(tx, inputs) + %Ok{} + + {:error, reason} -> + %ReplicationError{address: tx.address, reason: reason} + end end end diff --git a/lib/archethic/utils.ex b/lib/archethic/utils.ex index d6678242f1..f4fcfd4999 100644 --- a/lib/archethic/utils.ex +++ b/lib/archethic/utils.ex @@ -1287,4 +1287,19 @@ defmodule Archethic.Utils do {Enum.reverse(items), more?, offset} end + + @spec extract_progagated_context(binary()) :: :ok + def extract_progagated_context(""), do: :ok + + def extract_progagated_context(trace) do + :otel_propagator_text_map.extract([{"traceparent", trace}]) + :otel_ctx.attach(:otel_ctx.get_current()) + end + + @spec inject_propagated_context() :: binary() + def inject_propagated_context do + :otel_propagator_text_map.inject([]) + |> Map.new() + |> Map.get("traceparent") + end end diff --git a/lib/archethic_web/explorer/controllers/faucet_controller.ex b/lib/archethic_web/explorer/controllers/faucet_controller.ex index 9151c61ec2..94e00eb28c 100644 --- a/lib/archethic_web/explorer/controllers/faucet_controller.ex +++ b/lib/archethic_web/explorer/controllers/faucet_controller.ex @@ -13,6 +13,8 @@ defmodule ArchethicWeb.Explorer.FaucetController do alias ArchethicWeb.TransactionSubscriber alias ArchethicWeb.Explorer.FaucetRateLimiter + require OpenTelemetry.Tracer + @pool_seed Application.compile_env(:archethic, [__MODULE__, :seed]) @faucet_rate_limit_expiry Application.compile_env(:archethic, :faucet_rate_limit_expiry) @@ -121,6 +123,14 @@ defmodule ArchethicWeb.Explorer.FaucetController do receive do {:new_transaction, ^tx_address} -> + try do + mining_span = :persistent_term.get({:initial_mining_span, tx_address}) + OpenTelemetry.Tracer.set_current_span(mining_span) + OpenTelemetry.Tracer.end_span(mining_span) + rescue + _ -> :ok + end + FaucetRateLimiter.register(recipient_address, System.monotonic_time()) {:ok, tx_address} after diff --git a/lib/archethic_web/transaction_subscriber.ex b/lib/archethic_web/transaction_subscriber.ex index 4433317a2b..5ed60a7f61 100644 --- a/lib/archethic_web/transaction_subscriber.ex +++ b/lib/archethic_web/transaction_subscriber.ex @@ -102,6 +102,14 @@ defmodule ArchethicWeb.TransactionSubscriber do transaction_confirmed: tx_address ) + try do + mining_span = :persistent_term.get({:initial_mining_span, tx_address}) + OpenTelemetry.Tracer.set_current_span(mining_span) + OpenTelemetry.Tracer.end_span(mining_span) + rescue + _ -> :ok + end + send(from, {:new_transaction, tx_address}) case Map.get(state, tx_address) do diff --git a/test/archethic_web/api/jsonrpc/controllers/jsonrpc_controller_test.exs b/test/archethic_web/api/jsonrpc/controllers/jsonrpc_controller_test.exs index dc8116dcf2..6a462e5249 100644 --- a/test/archethic_web/api/jsonrpc/controllers/jsonrpc_controller_test.exs +++ b/test/archethic_web/api/jsonrpc/controllers/jsonrpc_controller_test.exs @@ -20,7 +20,9 @@ defmodule ArchethicWeb.API.JsonRPCControllerTest do authorization_date: DateTime.utc_now(), available?: true, geo_patch: "AAA", - network_patch: "AAA" + network_patch: "AAA", + ip: {127, 0, 0, 1}, + port: 3_000 }) start_supervised!(NetworkView)