package arbitrum import ( "bytes" "encoding/binary" "fmt" "math/big" "time" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/fraktal/mev-beta/internal/logger" ) // L2MessageParser parses Arbitrum L2 messages and transactions type L2MessageParser struct { logger *logger.Logger uniswapV2RouterABI abi.ABI uniswapV3RouterABI abi.ABI // Known DEX contract addresses on Arbitrum knownRouters map[common.Address]string knownPools map[common.Address]string } // NewL2MessageParser creates a new L2 message parser func NewL2MessageParser(logger *logger.Logger) *L2MessageParser { parser := &L2MessageParser{ logger: logger, knownRouters: make(map[common.Address]string), knownPools: make(map[common.Address]string), } // Initialize known Arbitrum DEX addresses parser.initializeKnownAddresses() // Load ABIs for parsing parser.loadABIs() return parser } // initializeKnownAddresses sets up known DEX addresses on Arbitrum func (p *L2MessageParser) initializeKnownAddresses() { // Uniswap V3 on Arbitrum p.knownRouters[common.HexToAddress("0xE592427A0AEce92De3Edee1F18E0157C05861564")] = "UniswapV3" p.knownRouters[common.HexToAddress("0x68b3465833fb72A70ecDF485E0e4C7bD8665Fc45")] = "UniswapV3Router2" // Uniswap V2 on Arbitrum p.knownRouters[common.HexToAddress("0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D")] = "UniswapV2" // SushiSwap on Arbitrum p.knownRouters[common.HexToAddress("0x1b02dA8Cb0d097eB8D57A175b88c7D8b47997506")] = "SushiSwap" // Camelot DEX (Arbitrum native) p.knownRouters[common.HexToAddress("0xc873fEcbd354f5A56E00E710B90EF4201db2448d")] = "Camelot" // GMX p.knownRouters[common.HexToAddress("0x327df1e6de05895d2ab08513aadd9317845f20d9")] = "GMX" // Balancer V2 p.knownRouters[common.HexToAddress("0xBA12222222228d8Ba445958a75a0704d566BF2C8")] = "BalancerV2" // Curve p.knownRouters[common.HexToAddress("0x98EE8517825C0bd778a57471a27555614F97F48D")] = "Curve" // Popular pools on Arbitrum p.knownPools[common.HexToAddress("0xC31E54c7a869B9FcBEcc14363CF510d1c41fa443")] = "ETH/USDC-0.05%" p.knownPools[common.HexToAddress("0x17c14D2c404D167802b16C450d3c99F88F2c4F4d")] = "ETH/USDC-0.3%" p.knownPools[common.HexToAddress("0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640")] = "ETH/USDC-0.05%" p.knownPools[common.HexToAddress("0xB4e16d0168e52d35CaCD2c6185b44281Ec28C9Dc")] = "ETH/USDC-0.3%" } // loadABIs loads the required ABI definitions func (p *L2MessageParser) loadABIs() { // Simplified ABI loading - in production, load from files uniswapV2RouterABI := `[ { "inputs": [ {"internalType": "uint256", "name": "amountIn", "type": "uint256"}, {"internalType": "uint256", "name": "amountOutMin", "type": "uint256"}, {"internalType": "address[]", "name": "path", "type": "address[]"}, {"internalType": "address", "name": "to", "type": "address"}, {"internalType": "uint256", "name": "deadline", "type": "uint256"} ], "name": "swapExactTokensForTokens", "outputs": [{"internalType": "uint256[]", "name": "amounts", "type": "uint256[]"}], "stateMutability": "nonpayable", "type": "function" } ]` var err error p.uniswapV2RouterABI, err = abi.JSON(bytes.NewReader([]byte(uniswapV2RouterABI))) if err != nil { p.logger.Error(fmt.Sprintf("Failed to load Uniswap V2 Router ABI: %v", err)) } } // ParseL2Message parses an L2 message and extracts relevant information func (p *L2MessageParser) ParseL2Message(messageData []byte, messageNumber *big.Int, timestamp uint64) (*L2Message, error) { // Validate inputs if messageData == nil { return nil, fmt.Errorf("message data is nil") } if len(messageData) < 4 { return nil, fmt.Errorf("message data too short: %d bytes", len(messageData)) } // Validate message number if messageNumber == nil { return nil, fmt.Errorf("message number is nil") } // Validate timestamp (should be a reasonable Unix timestamp) if timestamp > uint64(time.Now().Unix()+86400) || timestamp < 1609459200 { // 1609459200 = 2021-01-01 p.logger.Warn(fmt.Sprintf("Suspicious timestamp: %d", timestamp)) // We'll still process it but log the warning } l2Message := &L2Message{ MessageNumber: messageNumber, Data: messageData, Timestamp: timestamp, Type: L2Unknown, } // Parse message type from first bytes msgType := binary.BigEndian.Uint32(messageData[:4]) // Validate message type if msgType != 3 && msgType != 7 { p.logger.Debug(fmt.Sprintf("Unknown L2 message type: %d", msgType)) // We'll still return the message but mark it as unknown return l2Message, nil } switch msgType { case 3: // L2 Transaction return p.parseL2Transaction(l2Message, messageData[4:]) case 7: // Batch submission return p.parseL2Batch(l2Message, messageData[4:]) default: p.logger.Debug(fmt.Sprintf("Unknown L2 message type: %d", msgType)) return l2Message, nil } } // parseL2Transaction parses an L2 transaction message func (p *L2MessageParser) parseL2Transaction(l2Message *L2Message, data []byte) (*L2Message, error) { // Validate inputs if l2Message == nil { return nil, fmt.Errorf("l2Message is nil") } if data == nil { return nil, fmt.Errorf("transaction data is nil") } // Validate data length if len(data) == 0 { return nil, fmt.Errorf("transaction data is empty") } l2Message.Type = L2Transaction // Parse RLP-encoded transaction tx := &types.Transaction{} if err := tx.UnmarshalBinary(data); err != nil { return nil, fmt.Errorf("failed to unmarshal transaction: %v", err) } // Validate the parsed transaction if tx == nil { return nil, fmt.Errorf("parsed transaction is nil") } // Additional validation for transaction fields if tx.Gas() == 0 && len(tx.Data()) == 0 { p.logger.Warn("Transaction has zero gas and no data") } l2Message.ParsedTx = tx // Extract sender (this might require signature recovery) if tx.To() != nil { // For now, we'll extract what we can without signature recovery l2Message.Sender = common.HexToAddress("0x0") // Placeholder } return l2Message, nil } // parseL2Batch parses a batch submission message func (p *L2MessageParser) parseL2Batch(l2Message *L2Message, data []byte) (*L2Message, error) { // Validate inputs if l2Message == nil { return nil, fmt.Errorf("l2Message is nil") } if data == nil { return nil, fmt.Errorf("batch data is nil") } l2Message.Type = L2BatchSubmission // Parse batch data structure if len(data) < 32 { return nil, fmt.Errorf("batch data too short: %d bytes", len(data)) } // Extract batch index batchIndex := new(big.Int).SetBytes(data[:32]) // Validate batch index if batchIndex == nil || batchIndex.Sign() < 0 { return nil, fmt.Errorf("invalid batch index") } l2Message.BatchIndex = batchIndex // Parse individual transactions in the batch remainingData := data[32:] // Validate remaining data if remainingData == nil { // No transactions in the batch, which is valid l2Message.InnerTxs = []*types.Transaction{} return l2Message, nil } var innerTxs []*types.Transaction for len(remainingData) > 0 { // Each transaction is prefixed with its length if len(remainingData) < 4 { // Incomplete data, log warning but continue with what we have p.logger.Warn("Incomplete transaction length prefix in batch") break } txLength := binary.BigEndian.Uint32(remainingData[:4]) // Validate transaction length if txLength == 0 { p.logger.Warn("Zero-length transaction in batch") remainingData = remainingData[4:] continue } if uint32(len(remainingData)) < 4+txLength { // Incomplete transaction data, log warning but continue with what we have p.logger.Warn(fmt.Sprintf("Incomplete transaction data in batch: expected %d bytes, got %d", txLength, len(remainingData)-4)) break } txData := remainingData[4 : 4+txLength] tx := &types.Transaction{} if err := tx.UnmarshalBinary(txData); err == nil { // Validate the parsed transaction if tx != nil { innerTxs = append(innerTxs, tx) } else { p.logger.Warn("Parsed nil transaction in batch") } } else { // Log the error but continue processing other transactions p.logger.Warn(fmt.Sprintf("Failed to unmarshal transaction in batch: %v", err)) } remainingData = remainingData[4+txLength:] } l2Message.InnerTxs = innerTxs return l2Message, nil } // ParseDEXInteraction extracts DEX interaction details from a transaction func (p *L2MessageParser) ParseDEXInteraction(tx *types.Transaction) (*DEXInteraction, error) { // Validate inputs if tx == nil { return nil, fmt.Errorf("transaction is nil") } if tx.To() == nil { return nil, fmt.Errorf("contract creation transaction") } to := *tx.To() // Validate address if to == (common.Address{}) { return nil, fmt.Errorf("invalid contract address") } protocol, isDEX := p.knownRouters[to] if !isDEX { // Also check if this might be a direct pool interaction if poolName, isPool := p.knownPools[to]; isPool { protocol = poolName } else { return nil, fmt.Errorf("not a known DEX router or pool") } } data := tx.Data() // Validate transaction data if data == nil { return nil, fmt.Errorf("transaction data is nil") } if len(data) < 4 { return nil, fmt.Errorf("transaction data too short: %d bytes", len(data)) } // Validate function selector (first 4 bytes) selector := data[:4] if len(selector) != 4 { return nil, fmt.Errorf("invalid function selector length: %d", len(selector)) } interaction := &DEXInteraction{ Protocol: protocol, Router: to, Timestamp: uint64(time.Now().Unix()), // Use current time as default MessageNumber: big.NewInt(0), // Will be set by caller } // Parse based on function selector switch common.Bytes2Hex(selector) { case "38ed1739": // swapExactTokensForTokens (Uniswap V2) return p.parseSwapExactTokensForTokens(interaction, data[4:]) case "8803dbee": // swapTokensForExactTokens (Uniswap V2) return p.parseSwapTokensForExactTokens(interaction, data[4:]) case "18cbafe5": // swapExactTokensForTokensSupportingFeeOnTransferTokens (Uniswap V2) return p.parseSwapExactTokensForTokens(interaction, data[4:]) case "414bf389": // exactInputSingle (Uniswap V3) return p.parseExactInputSingle(interaction, data[4:]) case "db3e2198": // exactInput (Uniswap V3) return p.parseExactInput(interaction, data[4:]) case "f305d719": // exactOutputSingle (Uniswap V3) return p.parseExactOutputSingle(interaction, data[4:]) case "04e45aaf": // exactOutput (Uniswap V3) return p.parseExactOutput(interaction, data[4:]) case "7ff36ab5": // swapExactETHForTokens (Uniswap V2) return p.parseSwapExactETHForTokens(interaction, data[4:]) case "18cffa1c": // swapExactETHForTokensSupportingFeeOnTransferTokens (Uniswap V2) return p.parseSwapExactETHForTokens(interaction, data[4:]) case "b6f9de95": // swapExactTokensForETH (Uniswap V2) return p.parseSwapExactTokensForETH(interaction, data[4:]) case "791ac947": // swapExactTokensForETHSupportingFeeOnTransferTokens (Uniswap V2) return p.parseSwapExactTokensForETH(interaction, data[4:]) case "5ae401dc": // multicall (Uniswap V3) return p.parseMulticall(interaction, data[4:]) default: return nil, fmt.Errorf("unknown DEX function selector: %s", common.Bytes2Hex(selector)) } } // parseSwapExactTokensForTokens parses Uniswap V2 style swap func (p *L2MessageParser) parseSwapExactTokensForTokens(interaction *DEXInteraction, data []byte) (*DEXInteraction, error) { // Validate inputs if interaction == nil { return nil, fmt.Errorf("interaction is nil") } if data == nil { return nil, fmt.Errorf("data is nil") } // Decode ABI data method, err := p.uniswapV2RouterABI.MethodById(crypto.Keccak256([]byte("swapExactTokensForTokens(uint256,uint256,address[],address,uint256)"))[:4]) if err != nil { return nil, fmt.Errorf("failed to get ABI method: %v", err) } // Validate data length before unpacking if len(data) == 0 { return nil, fmt.Errorf("data is empty") } inputs, err := method.Inputs.Unpack(data) if err != nil { return nil, fmt.Errorf("failed to unpack ABI data: %v", err) } if len(inputs) < 5 { return nil, fmt.Errorf("insufficient swap parameters: got %d, expected 5", len(inputs)) } // Extract parameters with validation amountIn, ok := inputs[0].(*big.Int) if !ok { return nil, fmt.Errorf("amountIn is not a *big.Int") } // Validate amountIn is not negative if amountIn.Sign() < 0 { return nil, fmt.Errorf("negative amountIn") } interaction.AmountIn = amountIn // amountOutMin := inputs[1].(*big.Int) path, ok := inputs[2].([]common.Address) if !ok { return nil, fmt.Errorf("path is not []common.Address") } // Validate path if len(path) < 2 { return nil, fmt.Errorf("path must contain at least 2 tokens, got %d", len(path)) } // Validate addresses in path are not zero for i, addr := range path { if addr == (common.Address{}) { return nil, fmt.Errorf("zero address in path at index %d", i) } } recipient, ok := inputs[3].(common.Address) if !ok { return nil, fmt.Errorf("recipient is not common.Address") } // Validate recipient is not zero if recipient == (common.Address{}) { return nil, fmt.Errorf("recipient address is zero") } interaction.Recipient = recipient interaction.Deadline = inputs[4].(*big.Int).Uint64() interaction.TokenIn = path[0] interaction.TokenOut = path[len(path)-1] return interaction, nil } // parseSwapTokensForExactTokens parses exact output swaps func (p *L2MessageParser) parseSwapTokensForExactTokens(interaction *DEXInteraction, data []byte) (*DEXInteraction, error) { // Similar to above but for exact output // Implementation would be similar to parseSwapExactTokensForTokens // but with different parameter ordering return interaction, fmt.Errorf("not implemented yet") } // parseSwapExactETHForTokens parses ETH to token swaps func (p *L2MessageParser) parseSwapExactETHForTokens(interaction *DEXInteraction, data []byte) (*DEXInteraction, error) { // Implementation for ETH to token swaps return interaction, fmt.Errorf("not implemented yet") } // parseSwapExactTokensForETH parses token to ETH swaps func (p *L2MessageParser) parseSwapExactTokensForETH(interaction *DEXInteraction, data []byte) (*DEXInteraction, error) { // Implementation for token to ETH swaps return interaction, fmt.Errorf("not implemented yet") } // parseExactOutputSingle parses Uniswap V3 exact output single pool swap func (p *L2MessageParser) parseExactOutputSingle(interaction *DEXInteraction, data []byte) (*DEXInteraction, error) { // Implementation for exact output swaps return interaction, fmt.Errorf("not implemented yet") } // parseExactOutput parses Uniswap V3 exact output multi-hop swap func (p *L2MessageParser) parseExactOutput(interaction *DEXInteraction, data []byte) (*DEXInteraction, error) { // Implementation for exact output multi-hop swaps return interaction, fmt.Errorf("not implemented yet") } // parseMulticall parses Uniswap V3 multicall transactions func (p *L2MessageParser) parseMulticall(interaction *DEXInteraction, data []byte) (*DEXInteraction, error) { // Implementation for multicall transactions return interaction, fmt.Errorf("not implemented yet") } // parseExactInputSingle parses Uniswap V3 single pool swap func (p *L2MessageParser) parseExactInputSingle(interaction *DEXInteraction, data []byte) (*DEXInteraction, error) { // Validate inputs if interaction == nil { return nil, fmt.Errorf("interaction is nil") } if data == nil { return nil, fmt.Errorf("data is nil") } // Uniswap V3 exactInputSingle structure: // struct ExactInputSingleParams { // address tokenIn; // address tokenOut; // uint24 fee; // address recipient; // uint256 deadline; // uint256 amountIn; // uint256 amountOutMinimum; // uint160 sqrtPriceLimitX96; // } // Validate minimum data length (at least 8 parameters * 32 bytes each) if len(data) < 256 { return nil, fmt.Errorf("insufficient data for exactInputSingle: %d bytes", len(data)) } // Parse parameters with bounds checking // tokenIn (first parameter) - bytes 0-31, address is in last 20 bytes (12-31) if len(data) >= 32 { interaction.TokenIn = common.BytesToAddress(data[12:32]) } // tokenOut (second parameter) - bytes 32-63, address is in last 20 bytes (44-63) if len(data) >= 64 { interaction.TokenOut = common.BytesToAddress(data[44:64]) } // recipient (fourth parameter) - bytes 96-127, address is in last 20 bytes (108-127) if len(data) >= 128 { interaction.Recipient = common.BytesToAddress(data[108:128]) } // deadline (fifth parameter) - bytes 128-159, uint64 is in last 8 bytes (152-159) if len(data) >= 160 { interaction.Deadline = binary.BigEndian.Uint64(data[152:160]) } // amountIn (sixth parameter) - bytes 160-191 if len(data) >= 192 { amountIn := new(big.Int).SetBytes(data[160:192]) // Validate amount is reasonable (not negative) if amountIn.Sign() < 0 { return nil, fmt.Errorf("negative amountIn") } interaction.AmountIn = amountIn } // Set default values for fields that might not be parsed if interaction.AmountOut == nil { interaction.AmountOut = big.NewInt(0) } // Validate that we have required fields if interaction.TokenIn == (common.Address{}) && interaction.TokenOut == (common.Address{}) { // If both are zero, we likely don't have valid data return nil, fmt.Errorf("unable to parse token addresses from data") } // Note: We're not strictly validating that addresses are non-zero since some // transactions might legitimately use zero addresses in certain contexts // The calling code should validate addresses as appropriate for their use case return interaction, nil } // parseExactInput parses Uniswap V3 multi-hop swap func (p *L2MessageParser) parseExactInput(interaction *DEXInteraction, data []byte) (*DEXInteraction, error) { // This would parse the more complex multi-hop swap structure return interaction, fmt.Errorf("not implemented yet") } // IsSignificantSwap determines if a DEX interaction is significant enough to monitor func (p *L2MessageParser) IsSignificantSwap(interaction *DEXInteraction, minAmountUSD float64) bool { // Validate inputs if interaction == nil { p.logger.Warn("IsSignificantSwap called with nil interaction") return false } // Validate minAmountUSD if minAmountUSD < 0 { p.logger.Warn(fmt.Sprintf("Negative minAmountUSD: %f", minAmountUSD)) return false } // This would implement logic to determine if the swap is large enough // to be worth monitoring for arbitrage opportunities // For now, check if amount is above a threshold if interaction.AmountIn == nil { return false } // Validate AmountIn is not negative if interaction.AmountIn.Sign() < 0 { p.logger.Warn("Negative AmountIn in DEX interaction") return false } // Simplified check - in practice, you'd convert to USD value threshold := new(big.Int).Exp(big.NewInt(10), big.NewInt(18), nil) // 1 ETH worth // Validate threshold if threshold == nil || threshold.Sign() <= 0 { p.logger.Error("Invalid threshold calculation") return false } return interaction.AmountIn.Cmp(threshold) >= 0 }