import subprocess
import time
import os
import signal
import shutil
import json
import re
from multiprocessing import Process, Queue, current_process
from queue import Empty
base_working_directory = '/home/user/dev/backtesting-runs'
working_directories = [os.path.join(base_working_directory, str(i)) for i in range(1, 4)]
command = ['./gunthy-linux']
trading_pairs = [
"USDT-BTC", "USDT-ETH", "USDT-BNB", "USDT-SOL", "USDT-XRP", "USDT-DOGE", "USDT-ADA", "USDT-TRX",
"USDT-AVAX", "USDT-SHIB", "USDT-DOT", "USDT-LINK", "USDT-BCH", "USDT-NEAR", "USDT-MATIC", "USDT-LTC",
"USDT-UNI", "USDT-PEPE", "BTC-ETH", "BTC-BNB", "BTC-SOL", "BTC-XRP", "BTC-DOGE", "BTC-ADA", "BTC-TRX",
"BTC-AVAX", "BTC-DOT", "BTC-LINK", "BTC-BCH", "BTC-NEAR", "BTC-MATIC", "BTC-LTC", "BTC-UNI", "BNB-SOL",
"BNB-XRP", "BNB-ADA", "BNB-TRX", "BNB-AVAX", "BNB-DOT", "BNB-LINK", "BNB-BCH", "BNB-NEAR", "BNB-MATIC",
"BNB-LTC", "ETH-BNB", "ETH-XRP", "ETH-SOL", "ETH-ADA", "ETH-TRX", "ETH-AVAX"
]
months = [
(1704067200000, 1706659199000),
(1706659200000, 1709251199000),
(1709251200000, 1711843199000),
(1711843200000, 1714435199000),
(1714435200000, 1717027199000),
(1717027200000, 1719619199000),
]
def transform_pair(pair):
if pair.endswith('BTC'):
return f"BTC-{pair[:-3]}"
elif pair.endswith('ETH'):
return f"ETH-{pair[:-3]}"
elif pair.endswith('USDT'):
return f"USDT-{pair[:-4]}"
elif pair.endswith('BNB'):
return f"BNB-{pair[:-3]}"
return pair
def clear_ramdisk():
ramdisk_base = '/dev/shm/'
for item in os.listdir(ramdisk_base):
item_path = os.path.join(ramdisk_base, item)
if os.path.isdir(item_path) and item.startswith('backtesting_'):
shutil.rmtree(item_path)
print(f"Cleared RAM disk directory: {item_path}")
def delete_folders(working_directory):
json_folder = os.path.join(working_directory, 'json')
backtesting_folder = os.path.join(working_directory, 'backtesting')
if os.path.exists(json_folder):
shutil.rmtree(json_folder)
print(f"Deleted folder: {json_folder}")
if os.path.exists(backtesting_folder):
shutil.rmtree(backtesting_folder)
print(f"Deleted folder: {backtesting_folder}")
def read_config(working_directory):
config_path = os.path.join(working_directory, 'config.js')
with open(config_path, 'r') as file:
config_data = file.read()
config_data = re.sub(r'^module\.exports\s*=\s*', '', config_data)
config_data = re.sub(r';\s*$', '', config_data)
return json.loads(config_data)
def write_config(config, working_directory):
config_path = os.path.join(working_directory, 'config.js')
config_data = json.dumps(config, indent=4)
with open(config_path, 'w') as file:
file.write(config_data)
def ensure_pair_config(config, pair):
exchange = 'binance'
if exchange not in config['pairs']:
config['pairs'][exchange] = {}
config['pairs'][exchange] = {}
if pair not in config['pairs'][exchange]:
config['pairs'][exchange][pair] = {
"strategy": "channelmaestro",
"enabled": True,
"override": {
"INITIAL_FUNDS": "500",
"BUY_METHOD": "channelmaestro",
"SELL_METHOD": "channelmaestro",
"COMPOUND_RATIO": "1",
"COMPOUND_PROFITS_SINCE": "0",
"USE_STOP_AFTER_PROFIT": False,
"PROFIT_TARGET_PCT": "7.5",
"USE_AUTO_GAIN": True,
"GAIN_PARTIAL": "0.5",
"GAIN": "2",
"BUY_ENABLED": True,
"SELL_ENABLED": True,
"STOP_AFTER_SELL": False,
"MIN_VOLUME_TO_SELL": 10,
"MAX_INVESTMENT": "999999999999999",
"PERIOD": "5",
"PERIOD_MEDIUM": "15",
"PERIOD_LONG": "30",
"IGNORE_TRADES_BEFORE": "0",
"BF_SINCE": 0,
"BF_UNTIL": 0,
"USE_EXPERIMENTS": False
}
}
def update_config(config, pair, start, end):
if not config['bot']['BACKFESTER']:
config['bot']['BACKFESTER'] = True
base, quote = pair.split('-')[0], pair.split('-')[1]
balance = float(config['bot']['simulatorBalances']['binance'][base])
trading_limit = balance / 30
min_volume_to_sell = trading_limit * 0.3
ensure_pair_config(config, pair)
config['pairs']['binance'][pair]['override']['BF_SINCE'] = start
config['pairs']['binance'][pair]['override']['BF_UNTIL'] = end
config['pairs']['binance'][pair]['override']['TRADING_LIMIT'] = trading_limit
config['pairs']['binance'][pair]['override']['MIN_VOLUME_TO_SELL'] = min_volume_to_sell
config['pairs']['binance'][pair]['override']['INITIAL_FUNDS'] = balance
def copy_to_ram(working_directory, ram_directory):
print(f"Copying {working_directory} to {ram_directory}")
if os.path.exists(ram_directory):
shutil.rmtree(ram_directory)
shutil.copytree(working_directory, ram_directory)
def copy_backtesting_reports_from_ram(working_directory, ram_directory):
print(f"Copying backtestingReports from {ram_directory} back to {working_directory}")
ram_backtesting_reports = os.path.join(ram_directory, 'backtestingReports')
main_backtesting_reports = os.path.join(working_directory, 'backtestingReports')
if os.path.exists(ram_backtesting_reports):
if os.path.exists(main_backtesting_reports):
shutil.rmtree(main_backtesting_reports)
shutil.copytree(ram_backtesting_reports, main_backtesting_reports)
def launch_and_kill_process(command, working_directory, ram_directory):
copy_to_ram(working_directory, ram_directory)
process = subprocess.Popen(command, cwd=ram_directory, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
print(f"Process {process.pid} started in {ram_directory}.")
last_output_time = time.time()
try:
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
break
if output:
last_output_time = time.time()
print(output.strip())
if 'Backtesting report created successfully' in output:
break
if 'Backtester completed the job: your data will be available soon on your GUI' in output:
time.sleep(3)
break
if time.time() - last_output_time > 8:
print(f"No log output for 8 seconds, restarting process {process.pid}")
os.kill(process.pid, signal.SIGTERM)
process.wait()
process = subprocess.Popen(command, cwd=ram_directory, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
last_output_time = time.time()
finally:
if process.poll() is None:
os.kill(process.pid, signal.SIGTERM)
print(f"Process {process.pid} in {ram_directory} has been terminated.")
copy_backtesting_reports_from_ram(working_directory, ram_directory)
delete_folders(ram_directory)
def run_backtest(pair, start, end, working_directory, ram_directory, queue):
config = read_config(working_directory)
update_config(config, pair, start, end)
write_config(config, working_directory)
launch_and_kill_process(command, working_directory, ram_directory)
time.sleep(1)
queue.put(working_directory)
def save_queue(task_queue, filename='task_queue.json'):
with open(filename, 'w') as file:
tasks = []
while not task_queue.empty():
tasks.append(task_queue.get())
json.dump(tasks, file)
for task in tasks:
task_queue.put(task)
def load_queue(filename='task_queue.json'):
task_queue = Queue()
if os.path.exists(filename):
with open(filename, 'r') as file:
tasks = json.load(file)
for task in tasks:
task_queue.put(tuple(task))
else:
for pair in trading_pairs:
for start, end in months:
task_queue.put((pair, start, end))
return task_queue
def worker(task_queue, working_directory_queue, completed_tasks_filename='completed_tasks.json'):
completed_tasks = []
if os.path.exists(completed_tasks_filename):
with open(completed_tasks_filename, 'r') as file:
completed_tasks = json.load(file)
while True:
try:
pair, start, end = task_queue.get(timeout=5)
except Empty:
break
if (pair, start, end) in completed_tasks:
continue
working_directory = working_directory_queue.get()
ram_directory = os.path.join('/dev/shm', f'backtesting_{current_process().pid}')
print(f"Worker {current_process().pid} processing {pair} from {start} to {end} in {working_directory}")
run_backtest(pair, start, end, working_directory, ram_directory, working_directory_queue)
completed_tasks.append((pair, start, end))
with open(completed_tasks_filename, 'w') as file:
json.dump(completed_tasks, file)
if __name__ == "__main__":
clear_ramdisk()
task_queue = load_queue()
working_directory_queue = Queue()
save_queue(task_queue)
for wd in working_directories:
working_directory_queue.put(wd)
processes = []
for i in range(3):
p = Process(target=worker, args=(task_queue, working_directory_queue))
p.start()
processes.append(p)
time.sleep(0.5)
for p in processes:
p.join()
save_queue(task_queue)