diff --git a/Makefile b/Makefile index 0867ca99..2be0856d 100644 --- a/Makefile +++ b/Makefile @@ -39,6 +39,10 @@ fund-validator-wallet: fund-miner-wallet: btcli wallet faucet --wallet.name miner --subtensor.$(SUBTENSOR_ENVIRONMENT) +## Send TAO +send: + btcli w transfer --subtensor.$(SUBTENSOR_ENVIRONMENT) + ## Subnet creation create-subnet: btcli subnet create --wallet.name owner --subtensor.$(SUBTENSOR_ENVIRONMENT) diff --git a/masa/miner/web/scraper.py b/masa/miner/web/scraper.py index fc52af83..df4e0762 100644 --- a/masa/miner/web/scraper.py +++ b/masa/miner/web/scraper.py @@ -28,9 +28,8 @@ def scrape_web(self, query: WebScraperQuery) -> WebScraperObject: def format_scraped_data(self, data: requests.Response) -> WebScraperObject: bt.logging.info(f"Formatting scraped data: {data}") - scraped_data = json.loads( - data.json()["data"] - ) # Convert stringified json to dict - formatted_scraped_data = WebScraperObject(**scraped_data) + json_data = data.json()["data"] + + formatted_scraped_data = WebScraperObject(**json_data) return formatted_scraped_data diff --git a/masa/types/web.py b/masa/types/web.py index b8ac1656..c021a403 100644 --- a/masa/types/web.py +++ b/masa/types/web.py @@ -10,4 +10,4 @@ class Section(TypedDict, total=False): class WebScraperObject(TypedDict): sections: Optional[List[Section]] - pages: List[str] + pages: Optional[List[str]] diff --git a/masa/utils/uids.py b/masa/utils/uids.py index e7220a4b..2113ec6b 100644 --- a/masa/utils/uids.py +++ b/masa/utils/uids.py @@ -20,6 +20,7 @@ def check_uid_availability( """ # Filter non serving axons. if not metagraph.axons[uid].is_serving: + bt.logging.info(f"UID: {uid} is not serving") return False # Filter out non validator permit. @@ -116,26 +117,28 @@ async def get_random_uids(self, k: int, exclude: List[int] = None) -> torch.Long """ dendrite = bt.dendrite(wallet=self.wallet) - print("get random uids") - try: # Generic sanitation avail_uids = get_available_uids( self.metagraph, self.config.neuron.vpermit_tao_limit ) - candidate_uids = remove_excluded_uids(avail_uids, exclude) + # healthy_uids = remove_excluded_uids(avail_uids, exclude) + + # healthy_uids, _ = await ping_uids(dendrite, self.metagraph, candidate_uids) + + # guard against deployed validators not finding any healthy ids via ping... + # if (len(healthy_uids) == 0): + # healthy_uids = candidate_uids - healthy_uids, _ = await ping_uids(dendrite, self.metagraph, candidate_uids) # filtered_uids = filter_duplicated_axon_ips_for_uids( # healthy_uids, self.metagraph # ) - k = min(k, len(healthy_uids)) + # k = min(k, len(healthy_uids)) # Random sampling - random_sample = random.sample(healthy_uids, k) - print(f"Random sample: {random_sample}") + # random_sample = random.sample(healthy_uids, k) - uids = torch.tensor(random_sample) + uids = torch.tensor(avail_uids) return uids except Exception as e: bt.logging.error(f"Failed to get random miner uids: {e}") diff --git a/masa/validator/discord/all_guilds/forward.py b/masa/validator/discord/all_guilds/forward.py index 96ec151e..b4d575cd 100644 --- a/masa/validator/discord/all_guilds/forward.py +++ b/masa/validator/discord/all_guilds/forward.py @@ -21,8 +21,8 @@ from masa.api.request import Request, RequestType from masa.validator.forwarder import Forwarder from masa.validator.discord.all_guilds.parser import all_guilds_parser -from masa.validator.discord.all_guilds.reward import get_rewards from masa.miner.masa_protocol_request import REQUEST_TIMEOUT_IN_SECONDS +from masa.miner.discord.all_guilds import DiscordAllGuildsRequest class DiscordAllGuildsForwarder(Forwarder): @@ -32,11 +32,14 @@ def __init__(self, validator): async def forward_query(self): try: + + def source_method(query): + return DiscordAllGuildsRequest().get_discord_all_guilds() return await self.forward( request=Request(type=RequestType.DISCORD_ALL_GUILDS.value), - get_rewards=get_rewards, parser_method=all_guilds_parser, timeout=REQUEST_TIMEOUT_IN_SECONDS, + source_method=source_method ) except Exception as e: diff --git a/masa/validator/discord/channel_messages/forward.py b/masa/validator/discord/channel_messages/forward.py index d49d16ef..aa4f1a6c 100644 --- a/masa/validator/discord/channel_messages/forward.py +++ b/masa/validator/discord/channel_messages/forward.py @@ -21,7 +21,7 @@ from masa.api.request import Request, RequestType from masa.validator.forwarder import Forwarder from masa.validator.discord.channel_messages.parser import channel_messages_parser -from masa.validator.discord.channel_messages.reward import get_rewards +from masa.miner.discord.channel_messages import DiscordChannelMessagesRequest class DiscordChannelMessagesForwarder(Forwarder): @@ -35,8 +35,8 @@ async def forward_query(self, query): request=Request( query=query, type=RequestType.DISCORD_CHANNEL_MESSAGES.value ), - get_rewards=get_rewards, parser_method=channel_messages_parser, + source_method=DiscordChannelMessagesRequest().get_discord_channel_messages ) except Exception as e: diff --git a/masa/validator/discord/guild_channels/forward.py b/masa/validator/discord/guild_channels/forward.py index ee8da68e..4eb805c2 100644 --- a/masa/validator/discord/guild_channels/forward.py +++ b/masa/validator/discord/guild_channels/forward.py @@ -21,7 +21,7 @@ from masa.api.request import Request, RequestType from masa.validator.forwarder import Forwarder from masa.validator.discord.guild_channels.parser import guild_channels_parser -from masa.validator.discord.guild_channels.reward import get_rewards +from masa.miner.discord.guild_channels import DiscordGuildChannelsRequest class DiscordGuildChannelsForwarder(Forwarder): @@ -35,8 +35,8 @@ async def forward_query(self, query): request=Request( query=query, type=RequestType.DISCORD_GUILD_CHANNELS.value ), - get_rewards=get_rewards, parser_method=guild_channels_parser, + source_method=DiscordGuildChannelsRequest().get_discord_guild_channels ) except Exception as e: diff --git a/masa/validator/discord/profile/forward.py b/masa/validator/discord/profile/forward.py index a71197fe..04c9310c 100644 --- a/masa/validator/discord/profile/forward.py +++ b/masa/validator/discord/profile/forward.py @@ -21,7 +21,7 @@ from masa.api.request import Request, RequestType from masa.types.discord import DiscordProfileObject from masa.validator.forwarder import Forwarder -from masa.validator.discord.profile.reward import get_rewards +from masa.miner.discord.profile import DiscordProfileRequest class DiscordProfileForwarder(Forwarder): @@ -33,8 +33,8 @@ async def forward_query(self, query): try: return await self.forward( request=Request(query=query, type=RequestType.DISCORD_PROFILE.value), - get_rewards=get_rewards, parser_object=DiscordProfileObject, + source_method=DiscordProfileRequest().get_profile ) except Exception as e: diff --git a/masa/validator/discord/user_guilds/forward.py b/masa/validator/discord/user_guilds/forward.py index 87770538..2d71d21b 100644 --- a/masa/validator/discord/user_guilds/forward.py +++ b/masa/validator/discord/user_guilds/forward.py @@ -21,7 +21,7 @@ from masa.api.request import Request, RequestType from masa.validator.forwarder import Forwarder from masa.validator.discord.user_guilds.parser import user_guilds_parser -from masa.validator.discord.user_guilds.reward import get_rewards +from masa.miner.discord.user_guilds import DiscordUserGuildsRequest class DiscordUserGuildsForwarder(Forwarder): @@ -33,8 +33,8 @@ async def forward_query(self): try: return await self.forward( request=Request(type=RequestType.DISCORD_USER_GUILDS.value), - get_rewards=get_rewards, parser_method=user_guilds_parser, + source_method=DiscordUserGuildsRequest().get_discord_user_guilds ) except Exception as e: diff --git a/masa/validator/forwarder.py b/masa/validator/forwarder.py index 98a201b1..41b1daf2 100644 --- a/masa/validator/forwarder.py +++ b/masa/validator/forwarder.py @@ -19,32 +19,35 @@ from masa.utils.uids import get_random_uids import bittensor as bt +import torch +from collections import defaultdict +import math +from sklearn.cluster import KMeans +# this forwarder needs to able to handle multiple requests, driven off of an API request -# this forwarder needs to able to handle multiple requests, driven off of an API request class Forwarder: def __init__(self, validator): self.validator = validator + self.minimum_accepted_score = 0.8 - async def forward( - self, request, get_rewards, parser_object=None, parser_method=None, timeout=5 - ): - # TODO: This should live inside each endpoint to enable us to filter miners by different parameters in the future - # like blacklisting miners only on a specific endpoint like profiles or followers - miner_uids = await get_random_uids( - self.validator, k=self.validator.config.neuron.sample_size - ) + async def forward(self, request, parser_object=None, parser_method=None, timeout=5, source_method=None): + miner_uids = await get_random_uids(self.validator, k=self.validator.config.neuron.sample_size) + bt.logging.info("Calling UIDS -----------------------------------------") + bt.logging.info(miner_uids) if miner_uids is None: return [] - responses = await self.validator.dendrite( + synapses = await self.validator.dendrite( axons=[self.validator.metagraph.axons[uid] for uid in miner_uids], synapse=request, - deserialize=True, - timeout=timeout, + deserialize=False, + timeout=timeout ) + responses = [synapse.response for synapse in synapses] + # Filter and parse valid responses valid_responses, valid_miner_uids = self.sanitize_responses_and_uids( responses, miner_uids=miner_uids @@ -58,13 +61,16 @@ async def forward( elif parser_method: parsed_responses = parser_method(valid_responses) - # Score responses - rewards = get_rewards( - self.validator, query=request.query, responses=parsed_responses - ) + process_times = [synapse.dendrite.process_time for synapse, + uid in zip(synapses, miner_uids) if uid in valid_miner_uids] - # Update the scores based on the rewards + source_of_truth = await self.get_source_of_truth( + responses=parsed_responses, miner_uids=miner_uids, source_method=source_method, query=request.query) + # Score responses + rewards = self.get_rewards(responses=parsed_responses, source_of_truth=source_of_truth + ) + # Update the scores based on the rewards if len(valid_miner_uids) > 0: self.validator.update_scores(rewards, valid_miner_uids) if self.validator.should_set_weights(): @@ -72,19 +78,89 @@ async def forward( self.validator.set_weights() except Exception as e: bt.logging.error(f"Failed to set weights: {e}") - + # Add corresponding uid to each response - response_with_uids = [ - {"response": response, "uid": int(uid.item()), "score": score.item()} - for response, uid, score in zip(parsed_responses, valid_miner_uids, rewards) + responses_with_metadata = [ + {"response": response, "uid": int( + uid.item()), "score": score.item(), "latency": latency} + for response, latency, uid, score in zip(parsed_responses, process_times, valid_miner_uids, rewards) + ] + + responses_with_metadata.sort(key=lambda x: (-x["score"], x["latency"])) + return responses_with_metadata + + def get_rewards( + self, + responses: dict, + source_of_truth: dict + ) -> torch.FloatTensor: + + combined_responses = responses.copy() + combined_responses.append(source_of_truth) + + embeddings = self.validator.model.encode( + [str(response) for response in combined_responses]) + + num_clusters = min(len(combined_responses), 2) + clustering_model = KMeans(n_clusters=num_clusters) + clustering_model.fit(embeddings) + cluster_labels = clustering_model.labels_ + + source_of_truth_label = cluster_labels[-1] if len(cluster_labels) > 0 else None + bt.logging.info("Source of truth -----------------------------------------") + bt.logging.info(source_of_truth) + bt.logging.info(f"Source of truth label: {source_of_truth_label}") + bt.logging.info(f"labels: {cluster_labels}") + rewards_list = [ + 1 if cluster_labels[i] == source_of_truth_label else self.calculate_reward( + response, source_of_truth) + for i, response in enumerate(responses) ] - - response_with_uids.sort(key=lambda x: x["score"], reverse=True) - - print("FINAL RESPONSES ------------------------------------------------") - print(response_with_uids) - - return response_with_uids + + bt.logging.info("REWARDS LIST ----------------------------------------------") + bt.logging.info(rewards_list) + + return torch.FloatTensor(rewards_list).to( + self.validator.device + ) + + def score_dicts_difference(self, initialScore, dict1, dict2): + score = initialScore + + if not isinstance(dict1, dict) and not isinstance(dict2, dict): + if dict1 != dict2: + return max(score - 0.1, 0) + else: + return max(score, 0) + + for key in dict1.keys(): + if key not in dict2 or dict2[key] is None: + score -= 0.1 + elif isinstance(dict1[key], dict) and isinstance(dict2[key], dict): + score = self.score_dicts_difference(score, dict1[key], dict2[key]) + elif isinstance(dict1[key], list) and isinstance(dict2[key], list): + if len(dict1[key]) != len(dict2[key]): + length_difference = abs(len(dict1[key]) - len(dict2[key])) + score -= 0.1 * (1 + length_difference) + else: + for item1, item2 in zip(dict1[key], dict2[key]): + score = self.score_dicts_difference(score, item1, item2) + elif str(dict1[key]) != str(dict2[key]): + score -= 0.1 + + return max(score, 0) + + def calculate_reward(self, response: dict, source_of_truth: dict) -> float: + + # Return a reward of 0.0 if the response is None + if response is None: + return 0.0 + + bt.logging.info(f"Getting username from {response}") + response = {'response': response} + + score = self.score_dicts_difference(1, source_of_truth, response) + return max(score, 0) # Ensure the score doesn't go below 0 def sanitize_responses_and_uids(self, responses, miner_uids): valid_responses = [response for response in responses if response is not None] @@ -94,3 +170,36 @@ def sanitize_responses_and_uids(self, responses, miner_uids): if response is not None ] return valid_responses, valid_miner_uids + + async def get_source_of_truth(self, responses, miner_uids, source_method, query): + responses_str = [str(response) for response in responses] + weighted_responses = defaultdict(float) + most_common_response = None + count_high_score_uids = sum( + 1 for uid in miner_uids if self.validator.scores[uid] >= self.minimum_accepted_score) + bt.logging.info( + f"Number of UIDs with score greater than the minimum accepted: {count_high_score_uids}") + + if (count_high_score_uids > 10): + for response, uid in zip(responses_str, miner_uids): + score = self.validator.scores[uid] + exponential_weight = math.exp(score) + + weighted_responses[response] += exponential_weight + + most_common_response = max(weighted_responses, key=weighted_responses.get) + else: + if source_method: + most_common_response = source_method(query) + + if isinstance(most_common_response, str): + try: + most_common_response = eval(most_common_response) + except Exception as e: + bt.logging.error( + f"Failed to transform most_common_response to dict: {e}") + most_common_response = {} + + most_common_response = {'response': most_common_response} + + return most_common_response diff --git a/masa/validator/twitter/followers/forward.py b/masa/validator/twitter/followers/forward.py index 84497d43..1ac7195d 100644 --- a/masa/validator/twitter/followers/forward.py +++ b/masa/validator/twitter/followers/forward.py @@ -20,8 +20,8 @@ import bittensor as bt from masa.api.request import Request, RequestType from masa.validator.forwarder import Forwarder -from masa.validator.twitter.followers.reward import get_rewards from masa.validator.twitter.followers.parser import followers_parser +from masa.miner.twitter.followers import TwitterFollowersRequest class TwitterFollowersForwarder(Forwarder): @@ -33,8 +33,8 @@ async def forward_query(self, query): try: return await self.forward( request=Request(query=query, type=RequestType.TWITTER_FOLLOWERS.value), - get_rewards=get_rewards, parser_method=followers_parser, + source_method=TwitterFollowersRequest().get_followers ) except Exception as e: diff --git a/masa/validator/twitter/profile/forward.py b/masa/validator/twitter/profile/forward.py index 4479a6d1..78dd51bd 100644 --- a/masa/validator/twitter/profile/forward.py +++ b/masa/validator/twitter/profile/forward.py @@ -20,8 +20,8 @@ import bittensor as bt from masa.api.request import Request, RequestType from masa.validator.forwarder import Forwarder -from masa.validator.twitter.profile.reward import get_rewards from masa.types.twitter import TwitterProfileObject +from masa.miner.twitter.profile import TwitterProfileRequest class TwitterProfileForwarder(Forwarder): @@ -33,8 +33,8 @@ async def forward_query(self, query): try: return await self.forward( request=Request(query=query, type=RequestType.TWITTER_PROFILE.value), - get_rewards=get_rewards, parser_object=TwitterProfileObject, + source_method=TwitterProfileRequest().get_profile ) except Exception as e: diff --git a/masa/validator/twitter/profile/reward.py b/masa/validator/twitter/profile/reward.py index 3c261d34..5d0ce4b3 100644 --- a/masa/validator/twitter/profile/reward.py +++ b/masa/validator/twitter/profile/reward.py @@ -17,39 +17,29 @@ # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. -import torch import bittensor as bt from masa.types.twitter import TwitterProfileObject -def reward(query: str, response: TwitterProfileObject) -> float: +def calculate_reward(query: str, response: TwitterProfileObject, source_of_truth: TwitterProfileObject) -> float: + # Return a reward of 0.0 if the response is None if response is None: return 0.0 + bt.logging.info(f"Getting username from {response}") # Extract username and userID from the response, defaulting to empty string and None respectively - username = response.get("Username", "") - userID = response.get("UserID", None) - bt.logging.info(f"Calculating reward for response {username.lower()}") # Return a reward of 1 if the username matches the query and userID is not None, otherwise return 0 score = 1.0 - required_keys = TwitterProfileObject.__annotations__.keys() # Get all required keys from TwitterProfileObject - missing_keys = sum(1 for key in required_keys if key not in response or response[key] is None) + # Get all required keys from TwitterProfileObject + required_keys = TwitterProfileObject.__annotations__.keys() + missing_keys = sum( + 1 for key in required_keys if key not in response or response[key] is None) score -= 0.1 * missing_keys - if username.lower() == query.lower() and userID is not None: - return max(score, 0) # Ensure the score doesn't go below 0 - else: - return 0 - - -def get_rewards( - self, - query: str, - responses: TwitterProfileObject, -) -> torch.FloatTensor: - bt.logging.info("Getting rewards...") - return torch.FloatTensor([reward(query, response) for response in responses]).to( - self.device - ) + for key in required_keys: + if key in response and key in source_of_truth and response[key] != source_of_truth[key]: + score -= 0.1 + + return max(score, 0) diff --git a/masa/validator/twitter/tweets/forward.py b/masa/validator/twitter/tweets/forward.py index e3f36e61..2de43784 100644 --- a/masa/validator/twitter/tweets/forward.py +++ b/masa/validator/twitter/tweets/forward.py @@ -20,8 +20,8 @@ import bittensor as bt from masa.api.request import Request, RequestType from masa.validator.forwarder import Forwarder -from masa.validator.twitter.tweets.reward import get_rewards from masa.validator.twitter.tweets.parser import tweets_parser +from masa.miner.twitter.tweets import TwitterTweetsRequest class TwitterTweetsForwarder(Forwarder): @@ -31,14 +31,17 @@ def __init__(self, validator): async def forward_query(self, tweet_query): try: + def source_method(query): + return TwitterTweetsRequest().get_recent_tweets(query=tweet_query) + return await self.forward( request=Request( query=tweet_query.query, count=tweet_query.count, type=RequestType.TWITTER_TWEETS.value, ), - get_rewards=get_rewards, parser_method=tweets_parser, + source_method=source_method ) except Exception as e: diff --git a/masa/validator/web/forward.py b/masa/validator/web/forward.py index 35b3a7aa..6d289783 100644 --- a/masa/validator/web/forward.py +++ b/masa/validator/web/forward.py @@ -21,8 +21,8 @@ from masa.api.request import Request, RequestType from masa.miner.web.scraper import WebScraperQuery from masa.validator.forwarder import Forwarder -from masa.validator.web.reward import get_rewards from masa.validator.web.parser import web_scraper_parser +from masa.miner.web.scraper import WebScraperRequest class WebScraperForwarder(Forwarder): @@ -32,15 +32,17 @@ def __init__(self, validator): async def forward_query(self, web_scraper_query: WebScraperQuery): try: + def source_method(query): + return WebScraperRequest().scrape_web(web_scraper_query) + return await self.forward( request=Request( url=web_scraper_query.url, depth=web_scraper_query.depth, type=RequestType.WEB_SCRAPER.value, ), - get_rewards=get_rewards, parser_method=web_scraper_parser, - ) + source_method=source_method) except Exception as e: bt.logging.error( diff --git a/masa/validator/web/parser.py b/masa/validator/web/parser.py index a4a48534..c67f15e2 100644 --- a/masa/validator/web/parser.py +++ b/masa/validator/web/parser.py @@ -1,5 +1,2 @@ -from masa.types.web import WebScraperObject - - def web_scraper_parser(web_scraper_responses): - return [WebScraperObject(**response) for response in web_scraper_responses] + return web_scraper_responses diff --git a/neurons/miner.py b/neurons/miner.py index f960deb4..42be3cbc 100644 --- a/neurons/miner.py +++ b/neurons/miner.py @@ -45,8 +45,7 @@ def __init__(self, config=None): bt.logging.info("Miner initialized with config: {}".format(config)) async def forward(self, synapse: Request) -> Request: - print(f"Sleeping for rate limiting purposes: {delay}s") - time.sleep(delay) + print(f"Getting request: {synapse.type}") try: self.handle_request(synapse) diff --git a/neurons/validator.py b/neurons/validator.py index e093001c..39f0eab8 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -25,11 +25,13 @@ # Bittensor Validator Template: from masa.base.validator import BaseValidatorNeuron from masa.api.validator_api import ValidatorAPI +from sentence_transformers import SentenceTransformer class Validator(BaseValidatorNeuron): def __init__(self, config=None): super(Validator, self).__init__(config=config) + self.model = SentenceTransformer('all-MiniLM-L6-v2') # Load a pre-trained model for embeddings self.API = ValidatorAPI(self) bt.logging.info("Validator initialized with config: {}".format(config)) diff --git a/requirements.txt b/requirements.txt index 1274ccea..8287aff0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,6 @@ bittensor==7.1.0 loguru==0.7.2 python-dotenv==0.21.0 torch==2.3.0 -watchfiles==0.22.0 \ No newline at end of file +watchfiles==0.22.0 +scikit-learn==1.5.1 +sentence-transformers==3.0.1