Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions Common/Model/Notification.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using FHIRcastSandbox.Model;
using Hl7.Fhir.Model;
using Hl7.Fhir.Model;
using Hl7.Fhir.Serialization;
using Microsoft.AspNetCore.Mvc;
using Newtonsoft.Json;
Expand Down Expand Up @@ -80,7 +79,7 @@ public static Notification FromJson(string jsonString)
}

return notification;
}
}
#endregion

#region Overrides
Expand Down
160 changes: 147 additions & 13 deletions Common/Model/SubscriptionModels.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Net.WebSockets;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Web;

namespace Common.Model
Expand Down Expand Up @@ -71,16 +76,45 @@ public override int GetHashCode()
public class SubscriptionRequest : SubscriptionBase
{
#region Properties
[BindRequired]
[BindingBehavior(BindingBehavior.Optional)]
public string Callback { get; set; }

[BindRequired]
[BindingBehavior(BindingBehavior.Optional)]
public string Secret { get; set; }

public SubscriptionChannelType? ChannelType { get; set; }
[BindRequired]
public Channel Channel { get; set; }

[BindNever]
public HubDetails HubDetails { get; set; }

public string WebsocketURL { get; set; }
public WebSocket Websocket { get; set; }
/// <summary>
/// This is the key used to store the subscription in a dictionary (see Hub implementation).
/// </summary>
public string CollectionKey
{
get
{
try
{
if (Channel.Type == SubscriptionChannelType.webhook)
{
return Callback;
}
else
{
return WebsocketURL;
}
}
catch (Exception)
{
return "";
}

}
}
#endregion

#region Public Methods
Expand All @@ -91,14 +125,104 @@ public class SubscriptionRequest : SubscriptionBase
/// <returns>StringContent containing the SubscriptionRequest properties to be used in subscription requests</returns>
public HttpContent BuildPostHttpContent()
{
string content = $"hub.callback={Callback}" +
$"&hub.mode={Mode}" +
$"&hub.topic={Topic}" +
$"&hub.secret={Secret}" +
$"&hub.events={string.Join(",", Events)}" +
$"&hub.lease_seconds={Lease_Seconds}";

return new StringContent(content, Encoding.UTF8, "application/x-www-form-urlencoded");
StringBuilder sb = new StringBuilder();

if (Channel.Type == SubscriptionChannelType.websocket)
{
sb.Append($"hub.channel.type={Channel.Type}");
}
else
{
sb.Append($"hub.callback={Callback}");
}

sb.Append($"&hub.mode={Mode}");
sb.Append($"&hub.topic={Topic}");
sb.Append($"&hub.secret={Secret}");
sb.Append($"&hub.events={string.Join(",", Events)}");
sb.Append($"&hub.lease_seconds={Lease_Seconds}");

return new StringContent(sb.ToString(), Encoding.UTF8, "application/x-www-form-urlencoded");
}

public string GetWebsocketUrl(string hubHost, int? port)
{
if (Channel.Type != SubscriptionChannelType.websocket)
{
throw new Exception("Channel type isn't websocket");
}

string guid = Guid.NewGuid().ToString("n");

Uri uri = new UriBuilder("ws", hubHost, (port == null) ? 0 : port.Value, guid).Uri;
WebsocketURL = (port == null) ? uri.GetComponents(UriComponents.AbsoluteUri & ~UriComponents.Port, UriFormat.UriEscaped) : uri.AbsoluteUri;
return WebsocketURL;
}

public async Task<bool> SendNotificationAsync(string jsonBody)
{
if (Channel.Type == SubscriptionChannelType.webhook)
{
return await SendWebhookNotificationAsync(jsonBody);
}
else if (Channel.Type == SubscriptionChannelType.websocket)
{
return await SendWebsocketNotificationAsync(jsonBody);
}
return false;
}
#endregion

#region Private Methods
private async Task<bool> SendWebsocketNotificationAsync(string jsonBody)
{
try
{
var buffer = Encoding.UTF8.GetBytes(jsonBody);
var segment = new ArraySegment<byte>(buffer);
await Websocket.SendAsync(segment, WebSocketMessageType.Text, true, default(CancellationToken));
return true;
}
catch (Exception)
{
throw;
}
}

private async Task<bool> SendWebhookNotificationAsync(string jsonBody)
{
HttpContent httpContent = new StringContent(jsonBody);

// Add the headers
httpContent.Headers.ContentType = new MediaTypeHeaderValue("application/json");
httpContent.Headers.Add("X-Hub-Signature", XHubSignature(jsonBody));

HttpClient client = new HttpClient();
var response = await client.PostAsync(this.Callback, httpContent);
return response.IsSuccessStatusCode;
}

/// <summary>
/// Calculates and returns the X-Hub-Signature header. Currently uses sha256
/// </summary>
/// <param name="subscription">Subscription to get the secret from</param>
/// <param name="body">Body used to calculate the signature</param>
/// <returns>The sha256 hash of the body using the subscription's secret</returns>
private string XHubSignature(string body)
{
using (HMACSHA256 sha256 = new HMACSHA256(Encoding.ASCII.GetBytes(this.Secret)))
{
byte[] bodyBytes = Encoding.UTF8.GetBytes(body);

byte[] hash = sha256.ComputeHash(bodyBytes);
StringBuilder stringBuilder = new StringBuilder(hash.Length * 2);
foreach (byte b in hash)
{
stringBuilder.AppendFormat("{0:x2}", b);
}

return "sha256=" + stringBuilder.ToString();
}
}
#endregion

Expand Down Expand Up @@ -138,11 +262,13 @@ public override int GetHashCode()
hashCode = hashCode * -1521134295 + base.GetHashCode();
hashCode = hashCode * -1521134295 + EqualityComparer<string>.Default.GetHashCode(Callback);
hashCode = hashCode * -1521134295 + EqualityComparer<string>.Default.GetHashCode(Secret);
hashCode = hashCode * -1521134295 + EqualityComparer<SubscriptionChannelType?>.Default.GetHashCode(ChannelType);
hashCode = hashCode * -1521134295 + EqualityComparer<SubscriptionChannelType?>.Default.GetHashCode(Channel.Type);
hashCode = hashCode * -1521134295 + EqualityComparer<HubDetails>.Default.GetHashCode(HubDetails);
return hashCode;
}
#endregion


