Skip to content

Commit 9f541ab

Browse files
authored
Support for uploading to S3 (#95)
- support uploading WACZ to s3-compatible storage (via minio client) - config storage loaded from env vars, enabled when WACZ output is used. - support pinging either or an http or a redis key-based webhook, - webhook: include 'completed' bool to indicate if fully completed crawl or partial (eg. interrupted via signal) - consolidate redis init to redis.js - support upload filename with custom variables: can interpolate current timestamp (@ts), hostname (@hostname) and user provided id (@Crawlid) - README: add docs for s3 storage, remove unused args - update to pywb 2.6.2, browsertrix-behaviors 0.2.4 * fix to `limit` option, ensure limit check uses shared state * bump version to 0.5.0-beta.1
1 parent f5d0328 commit 9f541ab

File tree

8 files changed

+288
-19
lines changed

8 files changed

+288
-19
lines changed

README.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,45 @@ docker run -p 9037:9037 -v $PWD/crawls:/crawls/ webrecorder/browsertrix-crawler
405405

406406
will start a crawl with 3 workers, and show the screen of each of the workers from `http://localhost:9037/`.
407407

408+
### Uploading crawl output to S3-Compatible Storage
409+
410+
Browsertrix Crawler also includes support for uploading WACZ files to S3-compatible storage, and notifying a webhook when the upload succeeds.
411+
412+
(At this time, S3 upload is supported only when WACZ output is enabled, but WARC uploads may be added in the future).
413+
414+
This feature can currently be enabled by setting environment variables (for security reasons, these settings are not passed in as part of the command-line or YAML config at this time).
415+
416+
<details>
417+
418+
<summary>Environment variables for S3-uploads include:</summary>
419+
420+
- `STORE_ACCESS_KEY` / `STORE_SECRET_KEY` - S3 credentials
421+
- `STORE_ENDPOINT_URL` - S3 endpoint URL
422+
- `STORE_PATH` - optional path appended to endpoint, if provided
423+
- `STORE_FILENAME` - filename or template for filename to put on S3
424+
- `STORE_USER` - optional username to pass back as part of the webhook callback
425+
- `CRAWL_ID` - unique crawl id (defaults to container hostname)
426+
- `WEBHOOK_URL` - the URL of the webhook (can be http://, https:// or redis://)
427+
428+
</details>
429+
430+
#### Webhook Notification
431+
432+
The webhook URL can be an HTTP URL which receives a JSON POST request OR a Redis URL, which specifies a redis list key to which the JSON data is pushed as a string.
433+
434+
<details>
435+
436+
<summary>Webhook notification JSON includes:</summary>
437+
438+
- `id` - crawl id (value of `CRAWL_ID`)
439+
- `userId` - user id (value of `STORE_USER`)
440+
- `filename` - bucket path + filename of the file
441+
- `size` - size of WACZ file
442+
- `hash` - SHA-256 of WACZ file
443+
- `completed` - boolean of whether crawl fully completed or partially (due to interrupt signal or other error).
444+
445+
</details>
446+
408447
## Interrupting and Restarting the Crawl
409448

410449
With version 0.5.0, a crawl can be gracefully interrupted with Ctrl-C (SIGINT) or a SIGTERM.

crawler.js

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
const child_process = require("child_process");
22
const path = require("path");
33
const fs = require("fs");
4+
const os = require("os");
45
const fsp = require("fs/promises");
56

67
// to ignore HTTPS error for HEAD check
@@ -17,16 +18,17 @@ const { RedisCrawlState, MemoryCrawlState } = require("./util/state");
1718
const AbortController = require("abort-controller");
1819
const Sitemapper = require("sitemapper");
1920
const { v4: uuidv4 } = require("uuid");
20-
const Redis = require("ioredis");
2121
const yaml = require("js-yaml");
2222

2323
const warcio = require("warcio");
2424

2525
const behaviors = fs.readFileSync(path.join(__dirname, "node_modules", "browsertrix-behaviors", "dist", "behaviors.js"), {encoding: "utf8"});
2626

2727
const TextExtract = require("./util/textextract");
28+
const { S3StorageSync } = require("./util/storage");
2829
const { ScreenCaster } = require("./util/screencaster");
2930
const { parseArgs } = require("./util/argParser");
31+
const { initRedis } = require("./util/redis");
3032

3133
const { getBrowserExe, loadProfile } = require("./util/browser");
3234

@@ -43,9 +45,6 @@ class Crawler {
4345

4446
this.emulateDevice = null;
4547

46-
// links crawled counter
47-
this.numLinks = 0;
48-
4948
// pages file
5049
this.pagesFH = null;
5150

@@ -148,10 +147,10 @@ class Crawler {
148147
throw new Error("stateStoreUrl must start with redis:// -- Only redis-based store currently supported");
149148
}
150149

151-
const redis = new Redis(redisUrl, {lazyConnect: true});
150+
let redis;
152151

153152
try {
154-
await redis.connect();
153+
redis = await initRedis(redisUrl);
155154
} catch (e) {
156155
throw new Error("Unable to connect to state store Redis: " + redisUrl);
157156
}
@@ -350,6 +349,25 @@ class Crawler {
350349
return;
351350
}
352351

352+
if (this.params.generateWACZ && process.env.STORE_ENDPOINT_URL) {
353+
const endpointUrl = process.env.STORE_ENDPOINT_URL + (process.env.STORE_PATH || "");
354+
const storeInfo = {
355+
endpointUrl,
356+
accessKey: process.env.STORE_ACCESS_KEY,
357+
secretKey: process.env.STORE_SECRET_KEY,
358+
};
359+
360+
const opts = {
361+
crawlId: process.env.CRAWL_ID || os.hostname(),
362+
webhookUrl: process.env.WEBHOOK_URL,
363+
userId: process.env.STORE_USER,
364+
filename: process.env.STORE_FILENAME || "@[email protected]",
365+
};
366+
367+
console.log("Initing Storage...");
368+
this.storage = new S3StorageSync(storeInfo, opts);
369+
}
370+
353371
// Puppeteer Cluster init and options
354372
this.cluster = await Cluster.launch({
355373
concurrency: this.params.newContext,
@@ -436,6 +454,11 @@ class Crawler {
436454
// Run the wacz create command
437455
child_process.spawnSync("wacz" , argument_list, {stdio: "inherit"});
438456
this.debugLog(`WACZ successfully generated and saved to: ${waczPath}`);
457+
458+
if (this.storage) {
459+
const finished = await this.crawlState.finished();
460+
await this.storage.uploadCollWACZ(waczPath, finished);
461+
}
439462
}
440463
}
441464

@@ -543,7 +566,7 @@ class Crawler {
543566
return false;
544567
}
545568

546-
if (this.numLinks >= this.params.limit && this.params.limit > 0) {
569+
if (this.params.limit > 0 && (await this.crawlState.numRealSeen() >= this.params.limit)) {
547570
this.limitHit = true;
548571
return false;
549572
}
@@ -553,7 +576,6 @@ class Crawler {
553576
}
554577

555578
await this.crawlState.add(url);
556-
this.numLinks++;
557579
this.cluster.queue({url, seedId, depth});
558580
return true;
559581
}
@@ -656,7 +678,7 @@ class Crawler {
656678
async awaitPendingClear() {
657679
this.statusLog("Waiting to ensure pending data is written to WARCs...");
658680

659-
const redis = new Redis("redis://localhost/0");
681+
const redis = await initRedis("redis://localhost/0");
660682

661683
while (true) {
662684
const res = await redis.get(`pywb:${this.params.collection}:pending`);

package.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "browsertrix-crawler",
3-
"version": "0.5.0-beta.0",
3+
"version": "0.5.0-beta.1",
44
"main": "browsertrix-crawler",
55
"repository": "https://github.com/webrecorder/browsertrix-crawler",
66
"author": "Ilya Kreymer <[email protected]>, Webrecorder Software",
@@ -10,9 +10,10 @@
1010
},
1111
"dependencies": {
1212
"abort-controller": "^3.0.0",
13-
"browsertrix-behaviors": "github:webrecorder/browsertrix-behaviors#skip-mp4-video",
13+
"browsertrix-behaviors": "^0.2.4",
1414
"ioredis": "^4.27.1",
1515
"js-yaml": "^4.1.0",
16+
"minio": "^7.0.18",
1617
"node-fetch": "^2.6.1",
1718
"puppeteer-cluster": "github:ikreymer/puppeteer-cluster#async-job-queue",
1819
"puppeteer-core": "^8.0.0",

screencast/index.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
}
1515
</style>
1616
<script>
17-
const ws = new WebSocket(window.location.origin.replace("http", "ws") + "/ws");
17+
const ws = new WebSocket(window.location.href.replace("http", "ws") + "ws");
1818
ws.addEventListener("message", (event) => handleMessage(event.data));
1919

2020
const unusedElems = [];

tests/custom_driver.test.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@ test("ensure custom driver with custom selector crawls JS files as pages", async
2222
pages.add(url);
2323
}
2424

25+
console.log(pages);
26+
2527
const expectedPages = new Set([
2628
"https://www.iana.org/",
27-
"https://www.iana.org/_js/2013.1/jquery.js",
28-
"https://www.iana.org/_js/2013.1/iana.js"
29+
"https://www.iana.org/_js/jquery.js",
30+
"https://www.iana.org/_js/iana.js"
2931
]);
3032

3133
expect(pages).toEqual(expectedPages);

util/redis.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
const Redis = require("ioredis");
2+
3+
module.exports.initRedis = async function(url) {
4+
const redis = new Redis(url, {lazyConnect: true});
5+
await redis.connect();
6+
return redis;
7+
};

util/storage.js

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
const fs = require("fs");
2+
const os = require("os");
3+
const { Transform } = require("stream");
4+
const { createHash } = require("crypto");
5+
6+
const fetch = require("node-fetch");
7+
const Minio = require("minio");
8+
9+
const { initRedis } = require("./redis");
10+
11+
class S3StorageSync
12+
{
13+
constructor(urlOrData, {filename, webhookUrl, userId, crawlId} = {}) {
14+
let url;
15+
let accessKey;
16+
let secretKey;
17+
18+
if (typeof(urlOrData) === "string") {
19+
url = new URL(urlOrData);
20+
accessKey = url.username;
21+
secretKey = url.password;
22+
url.username = "";
23+
url.password = "";
24+
this.fullPrefix = url.href;
25+
26+
} else {
27+
url = new URL(urlOrData.endpointUrl);
28+
accessKey = urlOrData.accessKey;
29+
secretKey = urlOrData.secretKey;
30+
this.fullPrefix = url.href;
31+
}
32+
33+
this.client = new Minio.Client({
34+
endPoint: url.hostname,
35+
port: Number(url.port) || (url.protocol === "https:" ? 443 : 80),
36+
useSSL: url.protocol === "https:",
37+
accessKey,
38+
secretKey
39+
});
40+
41+
this.bucketName = url.pathname.slice(1).split("/")[0];
42+
43+
this.objectPrefix = url.pathname.slice(this.bucketName.length + 2);
44+
45+
this.resources = [];
46+
47+
this.userId = userId;
48+
this.crawlId = crawlId;
49+
this.webhookUrl = webhookUrl;
50+
51+
filename = filename.replace("@ts", new Date().toISOString().replace(/[:TZz.]/g, ""));
52+
filename = filename.replace("@hostname", os.hostname());
53+
filename = filename.replace("@id", this.crawlId);
54+
55+
this.waczFilename = "data/" + filename;
56+
}
57+
58+
async uploadCollWACZ(filename, completed = true) {
59+
const origStream = fs.createReadStream(filename);
60+
61+
const hash = createHash("sha256");
62+
let size = 0;
63+
let finalHash;
64+
65+
const hashTrans = new Transform({
66+
transform(chunk, encoding, callback) {
67+
size += chunk.length;
68+
hash.update(chunk);
69+
this.push(chunk);
70+
callback();
71+
},
72+
73+
flush(callback) {
74+
finalHash = "sha256:" + hash.digest("hex");
75+
callback();
76+
}
77+
});
78+
79+
const fsStream = origStream.pipe(hashTrans);
80+
const res = await this.client.putObject(this.bucketName, this.objectPrefix + this.waczFilename, fsStream);
81+
console.log(res);
82+
83+
const resource = {"path": this.waczFilename, "hash": finalHash, "bytes": size};
84+
85+
if (this.webhookUrl) {
86+
const body = {
87+
id: this.crawlId,
88+
user: this.userId,
89+
90+
//filename: `s3://${this.bucketName}/${this.objectPrefix}${this.waczFilename}`,
91+
filename: this.fullPrefix + this.waczFilename,
92+
93+
hash: resource.hash,
94+
size: resource.bytes,
95+
96+
completed
97+
};
98+
99+
console.log("Pinging Webhook: " + this.webhookUrl);
100+
101+
if (this.webhookUrl.startsWith("http://") || this.webhookUrl.startsWith("https://")) {
102+
await fetch(this.webhookUrl, {method: "POST", body: JSON.stringify(body)});
103+
} else if (this.webhookUrl.startsWith("redis://")) {
104+
const parts = this.webhookUrl.split("/");
105+
if (parts.length !== 5) {
106+
throw new Error("redis webhook url must be in format: redis://<host>:<port>/<db>/<key>");
107+
}
108+
const redis = await initRedis(parts.slice(0, 4).join("/"));
109+
await redis.rpush(parts[4], JSON.stringify(body));
110+
}
111+
}
112+
}
113+
}
114+
115+
module.exports.S3StorageSync = S3StorageSync;

0 commit comments

Comments
 (0)