mirror of
https://github.com/lbatalha/rnmon.git
synced 2025-12-22 10:27:12 +00:00
add signal handling
This commit is contained in:
@@ -1,9 +1,11 @@
|
||||
from multiprocessing.pool import TERMINATE
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import random
|
||||
import re
|
||||
import argparse
|
||||
import signal
|
||||
import importlib
|
||||
import concurrent.futures
|
||||
import multiprocessing as mp
|
||||
@@ -26,6 +28,7 @@ lproto_label_ttable = str.maketrans({
|
||||
})
|
||||
|
||||
metric_queue = mp.Queue(10000)
|
||||
terminate = mp.Event()
|
||||
|
||||
class RNSRemote:
|
||||
|
||||
@@ -58,7 +61,6 @@ class RNSRemote:
|
||||
}
|
||||
|
||||
def __init__(self, interval: int, dest_ident_hexhash: str, identity: os.PathLike, configpath: str, verbosity: int, **kwargs):
|
||||
self.alive: bool = True
|
||||
self.link: RNS.Link
|
||||
self.remote_dest: RNS.Destination
|
||||
|
||||
@@ -76,7 +78,7 @@ class RNSRemote:
|
||||
'.'.join(RNSRemote.ASPECTS), bytes.fromhex(self.dest_ident_hexhash))
|
||||
|
||||
# Initialize Reticulum Instance
|
||||
self.rns = RNS.Reticulum(configdir=configpath, verbosity=verbosity)
|
||||
RNS.Reticulum(configdir=configpath, verbosity=verbosity)
|
||||
|
||||
self._ensure_path()
|
||||
|
||||
@@ -100,7 +102,9 @@ class RNSRemote:
|
||||
|
||||
def run(self):
|
||||
print(f"Starting Main")
|
||||
while self.alive:
|
||||
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
||||
while not terminate.is_set():
|
||||
# try:
|
||||
# print(metric_queue.get_nowait())
|
||||
# except Empty:
|
||||
@@ -125,7 +129,7 @@ class RNSRemote:
|
||||
RNS.log(f"Error while sending request over the link: {str(e)}")
|
||||
|
||||
time.sleep(self.interval)
|
||||
|
||||
RNS.log(f"Terminating RNSRemote", RNS.LOG_INFO)
|
||||
|
||||
def _ensure_path(self):
|
||||
if not RNS.Transport.has_path(self.dest_hash):
|
||||
@@ -163,11 +167,13 @@ class RNSRemote:
|
||||
reason = link.teardown_reason
|
||||
if reason == RNS.Link.TIMEOUT:
|
||||
RNS.log("The link timed out, reconnecting", RNS.LOG_WARNING)
|
||||
self._establish_link()
|
||||
elif reason == RNS.Link.DESTINATION_CLOSED:
|
||||
RNS.log("The link was closed by the server, reconnecting", RNS.LOG_WARNING)
|
||||
self._establish_link()
|
||||
else:
|
||||
RNS.log("Link closed unexpectedly, terminating", RNS.LOG_ERROR)
|
||||
self._establish_link()
|
||||
|
||||
|
||||
def _on_response(self, response: RNS.RequestReceipt) -> None:
|
||||
#pp(response.response) #DEBUG
|
||||
@@ -182,7 +188,6 @@ class RNSRemote:
|
||||
node_labels = {}
|
||||
node_metrics = {}
|
||||
t = time.time()
|
||||
#pp(data[0])
|
||||
|
||||
# link_count isnt labeled >.>
|
||||
node_metrics[RNSRemote.NODE_METRICS['link_count']] = data[1]
|
||||
@@ -226,7 +231,7 @@ class RNSRemote:
|
||||
metric_queue.put_nowait(metric)
|
||||
|
||||
|
||||
def validate_hexhash(hexhash: str) -> None:
|
||||
def validate_hexhash(hexhash: str):
|
||||
dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2
|
||||
if len(hexhash) != dest_len:
|
||||
raise TypeError(f"Destination length is invalid, must be {dest_len} hexadecimal characters ({dest_len//2} bytes)")
|
||||
@@ -243,7 +248,7 @@ class InfluxWriter:
|
||||
last_push = time.time()
|
||||
print("Started InfluxWriter")
|
||||
with open("/Users/lbatalha/src/rnmon/metrics.log", "w", buffering=1) as f:
|
||||
while True:
|
||||
while not terminate.is_set():
|
||||
try:
|
||||
push_queue.append(metric_queue.get_nowait())
|
||||
metric_count += 1
|
||||
@@ -256,14 +261,11 @@ class InfluxWriter:
|
||||
f.write("\n".join(push_queue))
|
||||
|
||||
time.sleep(0.005)
|
||||
print(f"Terminating InfluxWriter")
|
||||
|
||||
|
||||
|
||||
def main():
|
||||
JOB_TYPES = {
|
||||
"transport_node": RNSRemote,
|
||||
"influx": InfluxWriter
|
||||
}
|
||||
|
||||
parser = argparse.ArgumentParser(description="Simple request/response example")
|
||||
parser.add_argument('-v', '--verbose', action='count', default=0)
|
||||
parser.add_argument("--config", type=str, default=None, \
|
||||
@@ -274,7 +276,18 @@ def main():
|
||||
|
||||
target_config = safe_load(args.targets)
|
||||
|
||||
JOB_TYPES = {
|
||||
"transport_node": RNSRemote,
|
||||
"influx": InfluxWriter
|
||||
}
|
||||
|
||||
jobs = []
|
||||
# Setup InfluxWriter push job
|
||||
jobs.append({
|
||||
"type": "influx",
|
||||
"batch_size": target_config['batch_size'],
|
||||
"flush_interval": target_config['flush_interval'],
|
||||
})
|
||||
# Setup Scraping Jobs
|
||||
for target in target_config['targets']:
|
||||
jobs.append({
|
||||
@@ -283,37 +296,37 @@ def main():
|
||||
"dest_ident_hexhash": target['dest_identity'],
|
||||
"identity": target['rpc_identity'],
|
||||
"configpath": args.config,
|
||||
"verbosity": args.verbose
|
||||
"verbosity": args.verbose,
|
||||
})
|
||||
# Setup InfluxWriter push job
|
||||
jobs.append({
|
||||
"type": "influx",
|
||||
"batch_size": target_config['batch_size'],
|
||||
"flush_interval": target_config['flush_interval'],
|
||||
})
|
||||
|
||||
def sig_handler(signum, frame):
|
||||
terminate.set()
|
||||
signal.signal(signal.SIGINT, sig_handler)
|
||||
signal.signal(signal.SIGTERM, sig_handler)
|
||||
|
||||
with concurrent.futures.ProcessPoolExecutor(mp_context=mp.get_context("fork")) as executor:
|
||||
try:
|
||||
# I WILL commit crimes >:3
|
||||
futures = {executor.submit(JOB_TYPES[job['type']], **job): job for job in jobs}
|
||||
while len(futures) > 0:
|
||||
new_jobs = {}
|
||||
done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
|
||||
for future in done:
|
||||
if future.exception():
|
||||
print(f"Job exited with exception: \"{future.exception()}\"")
|
||||
job = futures[future]
|
||||
print(f"Job Exception Restart: {JOB_TYPES[job['type']]}")
|
||||
#new_jobs[executor.submit(JOB_TYPES[job['type']], **job)] = job
|
||||
# else:
|
||||
#for future in not_done:
|
||||
# job = futures[future]
|
||||
# print(f"Job Not Done Restart: {JOB_TYPES[job['type']]}")
|
||||
# new_jobs[executor.submit(JOB_TYPES[job['type']], **job)] = job
|
||||
futures = new_jobs
|
||||
if terminate.is_set():
|
||||
print("Terminating processes")
|
||||
for pid, proc in executor._processes.items():
|
||||
proc.terminate()
|
||||
break
|
||||
else:
|
||||
for future in done:
|
||||
if future.exception():
|
||||
print(f"Job exited with exception: \"{future.exception()}\"")
|
||||
job = futures[future]
|
||||
print(f"Job Exception Restart: {JOB_TYPES[job['type']]}")
|
||||
new_jobs[executor.submit(JOB_TYPES[job['type']], **job)] = job
|
||||
futures = new_jobs
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
RNS.exit()
|
||||
sys.exit(0)
|
||||
|
||||
print("Exiting")
|
||||
sys.exit(0)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user