Skip to content
This repository was archived by the owner on Jan 27, 2022. It is now read-only.

Bug fix to address work order receipt flow in work order sync mode execution #440

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ services:
- 5555
command: |
bash -c "
avalon_listener --bind http://avalon-listener:1947 --lmdb_url http://avalon-lmdb:9090
avalon_listener --bind http://avalon-listener:1947 --lmdb_url http://avalon-lmdb:9090 --zmq_url tcp://avalon-enclave-manager:5555
tail -f /dev/null
"
depends_on:
Expand Down
90 changes: 45 additions & 45 deletions enclave_manager/avalon_enclave_manager/work_order_kv_delegate.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,49 +124,49 @@ def update_receipt(self, wo_id, wo_json_resp):
the receipt.
"""
receipt_entry = self._kv_helper.get("wo-receipts", wo_id)
if receipt_entry:
update_type = None
if "error" in wo_json_resp and \
wo_json_resp["error"]["code"] != \
WorkOrderStatus.PENDING.value:
update_type = ReceiptCreateStatus.FAILED.value
else:
update_type = ReceiptCreateStatus.PROCESSED.value
receipt_obj = WorkOrderReceiptRequest()
wo_receipt = receipt_obj.update_receipt(
wo_id,
update_type,
wo_json_resp,
self.private_key
)
updated_receipt = None
# load previous updates to receipt
updates_to_receipt = self._kv_helper.get(
"wo-receipt-updates", wo_id)
# If it is first update to receipt
if updates_to_receipt is None:
updated_receipt = []
else:
updated_receipt = json.loads(updates_to_receipt)
# Get the last update to receipt
last_receipt = updated_receipt[len(updated_receipt) - 1]

# If receipt updateType is completed,
# then no further update allowed
if last_receipt["updateType"] == \
ReceiptCreateStatus.COMPLETED.value:
logger.info(
"Receipt for the workorder id %s is completed " +
"and no further updates are allowed",
wo_id)
return
updated_receipt.append(wo_receipt)

# Since receipts_json is jrpc request updating only params object.
self._kv_helper.set("wo-receipt-updates", wo_id, json.dumps(
updated_receipt))
logger.info("Receipt for the workorder id %s is updated to %s",
wo_id, wo_receipt)
# If receipt is not created yet, add tag "receiptUpdates" to
# Receipt entry and update it
if receipt_entry is None:
receipt_entry = {
"params": {
"receiptUpdates": []
}
}
# load previous updates to receipt
receipt_update_entry = receipt_entry["params"]["receiptUpdates"]
update_type = None
if "error" in wo_json_resp and \
wo_json_resp["error"]["code"] != \
WorkOrderStatus.PENDING.value:
update_type = ReceiptCreateStatus.FAILED.value
else:
logger.info("Work order receipt is not created, " +
"so skipping the update")
update_type = ReceiptCreateStatus.PROCESSED.value
receipt_obj = WorkOrderReceiptRequest()
wo_receipt = receipt_obj.update_receipt(
wo_id,
update_type,
wo_json_resp,
self.private_key
)

# If it is first update to receipt
if len(receipt_update_entry) > 0:
# Get the last update to receipt
last_receipt = receipt_update_entry[len(receipt_update_entry) - 1]
# If receipt updateType is completed,
# then no further update allowed
if last_receipt["updateType"] == \
ReceiptCreateStatus.COMPLETED.value:
logger.info(
"Receipt for the workorder id %s is completed " +
"and no further updates are allowed",
wo_id)
return
receipt_update_entry.append(wo_receipt)

# Since receipts_json is jrpc request updating only params object.
receipt_entry["receiptUpdates"] = receipt_update_entry
self._kv_helper.set("wo-receipts", wo_id, json.dumps(
receipt_entry))
logger.info("Receipt for the workorder id %s is updated to %s",
wo_id, wo_receipt)
26 changes: 24 additions & 2 deletions listener/avalon_listener/tcs_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import sys
import logging
import argparse
from urllib.parse import urlparse

from avalon_listener.tcs_work_order_handler import TCSWorkOrderHandler
from avalon_listener.tcs_work_order_handler_sync import TCSWorkOrderHandlerSync
Expand Down Expand Up @@ -72,8 +73,7 @@ def __init__(self, config):
self.workorder_handler = TCSWorkOrderHandlerSync(
self.kv_helper,
config["Listener"]["max_work_order_count"],
config["Listener"]["zmq_url"],
config["Listener"]["zmq_port"])
config["Listener"]["zmq_url"])
else:
self.workorder_handler = TCSWorkOrderHandler(
self.kv_helper,
Expand Down Expand Up @@ -121,6 +121,13 @@ def parse_command_line(config, args):
'--bind', help='URI to listen for requests ', type=str)
parser.add_argument(
'--lmdb_url', help='DB url to connect to LMDB ', type=str)
# Check if listener is running in sync work load
# execution mode then add additional argument zmq url
is_sync = config["WorkloadExecution"]["sync_workload_execution"]
if is_sync:
parser.add_argument(
'--zmq_url',
help='ZMQ url to connect to enclave manager ', type=str)

options = parser.parse_args(args)

Expand Down Expand Up @@ -150,6 +157,21 @@ def parse_command_line(config, args):
logger.error("Quit : remote_storage_url is not \
present in config for Listener")
sys.exit(-1)
if options.zmq_url:
if not is_sync:
logger.error("Invalid option zmq_url! It should be supported"
"in work order sync mode")
sys.exit(-1)
else:
if config.get("Listener") is None or \
config["Listener"].get("zmq_url") is None:
logger.error("Quit : no zmq_url config found for Listener")
sys.exit(-1)
parse_res = urlparse(options.zmq_url)
if parse_res.scheme != "tcp" or parse_res.port == "":
logger.error("Invalid zmq url. It should tcp://<host>:<port>")
sys.exit(-1)
config["Listener"]["zmq_url"] = options.zmq_url

return host_name, port

Expand Down
7 changes: 3 additions & 4 deletions listener/avalon_listener/tcs_work_order_handler_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,13 @@ class TCSWorkOrderHandlerSync(TCSWorkOrderHandler):
"""
# ------------------------------------------------------------------------------------------------

