-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtx.py
219 lines (192 loc) · 8.78 KB
/
tx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# coding:utf-8
"""
@Time : 2021/6/2 下午4:57
@Author: chuwt
"""
from typing import (
List,
Dict,
Optional,
Any,
)
from blspy import G1Element, PrivateKey, G2Element, AugSchemeMPL
from chia.util.byte_types import hexstr_to_bytes
from chia.consensus.coinbase import create_puzzlehash_for_pk
from chia.types.blockchain_format.coin import Coin
from chia.types.blockchain_format.program import Program, SerializedProgram
from chia.wallet.puzzles.p2_delegated_puzzle_or_hidden_puzzle import puzzle_for_pk
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.util.ints import uint64
from chia.types.coin_solution import CoinSolution
from chia.util.hash import std_hash
from chia.wallet.puzzles.puzzle_utils import (
make_assert_coin_announcement,
make_assert_puzzle_announcement,
make_assert_my_coin_id_condition,
make_assert_absolute_seconds_exceeds_condition,
make_create_coin_announcement,
make_create_puzzle_announcement,
make_create_coin_condition,
make_reserve_fee_condition,
)
from chia.wallet.puzzles.p2_delegated_puzzle_or_hidden_puzzle import solution_for_conditions
from chia.wallet.puzzles.announcement import Announcement
from chia.types.spend_bundle import SpendBundle
from chia.wallet.sign_coin_solutions import sign_coin_solutions, unsigned_coin_solutions
from chia.consensus.coinbase import DEFAULT_HIDDEN_PUZZLE_HASH
from chia.wallet.puzzles.p2_delegated_puzzle_or_hidden_puzzle import calculate_synthetic_secret_key
from util.util import xch_address_to_puzzle_hash
def create_signed_tx(sk: str, to_address: str, amount: uint64, fee: int, coins: List) -> (str, dict):
"""
创建签名后的交易
注意: 这里的utxo必须是sk作为私钥进行授权的,不然无法通过签名
注意2: 官方的walletSk的index使用过后会+1生成,但是此包的index使用的总是index=0的
walletSk,望悉知
当有输入的UTXO有剩余时,将UTXO重新倒入到sk中
:param sk: 私钥字符串, 这里的私钥是wallet和index对应的私钥
:param to_address: 转账地址
:param amount: 数量
:param fee: 手续费
:param coins: UTXO的输入
:return: 签名后的交易
"""
to_puzzle_hash = xch_address_to_puzzle_hash(to_address)
synthetic = calculate_synthetic_secret_key(PrivateKey.from_bytes(hexstr_to_bytes(sk)), DEFAULT_HIDDEN_PUZZLE_HASH)
pk = PrivateKey.from_bytes(hexstr_to_bytes(sk)).get_g1()
transaction = _create_transaction(pk, to_puzzle_hash, amount, fee, coins)
spend_bundle: SpendBundle = sign_coin_solutions(
transaction,
synthetic,
bytes.fromhex("ccd5bb71183532bff220ba46c268991a3ff07eb358e8255a65c30a2dce0e5fbb"),
11000000000,
)
json_dict = spend_bundle.to_json_dict()
tx_hash = spend_bundle.name()
return str(tx_hash), json_dict
def sign_tx(sk: str, unsigned_tx: dict, msg_list: List[bytes], pk_list: List[bytes]) -> (str, dict):
"""
签名交易
:param unsigned_tx:
:param sk: 是wallet的私钥
:param msg_list: create_unsigned_tx生成的
:param pk_list: create_unsigned_tx生成的
:return:
"""
synthetic = calculate_synthetic_secret_key(PrivateKey.from_bytes(hexstr_to_bytes(sk)), DEFAULT_HIDDEN_PUZZLE_HASH)
signatures: List[G2Element] = []
for msg in msg_list:
index = msg_list.index(msg)
assert bytes(synthetic.get_g1()) == bytes(pk_list[index])
signature = AugSchemeMPL.sign(synthetic, msg)
assert AugSchemeMPL.verify(pk_list[index], msg, signature)
signatures.append(signature)
aggsig = AugSchemeMPL.aggregate(signatures)
assert AugSchemeMPL.aggregate_verify(pk_list, msg_list, aggsig)
signed_tx = {
"coin_solutions": unsigned_tx["coin_solutions"],
"aggregated_signature": "0x" + bytes(aggsig).hex(),
}
coin_solutions = [CoinSolution.from_json_dict(tx) for tx in signed_tx["coin_solutions"]]
sb: SpendBundle = SpendBundle(coin_solutions, aggsig)
return str(sb.name()), signed_tx
def create_unsigned_tx(from_pk: str, to_address: str, amount: uint64, fee: int, coins: List):
"""
创建未签名的交易
:param from_pk: wallet sk 对应的pk
:param to_address: 转账地址
:param amount: 数量
:param fee: 手续费
:param coins: coin输入
:return:
"""
to_puzzle_hash = xch_address_to_puzzle_hash(to_address)
transaction = _create_transaction(G1Element.from_bytes(hexstr_to_bytes(from_pk)), to_puzzle_hash, amount, fee,
coins)
# 11000000000 是clvm消耗,类似eth的gasLimit
msg_list, pk_list = unsigned_coin_solutions(
transaction,
bytes.fromhex("ccd5bb71183532bff220ba46c268991a3ff07eb358e8255a65c30a2dce0e5fbb"),
11000000000)
unsigned_tx = {
"coin_solutions": [t.to_json_dict() for t in transaction],
"aggregated_signature": "",
}
return unsigned_tx, msg_list, pk_list
def _create_transaction(pk: G1Element, to_puzzle_hash: str, amount: uint64, fee: int, coins: List):
outputs = []
if not to_puzzle_hash:
raise ValueError(f"Address is null in send list")
if amount <= 0:
raise ValueError(f"Amount must greater than 0")
# address to puzzle hash
to_puzzle_hash = hexstr_to_bytes(to_puzzle_hash)
total_amount = amount + fee
outputs.append({"puzzle_hash": to_puzzle_hash, "amount": amount})
# 余额判断
coins = set([Coin.from_json_dict(coin_json) for coin_json in coins])
spend_value = sum([coin.amount for coin in coins])
change = spend_value - total_amount
if change < 0:
raise ValueError("Insufficient balance")
transaction: List[CoinSolution] = []
primary_announcement_hash: Optional[bytes32] = None
for coin in coins:
puzzle: Program = puzzle_for_pk(pk)
if primary_announcement_hash is None:
primaries = [{"puzzlehash": to_puzzle_hash, "amount": amount}]
if change > 0:
# 源码这里是通过index来派生新的wallet私钥,然后转成公钥,最后根据公钥生成puzzle_hash,
# 这些会记录在数据库里,然后后面会通过puzzle_hash查找index,然后推出公钥
# todo 这里我们直接使用index=0的公钥生成的puzzle_hash, 不知道会不会有问题
change_puzzle_hash: bytes32 = create_puzzlehash_for_pk(pk)
primaries.append({"puzzlehash": change_puzzle_hash, "amount": change})
message_list: List[bytes32] = [c.name() for c in coins]
for primary in primaries:
message_list.append(Coin(coin.name(), primary["puzzlehash"], primary["amount"]).name())
message: bytes32 = std_hash(b"".join(message_list))
solution: Program = make_solution(primaries=primaries, fee=fee, coin_announcements=[message])
primary_announcement_hash = Announcement(coin.name(), message).name()
else:
solution = make_solution(coin_announcements_to_assert=[primary_announcement_hash])
transaction.append(
CoinSolution(
coin, SerializedProgram.from_bytes(bytes(puzzle)), SerializedProgram.from_bytes(bytes(solution))
)
)
if len(transaction) <= 0:
raise ValueError("spends is zero")
return transaction
def make_solution(
primaries: Optional[List[Dict[str, Any]]] = None,
min_time=0,
me=None,
coin_announcements: Optional[List[bytes32]] = None,
coin_announcements_to_assert: Optional[List[bytes32]] = None,
puzzle_announcements=None,
puzzle_announcements_to_assert=None,
fee=0,
) -> Program:
assert fee >= 0
condition_list = []
if primaries:
for primary in primaries:
condition_list.append(make_create_coin_condition(primary["puzzlehash"], primary["amount"]))
if min_time > 0:
condition_list.append(make_assert_absolute_seconds_exceeds_condition(min_time))
if me:
condition_list.append(make_assert_my_coin_id_condition(me["id"]))
if fee:
condition_list.append(make_reserve_fee_condition(fee))
if coin_announcements:
for announcement in coin_announcements:
condition_list.append(make_create_coin_announcement(announcement))
if coin_announcements_to_assert:
for announcement_hash in coin_announcements_to_assert:
condition_list.append(make_assert_coin_announcement(announcement_hash))
if puzzle_announcements:
for announcement in puzzle_announcements:
condition_list.append(make_create_puzzle_announcement(announcement))
if puzzle_announcements_to_assert:
for announcement_hash in puzzle_announcements_to_assert:
condition_list.append(make_assert_puzzle_announcement(announcement_hash))
return solution_for_conditions(condition_list)