add removing option to also remove unused crawls if doing a full sync, disable by default

This commit is contained in:
Ilya Kreymer
2025-10-25 15:41:31 -07:00
parent 7c9317e3dc
commit 460badf8c7
2 changed files with 80 additions and 11 deletions

View File

@@ -9,6 +9,7 @@ import { initRedisWaitForSuccess } from "./util/redis.js";
import { AsyncIterReader } from "warcio";
import { RedisDedupeIndex } from "./util/state.js";
import { basename } from "node:path";
import { sleep } from "./util/timing.js";
export type DedupeIndexEntry = {
name: string;
@@ -42,6 +43,13 @@ export class CrawlIndexer {
type: "string",
required: false,
},
removing: {
describe: "If set, also remove unsued crawls/hashes from index",
type: "boolean",
required: false,
default: false,
},
})
.parseSync();
}
@@ -62,16 +70,24 @@ export class CrawlIndexer {
for await (const entry of this.iterWACZ({
url: params.sourceUrl,
name: params.sourceCrawlId || params.sourceUrl,
name: basename(params.sourceUrl),
crawlId: params.sourceCrawlId,
})) {
await dedupeIndex.queueImportSource(entry.name, JSON.stringify(entry));
if (params.removing && entry.crawlId) {
await dedupeIndex.markNotRemoved(entry.crawlId);
}
}
let count = 0;
let total = 0;
let res;
while ((res = await dedupeIndex.nextQueuedImportSource())) {
const { name, entry, total } = res;
const { name, entry, remaining } = res;
if (!total) {
total = remaining;
}
const { url, crawlId, size, hash } = JSON.parse(
entry,
) as DedupeIndexEntry;
@@ -107,7 +123,15 @@ export class CrawlIndexer {
await dedupeIndex.markImportSourceDone(name, crawlIdReal);
}
if (params.removing) {
const removeset = await dedupeIndex.getRemoveSet();
if (removeset.size > 0) {
await dedupeIndex.removeCrawlIds(removeset);
}
}
logger.info("Done!");
await sleep(30);
await dedupeIndex.markImportFinishedTS();
process.exit(ExitCodes.Success);
}
@@ -180,7 +204,6 @@ export class CrawlIndexer {
}
async *iterWACZ(entry: DedupeIndexEntry): AsyncIterable<DedupeIndexEntry> {
const { name } = entry;
let { url } = entry;
let path = url;
@@ -191,8 +214,7 @@ export class CrawlIndexer {
}
if (path.endsWith(".wacz")) {
console.log({ ...entry, name: basename(name || url) });
yield { ...entry, name: basename(name || url) };
yield entry;
} else if (path.endsWith(".json")) {
if (!url.startsWith("http://") && !url.startsWith("https://")) {
const blob = await openAsBlob(url);

View File

@@ -288,12 +288,12 @@ export class RedisDedupeIndex {
for await (const hashes of this.dedupeRedis.hscanStream(
`h:${this.crawlId}`,
)) {
let value = false;
let isValue = false;
for (const hash of hashes) {
if (!value) {
if (!isValue) {
await this.dedupeRedis.hsetnx(DUPE_ALL_HASH_KEY, hash, this.crawlId);
}
value = !value;
isValue = !isValue;
}
}
@@ -396,14 +396,58 @@ export class RedisDedupeIndex {
await this.dedupeRedis.lrem(this.pendingQ, 1, res);
const { name } = JSON.parse(res);
const total = (await this.dedupeRedis.llen(this.sourceQ)) + 1;
const remaining = (await this.dedupeRedis.llen(this.sourceQ)) + 1;
await this.dedupeRedis.setex(this.pendingPrefix + name, "1", 300);
return { name, entry: res, total };
return { name, entry: res, remaining };
}
async markImportFinishedTS() {
await this.dedupeRedis.set("last_update_ts", new Date().toISOString());
}
// REMOVE ON IMPORT
async markNotRemoved(crawlId: string) {
await this.dedupeRedis.sadd("noremove", crawlId);
}
async getRemoveSet() {
const removeSet = await this.dedupeRedis.sdiff(DUPE_ALL_CRAWLS, "noremove");
await this.dedupeRedis.del("noremove");
return new Set<string>(removeSet);
}
async removeCrawlIds(toRemove: Set<string>) {
for await (const hashes of this.dedupeRedis.hscanStream(
DUPE_ALL_HASH_KEY,
)) {
let isValue = false;
let key = "";
for (const hash of hashes) {
if (!isValue) {
key = hash;
}
if (key && isValue && toRemove.has(hash)) {
await this.dedupeRedis.hdel(DUPE_ALL_HASH_KEY, key);
}
isValue = !isValue;
}
}
for (const crawlId of toRemove) {
const allWACZ = await this.dedupeRedis.lrange(`c:${crawlId}:wacz`, 0, -1);
for (const waczdata of allWACZ) {
try {
const { filename } = JSON.parse(waczdata);
await this.dedupeRedis.srem(this.sourceDone, filename);
} catch (e) {
// ignore
}
}
await this.dedupeRedis.del(`h:${crawlId}`, `c:${crawlId}:wacz`);
await this.dedupeRedis.srem(DUPE_ALL_CRAWLS, crawlId);
}
}
}
// ============================================================================
@@ -1399,7 +1443,10 @@ return inx;
async markProfileUploaded(result: UploadResult & { modified?: string }) {
result.modified = this._timestamp();
await this.redis.set(`${this.crawlId}:profileUploaded`, JSON.stringify(result));
await this.redis.set(
`${this.crawlId}:profileUploaded`,
JSON.stringify(result),
);
}
// DEPENDENT CRAWLS FOR DEDUPE