diff --git a/tests/graphql_test.py b/tests/graphql_test.py index 7ed2f55..09aa133 100644 --- a/tests/graphql_test.py +++ b/tests/graphql_test.py @@ -1,11 +1,13 @@ from decimal import Decimal -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest +from celery.result import AsyncResult from pytest_httpx import HTTPXMock from world_boss.app.data_provider import DATA_PROVIDER_URLS from world_boss.app.enums import NetworkType +from world_boss.app.kms import HEADLESS_URLS, MINER_URLS from world_boss.app.models import Transaction, WorldBossReward, WorldBossRewardAmount @@ -67,3 +69,274 @@ def test_check_balance(fx_session, fx_test_client): result = req.json() assert m.call_count == 2 assert result["data"]["checkBalance"] == expected + + +def test_generate_ranking_rewards_csv( + fx_test_client, celery_session_worker, httpx_mock: HTTPXMock, fx_ranking_rewards +): + requested_rewards = [ + { + "raider": { + "address": "01A0b412721b00bFb5D619378F8ab4E4a97646Ca", + "ranking": 101, + }, + "rewards": fx_ranking_rewards, + }, + ] + httpx_mock.add_response( + method="POST", + url=DATA_PROVIDER_URLS[NetworkType.MAIN], + json={"data": {"worldBossRankingRewards": requested_rewards}}, + ) + + httpx_mock.add_response( + method="POST", + url=MINER_URLS[NetworkType.MAIN], + json={ + "data": { + "stateQuery": { + "arg01A0b412721b00bFb5D619378F8ab4E4a97646Ca": { + "agentAddress": "0x9EBD1b4F9DbB851BccEa0CFF32926d81eDf6De52", + }, + } + } + }, + ) + + query = 'mutation { generateRankingRewardsCsv(seasonId: 1, totalUsers: 1, startNonce: 1, channelId: "channel_id") }' + with patch("world_boss.app.tasks.client.files_upload_v2") as m: + req = fx_test_client.post("/graphql", json={"query": query}) + assert req.status_code == 200 + task_id = req.json()["data"]["generateRankingRewardsCsv"] + task: AsyncResult = AsyncResult(task_id) + task.get(timeout=30) + assert task.state == "SUCCESS" + m.assert_called_once() + # skip check file. because file is temp file. + kwargs = m.call_args.kwargs + assert kwargs["file"] + assert kwargs["channels"] == "channel_id" + assert kwargs["title"] == f"world_boss_1_1_1_result" + assert kwargs["filename"] == f"world_boss_1_1_1_result.csv" + + +@pytest.mark.parametrize("has_header", [True, False]) +def test_prepare_transfer_assets( + fx_test_client, + celery_session_worker, + fx_session, + httpx_mock: HTTPXMock, + has_header: bool, +): + assert not fx_session.query(Transaction).first() + assert not fx_session.query(WorldBossReward).first() + assert not fx_session.query(WorldBossRewardAmount).first() + header = "raid_id,ranking,agent_address,avatar_address,amount,ticker,decimal_places,target_nonce\n" + content = """3,25,0x01069aaf336e6aEE605a8A54D0734b43B62f8Fe4,5b65f5D0e23383FA18d74A62FbEa383c7D11F29d,150000,CRYSTAL,18,175 + 3,25,0x01069aaf336e6aEE605a8A54D0734b43B62f8Fe4,5b65f5D0e23383FA18d74A62FbEa383c7D11F29d,560,RUNESTONE_FENRIR1,0,175 + 3,25,0x01069aaf336e6aEE605a8A54D0734b43B62f8Fe4,5b65f5D0e23383FA18d74A62FbEa383c7D11F29d,150,RUNESTONE_FENRIR2,0,175 + 3,25,0x01069aaf336e6aEE605a8A54D0734b43B62f8Fe4,5b65f5D0e23383FA18d74A62FbEa383c7D11F29d,40,RUNESTONE_FENRIR3,0,175 + 3,26,0x1774cd5d2C1C0f72AA75E9381889a1a554797a4c,1F8d5e0D201B7232cE3BC8d630d09E3F9107CceE,150000,CRYSTAL,18,176 + 3,26,0x1774cd5d2C1C0f72AA75E9381889a1a554797a4c,1F8d5e0D201B7232cE3BC8d630d09E3F9107CceE,560,RUNESTONE_FENRIR1,0,176 + 3,26,0x1774cd5d2C1C0f72AA75E9381889a1a554797a4c,1F8d5e0D201B7232cE3BC8d630d09E3F9107CceE,150,RUNESTONE_FENRIR2,0,176 + 3,26,0x1774cd5d2C1C0f72AA75E9381889a1a554797a4c,1F8d5e0D201B7232cE3BC8d630d09E3F9107CceE,40,RUNESTONE_FENRIR3,0,176""" + + expected = [ + { + "nonce": 175, + "ranking": 25, + "agent_address": "0x01069aaf336e6aEE605a8A54D0734b43B62f8Fe4", + "avatar_address": "5b65f5D0e23383FA18d74A62FbEa383c7D11F29d", + }, + { + "nonce": 176, + "ranking": 26, + "agent_address": "0x1774cd5d2C1C0f72AA75E9381889a1a554797a4c", + "avatar_address": "1F8d5e0D201B7232cE3BC8d630d09E3F9107CceE", + }, + ] + reward_amounts = [ + { + "ticker": "CRYSTAL", + "decimal_places": 18, + "amount": 150000, + }, + { + "ticker": "RUNESTONE_FENRIR1", + "decimal_places": 0, + "amount": 560, + }, + { + "ticker": "RUNESTONE_FENRIR2", + "decimal_places": 0, + "amount": 150, + }, + { + "ticker": "RUNESTONE_FENRIR3", + "decimal_places": 0, + "amount": 40, + }, + ] + + private_url = "https://planetariumhq.slack.com/private/files/1/2/test.csv" + mocked_response = MagicMock() + mocked_response.data = { + "file": {"url_private": private_url}, + } + httpx_mock.add_response( + method="GET", + url=private_url, + content=(header + content).encode() if has_header else content.encode(), + ) + + query = 'mutation { prepareTransferAssets(link: "https://planetariumhq.slack.com/files/1/2/test.csv", timeStamp: "2022-12-31", channelId: "channel_id") }' + + with patch( + "world_boss.app.api.client.files_info", return_value=mocked_response + ) as m: + req = fx_test_client.post("/graphql", json={"query": query}) + assert req.status_code == 200 + task_id = req.json()["data"]["prepareTransferAssets"] + task: AsyncResult = AsyncResult(task_id) + task.get(timeout=30) + assert task.state == "SUCCESS" + m.assert_called_once_with(file="2") + assert fx_session.query(Transaction).count() == 2 + assert fx_session.query(WorldBossReward).count() == 2 + assert fx_session.query(WorldBossRewardAmount).count() == 8 + for i, tx in enumerate( + fx_session.query(Transaction).order_by(Transaction.nonce) + ): + assert tx.nonce == expected[i]["nonce"] + assert tx.tx_result is None + assert len(tx.amounts) == 4 + + world_boss_reward = tx.amounts[0].reward + assert world_boss_reward.raid_id == 3 + assert world_boss_reward.ranking == expected[i]["ranking"] + assert world_boss_reward.agent_address == expected[i]["agent_address"] + assert world_boss_reward.avatar_address == expected[i]["avatar_address"] + + for v, world_boss_reward_amount in enumerate(tx.amounts): + assert world_boss_reward_amount.ticker == reward_amounts[v]["ticker"] + assert ( + world_boss_reward_amount.decimal_places + == reward_amounts[v]["decimal_places"] + ) + assert world_boss_reward_amount.amount == reward_amounts[v]["amount"] + + +def test_prepare_reward_assets(fx_test_client, celery_session_worker, fx_session): + result = [] + assets = [ + {"decimalPlaces": 18, "ticker": "CRYSTAL", "quantity": 109380000}, + {"decimalPlaces": 0, "ticker": "RUNESTONE_FENRIR1", "quantity": 406545}, + {"decimalPlaces": 0, "ticker": "RUNESTONE_FENRIR2", "quantity": 111715}, + {"decimalPlaces": 0, "ticker": "RUNESTONE_FENRIR3", "quantity": 23890}, + ] + reward = WorldBossReward() + reward.avatar_address = "avatar_address" + reward.agent_address = "agent_address" + reward.raid_id = 3 + reward.ranking = 1 + + for i, asset in enumerate(assets): + reward_amount = WorldBossRewardAmount() + reward_amount.amount = asset["quantity"] + reward_amount.ticker = asset["ticker"] + reward_amount.decimal_places = asset["decimalPlaces"] + reward_amount.reward = reward + tx_id = i + reward_amount.tx_id = tx_id + transaction = Transaction() + transaction.tx_id = tx_id + transaction.signer = "signer" + transaction.payload = "payload" + transaction.nonce = i + fx_session.add(transaction) + result.append(reward_amount) + fx_session.commit() + query = 'mutation { prepareRewardAssets(seasonId: 3, channelId: "channel_id") }' + with patch("world_boss.app.tasks.client.chat_postMessage") as m: + req = fx_test_client.post("/graphql", json={"query": query}) + assert req.status_code == 200 + task_id = req.json()["data"]["prepareRewardAssets"] + task = AsyncResult(task_id) + task.get(timeout=30) + assert task.state == "SUCCESS" + + m.assert_called_once_with( + channel="channel_id", + text="world boss season 3 prepareRewardAssets\n```plain_value:{'type_id': 'prepare_reward_assets', 'values': {'a': " + "[], 'r': " + "b'%1\\xe5\\xe0l\\xbd\\x11\\xafT\\xf9\\x8d9W\\x89\\x90qo\\xfc}\\xba'}}\n" + "\n" + "6475373a747970655f69647532313a707265706172655f7265776172645f61737365747375363a76616c7565736475313a616c6575313a7232303a2531e5e06cbd11af54f98d39578990716ffc7dba6565```", + ) + + +def test_stage_transactions( + fx_test_client, + celery_session_worker, + fx_session, + fx_transactions, +): + for tx in fx_transactions: + fx_session.add(tx) + fx_session.commit() + network_type = NetworkType.MAIN + query = 'mutation { stageTransactions(channelId: "channel_id") }' + with patch( + "world_boss.app.tasks.signer.stage_transaction", return_value="tx_id" + ) as m, patch("world_boss.app.tasks.client.chat_postMessage") as m2: + req = fx_test_client.post("/graphql", json={"query": query}) + assert req.status_code == 200 + task_id = req.json()["data"]["stageTransactions"] + task: AsyncResult = AsyncResult(task_id) + task.get(timeout=30) + assert m.call_count == len(HEADLESS_URLS[network_type]) * len(fx_transactions) + m2.assert_called_once_with( + channel="channel_id", text=f"stage {len(fx_transactions)} transactions" + ) + + +def test_transaction_result( + fx_test_client, + fx_session, + celery_session_worker, +): + for nonce, tx_id, payload in [ + ( + 1, + "a9c9444bd50b3164b5c251315960272ae1f42f7b2d5b95948a78c608424bbcb2", + "payload_1", + ), + ( + 2, + "db4b916c5c821cbf90356694f231c9f6a6858b67231799dc9ee2d9f2946c4310", + "payload_2", + ), + ]: + transaction = Transaction() + transaction.tx_id = tx_id + transaction.nonce = nonce + transaction.payload = payload + transaction.signer = "0xCFCd6565287314FF70e4C4CF309dB701C43eA5bD" + fx_session.add(transaction) + fx_session.commit() + query = 'mutation { transactionResult(channelId: "channel_id") }' + with patch("world_boss.app.tasks.client.files_upload_v2") as m: + req = fx_test_client.post("/graphql", json={"query": query}) + assert req.status_code == 200 + task_id = req.json()["data"]["transactionResult"] + task: AsyncResult = AsyncResult(task_id) + task.get(timeout=30) + assert task.state == "SUCCESS" + m.assert_called_once() + kwargs = m.call_args.kwargs + assert kwargs["file"] + assert kwargs["channels"] == "channel_id" + assert kwargs["title"] == "world_boss_tx_result" + assert "world_boss_tx_result" in kwargs["filename"] + for tx in fx_session.query(Transaction): + assert tx.tx_result == "INVALID" diff --git a/world_boss/app/graphql.py b/world_boss/app/graphql.py index 39bd4b4..01d1cbf 100644 --- a/world_boss/app/graphql.py +++ b/world_boss/app/graphql.py @@ -1,6 +1,10 @@ +import csv import typing +from io import StringIO +import httpx import strawberry +from celery import chord from fastapi import Depends from strawberry.fastapi import GraphQLRouter from strawberry.types import Info @@ -8,8 +12,26 @@ from world_boss.app.api import get_db from world_boss.app.data_provider import data_provider_client from world_boss.app.enums import NetworkType -from world_boss.app.kms import MINER_URLS, signer -from world_boss.app.raid import get_currencies, get_next_tx_nonce +from world_boss.app.kms import HEADLESS_URLS, MINER_URLS, signer +from world_boss.app.models import Transaction +from world_boss.app.raid import ( + get_currencies, + get_next_tx_nonce, + list_tx_nonce, + row_to_recipient, +) +from world_boss.app.slack import client +from world_boss.app.stubs import Recipient +from world_boss.app.tasks import ( + get_ranking_rewards, + insert_world_boss_rewards, + query_tx_result, + send_slack_message, + sign_transfer_assets, + stage_transaction, + upload_prepare_reward_assets, + upload_tx_result, +) async def get_context(db=Depends(get_db)): @@ -40,5 +62,101 @@ def check_balance(self, info: Info) -> typing.List[str]: return result -schema = strawberry.Schema(Query) +@strawberry.type +class Mutation: + @strawberry.mutation + def generate_ranking_rewards_csv( + self, season_id: int, total_users: int, start_nonce: int, channel_id: str + ) -> str: + task = get_ranking_rewards.delay( + channel_id, season_id, total_users, start_nonce + ) + return task.id + + @strawberry.mutation + def prepare_transfer_assets( + self, link: str, time_stamp: str, channel_id: str, info: Info + ) -> str: + db = info.context["db"] + file_id = link.split("/")[5] + res = client.files_info(file=file_id) + data = typing.cast(dict, res.data) + file = data["file"] + content = httpx.get( + file["url_private"], headers={"Authorization": "Bearer %s" % client.token} + ).content.decode() + stream = StringIO(content) + has_header = csv.Sniffer().has_header(content) + reader = csv.reader(stream) + if has_header: + next(reader, None) + # nonce : recipients for transfer_assets tx + recipient_map: dict[int, list[Recipient]] = {} + max_nonce = get_next_tx_nonce(db) - 1 + exist_nonce = list_tx_nonce(db) + rows = [row for row in reader] + # raid_id,ranking,agent_address,avatar_address,amount,ticker,decimal_places,target_nonce + for row in rows: + nonce = int(row[7]) + recipient = row_to_recipient(row) + + # update recipient_map + if not recipient_map.get(nonce): + recipient_map[nonce] = [] + recipient_map[nonce].append(recipient) + + # sanity check + for k in recipient_map: + assert len(recipient_map[k]) <= 100 + # insert tables + memo = "world boss ranking rewards by world boss signer" + url = MINER_URLS[NetworkType.MAIN] + task = chord( + sign_transfer_assets.s( + time_stamp, + int(nonce), + recipient_map[nonce], + memo, + url, + max_nonce, + exist_nonce, + ) + for nonce in recipient_map + )(insert_world_boss_rewards.si(rows)) + return task.id + + @strawberry.mutation + def prepare_reward_assets(self, season_id: int, channel_id: str) -> str: + task = upload_prepare_reward_assets.delay(channel_id, season_id) + return task.id + + @strawberry.mutation + def stage_transactions(self, channel_id: str, info: Info) -> str: + db = info.context["db"] + nonce_list = ( + db.query(Transaction.nonce) + .filter_by(signer=signer.address, tx_result=None) + .all() + ) + network_type = NetworkType.MAIN + headless_urls = HEADLESS_URLS[network_type] + task = chord( + stage_transaction.s(headless_url, nonce) + for headless_url in headless_urls + for nonce, in nonce_list + )(send_slack_message.si(channel_id, f"stage {len(nonce_list)} transactions")) + return task.id + + @strawberry.mutation + def transaction_result(self, channel_id: str, info: Info) -> str: + db = info.context["db"] + tx_ids = db.query(Transaction.tx_id).filter_by(tx_result=None) + url = MINER_URLS[NetworkType.MAIN] + task = chord(query_tx_result.s(url, str(tx_id)) for tx_id, in tx_ids)( + upload_tx_result.s(channel_id) + ) + return task.id + + +schema = strawberry.Schema(Query, mutation=Mutation) graphql_app = GraphQLRouter(schema, context_getter=get_context)