#endregion
}

/// <summary>
Expand Down Expand Up @@ -219,6 +345,13 @@ public static SubscriptionVerification CreateSubscriptionVerification(Subscripti
}
#endregion
}

public class Channel
{
public SubscriptionChannelType Type { get; set; }
public string Endpoint { get; set; } //This isn't used yet
}

public class HubDetails
{
public string HubUrl { get; set; }
Expand All @@ -233,6 +366,7 @@ public enum SubscriptionMode

public enum SubscriptionChannelType
{
webhook,
websocket
}
}
45 changes: 38 additions & 7 deletions Hub/Controllers/HubController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
using FHIRcastSandbox.Core;

namespace FHIRcastSandbox.Controllers
{
Expand All @@ -19,16 +19,16 @@ public class HubController : Controller
private readonly ILogger<HubController> logger;
private readonly IBackgroundJobClient backgroundJobClient;
private readonly ISubscriptions subscriptions;
private readonly INotifications<HttpResponseMessage> notifications;
private readonly IContexts contexts;
private readonly InternalHub internalHub;

public HubController(ILogger<HubController> logger, IBackgroundJobClient backgroundJobClient, ISubscriptions subscriptions, INotifications<HttpResponseMessage> notifications, IContexts contexts)
public HubController(ILogger<HubController> logger, IBackgroundJobClient backgroundJobClient, ISubscriptions subscriptions, IContexts contexts, InternalHub internalHub)
{
this.backgroundJobClient = backgroundJobClient ?? throw new ArgumentNullException(nameof(backgroundJobClient));
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
this.subscriptions = subscriptions ?? throw new ArgumentNullException(nameof(subscriptions));
this.notifications = notifications ?? throw new ArgumentNullException(nameof(notifications));
this.contexts = contexts ?? throw new ArgumentNullException(nameof(contexts));
this.internalHub = internalHub;
}

/// <summary>
Expand Down Expand Up @@ -59,9 +59,25 @@ public IActionResult Subscribe([FromForm]SubscriptionRequest hub, bool _cancel =
return BadRequest(ModelState);
}

backgroundJobClient.Enqueue<ValidateSubscriptionJob>(job => job.Run(hub, _cancel));
// Validate subscription here, if invalid then return error
if (!internalHub.topicConnectionIdMapping.ContainsKey(hub.Topic))
{
logger.LogError($"Could not find topic {hub.Topic}. Denying subscription request.");
return NotFound($"hub.topic={hub.Topic}");
}

return Accepted();
if (hub.Channel.Type == SubscriptionChannelType.websocket)
{
string webSocketUrl = hub.GetWebsocketUrl(HttpContext.Request.Host.Host, HttpContext.Request.Host.Port);
subscriptions.AddPendingSubscription(hub, webSocketUrl);
return Accepted((object)webSocketUrl);
}
else
{
subscriptions.AddPendingSubscription(hub, hub.Callback);
backgroundJobClient.Enqueue<ValidateSubscriptionJob>(job => job.Run(hub, _cancel));
return Accepted();
}
}

