diff --git a/.gitignore b/.gitignore index 89c7a72..280fb24 100644 --- a/.gitignore +++ b/.gitignore @@ -160,4 +160,9 @@ cython_debug/ #.idea/ bittensor-subnet-template/ wandb/ -.vscode/ \ No newline at end of file +.vscode/ + +data +wallets +lightning_logs +.scale_batch_size* \ No newline at end of file diff --git a/DOCKER.md b/DOCKER.md new file mode 100644 index 0000000..f98736e --- /dev/null +++ b/DOCKER.md @@ -0,0 +1,75 @@ +# how to use hivetrain for docker + +## install dependencies + +1. [Docker](https://docs.docker.com/engine/install/) +2. [Nvidia Container Toolkit](https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/install-guide.html) + +## clone the repo +``` +git clone https://github.com/LuciferianInk/DistributedTraining.git +``` + +## move into the repo +``` +cd DistributedTraining +``` + +## checkout the dev branch +``` +git checkout docker-setup +``` + +## build the docker image +``` +docker compose build +``` + +## make a .env file +Make a file called `.env`, and place it in the root of this project. + +## make a choice +At this point, you must make one of two choices: + +### 1. bootstrap +If you intend to bootstrap a new training run. +``` +docker compose up +``` + +### 2. join +If you intend to join an existing training run, then add this environment variable to your `.env` file: +``` +INITIAL_PEERS="/p2p/12D3KooWE1fyvZHhuc2UQqAN35oXgexHKRpVqgXKo9EUQ4hguny9" +``` +After that, you may join the training run with: +``` +docker compose up +``` + +## final notes + +Your machine will print your own peer ID to the console at startup. It should look like this: +``` +PEER-ID: /p2p/12D3KooWF9KB7PVUdbct4ryCMzDjbNT1q2w5XMw9iVG6tisY4ThB +``` +If Hivemind is under-utilizing your GPU (i.e. it's not using all of your available VRAM), you may try to increase the batch size being used. To do this, add this environment variable to your `.env` file: +``` +BATCH_SIZE=2 (or 3, or whatever) +``` +You will know that training is progressing when you see output like this: +``` +hivetrain-1 | LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0] +hivetrain-1 | +hivetrain-1 | | Name | Type | Params +hivetrain-1 | ------------------------------------------ +hivetrain-1 | 0 | model | GPT2LMHeadModel | 186 M +hivetrain-1 | ------------------------------------------ +hivetrain-1 | 186 M Trainable params +hivetrain-1 | 0 Non-trainable params +hivetrain-1 | 186 M Total params +hivetrain-1 | 747.418 Total estimated model params size (MB) +hivetrain-1 | Global Step: 0, Local Loss: 12.069, Peers: 0 +hivetrain-1 | Global Step: 0, Local Loss: 12.063, Peers: 1 +hivetrain-1 | Global Step: 0, Local Loss: 11.852, Peers: 2 +``` \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..9aff565 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,30 @@ +FROM nvcr.io/nvidia/cuda:12.2.0-devel-ubuntu22.04 + +LABEL sponsor="Hivetrain" + +ENV DEBIAN_FRONTEND="noninteractive" + +RUN apt-get update \ + && apt-get install -y --no-install-recommends \ + git \ + python3-dev \ + python3-pip \ + python3-packaging \ + python3-venv \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +COPY requirements.txt requirements.txt + +RUN pip install -r requirements.txt && \ + pip cache purge + +COPY requirements.docker.txt requirements.docker.txt + +RUN pip install -r requirements.docker.txt && \ + pip cache purge + +COPY ./ /app + +ENTRYPOINT "bash ./entrypoint.sh" \ No newline at end of file diff --git a/README.md b/README.md index 1f4bcfd..3f381eb 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,7 @@ Done : Train TINYGPT ## How Miners are Rewarded -Hivetrain uses a simmple score assignment system designed to reward users for their participation and adherence to network guidelines. The system evaluates two critical aspects of user behavior: responsiveness and loss values. By applying a set of predefined rules, we aim to foster a healthy and productive network environment where all participants are incentivized to contribute positively. Whilst maintaining network integrity with few gameable variables. +Hivetrain uses a simple score assignment system designed to reward users for their participation and adherence to network guidelines. The system evaluates two critical aspects of user behavior: responsiveness and loss values. By applying a set of predefined rules, we aim to foster a healthy and productive network environment where all participants are incentivized to contribute positively. Whilst maintaining network integrity with few gameable variables. ### 1.0 Users who actively respond to network activities and maintain their losses within an acceptable threshold are awarded a score of 1.0. This top score reflects exemplary user behavior and strict adherence to network standards, highlighting the user as a model participant. diff --git a/compose.yml b/compose.yml new file mode 100644 index 0000000..381d598 --- /dev/null +++ b/compose.yml @@ -0,0 +1,32 @@ +version: '3.9' + +services: + hivetrain: + image: ghcr.io/bit-current/distributedtraining:latest + entrypoint: bash ./entrypoint.sh + restart: 'always' + ipc: host + network_mode: host + tty: true + stdin_open: true + build: + shm_size: '4gb' + dockerfile: Dockerfile + volumes: + - ./neurons:/app/neurons + - ./data:/data + - ./wallets:/root/.bittensor/wallets + deploy: + resources: + reservations: + devices: + - capabilities: ["gpu"] + count: all + environment: + NETUID: ${NETUID:-25} + WALLETNAME: ${WALLETNAME:-default} + WALLETHOTKEY: ${WALLETHOTKEY:-defaulthotkey} + DHTPORT: ${DHTPORT:-42316} + AXONPORT: ${AXONPORT:-42310} + env_file: + - .env diff --git a/entrypoint.sh b/entrypoint.sh new file mode 100644 index 0000000..10b16e2 --- /dev/null +++ b/entrypoint.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +cd /app/neurons + +python3 hiveminer.py \ + --initial_peers ${INITIAL_PEERS} \ + --batch_size ${BATCH_SIZE} \ + --save_every ${SAVE_EVERY} \ No newline at end of file diff --git a/example.env b/example.env new file mode 100644 index 0000000..851c4fd --- /dev/null +++ b/example.env @@ -0,0 +1,10 @@ +NETUID=25 +WALLETNAME='test' +WALLETHOTKEY='test' +WANDB_API_KEY='' +DHTPORT=42316 +EXTERNALIP=104.202.156.242 + +CUDA_VISIBLE_DEVICES=0 +INITIAL_PEERS="/p2p/12D3KooWCvMCCJDHQ7d9pfqqkxAPD6AZdAbcXPd1d9pWvQWDpqBi" +SAVE_EVERY=0 \ No newline at end of file diff --git a/neurons/hiveminer.py b/neurons/hiveminer.py new file mode 100644 index 0000000..6e45447 --- /dev/null +++ b/neurons/hiveminer.py @@ -0,0 +1,548 @@ +import argparse +import ipaddress +import logging +import os +import random +import re +import sys +from functools import partial +from math import isnan + +import numpy as np +import torch +from datasets import load_dataset +from hivetrain.btt_connector import ( + BittensorNetwork, + get_validator_uids_and_addresses, + serve_axon, +) +from hivetrain.config import Configurator +from lightning.fabric.utilities.seed import reset_seed, seed_everything +from lightning.pytorch import LightningModule +from lightning.pytorch.callbacks import Callback +from lightning.pytorch.core.datamodule import LightningDataModule +from lightning.pytorch.trainer import Trainer +from lightning_hivemind.strategy import HivemindStrategy +from torch.optim import AdamW +from torch.utils.data import DataLoader, Dataset, IterableDataset +from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer + +logging.getLogger("lightning.pytorch").setLevel(logging.INFO) + +# capture arguments passed to this python script +parser = argparse.ArgumentParser( + description="Get configs from arguments to this script." +) + +parser.add_argument( + "--initial_peers", + action="append", + help="Add a peer. Can be used multiple times to pass multiple peers.", + nargs="*", + default=[], +) + +parser.add_argument( + "--batch_size", + type=int, + help="The largest batch size able to fit on your GPU.", + default=1, + const=1, + nargs="?", +) + +parser.add_argument( + "--save_every", + type=int, + help="Save the model every X global steps.", + default=0, + const=0, + nargs="?", +) + +args = parser.parse_args() + + +def flatten_list(nested_list): + """Flatten a nested list.""" + if nested_list and isinstance(nested_list[0], list): + # Assumes only one level of nesting + return [item for sublist in nested_list for item in sublist] + return nested_list + + +# set some basic configuration values +initial_peers = flatten_list(args.initial_peers) +batch_size = args.batch_size +save_every = args.save_every +block_size = 512 +num_steps = 100_000 +target_batch_size = 8192 + +dataset_config = { + "dataset": "tiiuae/falcon-refinedweb", + "key": "content", + "split": "train", + "block_size": block_size, +} + +# initialized and load the model +config = AutoConfig.from_pretrained( + "gpt2", + n_embd=block_size, + n_ctx=block_size, + n_layer=2, + n_head=2, + n_positions=block_size, + n_inner=block_size * 4, + resid_pdrop=0.1, + embd_pdrop=0.1, + attn_pdrop=0.1, + summary_first_dropout=0.1, + layer_norm_epsilon=1e-5, + initializer_range=0.05, + summary_type="cls_index", + summary_proj_to_labels=True, + summary_use_proj=True, + torch_dtype=torch.bfloat16, +) + +print(config) + +model = AutoModelForCausalLM.from_config(config) +tokenizer = AutoTokenizer.from_pretrained( + "openai-community/gpt2", + cache_dir="/tmp/tokenizer", + padding="max_length", + padding_side="left", + use_fast=True, + return_overflowing_tokens=True, + truncation=True, +) +tokenizer.pad_token = tokenizer.eos_token + + +# create a datamodule to wrap our remote datasets +class StreamingDataModule(LightningDataModule): + def __init__(self, tokenizer, config): + super().__init__() + self.tokenizer = tokenizer + self.config = config + self.train_data = StreamingDataset(self.tokenizer, config) + + def train_dataloader(self): + return DataLoader( + self.train_data, + batch_size=batch_size, + pin_memory=True, + num_workers=2, + ) + + +# create an iterable dataset, which loops over the streaming data +class StreamingDataset(IterableDataset): + def __init__(self, tokenizer, config): + self.tokenizer = tokenizer + self.config = config + self.dataset = load_dataset( + self.config.get("dataset", "tiiuae/falcon-refinedweb"), + split=self.config.get("split", "train"), + streaming=True, + cache_dir="/tmp/pile", + ) + + def __iter__(self): + shuffled = self.dataset.shuffle( + seed=random.randint(0, 2**31), + buffer_size=10000, + ) + + block_size = self.config.get("block_size", 512) + + batch = [] + for document in shuffled: + tokenized = self.tokenizer( + text=document.get(self.config.get("key", "default")), + max_length=block_size, + stride=0, + padding=True, + truncation=True, + return_overflowing_tokens=True, + return_tensors="np", + )["input_ids"] + choice = random.choice(tokenized) + if len(choice) == 0: + continue + elif len(batch) == 0: + batch = choice + else: + np.append(batch, self.tokenizer.eos_token_id) + batch = np.concatenate([batch, choice]) + if len(batch) >= block_size: + yield batch[:block_size] + batch = [] + else: + continue + + +# prepare a dataset for use with training +dataset = StreamingDataModule(tokenizer, dataset_config) + + +# wrap the LightningModule in a custom class +class MinerTrainer(LightningModule): + """ + A training module for AIGen. + """ + + def __init__(self, model, optimizer, hparams): + super(MinerTrainer, self).__init__() + + self.model, self.optimizer = (model, optimizer) + self.automatic_optimization = True + self.save_hyperparameters(hparams) + + def forward(self, inputs): + return self.model(**inputs) + + def training_step(self, batch, batch_idx): + outputs = self({"input_ids": batch, "labels": batch}) + loss = outputs[0] + self.log( + "train_loss", float(loss), on_step=True, on_epoch=False, sync_dist=True + ) + return loss + + def on_train_batch_end(self, trainer, outputs, idx): + self.log( + "step", + int(self.trainer.strategy.optimizers[0].local_epoch), + on_step=True, + on_epoch=False, + sync_dist=True, + ) + + def configure_optimizers(self): + "Create optimizer and scheduler" + return [self.optimizer] + + +# define the model hyperparameters +hparams = dict( + learning_rate=0.001, + weight_decay=0.1, + eps=1e-8, + warmup_steps=10, + batch_size=batch_size, + num_steps=num_steps, + block_size=block_size, +) + +# define the hivemind strategy +strategy = HivemindStrategy( + run_id=f"hiveminer", + batch_size=batch_size, + target_batch_size=target_batch_size, + initial_peers=initial_peers, + use_ipfs=True, + use_relay=True, + use_auto_relay=True, + verbose=False, + wait_timeout=30, + bootstrap_timeout=20, + matchmaking_time=60.0, + averaging_timeout=300.0, + delay_state_averaging=True, + delay_grad_averaging=True, + delay_optimizer_step=True, + offload_optimizer=True, + reuse_grad_buffers=False, + # grad_compression=Float16Compression(), + # state_averaging_compression=Float16Compression(), + # load_state_compression=NoCompression(), + scheduler_fn=partial(torch.optim.lr_scheduler.ExponentialLR, gamma=0.9999), +) + +# print my peer id to console +visible_addresses = [ + str(a) + for a in strategy.dht.get_visible_maddrs() + if not ipaddress.ip_address(a.values()[0]).is_loopback +] + +my_ids = [] +pattern = r"(/p2p/.*)" +for peer in list(visible_addresses): + match = re.search(pattern, peer) + if match: + my_ids.append(match.group(1)) + +for peer in list(set(my_ids)): + print(f"PEER-ID: {peer}") + +# define training params +train_params = dict( + accelerator="auto", + strategy=strategy, + devices="auto", + max_steps=num_steps * target_batch_size, + max_epochs=-1, + reload_dataloaders_every_n_epochs=1, + precision="32-true", + accumulate_grad_batches=1, # must be 1 for Hivemind training + gradient_clip_val=1.0, + gradient_clip_algorithm="norm", + benchmark=True, + enable_progress_bar=False, + callbacks=[], +) + + +# set weights as trainable +def set_trainable_parameters(model, hparams): + no_decay = ["bias", "LayerNorm.weight"] + grouped_parameters = [] + + for n, p in model.named_parameters(): + if not p.requires_grad: + continue + + if any(nd in n for nd in no_decay): + weight_decay = 0.0 + else: + weight_decay = hparams["weight_decay"] + + grouped_parameters.append( + { + "params": [p], + "weight_decay": weight_decay, + } + ) + + return grouped_parameters + + +# set model parameters as trainable +params = set_trainable_parameters(model, hparams) + +# create the optimizer +optimizer = AdamW( + params, + lr=hparams.get("learning_rate", 0.001), + eps=hparams.get("eps", 1e-8), +) + + +# for logging progress +class MinerConsoleLogging(Callback): + """A variant progress bar that works off of steps and prints periodically.""" + + def __init__(self, num_steps): + super().__init__() + self.num_steps = num_steps + self.num_peers = 0 + self.previous_step = None + self.prev_avg_loss = None + + def on_train_batch_end(self, trainer, lm, outputs, batch, batch_idx): + super().on_train_batch_end(trainer, lm, outputs, batch, batch_idx) + step = int(trainer.callback_metrics.get("step", -1)) + if step == -1: + return + + current_loss = float(trainer.callback_metrics["train_loss"]) + + avg_loss = 0 + if not isnan(current_loss): + avg_loss = self.average_loss(current_loss, self.prev_avg_loss) + self.prev_avg_loss = avg_loss + + output = f"Global Step: {str(step)}, Local Loss: {avg_loss:.3f}" + + if hasattr(trainer.strategy, "num_peers"): + output += f", Peers: {trainer.strategy.num_peers}" + + if step != self.previous_step or self.num_peers != trainer.strategy.num_peers: + print(output) + self.previous_step = step + self.num_peers = trainer.strategy.num_peers + + def average_loss(self, current_loss, prev_avg_loss, smoothing=0.01): + if prev_avg_loss is None: + return current_loss + else: + return (smoothing * current_loss) + (1 - smoothing) * prev_avg_loss + + +class MinerModelSaver(Callback): + """Periodically save the model during training.""" + + def __init__( + self, + save_every, + output_dir, + ): + super().__init__() + self.step = 0 + self.last_step = 0 + self.save_every = save_every + self.output_dir = output_dir + + @property + def save_every_check(self): + return ( + self.step > 0 + and self.save_every > 0 + and self.last_step != self.step + and self.step % self.save_every == 0 + ) + + def on_train_batch_end(self, trainer, lm, outputs, batch, batch_idx): + super().on_train_batch_end(trainer, lm, outputs, batch, batch_idx) + + self.step = int(trainer.callback_metrics.get("step", 0)) + + if self.save_every_check: + self.save_pytorch_model(trainer, lm) + + self.last_step = self.step + + def save_pytorch_model(self, trainer, lm): + lm.model.save_pretrained(self.output_dir, safe_serialization=True) + + +class ValidationCommunicator(Callback): + """Periodically save the model during training.""" + + def __init__(self, sync_interval=600): + super().__init__() + + BittensorNetwork.initialize(Configurator.combine_configs()) + + # Now you can access wallet, subtensor, and metagraph like this: + self.wallet = BittensorNetwork.wallet + self.subtensor = BittensorNetwork.subtensor + self.metagraph = BittensorNetwork.metagraph + self.step = 0 + self.sync_interval = sync_interval + self.last_sync_time = 0 + self.validator_urls = [] + + def get_validator_uids_and_addresses( + self, metagraph: "bt.metagraph.Metagraph", vpermit_tao_limit: int = 2 + ): + """ + Check availability of all UIDs in a given subnet, returning their IP, port numbers, and hotkeys + if they are serving and have at least vpermit_tao_limit stake, along with a list of strings + formatted as 'ip:port' for each validator. + + Args: + metagraph (bt.metagraph.Metagraph): Metagraph object. + vpermit_tao_limit (int): Validator permit tao limit. + + Returns: + Tuple[List[dict], List[str]]: A tuple where the first element is a list of dicts with details + of available UIDs, including their IP, port, and hotkeys, and the + second element is a list of strings formatted as 'ip:port'. + """ + available_uid_details = [] + validator_addresses = [] # List to hold 'ip:port' strings + for uid in range(len(self.metagraph.S)): + if self.metagraph.S[uid] >= vpermit_tao_limit: + ip = self.metagraph.axons[uid].ip + port = self.metagraph.axons[uid].port + details = { + "uid": uid, + "ip": ip, + "port": port, + "hotkey": self.metagraph.hotkeys[uid], + } + available_uid_details.append(details) + validator_addresses.append( + f"{ip}:{port}" + ) # Format and add 'ip:port' to the list + + return available_uid_details, validator_addresses + + def on_train_batch_end(self, trainer, lm, outputs, batch, batch_idx): + super().on_train_batch_end(trainer, lm, outputs, batch, batch_idx) + + self.step = int(trainer.callback_metrics.get("step", 0)) + + if self.step % 100: + if self.should_sync_metagraph(): + self.resync_metagraph() + _, self.validator_urls = self.get_validator_uids_and_addresses() + message, signature, public_address = self.create_signed_message(timestamp) + + for url in self.validator_urls: + try: + requests.post( + f"http://{url}/validate_metrics", + json={"rank": rank, "checksum": checksum, "metrics": metrics}, + ) + except: + pass # FIXME log sth + requests.post() + + def create_signed_message(self, message): + """Sign a message and return the signature.""" + signature = self.wallet.hotkey.sign( + message + ).hex() # Convert bytes to hex string for easy transmission + public_address = wallet.hotkey.ss58_address + return message, signature, public_address + + def send_metrics(metrics, rank, validator_urls): + timestamp = time.time() + message, signature, public_address = create_signed_message(timestamp) + data = { + "message": message, + "signature": signature, + "public_address": public_address, + "metrics": metrics, + "rank": rank, + } + # Ensure metrics is a dictionary + if not isinstance(metrics, dict): + raise ValueError("Metrics must be provided as a dictionary.") + # Ensure validator_urls is a list + if not isinstance(validator_urls, list): + raise ValueError("validator_urls must be provided as a list.") + + def resync_metagraph(self): + self.metagraph.sync(subtensor=self.subtensor) + + def should_sync_metagraph(self): + """ + Check if enough epoch blocks have elapsed since the last checkpoint to sync. + """ + return (time.time() - self.last_sync_time) > self.sync_interval + # return ( + # self.block - self.metagraph.last_update[self.uid] + # ) > self.config.neuron.epoch_length + + def check_registered(self): + # --- Check for registration. + if not self.subtensor.is_hotkey_registered( + netuid=self.config.netuid, + hotkey_ss58=self.wallet.hotkey.ss58_address, + ): + bt.logging.error( + f"Wallet: {self.wallet} is not registered on netuid {self.config.netuid}." + f" Please register the hotkey using `btcli subnets register` before trying again" + ) + exit() + + +train_params["callbacks"].append(MinerConsoleLogging(hparams.get("num_steps"))) +train_params["callbacks"].append(MinerModelSaver(save_every, "/data")) +# train_params["callbacks"].append(ValidationCommunicator()) + +# Wrap the model in a pytorch-lightning module +train_model = MinerTrainer(model, optimizer, hparams) + +# fit the trainer and run +model.train() +trainer = Trainer(**train_params) +trainer.fit(train_model, dataset) diff --git a/requirements.docker.txt b/requirements.docker.txt new file mode 100644 index 0000000..9141cc6 --- /dev/null +++ b/requirements.docker.txt @@ -0,0 +1,3 @@ +lightning>=2.1.0 +git+https://github.com/LuciferianInk/lightning-Hivemind.git#egg=lightning-Hivemind +git+https://github.com/bit-current/NewArchScrapBook.git#egg=hivetrain \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 078e268..12377f3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,3 @@ datasets transformers hivemind wandb -bitarray \ No newline at end of file