From f80fded4559eb7b4a30b15a8dfca31e6dd0464f6 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Tue, 16 Sep 2025 17:48:13 -0700 Subject: [PATCH] args: add separate --dedupIndexUrl to support separate redis for dedup indexing prep: - move WACZLoader to wacz for reuse --- src/crawler.ts | 23 ++++++++++------------- src/replaycrawler.ts | 39 +-------------------------------------- src/util/argParser.ts | 7 +++++++ src/util/redis.ts | 14 ++++++++++++++ src/util/state.ts | 11 +++++++---- src/util/wacz.ts | 40 +++++++++++++++++++++++++++++++++++++++- 6 files changed, 78 insertions(+), 56 deletions(-) diff --git a/src/crawler.ts b/src/crawler.ts index b777d27e..4cba5a38 100644 --- a/src/crawler.ts +++ b/src/crawler.ts @@ -31,7 +31,7 @@ import { } from "./util/storage.js"; import { ScreenCaster, WSTransport } from "./util/screencaster.js"; import { Screenshots } from "./util/screenshots.js"; -import { initRedis } from "./util/redis.js"; +import { initRedisWaitForSuccess } from "./util/redis.js"; import { logger, formatErr, LogDetails, LogContext } from "./util/logger.js"; import { WorkerState, closeWorkers, runWorkers } from "./util/worker.js"; import { sleep, timedRun, secondsElapsed } from "./util/timing.js"; @@ -342,6 +342,7 @@ export class Crawler { async initCrawlState() { const redisUrl = this.params.redisStoreUrl || "redis://localhost:6379/0"; + const dedupRedisUrl = this.params.dedupStoreUrl || redisUrl; if (!redisUrl.startsWith("redis://")) { logger.fatal( @@ -349,18 +350,7 @@ export class Crawler { ); } - let redis; - - while (true) { - try { - redis = await initRedis(redisUrl); - break; - } catch (e) { - //logger.fatal("Unable to connect to state store Redis: " + redisUrl); - logger.warn(`Waiting for redis at ${redisUrl}`, {}, "state"); - await sleep(1); - } - } + const redis = await initRedisWaitForSuccess(redisUrl); logger.debug( `Storing state via Redis ${redisUrl} @ key prefix "${this.crawlId}"`, @@ -368,6 +358,12 @@ export class Crawler { "state", ); + let dedupRedis = redis; + + if (redisUrl !== dedupRedisUrl) { + dedupRedis = await initRedisWaitForSuccess(dedupRedisUrl); + } + logger.debug(`Max Page Time: ${this.maxPageTime} seconds`, {}, "state"); this.crawlState = new RedisCrawlState( @@ -376,6 +372,7 @@ export class Crawler { this.maxPageTime, os.hostname(), this.params.maxPageRetries, + dedupRedis, ); if (this.params.logErrorsToRedis) { diff --git a/src/replaycrawler.ts b/src/replaycrawler.ts index 819bcf39..13160c60 100644 --- a/src/replaycrawler.ts +++ b/src/replaycrawler.ts @@ -10,9 +10,6 @@ import { PageInfoRecord, PageInfoValue, Recorder } from "./util/recorder.js"; import fsp from "fs/promises"; import path from "path"; -import { ZipRangeReader, createLoader } from "@webrecorder/wabac"; - -import { AsyncIterReader } from "warcio"; import { parseArgs } from "./util/argParser.js"; import { PNG } from "pngjs"; @@ -23,6 +20,7 @@ import { MAX_URL_LENGTH } from "./util/reqresp.js"; import { openAsBlob } from "fs"; import { WARCWriter } from "./util/warcwriter.js"; import { parseRx } from "./util/seeds.js"; +import { WACZLoader } from "./util/wacz.js"; // RWP Replay Prefix const REPLAY_PREFIX = "http://localhost:9990/replay/w/replay/"; @@ -784,38 +782,3 @@ export class ReplayCrawler extends Crawler { return null; } } - -class WACZLoader { - url: string; - zipreader: ZipRangeReader | null; - - constructor(url: string) { - this.url = url; - this.zipreader = null; - } - - async init() { - if (!this.url.startsWith("http://") && !this.url.startsWith("https://")) { - const blob = await openAsBlob(this.url); - this.url = URL.createObjectURL(blob); - } - - const loader = await createLoader({ url: this.url }); - - this.zipreader = new ZipRangeReader(loader); - } - - async loadFile(fileInZip: string) { - const { reader } = await this.zipreader!.loadFile(fileInZip); - - if (!reader) { - return null; - } - - if (!reader.iterLines) { - return new AsyncIterReader(reader); - } - - return reader; - } -} diff --git a/src/util/argParser.ts b/src/util/argParser.ts index 78fd35b9..ae0fd2cd 100644 --- a/src/util/argParser.ts +++ b/src/util/argParser.ts @@ -445,6 +445,13 @@ class ArgParser { default: "redis://localhost:6379/0", }, + dedupStoreUrl: { + describe: + "If set, url for remote redis server to store state. Otherwise, using local redis instance", + type: "string", + default: "redis://localhost:6379/0", + }, + saveState: { describe: "If the crawl state should be serialized to the crawls/ directory. Defaults to 'partial', only saved when crawl is interrupted", diff --git a/src/util/redis.ts b/src/util/redis.ts index 56b3bb27..325ce9ed 100644 --- a/src/util/redis.ts +++ b/src/util/redis.ts @@ -1,5 +1,6 @@ import { Redis } from "ioredis"; import { logger } from "./logger.js"; +import { sleep } from "./timing.js"; const error = console.error; @@ -34,6 +35,19 @@ export async function initRedis(url: string) { return redis; } +export async function initRedisWaitForSuccess(redisUrl: string, retrySecs = 1) { + while (true) { + try { + return await initRedis(redisUrl); + break; + } catch (e) { + //logger.fatal("Unable to connect to state store Redis: " + redisUrl); + logger.warn(`Waiting for redis at ${redisUrl}`, {}, "state"); + await sleep(retrySecs); + } + } +} + export function setExitOnRedisError() { exitOnError = true; } diff --git a/src/util/state.ts b/src/util/state.ts index 68c46dc2..f1d151d7 100644 --- a/src/util/state.ts +++ b/src/util/state.ts @@ -217,6 +217,7 @@ export type SaveState = { export class RedisCrawlState { redis: Redis; maxRetries: number; + dedupRedis: Redis; uid: string; key: string; @@ -248,8 +249,10 @@ export class RedisCrawlState { maxPageTime: number, uid: string, maxRetries?: number, + dedupRedis?: Redis, ) { this.redis = redis; + this.dedupRedis = dedupRedis || redis; this.uid = uid; this.key = key; @@ -1078,7 +1081,7 @@ return inx; hash: string, url: string, ): Promise<{ dupe?: boolean; origDate?: string; origUrl?: string }> { - const value = await this.redis.hget(key, hash); + const value = await this.dedupRedis.hget(key, hash); if (!value) { return {}; } @@ -1088,7 +1091,7 @@ return inx; return { dupe: true }; } // otherwise, check if a revisit entry - if (await this.redis.sismember(`${key}:${hash}`, url)) { + if (await this.dedupRedis.sismember(`${key}:${hash}`, url)) { return { dupe: true }; } return { origUrl: val[1], origDate: val[0] }; @@ -1096,8 +1099,8 @@ return inx; async addHashDupe(key: string, hash: string, url: string, date: string) { const val = date + "|" + url; - if (!(await this.redis.hsetnx(key, hash, val))) { - await this.redis.sadd(`${key}:${hash}`, url); + if (!(await this.dedupRedis.hsetnx(key, hash, val))) { + await this.dedupRedis.sadd(`${key}:${hash}`, url); } } diff --git a/src/util/wacz.ts b/src/util/wacz.ts index fcf4eabc..3fa28d24 100644 --- a/src/util/wacz.ts +++ b/src/util/wacz.ts @@ -1,5 +1,5 @@ import path, { basename } from "node:path"; -import fs from "node:fs"; +import fs, { openAsBlob } from "node:fs"; import fsp from "node:fs/promises"; import { Writable, Readable } from "node:stream"; import { pipeline } from "node:stream/promises"; @@ -16,6 +16,8 @@ import { makeZip, InputWithoutMeta } from "client-zip"; import { logger, formatErr } from "./logger.js"; import { streamFinish } from "./warcwriter.js"; import { getDirSize } from "./storage.js"; +import { createLoader, ZipRangeReader } from "@webrecorder/wabac"; +import { AsyncIterReader } from "warcio"; const DATAPACKAGE_JSON = "datapackage.json"; const DATAPACKAGE_DIGEST_JSON = "datapackage-digest.json"; @@ -427,3 +429,39 @@ export async function mergeCDXJ( await removeIndexFile(INDEX_CDXJ); } } + +// ============================================================================ +export class WACZLoader { + url: string; + zipreader: ZipRangeReader | null; + + constructor(url: string) { + this.url = url; + this.zipreader = null; + } + + async init() { + if (!this.url.startsWith("http://") && !this.url.startsWith("https://")) { + const blob = await openAsBlob(this.url); + this.url = URL.createObjectURL(blob); + } + + const loader = await createLoader({ url: this.url }); + + this.zipreader = new ZipRangeReader(loader); + } + + async loadFile(fileInZip: string) { + const { reader } = await this.zipreader!.loadFile(fileInZip); + + if (!reader) { + return null; + } + + if (!reader.iterLines) { + return new AsyncIterReader(reader); + } + + return reader; + } +}