diff --git a/src/indexer.ts b/src/indexer.ts index 74dfaaa2..f08e59ca 100644 --- a/src/indexer.ts +++ b/src/indexer.ts @@ -60,7 +60,10 @@ export class CrawlIndexer { const redis = await initRedisWaitForSuccess(params.redisDedupeUrl); const dedupeIndex = new RedisDedupeIndex(redis, ""); - for await (const entry of this.iterWACZ(params.sourceUrl)) { + for await (const entry of this.iterWACZ({ + url: params.sourceUrl, + name: params.sourceCrawlId || params.sourceUrl, + })) { await dedupeIndex.queueImportSource(entry.name, JSON.stringify(entry)); } @@ -160,8 +163,7 @@ export class CrawlIndexer { } if (url && date && hash) { - await dedupeIndex.addHashDupe(hash, url, date, crawlId); - await dedupeIndex.addImportedForCrawl(hash, crawlId); + await dedupeIndex.addHashDupe(hash, url, date, crawlId, true); } else { logger.warn("Skipping invalid CDXJ, data missing", { url, @@ -177,8 +179,10 @@ export class CrawlIndexer { logger.debug("Processed", { count }); } - async *iterWACZ(url: string, name?: string): AsyncIterable { - let path: string = url; + async *iterWACZ(entry: DedupeIndexEntry): AsyncIterable { + const { name } = entry; + let { url } = entry; + let path = url; try { path = new URL(url).pathname; @@ -187,7 +191,8 @@ export class CrawlIndexer { } if (path.endsWith(".wacz")) { - yield { name: basename(name || url), url }; + console.log({ ...entry, name: basename(name || url) }); + yield { ...entry, name: basename(name || url) }; } else if (path.endsWith(".json")) { if (!url.startsWith("http://") && !url.startsWith("https://")) { const blob = await openAsBlob(url); @@ -198,13 +203,8 @@ export class CrawlIndexer { const json = await resp.json(); for (const entry of json.resources) { - const url = entry.path; - if (url && url.endsWith(".wacz")) { - const { size, hash, crawlId, name } = entry; - yield { crawlId, name, url, size, hash }; - } else { - yield* this.iterWACZ(entry.path, entry.name); - } + entry.url = entry.path; + yield* this.iterWACZ(entry); } } else { logger.warn("Unknown source", { url }, "replay"); diff --git a/src/util/state.ts b/src/util/state.ts index f63d050b..bcfe537b 100644 --- a/src/util/state.ts +++ b/src/util/state.ts @@ -324,11 +324,20 @@ export class RedisDedupeIndex { return { origUrl: val[2], origDate: val[1], index: val[0], crawlId }; } - async addHashDupe(hash: string, url: string, date: string, crawlId?: string) { + async addHashDupe( + hash: string, + url: string, + date: string, + crawlId?: string, + commit = false, + ) { date = date.replace(/[^\d]/g, ""); hash = hash.split(":").at(-1)!; const val = `${this.dedupeKeyIndex} ${date} ${url}`; - await this.dedupeRedis.hsetnx(`h:${crawlId || this.crawlId}`, hash, val); + crawlId = crawlId || this.crawlId; + if ((await this.dedupeRedis.hsetnx(`h:${crawlId}`, hash, val)) && commit) { + await this.dedupeRedis.hsetnx(DUPE_ALL_HASH_KEY, hash, crawlId); + } } // IMPORT @@ -341,10 +350,6 @@ export class RedisDedupeIndex { await this.dedupeRedis.lpush(this.sourceQ, data); } - async addImportedForCrawl(hash: string, crawlId: string) { - await this.dedupeRedis.hset(DUPE_ALL_HASH_KEY, hash, crawlId); - } - async addImportedSourceForDedupe(key: string, entry: DedupeSourceEntry) { return ( (await this.dedupeRedis.rpush(`c:${key}:wacz`, JSON.stringify(entry))) - 1