Skip to content

Commit 0a842ad

Browse files
johnwhumphreysfacebook-github-bot
authored andcommitted
Monarch web-crawler example. (#1359)
Summary: Pull Request resolved: #1359 This web-crawler example takes a target site and crawls it to a depth of 3. It is initialized by adding the root to a central/single queue actor, and a number of parallel crawling actors take off the queue, crawl, and add to the queue, until the depth is exhausted. The crawlers only crawl sub-pages of the base URL (just for safety/simplicity), and stop at the target depth. Reviewed By: colin2328 Differential Revision: D83214162 fbshipit-source-id: 65f6e753bcfc9bce6982e1e329d6abed5f369660
1 parent 847e8f7 commit 0a842ad

File tree

2 files changed

+192
-0
lines changed

2 files changed

+192
-0
lines changed

docs/source/examples/README.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ Python Script Examples
99
These examples are formatted for sphinx-gallery and will be automatically converted to HTML documentation:
1010

1111
- :doc:`ping_pong.py <ping_pong>`: Demonstrates the basics of Monarch's Actor/endpoint API with a ping-pong communication example
12+
- :doc:`crawler.py <crawler>`: Demonstrates Monarch's actor API and many-to-one communications with a web crawler example
1213
- :doc:`spmd_ddp.py <spmd_ddp>`: Shows how to run PyTorch's Distributed Data Parallel (DDP) within Monarch actors
1314
- :doc:`grpo_actor.py <grpo_actor>`: Implements a distributed PPO-like reinforcement learning algorithm using the Monarch actor framework
1415
- :doc:`distributed_tensors.py <distributed_tensors>`: Shows how to dispatch tensors and tensor level operations to a distributed mesh of workers and GPUs

docs/source/examples/crawler.py

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
# Copyright (c) Meta Platforms, Inc. and affiliates.
2+
# All rights reserved.
3+
#
4+
# This source code is licensed under the BSD-style license found in the
5+
# LICENSE file in the root directory of this source tree.
6+
7+
"""
8+
Distributed Web Crawler Example With Actors
9+
==============================================================
10+
This example demonstrates how to make a simplistic distributed web crawler with
11+
Monarch actors including:
12+
13+
- Creating a singleton QueueActor
14+
- Providing that QueueActor to multiple CrawlActors
15+
- Having CrawlActors add/remove items from the QueueActor as they crawl
16+
- Retrieving results and cleaning up
17+
The queue is based on asyncio to enable concurrent blocking waits/timeouts.
18+
An auxiliary set is also used to avoid duplicates and it does not need to
19+
be thread-safe because in Monarch each actor handles its messages sequentially,
20+
finishing one before moving on.
21+
"""
22+
23+
# %%
24+
"""
25+
Import libraries and set tuneable configuration values.
26+
"""
27+
import asyncio
28+
import time
29+
from typing import Optional, Set, Tuple
30+
from urllib.parse import urlparse, urlunparse
31+
32+
import requests
33+
from bs4 import BeautifulSoup
34+
from monarch.actor import Actor, context, endpoint, ProcMesh, this_host
35+
36+
# Configuration
37+
BASE = "https://meta-pytorch.org/monarch/"
38+
DEPTH = 3
39+
NUM_CRAWLERS = 8
40+
TIMEOUT = 5
41+
42+
43+
# %%
44+
class QueueActor(Actor):
45+
"""
46+
Define the QueueActor class.
47+
- Holds an asyncio Queue, which enables concurrent sleeps.
48+
- Provies insert and get functions to add/remove from the queue.
49+
- Uses the set to avoid duplicates (this would eventually OOM at scale).
50+
"""
51+
52+
def __init__(self):
53+
self.q: asyncio.Queue = asyncio.Queue()
54+
self.seen_links: Set[str] = set()
55+
56+
@endpoint
57+
async def insert(self, item, depth):
58+
if item not in self.seen_links:
59+
self.seen_links.add(item)
60+
await self.q.put((item, depth))
61+
62+
@endpoint
63+
async def get(self) -> Optional[Tuple[str, int]]:
64+
try:
65+
return await asyncio.wait_for(self.q.get(), timeout=TIMEOUT)
66+
except asyncio.TimeoutError:
67+
print("Queue has no items, returning done value.")
68+
return None
69+
70+
71+
# %%
72+
class CrawlActor(Actor):
73+
"""
74+
Define the CrawlActor class.
75+
- Takes in all queues, but slices down to only use the first one. This is a temporary
76+
workaround until ProcMesh.slice is implemented.
77+
- Runs a long crawl() process that continuously takes items off the central queue, parses them,
78+
and adds links it finds back to the queue.
79+
- Crawls to a configured depth and terminates after the queue is empty for a configured number
80+
of seconds.
81+
"""
82+
83+
def __init__(self, all_queues: QueueActor):
84+
self.target_queue: QueueActor = all_queues.slice(procs=slice(0, 1))
85+
self.processed = 0
86+
87+
@staticmethod
88+
def normalize_url(url: str) -> str:
89+
p = urlparse(url)
90+
normalized = urlunparse((p.scheme, p.netloc, p.path, p.params, "", ""))
91+
return normalized
92+
93+
async def _crawl_internal(self, target, depth):
94+
response = requests.get(target)
95+
response_size_kb = len(response.content) / 1024
96+
print(f" - {target} was {response_size_kb:.2f} KB")
97+
parsed = BeautifulSoup(response.content, "html.parser")
98+
99+
anchors = parsed.find_all("a", href=True)
100+
for a in anchors:
101+
link = a["href"] if "https://" in a["href"] else BASE + a["href"]
102+
103+
# Stop at the target depth and only follow links on our base site.
104+
if depth > 0 and BASE in link:
105+
normalized_link = CrawlActor.normalize_url(link)
106+
await self.target_queue.insert.call_one(normalized_link, depth - 1)
107+
108+
@endpoint
109+
async def crawl(self):
110+
rank = context().actor_instance.rank
111+
112+
while True:
113+
result = await self.target_queue.get.call_one()
114+
if result is None:
115+
break
116+
url, depth = result
117+
print(f"Crawler #{rank} found {url} @ depth={depth}.")
118+
await self._crawl_internal(url, depth)
119+
self.processed += 1
120+
121+
return self.processed
122+
123+
124+
# %%
125+
async def main():
126+
start_time = time.time()
127+
128+
# Start up a ProcMesh.
129+
local_proc_mesh: ProcMesh = await this_host().spawn_procs(
130+
per_host={"procs": NUM_CRAWLERS}
131+
)
132+
133+
# Create queues across the mesh and use slice to target the first one; we will not use the rest.
134+
# TODO: One ProcMesh::slice is implemented, avoid spawning the extra ones here.
135+
all_queues = await local_proc_mesh.spawn("queues", QueueActor)
136+
target_queue = all_queues.slice(procs=slice(0, 1))
137+
138+
# Prime the queue with the base URL we want to crawl.
139+
await target_queue.insert.call_one(BASE, DEPTH)
140+
141+
# Make the crawlers and pass in the queues; crawlers will just use the first one as well.
142+
crawlers = await local_proc_mesh.spawn("crawlers", CrawlActor, all_queues)
143+
144+
# Run the crawlers; display the count of documents they crawled when done.
145+
results = await crawlers.crawl.call()
146+
147+
# Shut down all our resources.
148+
await local_proc_mesh.stop()
149+
150+
# Log results.
151+
pages = sum(v[1] for v in results.items())
152+
duration = time.time() - start_time
153+
print(f"Finished - Found {pages} in {duration:.2f} seconds.\n{results}.")
154+
155+
156+
# %%
157+
"""
158+
Run main in an asyncio context.
159+
"""
160+
asyncio.run(main())
161+
162+
# %%
163+
# Results
164+
# -----------
165+
# With NUM_CRAWLERS=1, this takes around 288 seconds:
166+
#
167+
# .. code-block:: text
168+
#
169+
# Finished - Found 3123 in 288.07 seconds.
170+
#
171+
# ValueMesh({procs: 1}):
172+
# (({'procs': 0/1}, 3123),).
173+
#
174+
# With NUM_CRAWLERS=8, this takes around 45 seconds:
175+
#
176+
# .. code-block:: text
177+
#
178+
# Finished - Found 3123 in 45.94 seconds.
179+
#
180+
# ValueMesh({procs: 8}):
181+
# (({'procs': 0/8}, 393),
182+
# ({'procs': 1/8}, 393),
183+
# ({'procs': 2/8}, 397),
184+
# ({'procs': 3/8}, 394),
185+
# ({'procs': 4/8}, 383),
186+
# ({'procs': 5/8}, 393),
187+
# ({'procs': 6/8}, 393),
188+
# ({'procs': 7/8}, 377)).
189+
#
190+
# So, we see a near-linear improvement in crawling time from
191+
# the concurrent crawlers using the central queue.

0 commit comments

Comments
 (0)