def __init__(self, kv_helper, max_wo_count, zmq_url, zmq_port):
def __init__(self, kv_helper, max_wo_count, zmq_url):
"""
Function to perform init activity
Parameters:
- kv_helper is a object of lmdb database
"""
self.zmq_url = zmq_url
self.zmq_port_number = zmq_port
super(TCSWorkOrderHandlerSync, self).__init__(kv_helper, max_wo_count)

# ---------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -145,11 +144,11 @@ def WorkOrderSubmit(self, **params):
# ZeroMQ for sync workorder processing
try:
socket = context.socket(zmq.REQ)
socket.connect(self.zmq_url + self.zmq_port_number)
socket.connect(self.zmq_url)
socket.send_string(wo_id, flags=0, encoding='utf-8')
replymessage = socket.recv()
logger.info(replymessage)
socket.disconnect(self.zmq_url + self.zmq_port_number)
socket.disconnect(self.zmq_url)
except Exception as er:
raise JSONRPCDispatchException(
WorkOrderStatus.UNKNOWN_ERROR,
Expand Down
57 changes: 40 additions & 17 deletions listener/avalon_listener/tcs_workorder_receipt_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def WorkOrderReceiptCreate(self, **params):
wo_id = params["workOrderId"]
input_json_str = params["raw"]
input_value = json.loads(input_json_str)

# Check if work order id exists
wo_request = self.kv_helper.get("wo-requests", wo_id)
if wo_request is None:
raise JSONRPCDispatchException(
Expand All @@ -82,12 +82,22 @@ def WorkOrderReceiptCreate(self, **params):
)
else:
wo_receipt = self.kv_helper.get("wo-receipts", wo_id)
if wo_receipt is None:
wo_receipt = json.loads(wo_receipt)
if wo_receipt and "workOrderId" not in wo_receipt["params"]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under what scenario will workOrderId be not part of wo_receipt ? My understanding is receipt is for a specific work order. So a receipt does not exists without a work order

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WorkOrderId is exists if work order receipt is created. Refer the git issues for description #412

status, err_msg = \
self.__validate_work_order_receipt_create_req(
input_value, wo_request)
if status is True:
self.kv_helper.set("wo-receipts", wo_id, input_json_str)
# If receiptUpdates doesn't exists then
# create an entry with empty array
if "receiptUpdates" not in wo_receipt["params"]:
input_value["params"]["receiptUpdates"] = []
else:
input_value["params"]["receiptUpdates"] = \
wo_receipt["receiptUpdates"]
self.kv_helper.set("wo-receipts", wo_id, json.dumps(
input_value
))
raise JSONRPCDispatchException(
JRPCErrorCodes.SUCCESS,
"Receipt created successfully"
Expand Down Expand Up @@ -186,11 +196,14 @@ def WorkOrderReceiptUpdate(self, **params):
status, err_msg = self.__validate_work_order_receipt_update_req(
input_value)
if status is True:
value = json.loads(value)
# Load previous updates to receipt
updates_to_receipt = \
self.kv_helper.get("wo-receipt-updates", wo_id)
if "receiptUpdates" in value["params"]:
updates_to_receipt = value["params"]["receiptUpdates"]
else:
updates_to_receipt = []
# If it is first update to receipt
if updates_to_receipt is None:
if len(updates_to_receipt) == 0:
updated_receipt = []
else:
updated_receipt = json.loads(updates_to_receipt)
Expand Down Expand Up @@ -220,8 +233,9 @@ def WorkOrderReceiptUpdate(self, **params):
" is not allowed"
)
updated_receipt.append(input_value)
self.kv_helper.set("wo-receipt-updates", wo_id,
json.dumps(updated_receipt))
input_value["receiptUpdates"] = updated_receipt
self.kv_helper.set("wo-receipts", wo_id,
json.dumps(input_value))
raise JSONRPCDispatchException(
JRPCErrorCodes.SUCCESS,
"Receipt updated successfully"
Expand Down Expand Up @@ -377,15 +391,22 @@ def WorkOrderReceiptRetrieve(self, **params):
value = self.kv_helper.get("wo-receipts", wo_id)
if value:
receipt = json.loads(value)
receipt_updates = self.kv_helper.get("wo-receipt-updates", wo_id)
if receipt_updates is None:
if "receiptUpdates" in receipt["params"]:
receipt_updates = receipt["params"]["receiptUpdates"]
# Remove "receiptUpdates" key from the receipt entry
# It has updates to receipt.
del receipt["params"]["receiptUpdates"]
else:
receipt_updates = []
if len(receipt_updates) == 0:
# If there is no updates to receipt
# then current status is same as create status
receipt["params"]["receiptCurrentStatus"] = \
receipt["params"]["receiptCreateStatus"]
else:
receipt_updates_json = json.loads(receipt_updates)
# Get the recent update to receipt
last_receipt = receipt_updates_json[len(receipt_updates_json)
- 1]
# Get the latest update to receipt
last_receipt = receipt_updates[len(receipt_updates)
- 1]
receipt["params"]["receiptCurrentStatus"] = \
last_receipt["updateType"]
return receipt["params"]
Expand Down Expand Up @@ -420,10 +441,12 @@ def WorkOrderReceiptUpdateRetrieve(self, **params):
# starts from 1
update_index = input_params["updateIndex"]
# Load list of updates to the receipt
receipt_updates = self.kv_helper.get("wo-receipt-updates", wo_id)
receipt_entry = self.kv_helper.get("wo-receipts", wo_id)

if receipt_updates:
receipt_updates_json = json.loads(receipt_updates)
if receipt_entry:
receipt_entry_json = json.loads(receipt_entry)
receipt_updates_json = \
receipt_entry_json["params"]["receiptUpdates"]
total_updates = len(receipt_updates_json)
if update_index <= 0:
raise JSONRPCDispatchException(
Expand Down
3 changes: 1 addition & 2 deletions listener/listener_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ bind = "http://localhost:1947"
max_work_order_count = 10
# ZMQ configurations the listener would connect to
# Same as the url and port of enclave manager socket
zmq_url = "tcp://avalon-enclave-manager:"
zmq_port = '5555'
zmq_url = "tcp://localhost:5555"

# ------------------------------------------------------------------
# Work load execution-settings for workload execution(synchronous/asynchronous)
Expand Down
22 changes: 18 additions & 4 deletions scripts/tcs_startup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,21 @@ COMPONENTS="$ENCLAVE_MANAGER" # #KV_STORAGE added if -s passed
START_STOP_AVALON_SERVICES=0 # default if -s not passed
LMDB_URL="http://localhost:9090" # -l default
LISTENER_URL="http://localhost:1947"
ENCLAVE_ZMQ_URL="tcp://localhost:5555"
# Trap handler
trap 'stop_avalon_components' HUP INT QUIT ABRT ALRM TERM

is_sync_mode()
{
return grep "sync_workload_execution" ${TCF_HOME}/listener/listener_config.toml | awk -F'=' '{print $2}'
}

start_avalon_components()
{
if [ $START_STOP_AVALON_SERVICES = 1 ] ; then
echo "Starting Avalon KV Storage $VERSION ..."
$KV_STORAGE --bind $LMDB_URL &
echo "Avalon KV Storage started"

echo "Starting Avalon Listener $VERSION ..."
$LISTENER --bind $LISTENER_URL --lmdb_url $LMDB_URL &
echo "Avalon Listener started"
fi

# START_STOP_AVALON_SERVICES doesn't control enclave manager. It will be
Expand All @@ -44,6 +46,18 @@ start_avalon_components()
python3 $ENCLAVE_MANAGER --lmdb_url $LMDB_URL &
echo "Avalon Enclave Manager started"

if [ $START_STOP_AVALON_SERVICES = 1 ] ; then
echo "Starting Avalon Listener $VERSION ..."
is_sync_mode
is_sync_mode_on=$?
if [ "$is_sync_mode_on" -eq "1" ]; then
$LISTENER --bind $LISTENER_URL --lmdb_url $LMDB_URL --zmq_url $ENCLAVE_ZMQ_URL &
else
$LISTENER --bind $LISTENER_URL --lmdb_url $LMDB_URL &
fi
echo "Avalon Listener started"
fi

sleep 5s
check_avalon_components

Expand Down
3 changes: 1 addition & 2 deletions tools/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,10 @@ done

#yell "Start testing echo client with reading registry from blockchain................"
#yell "#------------------------------------------------------------------------------------------------"
#try $echo_client_path/echo_client.py -m "Hello world" -rs -dh

yell "Start testing echo client with service uri ................"
yell "#------------------------------------------------------------------------------------------------"
try $echo_client_path/echo_client.py -m "Hello world" -s "http://$LISTENER_URL:1947" -dh
try $echo_client_path/echo_client.py -m "Hello world" -s "http://$LISTENER_URL:1947" -dh -rs

yell "Start testing generic client for echo workload ................"
yell "#------------------------------------------------------------------------------------------------"
Expand Down