Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,4 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
.idea/
73 changes: 73 additions & 0 deletions labs_api/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import time
import typing as t
import uuid
from multiprocessing import Queue, Process, Manager

import fastapi
import uvicorn
from fastapi import HTTPException
from fastapi.middleware.cors import CORSMiddleware
from prediction_market_agent_tooling.deploy.agent import initialize_langfuse
from prediction_market_agent_tooling.gtypes import HexAddress
Expand All @@ -21,10 +25,33 @@
]


def f(x):
print(f"called f with {x=}")
return x * x


def long_running_task(identifier: str, input_queue: Queue, output_queue: Queue):
"""A dummy function that simulates a long-running task."""
print(f"Process {identifier} started")
while True:
# Check for messages in the queue
if not input_queue.empty():
number = input_queue.get()
result = number**2
print(f"Process {identifier} received {number}, squared it to {result}")
output_queue.put(result)
time.sleep(1) # Prevent tight loop
print(f"Process {identifier} ended") # Only runs if the loop is broken


def create_app() -> fastapi.FastAPI:
config = Config()
initialize_langfuse(config.default_enable_langfuse)

manager = Manager() # thread-safe shared state
manager.queue()
shared_registry = manager.dict()

app = fastapi.FastAPI()
app.add_middleware(
CORSMiddleware,
Expand Down Expand Up @@ -54,6 +81,52 @@ def _market_invalid(market_id: HEX_ADDRESS_VALIDATOR) -> MarketInvalidResponse:
logger.info(f"Invalid for `{market_id}`: {invalid.model_dump()}")
return invalid

@app.post("/start_process")
async def start_process():
"""Starts a new process and registers it."""
process_id = str(uuid.uuid4()) # Unique identifier for the process
input_queue = Queue() # Create an input queue for communication
output_queue = Queue() # Create an output queue for results
process = Process(
target=long_running_task,
args=(process_id, input_queue, output_queue),
daemon=True,
)
process.start()

# Store the process and its queues in the registry
shared_registry[process_id] = {
"process": process,
"input_queue": input_queue,
"output_queue": output_queue,
}
return {"message": "Process started", "process_id": process_id}

@app.post("/end_process/{process_id}")
async def end_process(process_id: str):
"""Ends a specific process by its ID."""
entry = shared_registry.get(process_id)
if not entry:
raise HTTPException(status_code=404, detail="Process not found")

# Terminate the process and remove it from the registry
entry["process"].terminate()
entry["process"].join()
del shared_registry[process_id]
return {"message": f"Process {process_id} terminated"}

@app.post("/send_message_to_process/{process_id}")
async def send_message_to_process(process_id: str, number: int):
"""Sends a number to a specific process to calculate its square."""
entry = shared_registry.get(process_id)
if not entry:
raise HTTPException(status_code=404, detail="Process not found")

# Put the number in the process's queue
queue = entry["queue"]
queue.put(number)
return {"message": f"Sent number {number} to process {process_id}"}

logger.info("API created.")

return app
Expand Down
Loading