uniswap 自动交易程序

admin2023-10-27科学家636
import json
import time
import web3
from uniswap import Uniswap
from uniswap.uniswap import ETH_ADDRESS

#  PROVIDER 可以自己在infura.io上注册一个
PROVIDER = 'https://mainnet.infura.io/v3/......'
EIP20_ABI = json.loads(
    '[{"constant":true,"inputs":[],"name":"name","outputs":[{"name":"","type":"string"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"_spender","type":"address"},{"name":"_value","type":"uint256"}],"name":"approve","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[],"name":"totalSupply","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"_from","type":"address"},{"name":"_to","type":"address"},{"name":"_value","type":"uint256"}],"name":"transferFrom","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[],"name":"decimals","outputs":[{"name":"","type":"uint8"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"_owner","type":"address"}],"name":"balanceOf","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[],"name":"symbol","outputs":[{"name":"","type":"string"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"_to","type":"address"},{"name":"_value","type":"uint256"}],"name":"transfer","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[{"name":"_owner","type":"address"},{"name":"_spender","type":"address"}],"name":"allowance","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"anonymous":false,"inputs":[{"indexed":true,"name":"_from","type":"address"},{"indexed":true,"name":"_to","type":"address"},{"indexed":false,"name":"_value","type":"uint256"}],"name":"Transfer","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"_owner","type":"address"},{"indexed":true,"name":"_spender","type":"address"},{"indexed":false,"name":"_value","type":"uint256"}],"name":"Approval","type":"event"}]')  # noqa: 501

# 设置以太地址和私钥
address = "0x............................."
private_key = None

uniswap_wrapper = Uniswap(address, private_key, provider=PROVIDER, version=2)  

eth = ETH_ADDRESS
dai = "0x6b175474e89094c44da98b954eedeac495271d0f"
usdt = "0xdac17f958d2ee523a2206206994597c13d831ec7"


def swap(from_coin, to_coin, from_amount):
    """
    使用 uniswap 进行交易
    :param from_coin:
    :param to_coin:
    :param from_amount: 数量
    :return:
    """
    if from_coin != eth:
        from_coin = uniswap_wrapper.w3.toChecksumAddress(from_coin)
        # 取各币种的 DECIMALS
        from_contract = uniswap_wrapper.w3.eth.contract(from_coin, abi=EIP20_ABI)
        from_decimals = from_contract.functions.decimals().call()
        from_name = from_contract.functions.symbol().call()
    else:
        from_decimals = 18
        from_name = 'ETH'

    if to_coin != eth:
        to_coin = uniswap_wrapper.w3.toChecksumAddress(to_coin)
        to_contract = uniswap_wrapper.w3.eth.contract(to_coin, abi=EIP20_ABI)
        to_decimals = to_contract.functions.decimals().call()
        to_name = to_contract.functions.symbol().call()
    else:
        to_decimals = 18
        to_name = 'ETH'

    per_amount = 1 * 10 ** from_decimals
    try:
        if (from_coin == eth) and (to_coin != eth):
            price = uniswap_wrapper.get_eth_token_input_price(to_coin, per_amount)
        elif (from_coin != eth) and (to_coin == eth):
            price = uniswap_wrapper.get_token_eth_input_price(from_coin, per_amount)
        else:
            price = uniswap_wrapper.get_token_token_input_price(from_coin, to_coin, per_amount)
    except web3.exceptions.ContractLogicError:
        return 1

    price = price / (10 ** to_decimals)
    print(f"当前价格: {price} {to_name}/{from_name}")

    # 设置 gas 价格
    gas = input(f'请设置交易的 gas 价格:')
    try:
        gas = int(gas)
    except ValueError:
        print("输入错误,退出!")
        return -1

    def value_based_gas_price_strategy(web3, transaction_params=None):
        return web3.toWei(gas, 'gwei')

    uniswap_wrapper.w3.eth.setGasPriceStrategy(value_based_gas_price_strategy)

    gprice = uniswap_wrapper.w3.eth.generateGasPrice()
    gprice = uniswap_wrapper.w3.fromWei(gprice, 'gwei')
    print(f"gas 价格设置为: {gprice}")

    answer = input(f'{from_name} => {to_name}, 是否交易? y/n: ')
    if answer not in ('Y', 'y'):
        print('退出交易!')
        return -1

    # 交易
    if private_key is not None:
        amount = int(from_amount * 10 ** from_decimals)
        uniswap_wrapper.make_trade(from_coin, to_coin, amount)
        print('交易已发送!')
        return 0
    else:
        print('没有私钥,不能提交交易!')
        return -1


if __name__ == '__main__':
    # 将 eth 换成 dai
    ret = swap(eth, dai, 0.1)


安装

pip install uniswap-python