import asyncio
import os
from solders.pubkey import Pubkey
from solders.transaction import Transaction
from solders.system_program import transfer, TransferParams
from solders.message import Message
from solders.hash import Hash
import httpx
from dynamic_wallet_sdk import DynamicSvmWalletClient
async def send_sol(
from_address: str,
to_address: str,
lamports: int,
rpc_url: str,
password: str | None = None,
):
from_pubkey = Pubkey.from_string(from_address)
to_pubkey = Pubkey.from_string(to_address)
async with httpx.AsyncClient() as http:
resp = await http.post(rpc_url, json={
"jsonrpc": "2.0",
"id": 1,
"method": "getLatestBlockhash",
"params": [{"commitment": "finalized"}],
})
blockhash = resp.json()["result"]["value"]["blockhash"]
instruction = transfer(TransferParams(
from_pubkey=from_pubkey,
to_pubkey=to_pubkey,
lamports=lamports,
))
message = Message.new_with_blockhash(
[instruction],
from_pubkey,
Hash.from_string(blockhash),
)
async with DynamicSvmWalletClient(os.environ["DYNAMIC_ENV_ID"]) as client:
await client.authenticate_api_token(os.environ["DYNAMIC_API_TOKEN"])
sig_hex = await client.sign_transaction(
address=from_address,
message_bytes=bytes(message),
password=password,
)
sig_bytes = bytes.fromhex(sig_hex)
tx = Transaction.populate(message, [sig_bytes])
async with httpx.AsyncClient() as http:
resp = await http.post(rpc_url, json={
"jsonrpc": "2.0",
"id": 1,
"method": "sendTransaction",
"params": [
bytes(tx).hex(),
{"encoding": "base64", "skipPreflight": False},
],
})
return resp.json()["result"]