mirror of
https://github.com/OpenCTI-Platform/opencti.git
synced 2025-12-22 08:17:08 +00:00
[backend] refactor & split concerns for stream (#13272)
This commit is contained in:
@@ -90,7 +90,8 @@ import {
|
||||
X_DETECTION,
|
||||
X_WORKFLOW_ID,
|
||||
} from '../schema/identifier';
|
||||
import { notify, redisAddDeletions, storeCreateEntityEvent, storeCreateRelationEvent, storeDeleteEvent, storeMergeEvent, storeUpdateEvent } from './redis';
|
||||
import { notify, redisAddDeletions } from './redis';
|
||||
import { storeCreateEntityEvent, storeCreateRelationEvent, storeDeleteEvent, storeMergeEvent, storeUpdateEvent } from './stream/stream-handler';
|
||||
import { cleanStixIds } from './stix';
|
||||
import {
|
||||
ABSTRACT_BASIC_RELATIONSHIP,
|
||||
|
||||
@@ -214,7 +214,6 @@ export const connectorConfig = (id, listen_callback_uri = undefined) => ({
|
||||
});
|
||||
|
||||
export const listenRouting = (connectorId) => `${RABBIT_QUEUE_PREFIX}listen_routing_${connectorId}`;
|
||||
|
||||
export const pushRouting = (connectorId) => `${RABBIT_QUEUE_PREFIX}push_routing_${connectorId}`;
|
||||
|
||||
export const registerConnectorQueues = async (id, name, type, scope) => {
|
||||
|
||||
246
opencti-platform/opencti-graphql/src/database/redis-stream.ts
Normal file
246
opencti-platform/opencti-graphql/src/database/redis-stream.ts
Normal file
@@ -0,0 +1,246 @@
|
||||
import { Cluster, Redis } from 'ioredis';
|
||||
import * as R from 'ramda';
|
||||
import conf, { logApp, REDIS_PREFIX } from '../config/conf';
|
||||
import type { ActivityStreamEvent, BaseEvent, DataEvent, SseEvent, StreamNotifEvent } from '../types/event';
|
||||
import {
|
||||
ACTIVITY_STREAM_NAME,
|
||||
type FetchEventRangeOption,
|
||||
LIVE_STREAM_NAME,
|
||||
NOTIFICATION_STREAM_NAME,
|
||||
type RawStreamClient,
|
||||
type StreamProcessor,
|
||||
type StreamProcessorOption,
|
||||
} from './stream/stream-utils';
|
||||
import { createRedisClient, getClientBase, getClientXRANGE } from './redis';
|
||||
import { isEmptyField, wait, waitInSec } from './utils';
|
||||
import { utcDate } from '../utils/format';
|
||||
import { UnsupportedError } from '../config/errors';
|
||||
import { asyncMap } from '../utils/data-processing';
|
||||
|
||||
// region opencti data stream
|
||||
const REDIS_LIVE_STREAM_NAME = `${REDIS_PREFIX}${LIVE_STREAM_NAME}`;
|
||||
const REDIS_NOTIFICATION_STREAM_NAME = `${REDIS_PREFIX}${NOTIFICATION_STREAM_NAME}`;
|
||||
const REDIS_ACTIVITY_STREAM_NAME = `${REDIS_PREFIX}${ACTIVITY_STREAM_NAME}`;
|
||||
const streamTrimming = conf.get('redis:trimming') || 0;
|
||||
|
||||
const convertStreamName = (streamName = LIVE_STREAM_NAME) => {
|
||||
switch (streamName) {
|
||||
case ACTIVITY_STREAM_NAME:
|
||||
return REDIS_ACTIVITY_STREAM_NAME;
|
||||
case NOTIFICATION_STREAM_NAME:
|
||||
return REDIS_NOTIFICATION_STREAM_NAME;
|
||||
case LIVE_STREAM_NAME:
|
||||
return REDIS_LIVE_STREAM_NAME;
|
||||
default:
|
||||
throw UnsupportedError('Cannot recognize stream name', streamName);
|
||||
}
|
||||
};
|
||||
|
||||
const mapJSToStream = (event: any) => {
|
||||
const cmdArgs: Array<string> = [];
|
||||
Object.keys(event).forEach((key) => {
|
||||
const value = event[key];
|
||||
if (value !== undefined) {
|
||||
cmdArgs.push(key);
|
||||
cmdArgs.push(JSON.stringify(value));
|
||||
}
|
||||
});
|
||||
return cmdArgs;
|
||||
};
|
||||
const mapStreamToJS = ([id, data]: any): SseEvent<any> => {
|
||||
const count = data.length / 2;
|
||||
const obj: any = {};
|
||||
for (let i = 0; i < count; i += 1) {
|
||||
obj[data[2 * i]] = JSON.parse(data[2 * i + 1]);
|
||||
}
|
||||
return { id, event: obj.type, data: obj };
|
||||
};
|
||||
|
||||
const rawPushToStream = async <T extends BaseEvent> (event: T) => {
|
||||
const redisClient = getClientBase();
|
||||
const eventStreamData = mapJSToStream(event);
|
||||
if (streamTrimming) {
|
||||
await redisClient.call('XADD', REDIS_LIVE_STREAM_NAME, 'MAXLEN', '~', streamTrimming, '*', ...eventStreamData);
|
||||
} else {
|
||||
await redisClient.call('XADD', REDIS_LIVE_STREAM_NAME, '*', ...eventStreamData);
|
||||
}
|
||||
};
|
||||
const processStreamResult = async (results: Array<any>, callback: any, withInternal: boolean | undefined) => {
|
||||
const transform = (r: any) => mapStreamToJS(r);
|
||||
const filter = (s: any) => (withInternal ? true : (s.data.scope ?? 'external') === 'external');
|
||||
const events = await asyncMap(results, transform, filter);
|
||||
const lastEventId = events.length > 0 ? R.last(events)?.id : `${new Date().valueOf()}-0`;
|
||||
await callback(events, lastEventId);
|
||||
return lastEventId;
|
||||
};
|
||||
const rawFetchStreamInfo = async (streamName = LIVE_STREAM_NAME) => {
|
||||
const redisStreamName = convertStreamName(streamName);
|
||||
const res: any = await getClientBase().xinfo('STREAM', redisStreamName);
|
||||
const info: any = R.fromPairs(R.splitEvery(2, res) as any);
|
||||
const firstId = info['first-entry'][0];
|
||||
const firstEventDate = utcDate(parseInt(firstId.split('-')[0], 10)).toISOString();
|
||||
const lastId = info['last-entry'][0];
|
||||
const lastEventDate = utcDate(parseInt(lastId.split('-')[0], 10)).toISOString();
|
||||
return { lastEventId: lastId, firstEventId: firstId, firstEventDate, lastEventDate, streamSize: info.length };
|
||||
};
|
||||
|
||||
const STREAM_BATCH_TIME = 5000;
|
||||
const MAX_RANGE_MESSAGES = 100;
|
||||
|
||||
const rawCreateStreamProcessor = <T extends BaseEvent> (
|
||||
provider: string,
|
||||
callback: (events: Array<SseEvent<T>>, lastEventId: string) => Promise<void>,
|
||||
opts: StreamProcessorOption = {},
|
||||
): StreamProcessor => {
|
||||
let client: Cluster | Redis;
|
||||
let startEventId: string;
|
||||
let processingLoopPromise: Promise<void>;
|
||||
let streamListening = true;
|
||||
const { streamName = LIVE_STREAM_NAME } = opts;
|
||||
const redisStreamName = convertStreamName(streamName);
|
||||
|
||||
const processStep = async () => {
|
||||
// since previous call is async (and blocking) we should check if we are still running before processing the message
|
||||
if (!streamListening) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
// Consume the data stream
|
||||
const streamResult = await client.call(
|
||||
'XREAD',
|
||||
'COUNT',
|
||||
MAX_RANGE_MESSAGES,
|
||||
'BLOCK',
|
||||
STREAM_BATCH_TIME,
|
||||
'STREAMS',
|
||||
redisStreamName,
|
||||
startEventId,
|
||||
) as any[];
|
||||
// Process the event results
|
||||
if (streamResult && streamResult.length > 0) {
|
||||
const [, results] = streamResult[0];
|
||||
const lastElementId = await processStreamResult(results, callback, opts.withInternal);
|
||||
startEventId = lastElementId || startEventId;
|
||||
} else {
|
||||
await processStreamResult([], callback, opts.withInternal);
|
||||
}
|
||||
await wait(opts.bufferTime ?? 50);
|
||||
} catch (err) {
|
||||
logApp.error('Redis stream consume fail', { cause: err, provider });
|
||||
if (opts.autoReconnect) {
|
||||
await waitInSec(5);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return streamListening;
|
||||
};
|
||||
const processingLoop = async () => {
|
||||
while (streamListening) {
|
||||
if (!(await processStep())) {
|
||||
streamListening = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
return {
|
||||
info: async () => rawFetchStreamInfo(streamName),
|
||||
running: () => streamListening,
|
||||
start: async (start = 'live') => {
|
||||
if (streamListening) {
|
||||
let fromStart = start;
|
||||
if (isEmptyField(fromStart)) {
|
||||
fromStart = 'live';
|
||||
}
|
||||
startEventId = fromStart === 'live' ? '$' : fromStart;
|
||||
logApp.info('[STREAM] Starting stream processor', { provider, startEventId });
|
||||
processingLoopPromise = (async () => {
|
||||
client = await createRedisClient(provider, opts.autoReconnect); // Create client for this processing loop
|
||||
try {
|
||||
await processingLoop();
|
||||
} finally {
|
||||
logApp.info('[STREAM] Stream processor terminated, closing Redis client');
|
||||
client.disconnect();
|
||||
}
|
||||
})();
|
||||
}
|
||||
},
|
||||
shutdown: async () => {
|
||||
logApp.info('[STREAM] Shutdown stream processor', { provider });
|
||||
streamListening = false;
|
||||
if (processingLoopPromise) {
|
||||
await processingLoopPromise;
|
||||
}
|
||||
logApp.info('[STREAM] Stream processor current promise terminated');
|
||||
},
|
||||
};
|
||||
};
|
||||
// endregion
|
||||
|
||||
// region fetch stream event range
|
||||
const rawFetchStreamEventsRangeFromEventId = async (
|
||||
startEventId: string,
|
||||
callback: (events: Array<SseEvent<DataEvent>>, lastEventId: string) => void,
|
||||
opts: FetchEventRangeOption = {},
|
||||
) => {
|
||||
const { streamBatchSize = MAX_RANGE_MESSAGES, streamName = LIVE_STREAM_NAME, withInternal } = opts;
|
||||
const redisStreamName = convertStreamName(streamName);
|
||||
let effectiveStartEventId = startEventId;
|
||||
const redisClient = getClientXRANGE();
|
||||
try {
|
||||
// Consume streamBatchSize number of stream events from startEventId (excluded)
|
||||
const streamResult = await redisClient.call(
|
||||
'XRANGE',
|
||||
redisStreamName,
|
||||
`(${startEventId}`, // ( prefix to exclude startEventId
|
||||
'+',
|
||||
'COUNT',
|
||||
streamBatchSize,
|
||||
) as any[];
|
||||
// Process the event results
|
||||
if (streamResult && streamResult.length > 0) {
|
||||
const lastStreamResultId = R.last(streamResult)[0]; // id of last event fetched (internal or external)
|
||||
await processStreamResult(streamResult, callback, withInternal); // process the stream events of the range
|
||||
if (lastStreamResultId) {
|
||||
effectiveStartEventId = lastStreamResultId;
|
||||
}
|
||||
} else {
|
||||
await processStreamResult([], callback, withInternal);
|
||||
}
|
||||
} catch (err) {
|
||||
logApp.error('Redis stream consume fail', { cause: err });
|
||||
}
|
||||
return { lastEventId: effectiveStartEventId };
|
||||
};
|
||||
|
||||
// region opencti notification stream
|
||||
const notificationTrimming = conf.get('redis:notification_trimming') || 50000;
|
||||
const rawStoreNotificationEvent = async <T extends StreamNotifEvent> (event: T) => {
|
||||
const eventStreamData = mapJSToStream(event);
|
||||
await getClientBase().call('XADD', REDIS_NOTIFICATION_STREAM_NAME, 'MAXLEN', '~', notificationTrimming, '*', ...eventStreamData);
|
||||
};
|
||||
const rawFetchRangeNotifications = async <T extends StreamNotifEvent> (start: Date, end: Date): Promise<Array<T>> => {
|
||||
const streamResult = await getClientBase().call('XRANGE', REDIS_NOTIFICATION_STREAM_NAME, start.getTime(), end.getTime()) as any[];
|
||||
const streamElements: Array<SseEvent<T>> = streamResult.map((r) => mapStreamToJS(r));
|
||||
return streamElements.filter((s) => s.event === 'live').map((e) => e.data);
|
||||
};
|
||||
// endregion
|
||||
|
||||
// region opencti audit stream
|
||||
const auditTrimming = conf.get('redis:activity_trimming') || 50000;
|
||||
const rawStoreActivityEvent = async (event: ActivityStreamEvent) => {
|
||||
const eventStreamData = mapJSToStream(event);
|
||||
await getClientBase().call('XADD', REDIS_ACTIVITY_STREAM_NAME, 'MAXLEN', '~', auditTrimming, '*', ...eventStreamData);
|
||||
};
|
||||
// endregion
|
||||
|
||||
export const rawRedisStreamClient: RawStreamClient = {
|
||||
initializeStreams: async () => {},
|
||||
rawPushToStream,
|
||||
rawFetchStreamInfo,
|
||||
rawCreateStreamProcessor,
|
||||
rawFetchStreamEventsRangeFromEventId,
|
||||
rawStoreNotificationEvent,
|
||||
rawFetchRangeNotifications,
|
||||
rawStoreActivityEvent,
|
||||
};
|
||||
@@ -1,59 +1,27 @@
|
||||
import { SEMATTRS_DB_NAME } from '@opentelemetry/semantic-conventions';
|
||||
import { Cluster, Redis } from 'ioredis';
|
||||
import type { ChainableCommander, CommonRedisOptions, ClusterOptions, RedisOptions, SentinelAddress, SentinelConnectionOptions } from 'ioredis';
|
||||
import { Redlock } from '@sesamecare-oss/redlock';
|
||||
import * as jsonpatch from 'fast-json-patch';
|
||||
import { RedisPubSub } from 'graphql-redis-subscriptions';
|
||||
import * as R from 'ramda';
|
||||
import conf, { booleanConf, configureCA, DEV_MODE, getStoppingState, loadCert, logApp, REDIS_PREFIX } from '../config/conf';
|
||||
import { asyncListTransformation, EVENT_TYPE_CREATE, EVENT_TYPE_DELETE, EVENT_TYPE_MERGE, EVENT_TYPE_UPDATE, isEmptyField, isNotEmptyField, wait, waitInSec } from './utils';
|
||||
import { INTERNAL_EXPORTABLE_TYPES, isStixExportableInStreamData } from '../schema/stixCoreObject';
|
||||
import { DatabaseError, LockTimeoutError, TYPE_LOCK_ERROR, UnsupportedError } from '../config/errors';
|
||||
import { mergeDeepRightAll, now, utcDate } from '../utils/format';
|
||||
import type { BasicStoreCommon, StoreObject, StoreRelation } from '../types/store';
|
||||
import type { AuthContext, AuthUser } from '../types/user';
|
||||
import type {
|
||||
ActivityStreamEvent,
|
||||
BaseEvent,
|
||||
Change,
|
||||
CreateEventOpts,
|
||||
DataEvent,
|
||||
DeleteEvent,
|
||||
EventOpts,
|
||||
MergeEvent,
|
||||
SseEvent,
|
||||
StreamDataEvent,
|
||||
UpdateEvent,
|
||||
UpdateEventOpts,
|
||||
} from '../types/event';
|
||||
import type { StixCoreObject, StixObject } from '../types/stix-2-1-common';
|
||||
import { isNotEmptyField } from './utils';
|
||||
import { DatabaseError, LockTimeoutError, TYPE_LOCK_ERROR } from '../config/errors';
|
||||
import { mergeDeepRightAll, now } from '../utils/format';
|
||||
import type { BasicStoreCommon } from '../types/store';
|
||||
import type { AuthUser } from '../types/user';
|
||||
import type { EditContext } from '../generated/graphql';
|
||||
import { telemetry } from '../config/tracing';
|
||||
import { filterEmpty } from '../types/type-utils';
|
||||
import type { ClusterConfig } from '../types/clusterConfig';
|
||||
import type { ExecutionEnvelop } from '../types/playbookExecution';
|
||||
import { generateCreateMessage, generateDeleteMessage, generateMergeMessage, generateRestoreMessage } from './generate-message';
|
||||
import { INPUT_OBJECTS } from '../schema/general';
|
||||
import { enrichWithRemoteCredentials } from '../config/credentials';
|
||||
import { getDraftContext } from '../utils/draftContext';
|
||||
import type { ExclusionListCacheItem } from './exclusionListCache';
|
||||
import { refreshLocalCacheForEntity } from './cache';
|
||||
import { asyncMap } from '../utils/data-processing';
|
||||
import { STIX_EXT_OCTI } from '../types/stix-2-1-extensions';
|
||||
|
||||
import { convertStoreToStix_2_1 } from './stix-2-1-converter';
|
||||
|
||||
const USE_SSL = booleanConf('redis:use_ssl', false);
|
||||
const REDIS_CA = conf.get('redis:ca').map((path: string) => loadCert(path));
|
||||
export const REDIS_STREAM_NAME = `${REDIS_PREFIX}stream.opencti`;
|
||||
const PLAYBOOK_LOG_MAX_SIZE = conf.get('playbook_manager:log_max_size') || 10000;
|
||||
|
||||
export const EVENT_CURRENT_VERSION = '4';
|
||||
|
||||
const isStreamPublishable = (opts: EventOpts) => {
|
||||
return opts.publishStreamEvent === undefined || opts.publishStreamEvent;
|
||||
};
|
||||
|
||||
const connectionName = (provider: string) => `${REDIS_PREFIX}${provider.replaceAll(' ', '_')}`;
|
||||
|
||||
const redisOptions = async (provider: string, autoReconnect = false): Promise<RedisOptions> => {
|
||||
@@ -162,7 +130,7 @@ export const createRedisClient = async (provider: string, autoReconnect = false)
|
||||
|
||||
// region Initialization of clients
|
||||
type RedisConnection = Cluster | Redis;
|
||||
interface RedisClients { base: RedisConnection; lock: RedisConnection; pubsub: RedisPubSub }
|
||||
interface RedisClients { base: RedisConnection; xrange: RedisConnection; lock: RedisConnection; pubsub: RedisPubSub }
|
||||
|
||||
let redisClients: RedisClients;
|
||||
// Method reserved for lock child process
|
||||
@@ -171,15 +139,17 @@ export const initializeOnlyRedisLockClient = async () => {
|
||||
// Disable typescript check for this specific use case.
|
||||
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
|
||||
// @ts-ignore
|
||||
redisClients = { lock, base: null, pubsub: null };
|
||||
redisClients = { lock, base: null, pubsub: null, xrange: null };
|
||||
};
|
||||
export const initializeRedisClients = async () => {
|
||||
const base = await createRedisClient('base', true);
|
||||
const xrange = await createRedisClient('xrange', true);
|
||||
const lock = await createRedisClient('lock', true);
|
||||
const publisher = await createRedisClient('publisher', true);
|
||||
const subscriber = await createRedisClient('subscriber', true);
|
||||
redisClients = {
|
||||
base,
|
||||
xrange,
|
||||
lock,
|
||||
pubsub: new RedisPubSub({
|
||||
publisher,
|
||||
@@ -199,7 +169,8 @@ export const shutdownRedisClients = () => {
|
||||
// endregion
|
||||
|
||||
// region pubsub
|
||||
const getClientBase = (): Cluster | Redis => redisClients.base;
|
||||
export const getClientBase = (): Cluster | Redis => redisClients.base;
|
||||
export const getClientXRANGE = (): Cluster | Redis => redisClients.xrange;
|
||||
const getClientLock = (): Cluster | Redis => redisClients.lock;
|
||||
const getClientPubSub = (): RedisPubSub => redisClients.pubsub;
|
||||
export const pubSubAsyncIterator = (topic: string | string[]) => {
|
||||
@@ -256,7 +227,6 @@ const keysFromList = async (listId: string, expirationTime?: number) => {
|
||||
}
|
||||
const instances = await getClientBase().zrange(listId, 0, -1);
|
||||
if (instances && instances.length > 0) {
|
||||
// eslint-disable-next-line newline-per-chained-call
|
||||
const fetchKey = (key: string) => getClientBase().multi().ttl(key).get(key).exec();
|
||||
const instancesConfig = await Promise.all(instances.map((i) => fetchKey(i)
|
||||
.then((results) => {
|
||||
@@ -431,7 +401,6 @@ export const lockResource = async (resources: Array<string>, opts: LockOptions =
|
||||
const queue = () => {
|
||||
timeout = setTimeout(
|
||||
() => {
|
||||
// eslint-disable-next-line @typescript-eslint/no-use-before-define
|
||||
extension = extend();
|
||||
},
|
||||
lock.expiration - Date.now() - 2 * automaticExtensionThreshold,
|
||||
@@ -444,7 +413,7 @@ export const lockResource = async (resources: Array<string>, opts: LockOptions =
|
||||
}
|
||||
lock = await lock.extend(maxTtl);
|
||||
queue();
|
||||
} catch (_error) {
|
||||
} catch {
|
||||
logApp.error('Execution timeout, error extending resources', { locks });
|
||||
if (process.send) {
|
||||
// If process.send, we use a child process
|
||||
@@ -487,7 +456,7 @@ export const lockResource = async (resources: Array<string>, opts: LockOptions =
|
||||
try {
|
||||
// Finally try to unlock
|
||||
await lock.release();
|
||||
} catch (_e) {
|
||||
} catch {
|
||||
// Nothing to do here
|
||||
}
|
||||
},
|
||||
@@ -495,405 +464,6 @@ export const lockResource = async (resources: Array<string>, opts: LockOptions =
|
||||
};
|
||||
// endregion
|
||||
|
||||
// region opencti data stream
|
||||
const streamTrimming = conf.get('redis:trimming') || 0;
|
||||
const mapJSToStream = (event: any) => {
|
||||
const cmdArgs: Array<string> = [];
|
||||
Object.keys(event).forEach((key) => {
|
||||
const value = event[key];
|
||||
if (value !== undefined) {
|
||||
cmdArgs.push(key);
|
||||
cmdArgs.push(JSON.stringify(value));
|
||||
}
|
||||
});
|
||||
return cmdArgs;
|
||||
};
|
||||
const pushToStream = async (context: AuthContext, user: AuthUser, client: Cluster | Redis, event: BaseEvent, opts: EventOpts = {}) => {
|
||||
const draftContext = getDraftContext(context, user);
|
||||
const eventToPush = { ...event, event_id: context.eventId };
|
||||
if (!draftContext && isStreamPublishable(opts)) {
|
||||
const pushToStreamFn = async () => {
|
||||
if (streamTrimming) {
|
||||
await client.call('XADD', REDIS_STREAM_NAME, 'MAXLEN', '~', streamTrimming, '*', ...mapJSToStream(eventToPush));
|
||||
} else {
|
||||
await client.call('XADD', REDIS_STREAM_NAME, '*', ...mapJSToStream(eventToPush));
|
||||
}
|
||||
};
|
||||
await telemetry(context, user, 'INSERT STREAM', {
|
||||
[SEMATTRS_DB_NAME]: 'stream_engine',
|
||||
}, pushToStreamFn);
|
||||
}
|
||||
};
|
||||
|
||||
// Merge
|
||||
const buildMergeEvent = async (user: AuthUser, previous: StoreObject, instance: StoreObject, sourceEntities: Array<StoreObject>): Promise<MergeEvent> => {
|
||||
const message = generateMergeMessage(instance, sourceEntities);
|
||||
const previousStix = convertStoreToStix_2_1(previous) as StixCoreObject;
|
||||
const currentStix = convertStoreToStix_2_1(instance) as StixCoreObject;
|
||||
return {
|
||||
version: EVENT_CURRENT_VERSION,
|
||||
type: EVENT_TYPE_MERGE,
|
||||
scope: 'external',
|
||||
message,
|
||||
origin: user.origin,
|
||||
data: currentStix,
|
||||
context: {
|
||||
patch: jsonpatch.compare(previousStix, currentStix),
|
||||
reverse_patch: jsonpatch.compare(currentStix, previousStix),
|
||||
sources: await asyncListTransformation<StixObject>(sourceEntities, convertStoreToStix_2_1) as StixCoreObject[],
|
||||
},
|
||||
};
|
||||
};
|
||||
export const storeMergeEvent = async (
|
||||
context: AuthContext,
|
||||
user: AuthUser,
|
||||
initialInstance: StoreObject,
|
||||
mergedInstance: StoreObject,
|
||||
sourceEntities: Array<StoreObject>,
|
||||
opts: EventOpts,
|
||||
) => {
|
||||
try {
|
||||
const event = await buildMergeEvent(user, initialInstance, mergedInstance, sourceEntities);
|
||||
await pushToStream(context, user, getClientBase(), event, opts);
|
||||
return event;
|
||||
} catch (e) {
|
||||
throw DatabaseError('Error in store merge event', { cause: e });
|
||||
}
|
||||
};
|
||||
// Update
|
||||
export const buildStixUpdateEvent = (
|
||||
user: AuthUser,
|
||||
previousStix: StixCoreObject,
|
||||
stix: StixCoreObject,
|
||||
message: string,
|
||||
changes: Change[],
|
||||
opts: UpdateEventOpts = {},
|
||||
): UpdateEvent => {
|
||||
// Build and send the event
|
||||
const patch = jsonpatch.compare(previousStix, stix);
|
||||
const previousPatch = jsonpatch.compare(stix, previousStix);
|
||||
if (patch.length === 0 || previousPatch.length === 0) {
|
||||
throw UnsupportedError('Update event must contains a valid previous patch');
|
||||
}
|
||||
if (patch.length === 1 && patch[0].path === '/modified' && !opts.allow_only_modified) {
|
||||
throw UnsupportedError('Update event must contains more operation than just modified/updated_at value');
|
||||
}
|
||||
const entityType = stix.extensions[STIX_EXT_OCTI].type;
|
||||
const scope = INTERNAL_EXPORTABLE_TYPES.includes(entityType) ? 'internal' : 'external';
|
||||
return {
|
||||
version: EVENT_CURRENT_VERSION,
|
||||
type: EVENT_TYPE_UPDATE,
|
||||
scope,
|
||||
message,
|
||||
origin: user.origin,
|
||||
data: stix,
|
||||
commit: opts.commit,
|
||||
noHistory: opts.noHistory,
|
||||
context: {
|
||||
patch,
|
||||
reverse_patch: previousPatch,
|
||||
related_restrictions: opts.related_restrictions,
|
||||
pir_ids: opts.pir_ids,
|
||||
changes,
|
||||
},
|
||||
};
|
||||
};
|
||||
export const publishStixToStream = async (context: AuthContext, user: AuthUser, event: StreamDataEvent) => {
|
||||
await pushToStream(context, user, getClientBase(), event);
|
||||
};
|
||||
const buildUpdateEvent = (user: AuthUser, previous: StoreObject, instance: StoreObject, message: string, changes: Change[], opts: UpdateEventOpts): UpdateEvent => {
|
||||
// Build and send the event
|
||||
const stix = convertStoreToStix_2_1(instance) as StixCoreObject;
|
||||
const previousStix = convertStoreToStix_2_1(previous) as StixCoreObject;
|
||||
return buildStixUpdateEvent(user, previousStix, stix, message, changes, opts);
|
||||
};
|
||||
export const storeUpdateEvent = async (
|
||||
context: AuthContext,
|
||||
user: AuthUser,
|
||||
previous: StoreObject,
|
||||
instance: StoreObject,
|
||||
message: string,
|
||||
changes: Change[],
|
||||
opts: UpdateEventOpts = {},
|
||||
) => {
|
||||
try {
|
||||
if (isStixExportableInStreamData(instance)) {
|
||||
const event = buildUpdateEvent(user, previous, instance, message, changes, opts);
|
||||
await pushToStream(context, user, getClientBase(), event, opts);
|
||||
return event;
|
||||
}
|
||||
return undefined;
|
||||
} catch (e) {
|
||||
throw DatabaseError('Error in store update event', { cause: e });
|
||||
}
|
||||
};
|
||||
// Create
|
||||
export const buildCreateEvent = (user: AuthUser, instance: StoreObject, message: string): StreamDataEvent => {
|
||||
const stix = convertStoreToStix_2_1(instance) as StixCoreObject;
|
||||
return {
|
||||
version: EVENT_CURRENT_VERSION,
|
||||
type: EVENT_TYPE_CREATE,
|
||||
scope: INTERNAL_EXPORTABLE_TYPES.includes(instance.entity_type) ? 'internal' : 'external',
|
||||
message,
|
||||
origin: user.origin,
|
||||
data: stix,
|
||||
};
|
||||
};
|
||||
export const storeCreateRelationEvent = async (context: AuthContext, user: AuthUser, instance: StoreRelation, opts: CreateEventOpts = {}) => {
|
||||
try {
|
||||
if (isStixExportableInStreamData(instance)) {
|
||||
const { withoutMessage = false, restore = false } = opts;
|
||||
let message = '-';
|
||||
if (!withoutMessage) {
|
||||
message = restore ? generateRestoreMessage(instance) : generateCreateMessage(instance);
|
||||
}
|
||||
const event = buildCreateEvent(user, instance, message);
|
||||
await pushToStream(context, user, getClientBase(), event, opts);
|
||||
return event;
|
||||
}
|
||||
return undefined;
|
||||
} catch (e) {
|
||||
throw DatabaseError('Error in store create relation event', { cause: e });
|
||||
}
|
||||
};
|
||||
export const storeCreateEntityEvent = async (context: AuthContext, user: AuthUser, instance: StoreObject, message: string, opts: CreateEventOpts = {}) => {
|
||||
try {
|
||||
if (isStixExportableInStreamData(instance)) {
|
||||
const event = buildCreateEvent(user, instance, message);
|
||||
await pushToStream(context, user, getClientBase(), event, opts);
|
||||
return event;
|
||||
}
|
||||
return undefined;
|
||||
} catch (e) {
|
||||
throw DatabaseError('Error in store create entity event', { cause: e });
|
||||
}
|
||||
};
|
||||
|
||||
// Delete
|
||||
export const buildDeleteEvent = async (
|
||||
user: AuthUser,
|
||||
instance: StoreObject,
|
||||
message: string,
|
||||
): Promise<DeleteEvent> => {
|
||||
const stix = convertStoreToStix_2_1(instance) as StixCoreObject;
|
||||
return {
|
||||
version: EVENT_CURRENT_VERSION,
|
||||
type: EVENT_TYPE_DELETE,
|
||||
scope: INTERNAL_EXPORTABLE_TYPES.includes(instance.entity_type) ? 'internal' : 'external',
|
||||
message,
|
||||
origin: user.origin,
|
||||
data: stix,
|
||||
};
|
||||
};
|
||||
export const storeDeleteEvent = async (context: AuthContext, user: AuthUser, instance: StoreObject, opts: EventOpts = {}) => {
|
||||
try {
|
||||
if (isStixExportableInStreamData(instance)) {
|
||||
const message = generateDeleteMessage(instance);
|
||||
const event = await buildDeleteEvent(user, instance, message);
|
||||
await pushToStream(context, user, getClientBase(), event, opts);
|
||||
return event;
|
||||
}
|
||||
return undefined;
|
||||
} catch (e) {
|
||||
throw DatabaseError('Error in store delete event', { cause: e });
|
||||
}
|
||||
};
|
||||
|
||||
const mapStreamToJS = ([id, data]: any): SseEvent<any> => {
|
||||
const count = data.length / 2;
|
||||
const obj: any = {};
|
||||
for (let i = 0; i < count; i += 1) {
|
||||
obj[data[2 * i]] = JSON.parse(data[2 * i + 1]);
|
||||
}
|
||||
return { id, event: obj.type, data: obj };
|
||||
};
|
||||
export const fetchStreamInfo = async (streamName = REDIS_STREAM_NAME) => {
|
||||
const res: any = await getClientBase().xinfo('STREAM', streamName);
|
||||
const info: any = R.fromPairs(R.splitEvery(2, res) as any);
|
||||
const firstId = info['first-entry'][0];
|
||||
const firstEventDate = utcDate(parseInt(firstId.split('-')[0], 10)).toISOString();
|
||||
const lastId = info['last-entry'][0];
|
||||
const lastEventDate = utcDate(parseInt(lastId.split('-')[0], 10)).toISOString();
|
||||
return { lastEventId: lastId, firstEventId: firstId, firstEventDate, lastEventDate, streamSize: info.length };
|
||||
};
|
||||
|
||||
const processStreamResult = async (results: Array<any>, callback: any, withInternal: boolean | undefined) => {
|
||||
const transform = (r: any) => mapStreamToJS(r);
|
||||
const filter = (s: any) => (withInternal ? true : (s.data.scope ?? 'external') === 'external');
|
||||
const events = await asyncMap(results, transform, filter);
|
||||
const lastEventId = events.length > 0 ? R.last(events)?.id : `${new Date().valueOf()}-0`;
|
||||
await callback(events, lastEventId);
|
||||
return lastEventId;
|
||||
};
|
||||
|
||||
const STREAM_BATCH_TIME = 5000;
|
||||
const MAX_RANGE_MESSAGES = 100;
|
||||
|
||||
export interface StreamProcessor {
|
||||
info: () => Promise<object>;
|
||||
start: (from: string | undefined) => Promise<void>;
|
||||
shutdown: () => Promise<void>;
|
||||
running: () => boolean;
|
||||
}
|
||||
|
||||
interface StreamOption {
|
||||
withInternal?: boolean;
|
||||
bufferTime?: number;
|
||||
autoReconnect?: boolean;
|
||||
streamName?: string;
|
||||
streamBatchSize?: number;
|
||||
}
|
||||
|
||||
export const createStreamProcessor = <T extends BaseEvent> (
|
||||
_user: AuthUser,
|
||||
provider: string,
|
||||
callback: (events: Array<SseEvent<T>>, lastEventId: string) => void,
|
||||
opts: StreamOption = {},
|
||||
): StreamProcessor => {
|
||||
let client: Cluster | Redis;
|
||||
let startEventId: string;
|
||||
let processingLoopPromise: Promise<void>;
|
||||
let streamListening = true;
|
||||
const streamName = opts.streamName ?? REDIS_STREAM_NAME;
|
||||
|
||||
const processStep = async () => {
|
||||
// since previous call is async (and blocking) we should check if we are still running before processing the message
|
||||
if (!streamListening) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
// Consume the data stream
|
||||
const streamResult = await client.call(
|
||||
'XREAD',
|
||||
'COUNT',
|
||||
MAX_RANGE_MESSAGES,
|
||||
'BLOCK',
|
||||
STREAM_BATCH_TIME,
|
||||
'STREAMS',
|
||||
streamName,
|
||||
startEventId,
|
||||
) as any[];
|
||||
// Process the event results
|
||||
if (streamResult && streamResult.length > 0) {
|
||||
const [, results] = streamResult[0];
|
||||
const lastElementId = await processStreamResult(results, callback, opts.withInternal);
|
||||
startEventId = lastElementId || startEventId;
|
||||
} else {
|
||||
await processStreamResult([], callback, opts.withInternal);
|
||||
}
|
||||
await wait(opts.bufferTime ?? 50);
|
||||
} catch (err) {
|
||||
logApp.error('Redis stream consume fail', { cause: err, provider });
|
||||
if (opts.autoReconnect) {
|
||||
await waitInSec(5);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return streamListening;
|
||||
};
|
||||
const processingLoop = async () => {
|
||||
while (streamListening) {
|
||||
if (!(await processStep())) {
|
||||
streamListening = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
return {
|
||||
info: async () => fetchStreamInfo(streamName),
|
||||
running: () => streamListening,
|
||||
start: async (start = 'live') => {
|
||||
if (streamListening) {
|
||||
let fromStart = start;
|
||||
if (isEmptyField(fromStart)) {
|
||||
fromStart = 'live';
|
||||
}
|
||||
startEventId = fromStart === 'live' ? '$' : fromStart;
|
||||
logApp.info('[STREAM] Starting stream processor', { provider, startEventId });
|
||||
processingLoopPromise = (async () => {
|
||||
client = await createRedisClient(provider, opts.autoReconnect); // Create client for this processing loop
|
||||
try {
|
||||
await processingLoop();
|
||||
} finally {
|
||||
logApp.info('[STREAM] Stream processor terminated, closing Redis client');
|
||||
client.disconnect();
|
||||
}
|
||||
})();
|
||||
}
|
||||
},
|
||||
shutdown: async () => {
|
||||
logApp.info('[STREAM] Shutdown stream processor', { provider });
|
||||
streamListening = false;
|
||||
if (processingLoopPromise) {
|
||||
await processingLoopPromise;
|
||||
}
|
||||
logApp.info('[STREAM] Stream processor current promise terminated');
|
||||
},
|
||||
};
|
||||
};
|
||||
// endregion
|
||||
|
||||
// region fetch stream event range
|
||||
export const fetchStreamEventsRangeFromEventId = async (
|
||||
client: Cluster | Redis,
|
||||
startEventId: string,
|
||||
callback: (events: Array<SseEvent<DataEvent>>, lastEventId: string) => void,
|
||||
opts: StreamOption = {},
|
||||
) => {
|
||||
const { streamBatchSize = MAX_RANGE_MESSAGES } = opts;
|
||||
let effectiveStartEventId = startEventId;
|
||||
try {
|
||||
// Consume streamBatchSize number of stream events from startEventId (excluded)
|
||||
const streamResult = await client.call(
|
||||
'XRANGE',
|
||||
opts.streamName ?? REDIS_STREAM_NAME,
|
||||
`(${startEventId}`, // ( prefix to exclude startEventId
|
||||
'+',
|
||||
'COUNT',
|
||||
streamBatchSize,
|
||||
) as any[];
|
||||
// Process the event results
|
||||
if (streamResult && streamResult.length > 0) {
|
||||
const lastStreamResultId = R.last(streamResult)[0]; // id of last event fetched (internal or external)
|
||||
await processStreamResult(streamResult, callback, opts.withInternal); // process the stream events of the range
|
||||
if (lastStreamResultId) {
|
||||
effectiveStartEventId = lastStreamResultId;
|
||||
}
|
||||
} else {
|
||||
await processStreamResult([], callback, opts.withInternal);
|
||||
}
|
||||
} catch (err) {
|
||||
logApp.error('Redis stream consume fail', { cause: err });
|
||||
if (opts.autoReconnect) {
|
||||
await waitInSec(2);
|
||||
}
|
||||
}
|
||||
return { lastEventId: effectiveStartEventId };
|
||||
};
|
||||
|
||||
// region opencti notification stream
|
||||
export const NOTIFICATION_STREAM_NAME = `${REDIS_PREFIX}stream.notification`;
|
||||
const notificationTrimming = conf.get('redis:notification_trimming') || 50000;
|
||||
export const storeNotificationEvent = async (context: AuthContext, event: any) => {
|
||||
await getClientBase().call('XADD', NOTIFICATION_STREAM_NAME, 'MAXLEN', '~', notificationTrimming, '*', ...mapJSToStream(event));
|
||||
};
|
||||
export const fetchRangeNotifications = async <T extends BaseEvent> (start: Date, end: Date): Promise<Array<T>> => {
|
||||
const streamResult = await getClientBase().call('XRANGE', NOTIFICATION_STREAM_NAME, start.getTime(), end.getTime()) as any[];
|
||||
const streamElements: Array<SseEvent<T>> = R.map((r) => mapStreamToJS(r), streamResult);
|
||||
return streamElements.filter((s) => s.event === 'live').map((e) => e.data);
|
||||
};
|
||||
// endregion
|
||||
|
||||
// region opencti audit stream
|
||||
export const EVENT_ACTIVITY_VERSION = '1';
|
||||
export const ACTIVITY_STREAM_NAME = `${REDIS_PREFIX}stream.activity`;
|
||||
const auditTrimming = conf.get('redis:activity_trimming') || 50000;
|
||||
export const storeActivityEvent = async (event: ActivityStreamEvent) => {
|
||||
await getClientBase().call('XADD', ACTIVITY_STREAM_NAME, 'MAXLEN', '~', auditTrimming, '*', ...mapJSToStream(event));
|
||||
};
|
||||
// endregion
|
||||
|
||||
// region work handling
|
||||
export const redisDeleteWorks = async (internalIds: Array<string>) => {
|
||||
const ids = Array.isArray(internalIds) ? internalIds : [internalIds];
|
||||
@@ -1032,7 +602,7 @@ export const redisGetExclusionListCache = async () => {
|
||||
const rawCache = await getClientBase().get(EXCLUSION_LIST_CACHE_KEY);
|
||||
try {
|
||||
return rawCache ? JSON.parse(rawCache) : [];
|
||||
} catch (_e) {
|
||||
} catch {
|
||||
logApp.error('Exclusion cache could not be parsed properly. Asking for a cache refresh.', { rawCache });
|
||||
await redisUpdateExclusionListStatus({ last_refresh_ask_date: (new Date()).toString() });
|
||||
return [];
|
||||
|
||||
@@ -0,0 +1,161 @@
|
||||
import { SEMATTRS_DB_NAME } from '@opentelemetry/semantic-conventions';
|
||||
import type { AuthContext, AuthUser } from '../../types/user';
|
||||
import type { StoreObject, StoreRelation } from '../../types/store';
|
||||
import type { ActivityStreamEvent, BaseEvent, Change, CreateEventOpts, EventOpts, SseEvent, StreamDataEvent, StreamNotifEvent, UpdateEventOpts } from '../../types/event';
|
||||
import { isStixExportableInStreamData } from '../../schema/stixCoreObject';
|
||||
import { generateCreateMessage, generateDeleteMessage, generateRestoreMessage } from '../generate-message';
|
||||
import {
|
||||
buildCreateEvent,
|
||||
buildDeleteEvent,
|
||||
buildMergeEvent,
|
||||
buildUpdateEvent,
|
||||
type FetchEventRangeOption,
|
||||
isStreamPublishable,
|
||||
LIVE_STREAM_NAME,
|
||||
type RawStreamClient,
|
||||
type StreamProcessor,
|
||||
type StreamProcessorOption,
|
||||
} from './stream-utils';
|
||||
import { DatabaseError } from '../../config/errors';
|
||||
import { getDraftContext } from '../../utils/draftContext';
|
||||
import { rawRedisStreamClient } from '../redis-stream';
|
||||
import { telemetry } from '../../config/tracing';
|
||||
|
||||
const streamClient: RawStreamClient = rawRedisStreamClient;
|
||||
export const initializeStreamStack = async () => {
|
||||
if (streamClient.initializeStreams) {
|
||||
await streamClient.initializeStreams();
|
||||
}
|
||||
};
|
||||
|
||||
const pushToStream = async <T extends BaseEvent> (context: AuthContext, user: AuthUser, event: T, opts: EventOpts = {}) => {
|
||||
const draftContext = getDraftContext(context, user);
|
||||
const eventToPush = { ...event, event_id: context.eventId };
|
||||
if (!draftContext && isStreamPublishable(opts)) {
|
||||
const pushToStreamFn = async () => {
|
||||
await streamClient.rawPushToStream(eventToPush);
|
||||
};
|
||||
await telemetry(context, user, 'INSERT STREAM', {
|
||||
[SEMATTRS_DB_NAME]: 'stream_engine',
|
||||
}, pushToStreamFn);
|
||||
}
|
||||
};
|
||||
|
||||
export const publishStixToStream = async (context: AuthContext, user: AuthUser, event: StreamDataEvent) => {
|
||||
await pushToStream(context, user, event);
|
||||
};
|
||||
|
||||
export const storeMergeEvent = async (
|
||||
context: AuthContext,
|
||||
user: AuthUser,
|
||||
initialInstance: StoreObject,
|
||||
mergedInstance: StoreObject,
|
||||
sourceEntities: Array<StoreObject>,
|
||||
opts: EventOpts,
|
||||
) => {
|
||||
try {
|
||||
const event = await buildMergeEvent(user, initialInstance, mergedInstance, sourceEntities);
|
||||
await pushToStream(context, user, event, opts);
|
||||
return event;
|
||||
} catch (e) {
|
||||
throw DatabaseError('Error in store merge event', { cause: e });
|
||||
}
|
||||
};
|
||||
export const storeUpdateEvent = async (
|
||||
context: AuthContext,
|
||||
user: AuthUser,
|
||||
previous: StoreObject,
|
||||
instance: StoreObject,
|
||||
message: string,
|
||||
changes: Change[],
|
||||
opts: UpdateEventOpts = {},
|
||||
) => {
|
||||
try {
|
||||
if (isStixExportableInStreamData(instance)) {
|
||||
const event = buildUpdateEvent(user, previous, instance, message, changes, opts);
|
||||
await pushToStream(context, user, event, opts);
|
||||
return event;
|
||||
}
|
||||
return undefined;
|
||||
} catch (e) {
|
||||
throw DatabaseError('Error in store update event', { cause: e });
|
||||
}
|
||||
};
|
||||
|
||||
export const storeCreateRelationEvent = async (context: AuthContext, user: AuthUser, instance: StoreRelation, opts: CreateEventOpts = {}) => {
|
||||
try {
|
||||
if (isStixExportableInStreamData(instance)) {
|
||||
const { withoutMessage = false, restore = false } = opts;
|
||||
let message = '-';
|
||||
if (!withoutMessage) {
|
||||
message = restore ? generateRestoreMessage(instance) : generateCreateMessage(instance);
|
||||
}
|
||||
const event = buildCreateEvent(user, instance, message);
|
||||
await pushToStream(context, user, event, opts);
|
||||
return event;
|
||||
}
|
||||
return undefined;
|
||||
} catch (e) {
|
||||
throw DatabaseError('Error in store create relation event', { cause: e });
|
||||
}
|
||||
};
|
||||
|
||||
export const storeCreateEntityEvent = async (context: AuthContext, user: AuthUser, instance: StoreObject, message: string, opts: CreateEventOpts = {}) => {
|
||||
try {
|
||||
if (isStixExportableInStreamData(instance)) {
|
||||
const event = buildCreateEvent(user, instance, message);
|
||||
await pushToStream(context, user, event, opts);
|
||||
return event;
|
||||
}
|
||||
return undefined;
|
||||
} catch (e) {
|
||||
throw DatabaseError('Error in store create entity event', { cause: e });
|
||||
}
|
||||
};
|
||||
export const storeDeleteEvent = async (context: AuthContext, user: AuthUser, instance: StoreObject, opts: EventOpts = {}) => {
|
||||
try {
|
||||
if (isStixExportableInStreamData(instance)) {
|
||||
const message = generateDeleteMessage(instance);
|
||||
const event = await buildDeleteEvent(user, instance, message);
|
||||
await pushToStream(context, user, event, opts);
|
||||
return event;
|
||||
}
|
||||
return undefined;
|
||||
} catch (e) {
|
||||
throw DatabaseError('Error in store delete event', { cause: e });
|
||||
}
|
||||
};
|
||||
|
||||
export const createStreamProcessor = <T extends BaseEvent> (
|
||||
provider: string,
|
||||
callback: (events: Array<SseEvent<T>>, lastEventId: string) => Promise<void>,
|
||||
opts: StreamProcessorOption = {},
|
||||
): StreamProcessor => {
|
||||
return streamClient.rawCreateStreamProcessor(provider, callback, opts);
|
||||
};
|
||||
|
||||
export const fetchStreamInfo = async (streamName = LIVE_STREAM_NAME) => {
|
||||
return streamClient.rawFetchStreamInfo(streamName);
|
||||
};
|
||||
|
||||
export const fetchStreamEventsRangeFromEventId = async <T extends BaseEvent> (
|
||||
startEventId: string,
|
||||
callback: (events: Array<SseEvent<T>>, lastEventId: string) => void,
|
||||
opts: FetchEventRangeOption = {},
|
||||
) => {
|
||||
return streamClient.rawFetchStreamEventsRangeFromEventId(startEventId, callback, opts);
|
||||
};
|
||||
|
||||
// region opencti notification stream
|
||||
export const storeNotificationEvent = async <T extends StreamNotifEvent>(_context: AuthContext, event: T) => {
|
||||
await streamClient.rawStoreNotificationEvent(event);
|
||||
};
|
||||
export const fetchRangeNotifications = async <T extends StreamNotifEvent>(start: Date, end: Date): Promise<Array<T>> => {
|
||||
return streamClient.rawFetchRangeNotifications<T>(start, end);
|
||||
};
|
||||
// endregion
|
||||
// region opencti audit stream
|
||||
export const storeActivityEvent = async (event: ActivityStreamEvent) => {
|
||||
await streamClient.rawStoreActivityEvent(event);
|
||||
};
|
||||
// endregion
|
||||
@@ -0,0 +1,172 @@
|
||||
import * as jsonpatch from 'fast-json-patch';
|
||||
import type { AuthUser } from '../../types/user';
|
||||
import type { StoreObject } from '../../types/store';
|
||||
import { generateMergeMessage } from '../generate-message';
|
||||
import { convertStoreToStix_2_1 } from '../stix-2-1-converter';
|
||||
import type { StixCoreObject, StixObject } from '../../types/stix-2-1-common';
|
||||
import { asyncListTransformation, EVENT_TYPE_CREATE, EVENT_TYPE_DELETE, EVENT_TYPE_MERGE, EVENT_TYPE_UPDATE } from '../utils';
|
||||
import { UnsupportedError } from '../../config/errors';
|
||||
import { INTERNAL_EXPORTABLE_TYPES } from '../../schema/stixCoreObject';
|
||||
import type {
|
||||
ActivityStreamEvent,
|
||||
BaseEvent,
|
||||
Change,
|
||||
DeleteEvent,
|
||||
EventOpts,
|
||||
MergeEvent,
|
||||
SseEvent,
|
||||
StreamDataEvent,
|
||||
StreamNotifEvent,
|
||||
UpdateEvent,
|
||||
UpdateEventOpts,
|
||||
} from '../../types/event';
|
||||
import { STIX_EXT_OCTI } from '../../types/stix-2-1-extensions';
|
||||
|
||||
export const LIVE_STREAM_NAME = 'stream.opencti';
|
||||
export const NOTIFICATION_STREAM_NAME = 'stream.notification';
|
||||
export const ACTIVITY_STREAM_NAME = 'stream.activity';
|
||||
export const EVENT_CURRENT_VERSION = '4';
|
||||
export const EVENT_ACTIVITY_VERSION = '1';
|
||||
|
||||
export interface StreamProcessor {
|
||||
info: () => Promise<object>;
|
||||
start: (from: string | undefined) => Promise<void>;
|
||||
shutdown: () => Promise<void>;
|
||||
running: () => boolean;
|
||||
}
|
||||
|
||||
export interface FetchEventRangeOption {
|
||||
withInternal?: boolean;
|
||||
streamName?: string;
|
||||
streamBatchSize?: number;
|
||||
}
|
||||
|
||||
export interface StreamProcessorOption {
|
||||
withInternal?: boolean;
|
||||
bufferTime?: number;
|
||||
autoReconnect?: boolean;
|
||||
streamName?: string;
|
||||
streamBatchSize?: number;
|
||||
}
|
||||
|
||||
export type StreamInfo = {
|
||||
lastEventId: string;
|
||||
firstEventId: string;
|
||||
firstEventDate: string;
|
||||
lastEventDate: string;
|
||||
streamSize: number;
|
||||
};
|
||||
|
||||
export interface RawStreamClient {
|
||||
initializeStreams: () => Promise<void>;
|
||||
rawPushToStream: <T extends BaseEvent> (event: T) => Promise<void>;
|
||||
rawFetchStreamInfo: (streamName?: string) => Promise<StreamInfo>;
|
||||
rawCreateStreamProcessor: <T extends BaseEvent> (
|
||||
provider: string,
|
||||
callback: (events: Array<SseEvent<T>>, lastEventId: string) => Promise<void>,
|
||||
opts?: StreamProcessorOption,
|
||||
) => StreamProcessor;
|
||||
rawFetchStreamEventsRangeFromEventId: <T extends BaseEvent> (
|
||||
startEventId: string,
|
||||
callback: (events: Array<SseEvent<T>>, lastEventId: string) => void,
|
||||
opts?: FetchEventRangeOption,
|
||||
) => Promise<{ lastEventId: string }>;
|
||||
rawStoreNotificationEvent: <T extends StreamNotifEvent> (event: T) => Promise<void>;
|
||||
rawFetchRangeNotifications: <T extends StreamNotifEvent> (start: Date, end: Date) => Promise<Array<T>>;
|
||||
rawStoreActivityEvent: (event: ActivityStreamEvent) => Promise<void>;
|
||||
}
|
||||
|
||||
export const isStreamPublishable = (opts: EventOpts) => {
|
||||
return opts.publishStreamEvent === undefined || opts.publishStreamEvent;
|
||||
};
|
||||
// Merge
|
||||
export const buildMergeEvent = async (user: AuthUser, previous: StoreObject, instance: StoreObject, sourceEntities: Array<StoreObject>): Promise<MergeEvent> => {
|
||||
const message = generateMergeMessage(instance, sourceEntities);
|
||||
const previousStix = convertStoreToStix_2_1(previous) as StixCoreObject;
|
||||
const currentStix = convertStoreToStix_2_1(instance) as StixCoreObject;
|
||||
return {
|
||||
version: EVENT_CURRENT_VERSION,
|
||||
type: EVENT_TYPE_MERGE,
|
||||
scope: 'external',
|
||||
message,
|
||||
origin: user.origin,
|
||||
data: currentStix,
|
||||
context: {
|
||||
patch: jsonpatch.compare(previousStix, currentStix),
|
||||
reverse_patch: jsonpatch.compare(currentStix, previousStix),
|
||||
sources: await asyncListTransformation<StixObject>(sourceEntities, convertStoreToStix_2_1) as StixCoreObject[],
|
||||
},
|
||||
};
|
||||
};
|
||||
// Update
|
||||
export const buildStixUpdateEvent = (
|
||||
user: AuthUser,
|
||||
previousStix: StixCoreObject,
|
||||
stix: StixCoreObject,
|
||||
message: string,
|
||||
changes: Change[],
|
||||
opts: UpdateEventOpts = {},
|
||||
): UpdateEvent => {
|
||||
// Build and send the event
|
||||
const patch = jsonpatch.compare(previousStix, stix);
|
||||
const previousPatch = jsonpatch.compare(stix, previousStix);
|
||||
if (patch.length === 0 || previousPatch.length === 0) {
|
||||
throw UnsupportedError('Update event must contains a valid previous patch');
|
||||
}
|
||||
if (patch.length === 1 && patch[0].path === '/modified' && !opts.allow_only_modified) {
|
||||
throw UnsupportedError('Update event must contains more operation than just modified/updated_at value');
|
||||
}
|
||||
const entityType = stix.extensions[STIX_EXT_OCTI].type;
|
||||
const scope = INTERNAL_EXPORTABLE_TYPES.includes(entityType) ? 'internal' : 'external';
|
||||
return {
|
||||
version: EVENT_CURRENT_VERSION,
|
||||
type: EVENT_TYPE_UPDATE,
|
||||
scope,
|
||||
message,
|
||||
origin: user.origin,
|
||||
data: stix,
|
||||
commit: opts.commit,
|
||||
noHistory: opts.noHistory,
|
||||
context: {
|
||||
patch,
|
||||
reverse_patch: previousPatch,
|
||||
related_restrictions: opts.related_restrictions,
|
||||
pir_ids: opts.pir_ids,
|
||||
changes,
|
||||
},
|
||||
};
|
||||
};
|
||||
export const buildUpdateEvent = (user: AuthUser, previous: StoreObject, instance: StoreObject, message: string, changes: Change[], opts: UpdateEventOpts): UpdateEvent => {
|
||||
// Build and send the event
|
||||
const stix = convertStoreToStix_2_1(instance) as StixCoreObject;
|
||||
const previousStix = convertStoreToStix_2_1(previous) as StixCoreObject;
|
||||
return buildStixUpdateEvent(user, previousStix, stix, message, changes, opts);
|
||||
};
|
||||
// Create
|
||||
export const buildCreateEvent = (user: AuthUser, instance: StoreObject, message: string): StreamDataEvent => {
|
||||
const stix = convertStoreToStix_2_1(instance) as StixCoreObject;
|
||||
return {
|
||||
version: EVENT_CURRENT_VERSION,
|
||||
type: EVENT_TYPE_CREATE,
|
||||
scope: INTERNAL_EXPORTABLE_TYPES.includes(instance.entity_type) ? 'internal' : 'external',
|
||||
message,
|
||||
origin: user.origin,
|
||||
data: stix,
|
||||
};
|
||||
};
|
||||
// Delete
|
||||
export const buildDeleteEvent = async (
|
||||
user: AuthUser,
|
||||
instance: StoreObject,
|
||||
message: string,
|
||||
): Promise<DeleteEvent> => {
|
||||
const stix = convertStoreToStix_2_1(instance) as StixCoreObject;
|
||||
return {
|
||||
version: EVENT_CURRENT_VERSION,
|
||||
type: EVENT_TYPE_DELETE,
|
||||
scope: INTERNAL_EXPORTABLE_TYPES.includes(instance.entity_type) ? 'internal' : 'external',
|
||||
message,
|
||||
origin: user.origin,
|
||||
data: stix,
|
||||
};
|
||||
};
|
||||
@@ -19,7 +19,8 @@ import {
|
||||
storeLoadByIds,
|
||||
} from '../database/middleware-loader';
|
||||
import { findStixCoreRelationshipsPaginated } from './stixCoreRelationship';
|
||||
import { delEditContext, notify, setEditContext, storeUpdateEvent } from '../database/redis';
|
||||
import { delEditContext, notify, setEditContext } from '../database/redis';
|
||||
import { storeUpdateEvent } from '../database/stream/stream-handler';
|
||||
import conf, { BUS_TOPICS, logApp } from '../config/conf';
|
||||
import { ForbiddenAccess, FunctionalError, LockTimeoutError, ResourceNotFoundError, TYPE_LOCK_ERROR, UnsupportedError } from '../config/errors';
|
||||
import { isStixCoreObject, stixCoreObjectOptions } from '../schema/stixCoreObject';
|
||||
|
||||
@@ -4,7 +4,7 @@ import { LRUCache } from 'lru-cache';
|
||||
import { now } from 'moment';
|
||||
import conf, { basePath, logApp } from '../config/conf';
|
||||
import { TAXIIAPI } from '../domain/user';
|
||||
import { createStreamProcessor, EVENT_CURRENT_VERSION } from '../database/redis';
|
||||
import { createStreamProcessor } from '../database/stream/stream-handler';
|
||||
import { generateInternalId } from '../schema/identifier';
|
||||
import { stixLoadById, storeLoadByIdsWithRefs } from '../database/middleware';
|
||||
import { elCount, elList } from '../database/engine';
|
||||
@@ -45,6 +45,7 @@ import { asyncMap, uniqAsyncMap } from '../utils/data-processing';
|
||||
import { isStixMatchFilterGroup } from '../utils/filtering/filtering-stix/stix-filtering';
|
||||
import { STIX_CORE_RELATIONSHIPS } from '../schema/stixCoreRelationship';
|
||||
import { createAuthenticatedContext } from '../http/httpAuthenticatedContext';
|
||||
import { EVENT_CURRENT_VERSION } from '../database/stream/stream-utils';
|
||||
|
||||
import { convertStoreToStix_2_1 } from '../database/stix-2-1-converter';
|
||||
|
||||
@@ -331,7 +332,7 @@ const createSseMiddleware = () => {
|
||||
}
|
||||
const { client } = createSseChannel(req, res, startStreamId);
|
||||
const opts = { autoReconnect: true };
|
||||
const processor = createStreamProcessor(user, user.user_email, async (elements, lastEventId) => {
|
||||
const processor = createStreamProcessor(user.user_email, async (elements, lastEventId) => {
|
||||
// Process the event messages
|
||||
for (let index = 0; index < elements.length; index += 1) {
|
||||
const { id: eventId, event, data } = elements[index];
|
||||
@@ -575,7 +576,7 @@ const createSseMiddleware = () => {
|
||||
let error;
|
||||
const userEmail = user.user_email;
|
||||
const opts = { autoReconnect: true };
|
||||
const processor = createStreamProcessor(user, userEmail, async (elements, lastEventId) => {
|
||||
const processor = createStreamProcessor(userEmail, async (elements, lastEventId) => {
|
||||
// Default Live collection doesn't have a stored Object associated
|
||||
if (!error && (!collection || collection.stream_live)) {
|
||||
// Process the stream elements
|
||||
|
||||
@@ -23,6 +23,7 @@ import { initExclusionListCache } from './database/exclusionListCache';
|
||||
import { initFintelTemplates } from './modules/fintelTemplate/fintelTemplate-domain';
|
||||
import { lockResources } from './lock/master-lock';
|
||||
import { loadEntityMetricsConfiguration } from './modules/metrics/metrics-utils';
|
||||
import { initializeStreamStack } from './database/stream/stream-handler';
|
||||
|
||||
// region Platform constants
|
||||
const PLATFORM_LOCK_ID = 'platform_init_lock';
|
||||
@@ -100,6 +101,7 @@ const platformInit = async (withMarkings = true) => {
|
||||
if (!alreadyExists) {
|
||||
logApp.info('[INIT] New platform detected, initialization...');
|
||||
await initializeInternalQueues();
|
||||
await initializeStreamStack();
|
||||
await initializeBucket();
|
||||
await initializeSchema();
|
||||
if (ES_IS_INIT_MIGRATION) {
|
||||
@@ -122,6 +124,7 @@ const platformInit = async (withMarkings = true) => {
|
||||
await patchPlatformId(context);
|
||||
await refreshMappingsAndIndices();
|
||||
await initializeInternalQueues();
|
||||
await initializeStreamStack();
|
||||
await enforceQueuesConsistency(context, SYSTEM_USER);
|
||||
await isCompatiblePlatform(context);
|
||||
await initializeAdminUser(context);
|
||||
|
||||
@@ -17,7 +17,7 @@ import { LRUCache } from 'lru-cache';
|
||||
import { type ActionHandler, type ActionListener, registerUserActionListener, type UserAction, type UserReadAction } from '../listener/UserActionListener';
|
||||
import conf, { auditLogTypes, logAudit } from '../config/conf';
|
||||
import type { BasicStoreSettings } from '../types/settings';
|
||||
import { EVENT_ACTIVITY_VERSION, storeActivityEvent } from '../database/redis';
|
||||
import { storeActivityEvent } from '../database/stream/stream-handler';
|
||||
import { getEntityFromCache } from '../database/cache';
|
||||
import { ENTITY_TYPE_SETTINGS, isInternalObject } from '../schema/internalObject';
|
||||
import { executionContext, SYSTEM_USER } from '../utils/access';
|
||||
@@ -26,6 +26,7 @@ import { isStixCoreRelationship } from '../schema/stixCoreRelationship';
|
||||
import { isStixCoreObject } from '../schema/stixCoreObject';
|
||||
import { REDACTED_INFORMATION } from '../database/utils';
|
||||
import type { ActivityStreamEvent } from '../types/event';
|
||||
import { EVENT_ACTIVITY_VERSION } from '../database/stream/stream-utils';
|
||||
|
||||
const INTERNAL_READ_ENTITIES = [ENTITY_TYPE_WORKSPACE];
|
||||
const LOGS_SENSITIVE_FIELDS = conf.get('app:app_logs:logs_redacted_inputs') ?? [];
|
||||
|
||||
@@ -14,7 +14,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
*/
|
||||
|
||||
import { clearIntervalAsync, setIntervalAsync, type SetIntervalAsyncTimer } from 'set-interval-async/fixed';
|
||||
import { ACTIVITY_STREAM_NAME, createStreamProcessor, storeNotificationEvent, type StreamProcessor } from '../database/redis';
|
||||
import { createStreamProcessor, storeNotificationEvent } from '../database/stream/stream-handler';
|
||||
import conf, { booleanConf, ENABLED_DEMO_MODE, logApp } from '../config/conf';
|
||||
import { INDEX_HISTORY, isEmptyField } from '../database/utils';
|
||||
import { TYPE_LOCK_ERROR } from '../config/errors';
|
||||
@@ -35,6 +35,7 @@ import { convertToNotificationUser, EVENT_NOTIFICATION_VERSION, getNotifications
|
||||
import { isActivityEventMatchFilterGroup } from '../utils/filtering/filtering-activity-event/activity-event-filtering';
|
||||
import { ENTITY_TYPE_MARKING_DEFINITION } from '../schema/stixMetaObject';
|
||||
import { lockResources } from '../lock/master-lock';
|
||||
import { ACTIVITY_STREAM_NAME, type StreamProcessor } from '../database/stream/stream-utils';
|
||||
|
||||
const ACTIVITY_ENGINE_KEY = conf.get('activity_manager:lock_key');
|
||||
const SCHEDULE_TIME = 10000;
|
||||
@@ -158,7 +159,7 @@ const initActivityManager = () => {
|
||||
running = true;
|
||||
logApp.info('[OPENCTI-MODULE] Running activity manager');
|
||||
const streamOpts = { streamName: ACTIVITY_STREAM_NAME, bufferTime: 5000 };
|
||||
streamProcessor = createStreamProcessor(SYSTEM_USER, 'Activity manager', activityStreamHandler, streamOpts);
|
||||
streamProcessor = createStreamProcessor('Activity manager', activityStreamHandler, streamOpts);
|
||||
await streamProcessor.start(lastEventId);
|
||||
while (!shutdown && streamProcessor.running()) {
|
||||
lock.signal.throwIfAborted();
|
||||
|
||||
@@ -19,7 +19,8 @@ import * as R from 'ramda';
|
||||
import type { BasicStoreSettings } from '../types/settings';
|
||||
import { EVENT_TYPE_UPDATE, isEmptyField, waitInSec } from '../database/utils';
|
||||
import conf, { ENABLED_FILE_INDEX_MANAGER, logApp } from '../config/conf';
|
||||
import { createStreamProcessor, type StreamProcessor } from '../database/redis';
|
||||
import { createStreamProcessor } from '../database/stream/stream-handler';
|
||||
import { type StreamProcessor } from '../database/stream/stream-utils';
|
||||
import { lockResources } from '../lock/master-lock';
|
||||
import { executionContext, SYSTEM_USER } from '../utils/access';
|
||||
import { getEntityFromCache } from '../database/cache';
|
||||
@@ -185,7 +186,7 @@ const initFileIndexManager = () => {
|
||||
lock = await lockResources([FILE_INDEX_MANAGER_STREAM_KEY], { retryCount: 0 });
|
||||
running = true;
|
||||
logApp.info('[OPENCTI-MODULE] Running file index manager stream handler');
|
||||
streamProcessor = createStreamProcessor(SYSTEM_USER, 'File index manager', handleStreamEvents, { bufferTime: 5000 });
|
||||
streamProcessor = createStreamProcessor('File index manager', handleStreamEvents, { bufferTime: 5000 });
|
||||
await streamProcessor.start('live');
|
||||
while (!shutdown && streamProcessor.running()) {
|
||||
lock.signal.throwIfAborted();
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
import * as R from 'ramda';
|
||||
import { clearIntervalAsync, setIntervalAsync, type SetIntervalAsyncTimer } from 'set-interval-async/fixed';
|
||||
import * as jsonpatch from 'fast-json-patch';
|
||||
import { createStreamProcessor, type StreamProcessor } from '../database/redis';
|
||||
import { createStreamProcessor } from '../database/stream/stream-handler';
|
||||
import { type StreamProcessor } from '../database/stream/stream-utils';
|
||||
import { lockResources } from '../lock/master-lock';
|
||||
import conf, { booleanConf, ENABLED_DEMO_MODE, logApp } from '../config/conf';
|
||||
import { EVENT_TYPE_UPDATE, INDEX_HISTORY, isEmptyField, isNotEmptyField } from '../database/utils';
|
||||
@@ -293,7 +294,7 @@ const initHistoryManager = () => {
|
||||
lock = await lockResources([HISTORY_ENGINE_KEY], { retryCount: 0 });
|
||||
running = true;
|
||||
logApp.info('[OPENCTI-MODULE] Running history manager');
|
||||
streamProcessor = createStreamProcessor(SYSTEM_USER, 'History manager', historyStreamHandler, { bufferTime: 5000, withInternal: true });
|
||||
streamProcessor = createStreamProcessor('History manager', historyStreamHandler, { bufferTime: 5000, withInternal: true });
|
||||
await streamProcessor.start(lastEventId);
|
||||
while (!shutdown && streamProcessor.running()) {
|
||||
lock.signal.throwIfAborted();
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
import { clearIntervalAsync, setIntervalAsync, type SetIntervalAsyncTimer } from 'set-interval-async/fixed';
|
||||
import { clearIntervalAsync as clearDynamicIntervalAsync, setIntervalAsync as setDynamicIntervalAsync } from 'set-interval-async/dynamic';
|
||||
import moment from 'moment/moment';
|
||||
import { createStreamProcessor, type StreamProcessor } from '../database/redis';
|
||||
import { createStreamProcessor } from '../database/stream/stream-handler';
|
||||
import { type StreamProcessor } from '../database/stream/stream-utils';
|
||||
import { lockResources } from '../lock/master-lock';
|
||||
import type { BasicStoreSettings } from '../types/settings';
|
||||
import { logApp } from '../config/conf';
|
||||
import { TYPE_LOCK_ERROR } from '../config/errors';
|
||||
import { SYSTEM_USER } from '../utils/access';
|
||||
import { utcDate } from '../utils/format';
|
||||
import { wait } from '../database/utils';
|
||||
import type { DataEvent, SseEvent } from '../types/event';
|
||||
@@ -27,7 +27,7 @@ export interface ManagerCronScheduler {
|
||||
}
|
||||
|
||||
export interface ManagerStreamScheduler {
|
||||
handler: (streamEvents: Array<SseEvent<DataEvent>>, lastEventId: string) => void;
|
||||
handler: (streamEvents: Array<SseEvent<DataEvent>>, lastEventId: string) => Promise<void>;
|
||||
interval: number;
|
||||
lockKey: string;
|
||||
streamOpts?: { withInternal?: boolean; streamName?: string; bufferTime: number };
|
||||
@@ -103,7 +103,7 @@ const initManager = (manager: ManagerDefinition) => {
|
||||
lock = await lockResources([manager.streamSchedulerHandler.lockKey], { retryCount: 0 });
|
||||
running = true;
|
||||
logApp.info(`[OPENCTI-MODULE] Running ${manager.label} stream handler`);
|
||||
streamProcessor = createStreamProcessor(SYSTEM_USER, manager.label, manager.streamSchedulerHandler.handler, manager.streamSchedulerHandler.streamOpts);
|
||||
streamProcessor = createStreamProcessor(manager.label, manager.streamSchedulerHandler.handler, manager.streamSchedulerHandler.streamOpts);
|
||||
const startFrom = manager.streamSchedulerHandler.streamProcessorStartFrom();
|
||||
await streamProcessor.start(startFrom);
|
||||
while (!shutdown && streamProcessor.running()) {
|
||||
|
||||
@@ -2,7 +2,8 @@ import * as R from 'ramda';
|
||||
import * as jsonpatch from 'fast-json-patch';
|
||||
import { clearIntervalAsync, setIntervalAsync, type SetIntervalAsyncTimer } from 'set-interval-async/fixed';
|
||||
import type { Moment } from 'moment';
|
||||
import { createStreamProcessor, fetchRangeNotifications, storeNotificationEvent, type StreamProcessor } from '../database/redis';
|
||||
import { type StreamProcessor } from '../database/stream/stream-utils';
|
||||
import { fetchRangeNotifications, storeNotificationEvent, createStreamProcessor } from '../database/stream/stream-handler';
|
||||
import { lockResources } from '../lock/master-lock';
|
||||
import conf, { booleanConf, logApp } from '../config/conf';
|
||||
import { FunctionalError, TYPE_LOCK_ERROR } from '../config/errors';
|
||||
@@ -659,7 +660,7 @@ const initNotificationManager = () => {
|
||||
lock = await lockResources([NOTIFICATION_LIVE_KEY], { retryCount: 0 });
|
||||
running = true;
|
||||
logApp.info('[OPENCTI-MODULE] Running notification manager (live)');
|
||||
streamProcessor = createStreamProcessor(SYSTEM_USER, 'Notification manager', notificationLiveStreamHandler);
|
||||
streamProcessor = createStreamProcessor('Notification manager', notificationLiveStreamHandler);
|
||||
await streamProcessor.start('live');
|
||||
while (!shutdown && streamProcessor.running()) {
|
||||
lock.signal.throwIfAborted();
|
||||
|
||||
@@ -25,7 +25,7 @@ import { FunctionalError } from '../config/errors';
|
||||
import { type BasicStoreEntityPir, ENTITY_TYPE_PIR, type ParsedPir, type ParsedPirCriterion, type StoreEntityPir } from '../modules/pir/pir-types';
|
||||
import { constructFinalPirFilters, parsePir } from '../modules/pir/pir-utils';
|
||||
import { getEntitiesListFromCache } from '../database/cache';
|
||||
import { createRedisClient, fetchStreamEventsRangeFromEventId } from '../database/redis';
|
||||
import { fetchStreamEventsRangeFromEventId } from '../database/stream/stream-handler';
|
||||
import { updatePir } from '../modules/pir/pir-domain';
|
||||
import { pushToWorkerForConnector } from '../database/rabbitmq';
|
||||
import convertEntityPirToStix from '../modules/pir/pir-converter';
|
||||
@@ -109,7 +109,6 @@ export const checkEventOnPir = async (context: AuthContext, event: SseEvent<any>
|
||||
// 2. Check Pir criteria one by one (because we need to know which one matches or not).
|
||||
const matchingCriteria: typeof pir_criteria = [];
|
||||
if (eventMatchesPirFilters) {
|
||||
// eslint-disable-next-line no-restricted-syntax
|
||||
for (const pirCriterion of pir_criteria) {
|
||||
const isMatch = await isStixMatchFilterGroup(context, PIR_MANAGER_USER, data, pirCriterion.filters);
|
||||
if (isMatch) matchingCriteria.push(pirCriterion);
|
||||
@@ -165,29 +164,22 @@ const processStreamEventsForPir = (context: AuthContext, pir: BasicStoreEntityPi
|
||||
* Handler called every {PIR_MANAGER_INTERVAL} and studying a range of stream events.
|
||||
*/
|
||||
const pirManagerHandler = async () => {
|
||||
const redisClient = await createRedisClient(PIR_MANAGER_LABEL, false);
|
||||
try {
|
||||
const context = executionContext(PIR_MANAGER_CONTEXT);
|
||||
const allPirs = await getEntitiesListFromCache<BasicStoreEntityPir>(context, PIR_MANAGER_USER, ENTITY_TYPE_PIR);
|
||||
const context = executionContext(PIR_MANAGER_CONTEXT);
|
||||
const allPirs = await getEntitiesListFromCache<BasicStoreEntityPir>(context, PIR_MANAGER_USER, ENTITY_TYPE_PIR);
|
||||
|
||||
// Loop through all Pirs by group
|
||||
await BluePromise.map(allPirs, async (pir) => {
|
||||
// Fetch stream events since last event id caught by the Pir.
|
||||
const { lastEventId } = await fetchStreamEventsRangeFromEventId(
|
||||
redisClient,
|
||||
pir.lastEventId,
|
||||
processStreamEventsForPir(context, pir),
|
||||
{ streamBatchSize: PIR_MANAGER_STREAM_BATCH_SIZE },
|
||||
);
|
||||
// Loop through all Pirs by group
|
||||
await BluePromise.map(allPirs, async (pir) => {
|
||||
// Fetch stream events since last event id caught by the Pir.
|
||||
const { lastEventId } = await fetchStreamEventsRangeFromEventId(
|
||||
pir.lastEventId,
|
||||
processStreamEventsForPir(context, pir),
|
||||
{ streamBatchSize: PIR_MANAGER_STREAM_BATCH_SIZE },
|
||||
);
|
||||
// Update pir last event id.
|
||||
if (lastEventId !== pir.lastEventId) {
|
||||
await updatePir(context, PIR_MANAGER_USER, pir.id, [{ key: 'lastEventId', value: [lastEventId] }], { auditLogEnabled: false });
|
||||
}
|
||||
}, { concurrency: PIR_MANAGER_MAX_CONCURRENCY });
|
||||
} finally {
|
||||
// close redis client connexion
|
||||
redisClient.disconnect();
|
||||
}
|
||||
if (lastEventId !== pir.lastEventId) {
|
||||
await updatePir(context, PIR_MANAGER_USER, pir.id, [{ key: 'lastEventId', value: [lastEventId] }], { auditLogEnabled: false });
|
||||
}
|
||||
}, { concurrency: PIR_MANAGER_MAX_CONCURRENCY });
|
||||
};
|
||||
|
||||
// Configuration of the manager.
|
||||
|
||||
@@ -16,7 +16,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
import { v4 as uuidv4 } from 'uuid';
|
||||
import { clearIntervalAsync, setIntervalAsync, type SetIntervalAsyncTimer } from 'set-interval-async/fixed';
|
||||
import type { Moment } from 'moment/moment';
|
||||
import { createStreamProcessor, type StreamProcessor } from '../../database/redis';
|
||||
import { createStreamProcessor } from '../../database/stream/stream-handler';
|
||||
import { type StreamProcessor } from '../../database/stream/stream-utils';
|
||||
import { lockResources } from '../../lock/master-lock';
|
||||
import conf, { booleanConf, logApp } from '../../config/conf';
|
||||
import { FunctionalError, TYPE_LOCK_ERROR } from '../../config/errors';
|
||||
@@ -199,7 +200,7 @@ const initPlaybookManager = () => {
|
||||
lock = await lockResources([PLAYBOOK_LIVE_KEY], { retryCount: 0 });
|
||||
running = true;
|
||||
logApp.info('[OPENCTI-MODULE] Running playbook manager');
|
||||
streamProcessor = createStreamProcessor(SYSTEM_USER, 'Playbook manager', playbookStreamHandler, { withInternal: true });
|
||||
streamProcessor = createStreamProcessor('Playbook manager', playbookStreamHandler, { withInternal: true });
|
||||
await streamProcessor.start('live');
|
||||
while (!shutdown && streamProcessor.running()) {
|
||||
lock.signal.throwIfAborted();
|
||||
|
||||
@@ -2,7 +2,7 @@ import { clearIntervalAsync, setIntervalAsync, type SetIntervalAsyncTimer } from
|
||||
import conf, { booleanConf, getBaseUrl, logApp } from '../config/conf';
|
||||
import { FunctionalError, TYPE_LOCK_ERROR, UnsupportedError } from '../config/errors';
|
||||
import { getEntitiesListFromCache, getEntitiesMapFromCache, getEntityFromCache } from '../database/cache';
|
||||
import { createStreamProcessor, NOTIFICATION_STREAM_NAME, type StreamProcessor } from '../database/redis';
|
||||
import { createStreamProcessor } from '../database/stream/stream-handler';
|
||||
import { lockResources } from '../lock/master-lock';
|
||||
import { sendMail, smtpComputeFrom, smtpIsAlive } from '../database/smtp';
|
||||
import type { NotifierTestInput } from '../generated/graphql';
|
||||
@@ -40,6 +40,7 @@ import { extractStixRepresentativeForUser } from '../database/stix-representativ
|
||||
import { EVENT_TYPE_UPDATE } from '../database/utils';
|
||||
import { sanitizeNotificationData } from '../utils/templateContextSanitizer';
|
||||
import { safeRender } from '../utils/safeEjs.client';
|
||||
import { NOTIFICATION_STREAM_NAME, type StreamProcessor } from '../database/stream/stream-utils';
|
||||
|
||||
const DOC_URI = 'https://docs.opencti.io';
|
||||
const PUBLISHER_ENGINE_KEY = conf.get('publisher_manager:lock_key');
|
||||
@@ -525,7 +526,7 @@ const initPublisherManager = () => {
|
||||
running = true;
|
||||
logApp.info('[OPENCTI-PUBLISHER] Running publisher manager');
|
||||
const opts = { withInternal: false, streamName: NOTIFICATION_STREAM_NAME, bufferTime: 5000 };
|
||||
streamProcessor = createStreamProcessor(SYSTEM_USER, 'Publisher manager', publisherStreamHandler, opts);
|
||||
streamProcessor = createStreamProcessor('Publisher manager', publisherStreamHandler, opts);
|
||||
await streamProcessor.start('live');
|
||||
while (!shutdown && streamProcessor.running()) {
|
||||
lock.signal.throwIfAborted();
|
||||
|
||||
@@ -3,7 +3,7 @@ import * as R from 'ramda';
|
||||
import type { Operation } from 'fast-json-patch';
|
||||
import * as jsonpatch from 'fast-json-patch';
|
||||
import { clearIntervalAsync, setIntervalAsync, type SetIntervalAsyncTimer } from 'set-interval-async/fixed';
|
||||
import { buildCreateEvent, createStreamProcessor, EVENT_CURRENT_VERSION, REDIS_STREAM_NAME, type StreamProcessor } from '../database/redis';
|
||||
import { createStreamProcessor } from '../database/stream/stream-handler';
|
||||
import { lockResources } from '../lock/master-lock';
|
||||
import conf, { booleanConf, logApp } from '../config/conf';
|
||||
import { createEntity, patchAttribute, stixLoadById, storeLoadByIdWithRefs } from '../database/middleware';
|
||||
@@ -29,6 +29,7 @@ import { executionContext, RULE_MANAGER_USER, SYSTEM_USER } from '../utils/acces
|
||||
import { isModuleActivated } from '../database/cluster-module';
|
||||
import { elList } from '../database/engine';
|
||||
import { isStixObject } from '../schema/stixCoreObject';
|
||||
import { buildCreateEvent, EVENT_CURRENT_VERSION, LIVE_STREAM_NAME, type StreamProcessor } from '../database/stream/stream-utils';
|
||||
|
||||
const MIN_LIVE_STREAM_EVENT_VERSION = 4;
|
||||
|
||||
@@ -296,8 +297,8 @@ const initRuleManager = () => {
|
||||
const { lastEventId } = ruleManager;
|
||||
logApp.info(`[OPENCTI-MODULE] Running rule manager from ${lastEventId ?? 'start'}`);
|
||||
// Start the stream listening
|
||||
const opts = { withInternal: true, streamName: REDIS_STREAM_NAME };
|
||||
streamProcessor = createStreamProcessor(RULE_MANAGER_USER, 'Rule manager', ruleStreamHandler, opts);
|
||||
const opts = { withInternal: true, streamName: LIVE_STREAM_NAME };
|
||||
streamProcessor = createStreamProcessor('Rule manager', ruleStreamHandler, opts);
|
||||
await streamProcessor.start(lastEventId);
|
||||
while (!shutdown && streamProcessor.running()) {
|
||||
lock.signal.throwIfAborted();
|
||||
|
||||
@@ -6,7 +6,6 @@ import { TYPE_LOCK_ERROR } from '../config/errors';
|
||||
import Queue from '../utils/queue';
|
||||
import { ENTITY_TYPE_SYNC } from '../schema/internalObject';
|
||||
import { patchSync } from '../domain/connector';
|
||||
import { EVENT_CURRENT_VERSION } from '../database/redis';
|
||||
import { lockResources } from '../lock/master-lock';
|
||||
import { STIX_EXT_OCTI } from '../types/stix-2-1-extensions';
|
||||
import { utcDate } from '../utils/format';
|
||||
@@ -16,6 +15,7 @@ import { pushToWorkerForConnector } from '../database/rabbitmq';
|
||||
import { OPENCTI_SYSTEM_UUID } from '../schema/general';
|
||||
import { getHttpClient } from '../utils/http-client';
|
||||
import { createSyncHttpUri, httpBase } from '../domain/connector-utils';
|
||||
import { EVENT_CURRENT_VERSION } from '../database/stream/stream-utils';
|
||||
|
||||
const SYNC_MANAGER_KEY = conf.get('sync_manager:lock_key') || 'sync_manager_lock';
|
||||
const SCHEDULE_TIME = conf.get('sync_manager:interval') || 10000;
|
||||
|
||||
@@ -10,7 +10,7 @@ import { ENTITY_TYPE_SETTINGS } from '../../../schema/internalObject';
|
||||
import { convertToNotificationUser, type DigestEvent, EVENT_NOTIFICATION_VERSION } from '../../../manager/notificationManager';
|
||||
import { generateCreateMessage, generateDeleteMessage } from '../../../database/generate-message';
|
||||
import { convertStixToInternalTypes } from '../../../schema/schemaUtils';
|
||||
import { storeNotificationEvent } from '../../../database/redis';
|
||||
import { storeNotificationEvent } from '../../../database/stream/stream-handler';
|
||||
import { convertMembersToUsers, extractBundleBaseElement } from '../playbook-utils';
|
||||
import { isEventInPirRelationship } from '../../../manager/playbookManager/playbookManagerUtils';
|
||||
import { extractEntityRepresentativeName } from '../../../database/entity-representative';
|
||||
|
||||
@@ -67,7 +67,7 @@ import { schemaRelationsRefDefinition } from '../../schema/schema-relationsRef';
|
||||
import { stixLoadByIds } from '../../database/middleware';
|
||||
import { usableNotifiers } from '../notifier/notifier-domain';
|
||||
import { convertToNotificationUser, type DigestEvent, EVENT_NOTIFICATION_VERSION } from '../../manager/notificationManager';
|
||||
import { storeNotificationEvent } from '../../database/redis';
|
||||
import { storeNotificationEvent } from '../../database/stream/stream-handler';
|
||||
import { ENTITY_TYPE_SETTINGS } from '../../schema/internalObject';
|
||||
import { isStixCyberObservable } from '../../schema/stixCyberObservable';
|
||||
import { createStixPattern } from '../../python/pythonBridge';
|
||||
|
||||
@@ -41,7 +41,8 @@ import { findById as findStatusById } from '../../domain/status';
|
||||
import { type BasicStoreEntityEntitySetting } from '../entitySetting/entitySetting-types';
|
||||
import { findById as findGroupById } from '../../domain/group';
|
||||
import { getDraftContext } from '../../utils/draftContext';
|
||||
import { notify, storeNotificationEvent } from '../../database/redis';
|
||||
import { notify } from '../../database/redis';
|
||||
import { storeNotificationEvent } from '../../database/stream/stream-handler';
|
||||
import { publishUserAction } from '../../listener/UserActionListener';
|
||||
import { verifyRequestAccessEnabled } from './requestAccessUtils';
|
||||
import { isEmptyField, isNotEmptyField } from '../../database/utils';
|
||||
|
||||
@@ -8,7 +8,7 @@ import {
|
||||
streamCollectionCleanContext,
|
||||
} from '../domain/stream';
|
||||
import { getAuthorizedMembers } from '../utils/authorizedMembers';
|
||||
import { fetchStreamInfo } from '../database/redis';
|
||||
import { fetchStreamInfo } from '../database/stream/stream-handler';
|
||||
|
||||
const streamResolvers = {
|
||||
Query: {
|
||||
|
||||
@@ -16,10 +16,11 @@ import type { RelationCreation, UpdateEvent } from '../types/event';
|
||||
import { READ_DATA_INDICES, UPDATE_OPERATION_ADD, UPDATE_OPERATION_REMOVE } from '../database/utils';
|
||||
import type { AuthContext } from '../types/user';
|
||||
import { executionContext, RULE_MANAGER_USER } from '../utils/access';
|
||||
import { buildStixUpdateEvent, publishStixToStream } from '../database/redis';
|
||||
import { publishStixToStream } from '../database/stream/stream-handler';
|
||||
import { INPUT_DOMAIN_TO, INPUT_OBJECTS, RULE_PREFIX } from '../schema/general';
|
||||
import { FilterMode, FilterOperator } from '../generated/graphql';
|
||||
import { asyncFilter } from '../utils/data-processing';
|
||||
import { buildStixUpdateEvent } from '../database/stream/stream-utils';
|
||||
|
||||
const buildContainerRefsRule = (ruleDefinition: RuleDefinition, containerType: string, relationTypes: RelationTypes): RuleRuntime => {
|
||||
const { id } = ruleDefinition;
|
||||
|
||||
@@ -3,7 +3,7 @@ import * as middlewareLoader from '../../../../src/database/middleware-loader';
|
||||
import * as cache from '../../../../src/database/cache';
|
||||
import * as utils from '../../../../src/utils/access';
|
||||
import * as playbookUtils from '../../../../src/modules/playbook/playbook-utils';
|
||||
import * as redis from '../../../../src/database/redis';
|
||||
import * as streamHandler from '../../../../src/database/stream/stream-handler';
|
||||
import * as notificationManager from '../../../../src/manager/notificationManager';
|
||||
import * as schemaUtils from '../../../../src/schema/schemaUtils';
|
||||
import * as generateMessage from '../../../../src/database/generate-message';
|
||||
@@ -48,7 +48,7 @@ describe('PLAYBOOK_NOTIFIER_COMPONENT', () => {
|
||||
vi.spyOn(schemaUtils, 'convertStixToInternalTypes').mockReturnValue('Indicator');
|
||||
vi.spyOn(generateMessage, 'generateCreateMessage').mockReturnValue('generated create message');
|
||||
vi.spyOn(generateMessage, 'generateDeleteMessage').mockReturnValue('generated delete message');
|
||||
vi.spyOn(redis, 'storeNotificationEvent').mockResolvedValue(undefined);
|
||||
vi.spyOn(streamHandler, 'storeNotificationEvent').mockResolvedValue(undefined);
|
||||
vi.spyOn(entityRepresentative, 'extractEntityRepresentativeName').mockReturnValue('name');
|
||||
});
|
||||
|
||||
@@ -82,7 +82,7 @@ describe('PLAYBOOK_NOTIFIER_COMPONENT', () => {
|
||||
event: mockEventPirDelete
|
||||
} as unknown as ExecutorParameters<NotifierConfiguration>);
|
||||
|
||||
expect(redis.storeNotificationEvent).toHaveBeenCalledWith(mockContext, expectedNotificationEvent);
|
||||
expect(streamHandler.storeNotificationEvent).toHaveBeenCalledWith(mockContext, expectedNotificationEvent);
|
||||
expect(result).toEqual({ output_port: undefined, bundle: mockBundle });
|
||||
});
|
||||
|
||||
@@ -114,7 +114,7 @@ describe('PLAYBOOK_NOTIFIER_COMPONENT', () => {
|
||||
event: mockEventUpdate
|
||||
} as unknown as ExecutorParameters<NotifierConfiguration>);
|
||||
|
||||
expect(redis.storeNotificationEvent).toHaveBeenCalledWith(mockContext, expectedNotificationEvent);
|
||||
expect(streamHandler.storeNotificationEvent).toHaveBeenCalledWith(mockContext, expectedNotificationEvent);
|
||||
expect(result).toEqual({ output_port: undefined, bundle: mockBundle });
|
||||
});
|
||||
|
||||
@@ -147,7 +147,7 @@ describe('PLAYBOOK_NOTIFIER_COMPONENT', () => {
|
||||
event: mockEventCreate
|
||||
} as unknown as ExecutorParameters<NotifierConfiguration>);
|
||||
|
||||
expect(redis.storeNotificationEvent).toHaveBeenCalledWith(mockContext, expectedNotificationEvent);
|
||||
expect(streamHandler.storeNotificationEvent).toHaveBeenCalledWith(mockContext, expectedNotificationEvent);
|
||||
});
|
||||
|
||||
it('should call storeNotificationEvent with generetad delete message if event type is deleted and not in pir', async () => {
|
||||
@@ -179,7 +179,7 @@ describe('PLAYBOOK_NOTIFIER_COMPONENT', () => {
|
||||
event: mockEventDelete
|
||||
} as unknown as ExecutorParameters<NotifierConfiguration>);
|
||||
|
||||
expect(redis.storeNotificationEvent).toHaveBeenCalledWith(mockContext, expectedNotificationEvent);
|
||||
expect(streamHandler.storeNotificationEvent).toHaveBeenCalledWith(mockContext, expectedNotificationEvent);
|
||||
});
|
||||
|
||||
it('should call storeNotificationEvent with generetad message if event in undefined', async () => {
|
||||
@@ -207,7 +207,7 @@ describe('PLAYBOOK_NOTIFIER_COMPONENT', () => {
|
||||
event: undefined
|
||||
} as unknown as ExecutorParameters<NotifierConfiguration>);
|
||||
|
||||
expect(redis.storeNotificationEvent).toHaveBeenCalledWith(mockContext, expectedNotificationEvent);
|
||||
expect(streamHandler.storeNotificationEvent).toHaveBeenCalledWith(mockContext, expectedNotificationEvent);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -20,6 +20,7 @@ import { deleteAllBucketContent } from '../../src/database/file-storage';
|
||||
import { initExclusionListCache } from '../../src/database/exclusionListCache';
|
||||
import { initLockFork } from '../../src/lock/master-lock';
|
||||
import { ADMIN_USER, createTestUsers, isPlatformAlive, testContext } from '../utils/testQuery';
|
||||
import { initializeStreamStack } from '../../src/database/stream/stream-handler';
|
||||
|
||||
/**
|
||||
* This is run once before all tests (for setup) and after all (for teardown).
|
||||
@@ -40,6 +41,7 @@ const initializePlatform = async () => {
|
||||
const stopTime = new Date().getTime();
|
||||
|
||||
await initializeInternalQueues();
|
||||
await initializeStreamStack();
|
||||
await initializeBucket();
|
||||
await initializeSchema();
|
||||
await initializeData(context, true);
|
||||
|
||||
@@ -6,7 +6,7 @@ import { isNotEmptyField, READ_INDEX_HISTORY, READ_INDEX_INFERRED_ENTITIES, READ
|
||||
import { ENTITY_TYPE_BACKGROUND_TASK } from '../../src/schema/internalObject';
|
||||
import { internalFindByIds, internalLoadById, topEntitiesList } from '../../src/database/middleware-loader';
|
||||
import { queryAsAdmin, testContext } from './testQuery';
|
||||
import { fetchStreamInfo } from '../../src/database/redis';
|
||||
import { fetchStreamInfo } from '../../src/database/stream/stream-handler';
|
||||
import { logApp } from '../../src/config/conf';
|
||||
import { TASK_TYPE_RULE } from '../../src/domain/backgroundTask-common';
|
||||
|
||||
|
||||
Reference in New Issue
Block a user