diff --git a/Common/Model/Notification.cs b/Common/Model/Notification.cs
index 7aacf0d..bc7c700 100644
--- a/Common/Model/Notification.cs
+++ b/Common/Model/Notification.cs
@@ -1,5 +1,4 @@
-using FHIRcastSandbox.Model;
-using Hl7.Fhir.Model;
+using Hl7.Fhir.Model;
using Hl7.Fhir.Serialization;
using Microsoft.AspNetCore.Mvc;
using Newtonsoft.Json;
@@ -80,7 +79,7 @@ public static Notification FromJson(string jsonString)
}
return notification;
- }
+ }
#endregion
#region Overrides
diff --git a/Common/Model/SubscriptionModels.cs b/Common/Model/SubscriptionModels.cs
index b6578ef..535676b 100644
--- a/Common/Model/SubscriptionModels.cs
+++ b/Common/Model/SubscriptionModels.cs
@@ -4,7 +4,12 @@
using System;
using System.Collections.Generic;
using System.Net.Http;
+using System.Net.Http.Headers;
+using System.Net.WebSockets;
+using System.Security.Cryptography;
using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
using System.Web;
namespace Common.Model
@@ -71,16 +76,45 @@ public override int GetHashCode()
public class SubscriptionRequest : SubscriptionBase
{
#region Properties
- [BindRequired]
+ [BindingBehavior(BindingBehavior.Optional)]
public string Callback { get; set; }
- [BindRequired]
+ [BindingBehavior(BindingBehavior.Optional)]
public string Secret { get; set; }
- public SubscriptionChannelType? ChannelType { get; set; }
+ [BindRequired]
+ public Channel Channel { get; set; }
[BindNever]
public HubDetails HubDetails { get; set; }
+
+ public string WebsocketURL { get; set; }
+ public WebSocket Websocket { get; set; }
+ ///
+ /// This is the key used to store the subscription in a dictionary (see Hub implementation).
+ ///
+ public string CollectionKey
+ {
+ get
+ {
+ try
+ {
+ if (Channel.Type == SubscriptionChannelType.webhook)
+ {
+ return Callback;
+ }
+ else
+ {
+ return WebsocketURL;
+ }
+ }
+ catch (Exception)
+ {
+ return "";
+ }
+
+ }
+ }
#endregion
#region Public Methods
@@ -91,14 +125,104 @@ public class SubscriptionRequest : SubscriptionBase
/// StringContent containing the SubscriptionRequest properties to be used in subscription requests
public HttpContent BuildPostHttpContent()
{
- string content = $"hub.callback={Callback}" +
- $"&hub.mode={Mode}" +
- $"&hub.topic={Topic}" +
- $"&hub.secret={Secret}" +
- $"&hub.events={string.Join(",", Events)}" +
- $"&hub.lease_seconds={Lease_Seconds}";
-
- return new StringContent(content, Encoding.UTF8, "application/x-www-form-urlencoded");
+ StringBuilder sb = new StringBuilder();
+
+ if (Channel.Type == SubscriptionChannelType.websocket)
+ {
+ sb.Append($"hub.channel.type={Channel.Type}");
+ }
+ else
+ {
+ sb.Append($"hub.callback={Callback}");
+ }
+
+ sb.Append($"&hub.mode={Mode}");
+ sb.Append($"&hub.topic={Topic}");
+ sb.Append($"&hub.secret={Secret}");
+ sb.Append($"&hub.events={string.Join(",", Events)}");
+ sb.Append($"&hub.lease_seconds={Lease_Seconds}");
+
+ return new StringContent(sb.ToString(), Encoding.UTF8, "application/x-www-form-urlencoded");
+ }
+
+ public string GetWebsocketUrl(string hubHost, int? port)
+ {
+ if (Channel.Type != SubscriptionChannelType.websocket)
+ {
+ throw new Exception("Channel type isn't websocket");
+ }
+
+ string guid = Guid.NewGuid().ToString("n");
+
+ Uri uri = new UriBuilder("ws", hubHost, (port == null) ? 0 : port.Value, guid).Uri;
+ WebsocketURL = (port == null) ? uri.GetComponents(UriComponents.AbsoluteUri & ~UriComponents.Port, UriFormat.UriEscaped) : uri.AbsoluteUri;
+ return WebsocketURL;
+ }
+
+ public async Task SendNotificationAsync(string jsonBody)
+ {
+ if (Channel.Type == SubscriptionChannelType.webhook)
+ {
+ return await SendWebhookNotificationAsync(jsonBody);
+ }
+ else if (Channel.Type == SubscriptionChannelType.websocket)
+ {
+ return await SendWebsocketNotificationAsync(jsonBody);
+ }
+ return false;
+ }
+ #endregion
+
+ #region Private Methods
+ private async Task SendWebsocketNotificationAsync(string jsonBody)
+ {
+ try
+ {
+ var buffer = Encoding.UTF8.GetBytes(jsonBody);
+ var segment = new ArraySegment(buffer);
+ await Websocket.SendAsync(segment, WebSocketMessageType.Text, true, default(CancellationToken));
+ return true;
+ }
+ catch (Exception)
+ {
+ throw;
+ }
+ }
+
+ private async Task SendWebhookNotificationAsync(string jsonBody)
+ {
+ HttpContent httpContent = new StringContent(jsonBody);
+
+ // Add the headers
+ httpContent.Headers.ContentType = new MediaTypeHeaderValue("application/json");
+ httpContent.Headers.Add("X-Hub-Signature", XHubSignature(jsonBody));
+
+ HttpClient client = new HttpClient();
+ var response = await client.PostAsync(this.Callback, httpContent);
+ return response.IsSuccessStatusCode;
+ }
+
+ ///
+ /// Calculates and returns the X-Hub-Signature header. Currently uses sha256
+ ///
+ /// Subscription to get the secret from
+ /// Body used to calculate the signature
+ /// The sha256 hash of the body using the subscription's secret
+ private string XHubSignature(string body)
+ {
+ using (HMACSHA256 sha256 = new HMACSHA256(Encoding.ASCII.GetBytes(this.Secret)))
+ {
+ byte[] bodyBytes = Encoding.UTF8.GetBytes(body);
+
+ byte[] hash = sha256.ComputeHash(bodyBytes);
+ StringBuilder stringBuilder = new StringBuilder(hash.Length * 2);
+ foreach (byte b in hash)
+ {
+ stringBuilder.AppendFormat("{0:x2}", b);
+ }
+
+ return "sha256=" + stringBuilder.ToString();
+ }
}
#endregion
@@ -138,11 +262,13 @@ public override int GetHashCode()
hashCode = hashCode * -1521134295 + base.GetHashCode();
hashCode = hashCode * -1521134295 + EqualityComparer.Default.GetHashCode(Callback);
hashCode = hashCode * -1521134295 + EqualityComparer.Default.GetHashCode(Secret);
- hashCode = hashCode * -1521134295 + EqualityComparer.Default.GetHashCode(ChannelType);
+ hashCode = hashCode * -1521134295 + EqualityComparer.Default.GetHashCode(Channel.Type);
hashCode = hashCode * -1521134295 + EqualityComparer.Default.GetHashCode(HubDetails);
return hashCode;
}
- #endregion
+
+
+ #endregion
}
///
@@ -219,6 +345,13 @@ public static SubscriptionVerification CreateSubscriptionVerification(Subscripti
}
#endregion
}
+
+ public class Channel
+ {
+ public SubscriptionChannelType Type { get; set; }
+ public string Endpoint { get; set; } //This isn't used yet
+ }
+
public class HubDetails
{
public string HubUrl { get; set; }
@@ -233,6 +366,7 @@ public enum SubscriptionMode
public enum SubscriptionChannelType
{
+ webhook,
websocket
}
}
diff --git a/Hub/Controllers/HubController.cs b/Hub/Controllers/HubController.cs
index 6c8f9c3..96efda3 100644
--- a/Hub/Controllers/HubController.cs
+++ b/Hub/Controllers/HubController.cs
@@ -7,9 +7,9 @@
using System;
using System.Collections.Generic;
using System.IO;
-using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
+using FHIRcastSandbox.Core;
namespace FHIRcastSandbox.Controllers
{
@@ -19,16 +19,16 @@ public class HubController : Controller
private readonly ILogger logger;
private readonly IBackgroundJobClient backgroundJobClient;
private readonly ISubscriptions subscriptions;
- private readonly INotifications notifications;
private readonly IContexts contexts;
+ private readonly InternalHub internalHub;
- public HubController(ILogger logger, IBackgroundJobClient backgroundJobClient, ISubscriptions subscriptions, INotifications notifications, IContexts contexts)
+ public HubController(ILogger logger, IBackgroundJobClient backgroundJobClient, ISubscriptions subscriptions, IContexts contexts, InternalHub internalHub)
{
this.backgroundJobClient = backgroundJobClient ?? throw new ArgumentNullException(nameof(backgroundJobClient));
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
this.subscriptions = subscriptions ?? throw new ArgumentNullException(nameof(subscriptions));
- this.notifications = notifications ?? throw new ArgumentNullException(nameof(notifications));
this.contexts = contexts ?? throw new ArgumentNullException(nameof(contexts));
+ this.internalHub = internalHub;
}
///
@@ -59,9 +59,25 @@ public IActionResult Subscribe([FromForm]SubscriptionRequest hub, bool _cancel =
return BadRequest(ModelState);
}
- backgroundJobClient.Enqueue(job => job.Run(hub, _cancel));
+ // Validate subscription here, if invalid then return error
+ if (!internalHub.topicConnectionIdMapping.ContainsKey(hub.Topic))
+ {
+ logger.LogError($"Could not find topic {hub.Topic}. Denying subscription request.");
+ return NotFound($"hub.topic={hub.Topic}");
+ }
- return Accepted();
+ if (hub.Channel.Type == SubscriptionChannelType.websocket)
+ {
+ string webSocketUrl = hub.GetWebsocketUrl(HttpContext.Request.Host.Host, HttpContext.Request.Host.Port);
+ subscriptions.AddPendingSubscription(hub, webSocketUrl);
+ return Accepted((object)webSocketUrl);
+ }
+ else
+ {
+ subscriptions.AddPendingSubscription(hub, hub.Callback);
+ backgroundJobClient.Enqueue(job => job.Run(hub, _cancel));
+ return Accepted();
+ }
}
///
@@ -103,7 +119,7 @@ public async Task Notify(string topicId)
var success = true;
foreach (var sub in subscriptions)
{
- success |= (await notifications.SendNotification(notification, sub)).IsSuccessStatusCode;
+ success |= await sub.SendNotificationAsync(notification.ToJson());
}
if (!success)
{
@@ -137,5 +153,20 @@ public object GetCurrentcontext(string topicId)
}
}
+
+ //Future update
+ //[Route("debug")]
+ //public IActionResult GetDebug()
+ //{
+ // HubDebugModel model = new HubDebugModel();
+ // model.PendingSubscriptions = (List) subscriptions.GetPendingSubscriptions();
+ // model.ActiveSubscriptions = (List)subscriptions.GetActiveSubscriptions();
+ // model.TopicConnections = new List();
+ // foreach (KeyValuePair keyValuePair in internalHub.topicConnectionIdMapping)
+ // {
+ // model.TopicConnections.Add($"Topic: {keyValuePair.Key} - Connection: {keyValuePair.Value}");
+ // }
+ // return View("Hub", model);
+ //}
}
}
diff --git a/Hub/Core/InternalHub.cs b/Hub/Core/InternalHub.cs
index 657e255..63206a4 100644
--- a/Hub/Core/InternalHub.cs
+++ b/Hub/Core/InternalHub.cs
@@ -25,7 +25,7 @@ public class InternalHub : Hub
/// TODO: Should the topic for the client be assigned by this hub? so when the client first starts up it asks
/// for its topic id which could then be this signalr hub's connectionid meaning we don't need this collection?
///
- Dictionary topicConnectionIdMapping = new Dictionary();
+ public Dictionary topicConnectionIdMapping = new Dictionary();
private readonly ILogger logger;
diff --git a/Hub/Interfaces.cs b/Hub/Interfaces.cs
index 5cc2aa8..10fd5c6 100644
--- a/Hub/Interfaces.cs
+++ b/Hub/Interfaces.cs
@@ -1,5 +1,4 @@
using Common.Model;
-using FHIRcastSandbox.Model;
using FHIRcastSandbox.Rules;
using System.Collections.Generic;
using System.Threading.Tasks;
@@ -11,15 +10,19 @@ public interface ISubscriptionValidator {
}
public interface ISubscriptions {
+ ICollection GetPendingSubscriptions();
ICollection GetActiveSubscriptions();
- void AddSubscription(SubscriptionRequest subscription);
+
void RemoveSubscription(SubscriptionRequest subscription);
ICollection GetSubscriptions(string topic, string notificationEvent);
- }
+ void AddPendingSubscription(SubscriptionRequest subscription, string key);
+ bool ActivatePendedSubscription(string key, out SubscriptionRequest subscription);
+ bool ActivatePendedSubscription(string key);
+ bool UnsubscribeSubscription(string key);
- public interface INotifications {
- Task SendNotification(Notification notification, SubscriptionRequest subscription);
+
}
+
public interface IContexts
{
string addContext();
diff --git a/Hub/Rules/HubSubscriptionCollection.cs b/Hub/Rules/HubSubscriptionCollection.cs
index 9b71e90..5a4d951 100644
--- a/Hub/Rules/HubSubscriptionCollection.cs
+++ b/Hub/Rules/HubSubscriptionCollection.cs
@@ -1,5 +1,6 @@
using Common.Model;
using Microsoft.Extensions.Logging;
+using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
@@ -9,42 +10,99 @@ namespace FHIRcastSandbox.Rules
public class HubSubscriptionCollection : ISubscriptions
{
private readonly ILogger logger;
- private ImmutableHashSet subscriptions = ImmutableHashSet.Empty;
+
+ private ConcurrentDictionary pendedSubscriptions;
+ private ConcurrentDictionary activeSubscriptions;
public HubSubscriptionCollection(ILogger logger)
{
this.logger = logger;
+
+ pendedSubscriptions = new ConcurrentDictionary();
+ activeSubscriptions = new ConcurrentDictionary();
}
+ public ICollection GetPendingSubscriptions()
+ {
+ return pendedSubscriptions.Values.ToList();
+ }
public ICollection GetActiveSubscriptions()
{
- return this.subscriptions;
+ return activeSubscriptions.Values.ToList();
}
+ ///
+ /// Get a list of subscriptions based on the topic and event. Used to get which subscriptions to notify
+ ///
+ ///
+ ///
+ ///
public ICollection GetSubscriptions(string topic, string notificationEvent)
{
- this.logger.LogDebug($"Finding subscriptions for topic: {topic} and event: {notificationEvent}");
- return this.subscriptions
+ logger.LogDebug($"Finding subscriptions for topic: {topic} and event: {notificationEvent}");
+
+ return activeSubscriptions
+ .Select(x => x.Value)
.Where(x => x.Topic == topic)
.Where(x => x.Events.Contains(notificationEvent))
.ToArray();
}
- public SubscriptionRequest GetSubscription(string topic)
+ public void RemoveSubscription(SubscriptionRequest subscription)
{
- return this.subscriptions.Where(x => x.Topic == topic).First();
+ logger.LogInformation($"Removing subscription {subscription}.");
+
+ activeSubscriptions.TryRemove(subscription.CollectionKey, out SubscriptionRequest value);
}
- public void AddSubscription(SubscriptionRequest subscription)
+ ///
+ /// Add a pending subscription. Store these before we validate it (webhook) or receive the websocket connection
+ ///
+ ///
+ ///
+ public void AddPendingSubscription(SubscriptionRequest subscription, string key)
{
- this.logger.LogInformation($"Adding subscription {subscription}.");
- this.subscriptions = this.subscriptions.Add(subscription);
+ pendedSubscriptions.AddOrUpdate(key, subscription, (k, o) => subscription);
}
- public void RemoveSubscription(SubscriptionRequest subscription)
+ ///
+ /// Moves the pending subscription to the active subscription collection. Call this once the webhook subscription
+ /// has been validated or we received the websocket connection.
+ ///
+ ///
+ ///
+ public bool ActivatePendedSubscription(string key)
{
- this.logger.LogInformation($"Removing subscription {subscription}.");
- this.subscriptions = this.subscriptions.Remove(subscription);
+ return ActivatePendedSubscription(key, out SubscriptionRequest sub);
}
+
+ ///
+ /// Moves the pending subscription to the active subscription collection. Call this once the webhook subscription
+ /// has been validated or we received the websocket connection.
+ ///
+ ///
+ ///
+ ///
+ public bool ActivatePendedSubscription(string key, out SubscriptionRequest subscription)
+ {
+ subscription = null;
+ if (!pendedSubscriptions.Remove(key, out SubscriptionRequest pendedSub))
+ {
+ return false;
+ }
+
+ activeSubscriptions.AddOrUpdate(key, pendedSub, (k, o) => pendedSub);
+ subscription = pendedSub;
+
+ return true;
+ }
+
+ public bool UnsubscribeSubscription(string key)
+ {
+ //TODO: Implement unsubscribe
+ return true;
+ }
+
+
}
}
diff --git a/Hub/Rules/Notifications.cs b/Hub/Rules/Notifications.cs
deleted file mode 100644
index 0dbfa8d..0000000
--- a/Hub/Rules/Notifications.cs
+++ /dev/null
@@ -1,67 +0,0 @@
-using Common.Model;
-using FHIRcastSandbox.Model;
-using Microsoft.Extensions.Logging;
-using System;
-using System.Net.Http;
-using System.Net.Http.Headers;
-using System.Security.Cryptography;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace FHIRcastSandbox.Rules
-{
- public class Notifications : INotifications
- {
- private ILogger> logger;
-
- public Notifications(ILogger> logger)
- {
- this.logger = logger;
- }
-
- public async Task SendNotification(Notification notification, SubscriptionRequest subscription)
- {
- // Create the JSON body
- string body = notification.ToJson();
- HttpContent httpContent = new StringContent(body);
-
- // Add the headers
- httpContent.Headers.ContentType = new MediaTypeHeaderValue("application/json");
- httpContent.Headers.Add("X-Hub-Signature", XHubSignature(subscription, body));
-
- this.logger.LogInformation($"Sending notification: " +
- $"{httpContent.Headers.ToString()}" +
- $"{body}");
-
- // Send notification
- HttpClient client = new HttpClient();
- var response = await client.PostAsync(subscription.Callback, httpContent);
-
- this.logger.LogDebug($"Got response from posting notification:{Environment.NewLine}{response}{Environment.NewLine}{await response.Content.ReadAsStringAsync()}.");
- return response;
- }
-
- ///
- /// Calculates and returns the X-Hub-Signature header. Currently uses sha256
- ///
- /// Subscription to get the secret from
- /// Body used to calculate the signature
- /// The sha256 hash of the body using the subscription's secret
- private string XHubSignature(SubscriptionRequest subscription, string body)
- {
- using (HMACSHA256 sha256 = new HMACSHA256(Encoding.ASCII.GetBytes(subscription.Secret)))
- {
- byte[] bodyBytes = Encoding.UTF8.GetBytes(body);
-
- byte[] hash = sha256.ComputeHash(bodyBytes);
- StringBuilder stringBuilder = new StringBuilder(hash.Length * 2);
- foreach (byte b in hash)
- {
- stringBuilder.AppendFormat("{0:x2}", b);
- }
-
- return "sha256=" + stringBuilder.ToString();
- }
- }
- }
-}
diff --git a/Hub/Rules/SubscriptionValidator.cs b/Hub/Rules/SubscriptionValidator.cs
index be35e19..04e59c3 100644
--- a/Hub/Rules/SubscriptionValidator.cs
+++ b/Hub/Rules/SubscriptionValidator.cs
@@ -23,10 +23,12 @@ public async Task ValidateSubscription(SubscriptionRequ
}
SubscriptionVerification verification = SubscriptionVerification.CreateSubscriptionVerification(subscription, (outcome == HubValidationOutcome.Canceled));
+
+ HttpResponseMessage response = new HttpResponseMessage();
Uri verificationUri = verification.VerificationURI();
logger.LogDebug($"Calling callback url: {verificationUri}");
- var response = await new HttpClient().GetAsync(verificationUri);
+ response = await new HttpClient().GetAsync(verificationUri);
if (outcome == HubValidationOutcome.Canceled)
{
@@ -34,21 +36,40 @@ public async Task ValidateSubscription(SubscriptionRequ
}
else
{
- if (!response.IsSuccessStatusCode)
+ if (await ValidVerificationResponseAsync(verification, response))
{
- logger.LogInformation($"Status code was not success but instead {response.StatusCode}");
- return ClientValidationOutcome.NotVerified;
+ return ClientValidationOutcome.Verified;
}
-
- var responseBody = (await response.Content.ReadAsStringAsync());
- if (responseBody != verification.Challenge)
+ else
{
- logger.LogInformation($"Callback result for verification request was not equal to challenge. Response body: '{responseBody}', Challenge: '{verification.Challenge}'.");
return ClientValidationOutcome.NotVerified;
}
+ }
+ }
- return ClientValidationOutcome.Verified;
+ ///
+ /// Validates the subscribing apps response to our verification. Confirms a successful status code
+ /// and the response matches our verification challenge.
+ ///
+ /// Verification intent sent to subscriber
+ /// Subscriber's HTTP response message
+ /// true if valid, else false
+ private async Task ValidVerificationResponseAsync(SubscriptionVerification verification, HttpResponseMessage response)
+ {
+ if (!response.IsSuccessStatusCode)
+ {
+ logger.LogInformation($"Status code was not success but instead {response.StatusCode}");
+ return false;
}
+
+ var responseBody = (await response.Content.ReadAsStringAsync());
+ if (responseBody != verification.Challenge)
+ {
+ logger.LogInformation($"Callback result for verification request was not equal to challenge. Response body: '{responseBody}', Challenge: '{verification.Challenge}'.");
+ return false;
+ }
+
+ return true;
}
}
diff --git a/Hub/Rules/ValidateSubscriptionJob.cs b/Hub/Rules/ValidateSubscriptionJob.cs
index 8901f36..5ccf852 100644
--- a/Hub/Rules/ValidateSubscriptionJob.cs
+++ b/Hub/Rules/ValidateSubscriptionJob.cs
@@ -23,7 +23,18 @@ public ValidateSubscriptionJob(ISubscriptionValidator validator, ISubscriptions
public async Task Run(SubscriptionRequest subscription, bool simulateCancellation)
{
HubValidationOutcome validationOutcome = simulateCancellation ? HubValidationOutcome.Canceled : HubValidationOutcome.Valid;
- var validationResult = await validator.ValidateSubscription(subscription, validationOutcome);
+ ClientValidationOutcome validationResult;
+
+ // Shouldn't have a websocket subscription here, we only need to validate webhook
+ if (subscription.Channel.Type == SubscriptionChannelType.websocket)
+ {
+ validationResult = ClientValidationOutcome.Verified;
+ return;
+ }
+ else
+ {
+ validationResult = await validator.ValidateSubscription(subscription, validationOutcome);
+ }
if (validationResult == ClientValidationOutcome.Verified)
{
@@ -31,7 +42,7 @@ public async Task Run(SubscriptionRequest subscription, bool simulateCancellatio
{
// Add subscription to collection and inform client
logger.LogInformation($"Adding verified subscription: {subscription}.");
- subscriptions.AddSubscription(subscription);
+ subscriptions.ActivatePendedSubscription(subscription.Callback);
await internalHub.NotifyClientOfSubscriber(subscription.Topic, subscription);
}
else if (subscription.Mode == SubscriptionMode.unsubscribe)
diff --git a/Hub/Rules/WebSocketMiddleware.cs b/Hub/Rules/WebSocketMiddleware.cs
new file mode 100644
index 0000000..22a7092
--- /dev/null
+++ b/Hub/Rules/WebSocketMiddleware.cs
@@ -0,0 +1,131 @@
+using System;
+using System.IO;
+using System.Net.WebSockets;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Common.Model;
+using FHIRcastSandbox.Core;
+using Hangfire;
+using Microsoft.AspNetCore.Http;
+using Microsoft.Extensions.Logging;
+
+namespace FHIRcastSandbox.Rules
+{
+ public class WebSocketMiddleware : IMiddleware
+ {
+ private readonly ISubscriptions subscriptions;
+ private readonly ILogger logger;
+ private readonly string webSocketProtocol;
+ private readonly IBackgroundJobClient backgroundJobClient;
+ private readonly ISubscriptionValidator validator;
+ private readonly InternalHub internalHub;
+
+ public WebSocketMiddleware(ILogger logger, ISubscriptions subscriptions, IBackgroundJobClient backgroundJobClient, ISubscriptionValidator validator, InternalHub internalHub)
+ {
+ this.subscriptions = subscriptions;
+ this.logger = logger;
+ this.backgroundJobClient = backgroundJobClient ?? throw new ArgumentNullException(nameof(backgroundJobClient));
+ this.validator = validator;
+ this.internalHub = internalHub;
+#if DEBUG
+ this.webSocketProtocol = "ws://";
+#else
+ this.webSocketProtocol = "wss://";
+#endif
+ }
+
+ ///
+ /// Websocket connection handler
+ ///
+ public async Task InvokeAsync(HttpContext context, RequestDelegate next)
+ {
+ if (!context.WebSockets.IsWebSocketRequest)
+ {
+ await next(context);
+ }
+ else
+ {
+ string key = $"{webSocketProtocol}{context.Request.Host}" +
+ $"{context.Request.PathBase}{ context.Request.Path.Value}";
+
+ WebSocket webSocket;
+ if (subscriptions.ActivatePendedSubscription(key, out SubscriptionRequest subscription))
+ {
+ webSocket = await context.WebSockets.AcceptWebSocketAsync();
+ subscription.Websocket = webSocket;
+
+ await internalHub.NotifyClientOfSubscriber(subscription.Topic, subscription);
+ }
+ else
+ {
+ return;
+ }
+
+ while (webSocket.State == WebSocketState.Open && !context.RequestAborted.IsCancellationRequested)
+ {
+ //There isn't anything we receive back on the websocket currently
+ //(in this case we are the hub so we only send out notifications over the websocket)
+ #region Read from websocket
+ //await webSocketConnections.ReceiveStringAsync(webSocket);
+ await ReceiveStringAsync(webSocket);
+
+ string socketdata = null;
+ try
+ {
+ socketdata = await ReceiveStringAsync(webSocket); //await webSocketConnections.ReceiveStringAsync(webSocket);
+ }
+ catch (Exception ex)
+ {
+ logger.LogError($"An exception occurred reading from web socket {subscription.WebsocketURL}:{Environment.NewLine}" +
+ $"{ex.Message}");
+ }
+ #endregion
+
+ #region Websocket close
+ if (webSocket.State != WebSocketState.Open)
+ {
+ logger.LogInformation($"Websocket {subscription.WebsocketURL} no longer open. state is {webSocket.State.ToString()}");
+ // gracefully handle aborted and intionally terminated websocket conections
+ if (webSocket.State == WebSocketState.CloseReceived)
+ {
+ logger.LogDebug($"websocket closing...");
+ await webSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "client requesting close", new CancellationToken());
+ logger.LogDebug($"websocket closed");
+ }
+ }
+ #endregion
+ }
+ }
+ }
+
+ private async Task ReceiveStringAsync(WebSocket socket, CancellationToken ct = default(CancellationToken))
+ {
+ var buffer = new ArraySegment(new byte[8192]);
+ using (var ms = new MemoryStream())
+ {
+ WebSocketReceiveResult result;
+ do
+ {
+ ct.ThrowIfCancellationRequested();
+
+ result = await socket.ReceiveAsync(buffer, ct);
+ ms.Write(buffer.Array, buffer.Offset, result.Count);
+ }
+ while (!result.EndOfMessage);
+
+ ms.Seek(0, SeekOrigin.Begin);
+ if (result.MessageType != WebSocketMessageType.Text)
+ {
+ return null;
+ }
+ using (var reader = new StreamReader(ms, Encoding.UTF8))
+ {
+ string data = await reader.ReadToEndAsync();
+ System.Diagnostics.Debug.WriteLine($"received data: {data}");
+ return data;
+ }
+ }
+ }
+ }
+}
diff --git a/Hub/Startup.cs b/Hub/Startup.cs
index a1d39d0..4a60b2e 100644
--- a/Hub/Startup.cs
+++ b/Hub/Startup.cs
@@ -6,7 +6,6 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using System;
-using System.Net.Http;
using FHIRcastSandbox.Core;
namespace FHIRcastSandbox
@@ -24,16 +23,17 @@ public void ConfigureServices(IServiceCollection services) {
services.AddHangfire(config => config
.UseNLogLogProvider()
.UseMemoryStorage());
+
services.AddSignalR();
services.AddTransient();
services.AddSingleton();
- services.AddSingleton, Notifications>();
services.AddSingleton();
services.AddSingleton(typeof(InternalHub));
services.AddTransient();
services.AddTransient();
+ services.AddTransient();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
@@ -50,6 +50,14 @@ public void Configure(IApplicationBuilder app, IHostingEnvironment env) {
route.MapHub("/internalHub");
});
+ var webSocketOptions = new WebSocketOptions()
+ {
+ KeepAliveInterval = TimeSpan.FromSeconds(120),
+ ReceiveBufferSize = 4 * 1024
+ };
+ app.UseWebSockets(webSocketOptions);
+ app.UseMiddleware();
+
JobActivator.Current = new ServiceProviderJobActivator(app.ApplicationServices);
}
}
diff --git a/Tests/SubscriptionTests.cs b/Tests/SubscriptionTests.cs
index babd009..faf22b7 100644
--- a/Tests/SubscriptionTests.cs
+++ b/Tests/SubscriptionTests.cs
@@ -255,11 +255,12 @@ public void SubscriptionRequest_SubscriptionVerification_EqualCases_ReturnTrue()
#region HTTP Tests
[Fact]
- public async void Post_HubController_FromForm_ValidData_SuccessResponse()
+ public async void Post_HubController_FromForm_ValidData_NotFoundResponse()
{
Dictionary formData = new Dictionary
{
{"hub.callback", "testcallback" },
+ {"hub.channel.type", "webhook" },
{"hub.mode", "subscribe" },
{"hub.topic", "testtopic" },
{"hub.events", "patient-open,patient-close" },
@@ -269,7 +270,7 @@ public async void Post_HubController_FromForm_ValidData_SuccessResponse()
var response = await _hubClient.PostAsync("api/hub", new FormUrlEncodedContent(formData));
- Assert.Equal(HttpStatusCode.Accepted, response.StatusCode);
+ Assert.Equal(HttpStatusCode.NotFound, response.StatusCode);
}
[Fact]
diff --git a/WebSubClient/Hubs/IWebSubClient.cs b/WebSubClient/Hubs/IWebSubClient.cs
index f802def..aabb817 100644
--- a/WebSubClient/Hubs/IWebSubClient.cs
+++ b/WebSubClient/Hubs/IWebSubClient.cs
@@ -15,6 +15,7 @@ public interface IWebSubClient
Task SubscriptionsChanged(List subscriptions);
Task SubscriberAdded(SubscriptionRequest subscriber);
Task SubscriberRemoved(SubscriptionRequest subscriber);
+ Task AddWebSocket(SubscriptionRequest subscription);
Task AlertMessage(string message);
}
diff --git a/WebSubClient/Hubs/WebSubClientHub.cs b/WebSubClient/Hubs/WebSubClientHub.cs
index ad6f1e8..2083093 100644
--- a/WebSubClient/Hubs/WebSubClientHub.cs
+++ b/WebSubClient/Hubs/WebSubClientHub.cs
@@ -7,6 +7,7 @@
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
+using System.IO;
using System.Net.Http;
using System.Security.Cryptography;
using System.Text;
@@ -57,7 +58,7 @@ private async void InternalHubClient_SubscriberRemoved(object sender, Subscripti
///
///
///
- public async Task Subscribe(string subscriptionUrl, string topic, string events, string[] httpHeaders)
+ public async Task Subscribe(string subscriptionUrl, string topic, string events, bool webSocket, string[] httpHeaders)
{
if (string.IsNullOrEmpty(subscriptionUrl))
{
@@ -96,6 +97,11 @@ public async Task Subscribe(string subscriptionUrl, string topic, string events,
}
};
+ subscription.Channel = new Channel()
+ {
+ Type = (webSocket) ? SubscriptionChannelType.websocket : SubscriptionChannelType.webhook
+ };
+
if (!await PendAndPostSubscription(clientId, subscription))
{
// I don't know do something
@@ -147,14 +153,33 @@ private async Task PendAndPostSubscription(string clientId, SubscriptionRe
{
_subscriptions.RemovePendingSubscription(clientId, subscriptionRequest);
}
- //else
- //{
- // await SubscriptionsChanged(clientId);
- //}
+
+ Stream receiveStream = await response.Content.ReadAsStreamAsync();
+ StreamReader readStream = new StreamReader(receiveStream, Encoding.UTF8);
+ string responseBody = readStream.ReadToEnd();
+
+ if (!String.IsNullOrEmpty(responseBody))
+ {
+ subscriptionRequest.WebsocketURL = responseBody;
+ await AddWebSocket(clientId, subscriptionRequest);
+ }
return response.IsSuccessStatusCode;
}
+ private async Task PostSubscriptionAsync(SubscriptionRequest subscription)
+ {
+ HttpClient client = new HttpClient();
+
+ foreach (string header in subscription.HubDetails.HttpHeaders)
+ {
+ string[] split = header.Split(":");
+ client.DefaultRequestHeaders.Add(split[0], split[1]);
+ }
+
+ return await client.PostAsync(subscription.HubDetails.HubUrl, subscription.BuildPostHttpContent());
+ }
+
///
/// Recieved an update from our client, send that notification to the hub (HubController -> Notify)
/// for it to send out to the awaiting subscriber
@@ -243,6 +268,12 @@ public async Task SubscriptionsChanged(string clientId)
await webSubClientHubContext.Clients.Client(clientId).SubscriptionsChanged(_subscriptions.ClientsSubscriptions(clientId));
}
+ public async Task AddWebSocket(string connectionId, SubscriptionRequest subscription)
+ {
+ logger.LogDebug($"Adding web socket for {connectionId}: {subscription.ToString()}");
+ await webSubClientHubContext.Clients.Client(connectionId).AddWebSocket(subscription);
+ }
+
public async Task AlertMessage(string connectionId, string message)
{
logger.LogDebug($"Alerting {connectionId}: {message}");
diff --git a/WebSubClient/Views/WebSubClient/WebSubClient.cshtml b/WebSubClient/Views/WebSubClient/WebSubClient.cshtml
index ea58710..97798b1 100644
--- a/WebSubClient/Views/WebSubClient/WebSubClient.cshtml
+++ b/WebSubClient/Views/WebSubClient/WebSubClient.cshtml
@@ -69,6 +69,8 @@
Close Study
+
+
diff --git a/WebSubClient/wwwroot/js/site.js b/WebSubClient/wwwroot/js/site.js
index 6563c5c..74c5a4a 100644
--- a/WebSubClient/wwwroot/js/site.js
+++ b/WebSubClient/wwwroot/js/site.js
@@ -52,7 +52,7 @@ connection.on("ReceivedNotification", (notification) => {
var ctrl, value;
if (eventObj.resourceType === eventResources.PATIENT) {
- ctrl = this["patientID"];
+ ctrl = this["patientID"];
} else if (eventObj.resourceType === eventResources.IMAGINGSTUDY) {
ctrl = this["accessionNumber"];
}
@@ -93,6 +93,42 @@ connection.on("SubscriberAdded", (subscription) => {
addSubscriptionToTable(subTable, subscription);
});
+connection.on("AddWebSocket", (subscription) => {
+ //popupNotification("New web socket " + subscription.websocketURL);
+
+ // Create WebSocket connection.
+ const socket = new WebSocket(subscription.websocketURL);
+
+ // Connection opened
+ socket.addEventListener('open', function (event) {
+ popupNotification("Websocket opened: " + subscription.websocketURL);
+ });
+
+ // Listen for messages
+ socket.addEventListener('message', function (event) {
+ //console.log('Message from server ', event.data);
+ var obj = JSON.parse(event.data);
+ popupNotification("Received web socket notification " + obj.event["hub.event"]);
+
+ let eventObj = parseNotificationIntoEventObj(obj);
+ var ctrl, value;
+
+ if (eventObj.resourceType === eventResources.PATIENT) {
+ ctrl = this["patientID"];
+ } else if (eventObj.resourceType === eventResources.IMAGINGSTUDY) {
+ ctrl = this["accessionNumber"];
+ }
+
+ if (eventObj.actionType === eventActions.OPEN) {
+ value = notification.event.context[0].idElement.value; // Just assume the first resource for now.
+ } else if (eventObj.actionType === eventActions.CLOSE) {
+ value = "";
+ }
+
+ ctrl.value = value;
+ });
+});
+
// Handles receiving a message from the hub to be displayed to the user
connection.on("AlertMessage", (message) => {
popupNotification(message);
@@ -204,6 +240,7 @@ $("#subscribe").submit(function (e) {
this["subscriptionUrl"].value,
this["topic"].value,
events,
+ this["useWebsockets"].checked,
headers)
.catch(e => console.error(e));