/// <summary>
Expand Down Expand Up @@ -103,7 +119,7 @@ public async Task<IActionResult> Notify(string topicId)
var success = true;
foreach (var sub in subscriptions)
{
success |= (await notifications.SendNotification(notification, sub)).IsSuccessStatusCode;
success |= await sub.SendNotificationAsync(notification.ToJson());
}
if (!success)
{
Expand Down Expand Up @@ -137,5 +153,20 @@ public object GetCurrentcontext(string topicId)
}

}

//Future update
//[Route("debug")]
//public IActionResult GetDebug()
//{
// HubDebugModel model = new HubDebugModel();
// model.PendingSubscriptions = (List<SubscriptionRequest>) subscriptions.GetPendingSubscriptions();
// model.ActiveSubscriptions = (List<SubscriptionRequest>)subscriptions.GetActiveSubscriptions();
// model.TopicConnections = new List<string>();
// foreach (KeyValuePair<string, string> keyValuePair in internalHub.topicConnectionIdMapping)
// {
// model.TopicConnections.Add($"Topic: {keyValuePair.Key} - Connection: {keyValuePair.Value}");
// }
// return View("Hub", model);
//}
}
}
2 changes: 1 addition & 1 deletion Hub/Core/InternalHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class InternalHub : Hub<IInternalHubClient>
/// TODO: Should the topic for the client be assigned by this hub? so when the client first starts up it asks
/// for its topic id which could then be this signalr hub's connectionid meaning we don't need this collection?
/// </summary>
Dictionary<string, string> topicConnectionIdMapping = new Dictionary<string, string>();
public Dictionary<string, string> topicConnectionIdMapping = new Dictionary<string, string>();

private readonly ILogger<InternalHub> logger;

Expand Down
13 changes: 8 additions & 5 deletions Hub/Interfaces.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using Common.Model;
using FHIRcastSandbox.Model;
using FHIRcastSandbox.Rules;
using System.Collections.Generic;
using System.Threading.Tasks;
Expand All @@ -11,15 +10,19 @@ public interface ISubscriptionValidator {
}

public interface ISubscriptions {
ICollection<SubscriptionRequest> GetPendingSubscriptions();
ICollection<SubscriptionRequest> GetActiveSubscriptions();
void AddSubscription(SubscriptionRequest subscription);

void RemoveSubscription(SubscriptionRequest subscription);
ICollection<SubscriptionRequest> GetSubscriptions(string topic, string notificationEvent);
}
void AddPendingSubscription(SubscriptionRequest subscription, string key);
bool ActivatePendedSubscription(string key, out SubscriptionRequest subscription);
bool ActivatePendedSubscription(string key);
bool UnsubscribeSubscription(string key);

public interface INotifications<T> {
Task<T> SendNotification(Notification notification, SubscriptionRequest subscription);

}

public interface IContexts
{
string addContext();
Expand Down
Loading