mirror of
https://github.com/webrecorder/browsertrix-crawler.git
synced 2025-12-24 19:10:15 +00:00
args: add separate --dedupIndexUrl to support separate redis for dedup
indexing prep: - move WACZLoader to wacz for reuse
This commit is contained in:
@@ -31,7 +31,7 @@ import {
|
||||
} from "./util/storage.js";
|
||||
import { ScreenCaster, WSTransport } from "./util/screencaster.js";
|
||||
import { Screenshots } from "./util/screenshots.js";
|
||||
import { initRedis } from "./util/redis.js";
|
||||
import { initRedisWaitForSuccess } from "./util/redis.js";
|
||||
import { logger, formatErr, LogDetails, LogContext } from "./util/logger.js";
|
||||
import { WorkerState, closeWorkers, runWorkers } from "./util/worker.js";
|
||||
import { sleep, timedRun, secondsElapsed } from "./util/timing.js";
|
||||
@@ -342,6 +342,7 @@ export class Crawler {
|
||||
|
||||
async initCrawlState() {
|
||||
const redisUrl = this.params.redisStoreUrl || "redis://localhost:6379/0";
|
||||
const dedupRedisUrl = this.params.dedupStoreUrl || redisUrl;
|
||||
|
||||
if (!redisUrl.startsWith("redis://")) {
|
||||
logger.fatal(
|
||||
@@ -349,18 +350,7 @@ export class Crawler {
|
||||
);
|
||||
}
|
||||
|
||||
let redis;
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
redis = await initRedis(redisUrl);
|
||||
break;
|
||||
} catch (e) {
|
||||
//logger.fatal("Unable to connect to state store Redis: " + redisUrl);
|
||||
logger.warn(`Waiting for redis at ${redisUrl}`, {}, "state");
|
||||
await sleep(1);
|
||||
}
|
||||
}
|
||||
const redis = await initRedisWaitForSuccess(redisUrl);
|
||||
|
||||
logger.debug(
|
||||
`Storing state via Redis ${redisUrl} @ key prefix "${this.crawlId}"`,
|
||||
@@ -368,6 +358,12 @@ export class Crawler {
|
||||
"state",
|
||||
);
|
||||
|
||||
let dedupRedis = redis;
|
||||
|
||||
if (redisUrl !== dedupRedisUrl) {
|
||||
dedupRedis = await initRedisWaitForSuccess(dedupRedisUrl);
|
||||
}
|
||||
|
||||
logger.debug(`Max Page Time: ${this.maxPageTime} seconds`, {}, "state");
|
||||
|
||||
this.crawlState = new RedisCrawlState(
|
||||
@@ -376,6 +372,7 @@ export class Crawler {
|
||||
this.maxPageTime,
|
||||
os.hostname(),
|
||||
this.params.maxPageRetries,
|
||||
dedupRedis,
|
||||
);
|
||||
|
||||
if (this.params.logErrorsToRedis) {
|
||||
|
||||
@@ -10,9 +10,6 @@ import { PageInfoRecord, PageInfoValue, Recorder } from "./util/recorder.js";
|
||||
import fsp from "fs/promises";
|
||||
import path from "path";
|
||||
|
||||
import { ZipRangeReader, createLoader } from "@webrecorder/wabac";
|
||||
|
||||
import { AsyncIterReader } from "warcio";
|
||||
import { parseArgs } from "./util/argParser.js";
|
||||
|
||||
import { PNG } from "pngjs";
|
||||
@@ -23,6 +20,7 @@ import { MAX_URL_LENGTH } from "./util/reqresp.js";
|
||||
import { openAsBlob } from "fs";
|
||||
import { WARCWriter } from "./util/warcwriter.js";
|
||||
import { parseRx } from "./util/seeds.js";
|
||||
import { WACZLoader } from "./util/wacz.js";
|
||||
|
||||
// RWP Replay Prefix
|
||||
const REPLAY_PREFIX = "http://localhost:9990/replay/w/replay/";
|
||||
@@ -784,38 +782,3 @@ export class ReplayCrawler extends Crawler {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
class WACZLoader {
|
||||
url: string;
|
||||
zipreader: ZipRangeReader | null;
|
||||
|
||||
constructor(url: string) {
|
||||
this.url = url;
|
||||
this.zipreader = null;
|
||||
}
|
||||
|
||||
async init() {
|
||||
if (!this.url.startsWith("http://") && !this.url.startsWith("https://")) {
|
||||
const blob = await openAsBlob(this.url);
|
||||
this.url = URL.createObjectURL(blob);
|
||||
}
|
||||
|
||||
const loader = await createLoader({ url: this.url });
|
||||
|
||||
this.zipreader = new ZipRangeReader(loader);
|
||||
}
|
||||
|
||||
async loadFile(fileInZip: string) {
|
||||
const { reader } = await this.zipreader!.loadFile(fileInZip);
|
||||
|
||||
if (!reader) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (!reader.iterLines) {
|
||||
return new AsyncIterReader(reader);
|
||||
}
|
||||
|
||||
return reader;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -445,6 +445,13 @@ class ArgParser {
|
||||
default: "redis://localhost:6379/0",
|
||||
},
|
||||
|
||||
dedupStoreUrl: {
|
||||
describe:
|
||||
"If set, url for remote redis server to store state. Otherwise, using local redis instance",
|
||||
type: "string",
|
||||
default: "redis://localhost:6379/0",
|
||||
},
|
||||
|
||||
saveState: {
|
||||
describe:
|
||||
"If the crawl state should be serialized to the crawls/ directory. Defaults to 'partial', only saved when crawl is interrupted",
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { Redis } from "ioredis";
|
||||
import { logger } from "./logger.js";
|
||||
import { sleep } from "./timing.js";
|
||||
|
||||
const error = console.error;
|
||||
|
||||
@@ -34,6 +35,19 @@ export async function initRedis(url: string) {
|
||||
return redis;
|
||||
}
|
||||
|
||||
export async function initRedisWaitForSuccess(redisUrl: string, retrySecs = 1) {
|
||||
while (true) {
|
||||
try {
|
||||
return await initRedis(redisUrl);
|
||||
break;
|
||||
} catch (e) {
|
||||
//logger.fatal("Unable to connect to state store Redis: " + redisUrl);
|
||||
logger.warn(`Waiting for redis at ${redisUrl}`, {}, "state");
|
||||
await sleep(retrySecs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function setExitOnRedisError() {
|
||||
exitOnError = true;
|
||||
}
|
||||
|
||||
@@ -217,6 +217,7 @@ export type SaveState = {
|
||||
export class RedisCrawlState {
|
||||
redis: Redis;
|
||||
maxRetries: number;
|
||||
dedupRedis: Redis;
|
||||
|
||||
uid: string;
|
||||
key: string;
|
||||
@@ -248,8 +249,10 @@ export class RedisCrawlState {
|
||||
maxPageTime: number,
|
||||
uid: string,
|
||||
maxRetries?: number,
|
||||
dedupRedis?: Redis,
|
||||
) {
|
||||
this.redis = redis;
|
||||
this.dedupRedis = dedupRedis || redis;
|
||||
|
||||
this.uid = uid;
|
||||
this.key = key;
|
||||
@@ -1078,7 +1081,7 @@ return inx;
|
||||
hash: string,
|
||||
url: string,
|
||||
): Promise<{ dupe?: boolean; origDate?: string; origUrl?: string }> {
|
||||
const value = await this.redis.hget(key, hash);
|
||||
const value = await this.dedupRedis.hget(key, hash);
|
||||
if (!value) {
|
||||
return {};
|
||||
}
|
||||
@@ -1088,7 +1091,7 @@ return inx;
|
||||
return { dupe: true };
|
||||
}
|
||||
// otherwise, check if a revisit entry
|
||||
if (await this.redis.sismember(`${key}:${hash}`, url)) {
|
||||
if (await this.dedupRedis.sismember(`${key}:${hash}`, url)) {
|
||||
return { dupe: true };
|
||||
}
|
||||
return { origUrl: val[1], origDate: val[0] };
|
||||
@@ -1096,8 +1099,8 @@ return inx;
|
||||
|
||||
async addHashDupe(key: string, hash: string, url: string, date: string) {
|
||||
const val = date + "|" + url;
|
||||
if (!(await this.redis.hsetnx(key, hash, val))) {
|
||||
await this.redis.sadd(`${key}:${hash}`, url);
|
||||
if (!(await this.dedupRedis.hsetnx(key, hash, val))) {
|
||||
await this.dedupRedis.sadd(`${key}:${hash}`, url);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import path, { basename } from "node:path";
|
||||
import fs from "node:fs";
|
||||
import fs, { openAsBlob } from "node:fs";
|
||||
import fsp from "node:fs/promises";
|
||||
import { Writable, Readable } from "node:stream";
|
||||
import { pipeline } from "node:stream/promises";
|
||||
@@ -16,6 +16,8 @@ import { makeZip, InputWithoutMeta } from "client-zip";
|
||||
import { logger, formatErr } from "./logger.js";
|
||||
import { streamFinish } from "./warcwriter.js";
|
||||
import { getDirSize } from "./storage.js";
|
||||
import { createLoader, ZipRangeReader } from "@webrecorder/wabac";
|
||||
import { AsyncIterReader } from "warcio";
|
||||
|
||||
const DATAPACKAGE_JSON = "datapackage.json";
|
||||
const DATAPACKAGE_DIGEST_JSON = "datapackage-digest.json";
|
||||
@@ -427,3 +429,39 @@ export async function mergeCDXJ(
|
||||
await removeIndexFile(INDEX_CDXJ);
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
export class WACZLoader {
|
||||
url: string;
|
||||
zipreader: ZipRangeReader | null;
|
||||
|
||||
constructor(url: string) {
|
||||
this.url = url;
|
||||
this.zipreader = null;
|
||||
}
|
||||
|
||||
async init() {
|
||||
if (!this.url.startsWith("http://") && !this.url.startsWith("https://")) {
|
||||
const blob = await openAsBlob(this.url);
|
||||
this.url = URL.createObjectURL(blob);
|
||||
}
|
||||
|
||||
const loader = await createLoader({ url: this.url });
|
||||
|
||||
this.zipreader = new ZipRangeReader(loader);
|
||||
}
|
||||
|
||||
async loadFile(fileInZip: string) {
|
||||
const { reader } = await this.zipreader!.loadFile(fileInZip);
|
||||
|
||||
if (!reader) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (!reader.iterLines) {
|
||||
return new AsyncIterReader(reader);
|
||||
}
|
||||
|
||||
return reader;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user