Restructured project for V2 refactor: **Structure Changes:** - Moved all V1 code to orig/ folder (preserved with git mv) - Created docs/planning/ directory - Added orig/README_V1.md explaining V1 preservation **Planning Documents:** - 00_V2_MASTER_PLAN.md: Complete architecture overview - Executive summary of critical V1 issues - High-level component architecture diagrams - 5-phase implementation roadmap - Success metrics and risk mitigation - 07_TASK_BREAKDOWN.md: Atomic task breakdown - 99+ hours of detailed tasks - Every task < 2 hours (atomic) - Clear dependencies and success criteria - Organized by implementation phase **V2 Key Improvements:** - Per-exchange parsers (factory pattern) - Multi-layer strict validation - Multi-index pool cache - Background validation pipeline - Comprehensive observability **Critical Issues Addressed:** - Zero address tokens (strict validation + cache enrichment) - Parsing accuracy (protocol-specific parsers) - No audit trail (background validation channel) - Inefficient lookups (multi-index cache) - Stats disconnection (event-driven metrics) Next Steps: 1. Review planning documents 2. Begin Phase 1: Foundation (P1-001 through P1-010) 3. Implement parsers in Phase 2 4. Build cache system in Phase 3 5. Add validation pipeline in Phase 4 6. Migrate and test in Phase 5 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
691 lines
21 KiB
Go
691 lines
21 KiB
Go
package arbitrum
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math/big"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum"
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/core/types"
|
|
"github.com/ethereum/go-ethereum/crypto"
|
|
"github.com/ethereum/go-ethereum/rpc"
|
|
|
|
"github.com/fraktal/mev-beta/internal/logger"
|
|
arbcommon "github.com/fraktal/mev-beta/pkg/arbitrum/common"
|
|
)
|
|
|
|
// EventMonitor monitors DEX events across all protocols for comprehensive MEV detection
|
|
type EventMonitor struct {
|
|
client *RateLimitedClient
|
|
rpcClient *rpc.Client
|
|
logger *logger.Logger
|
|
protocolRegistry *ArbitrumProtocolRegistry
|
|
poolCache *PoolCache
|
|
swapPipeline *SwapEventPipeline
|
|
|
|
// Event signatures for all major DEX protocols
|
|
eventSignatures map[arbcommon.Protocol]map[string]common.Hash
|
|
|
|
// Active monitoring subscriptions
|
|
subscriptions map[string]*subscription
|
|
subMu sync.RWMutex
|
|
|
|
// Metrics
|
|
eventsProcessed uint64
|
|
swapsDetected uint64
|
|
liquidityEvents uint64
|
|
|
|
// Channels for event processing
|
|
swapEvents chan *SwapEvent
|
|
liquidationEvents chan *LiquidationEvent
|
|
liquidityEventsChan chan *LiquidityEvent
|
|
|
|
// Shutdown management
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
// subscription represents an active event subscription
|
|
type subscription struct {
|
|
protocol arbcommon.Protocol
|
|
query ethereum.FilterQuery
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
// NewEventMonitor creates a new event monitor
|
|
func NewEventMonitor(
|
|
client *RateLimitedClient,
|
|
rpcClient *rpc.Client,
|
|
logger *logger.Logger,
|
|
protocolRegistry *ArbitrumProtocolRegistry,
|
|
poolCache *PoolCache,
|
|
) *EventMonitor {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
em := &EventMonitor{
|
|
client: client,
|
|
rpcClient: rpcClient,
|
|
logger: logger,
|
|
protocolRegistry: protocolRegistry,
|
|
poolCache: poolCache,
|
|
eventSignatures: make(map[arbcommon.Protocol]map[string]common.Hash),
|
|
subscriptions: make(map[string]*subscription),
|
|
swapEvents: make(chan *SwapEvent, 1000),
|
|
liquidationEvents: make(chan *LiquidationEvent, 100),
|
|
liquidityEventsChan: make(chan *LiquidityEvent, 500),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
|
|
// Initialize event signatures for all protocols
|
|
em.initializeEventSignatures()
|
|
|
|
return em
|
|
}
|
|
|
|
// initializeEventSignatures sets up event signatures for all supported protocols
|
|
func (em *EventMonitor) initializeEventSignatures() {
|
|
// Uniswap V2/V3 Swap events
|
|
em.eventSignatures[arbcommon.ProtocolUniswapV2] = map[string]common.Hash{
|
|
"Swap": crypto.Keccak256Hash([]byte("Swap(address,uint256,uint256,uint256,uint256,address)")),
|
|
"Mint": crypto.Keccak256Hash([]byte("Mint(address,uint256,uint256)")),
|
|
"Burn": crypto.Keccak256Hash([]byte("Burn(address,uint256,uint256,address)")),
|
|
}
|
|
|
|
em.eventSignatures[arbcommon.ProtocolUniswapV3] = map[string]common.Hash{
|
|
"Swap": crypto.Keccak256Hash([]byte("Swap(address,address,int256,int256,uint160,uint128,int24)")),
|
|
"Mint": crypto.Keccak256Hash([]byte("Mint(address,int24,int24,uint128,uint256,uint256)")),
|
|
"Burn": crypto.Keccak256Hash([]byte("Burn(address,int24,int24,uint128,uint256,uint256)")),
|
|
}
|
|
|
|
// SushiSwap (same as Uniswap V2)
|
|
em.eventSignatures[arbcommon.ProtocolSushiSwapV2] = em.eventSignatures[arbcommon.ProtocolUniswapV2]
|
|
|
|
// Camelot V3 (Algebra)
|
|
em.eventSignatures[arbcommon.ProtocolCamelotV3] = map[string]common.Hash{
|
|
"Swap": crypto.Keccak256Hash([]byte("Swap(address,address,int256,int256,uint160,uint128,int24)")),
|
|
"Mint": crypto.Keccak256Hash([]byte("Mint(address,int24,int24,uint128,uint256,uint256)")),
|
|
"Burn": crypto.Keccak256Hash([]byte("Burn(address,int24,int24,uint128,uint256,uint256)")),
|
|
}
|
|
|
|
// Balancer V2
|
|
em.eventSignatures[arbcommon.ProtocolBalancerV2] = map[string]common.Hash{
|
|
"Swap": crypto.Keccak256Hash([]byte("Swap(bytes32,address,address,uint256,uint256)")),
|
|
"PoolBalanceChanged": crypto.Keccak256Hash([]byte("PoolBalanceChanged(bytes32,address,address[],int256[],uint256[])")),
|
|
}
|
|
|
|
// Curve Finance
|
|
em.eventSignatures[arbcommon.ProtocolCurve] = map[string]common.Hash{
|
|
"TokenExchange": crypto.Keccak256Hash([]byte("TokenExchange(address,int128,uint256,int128,uint256)")),
|
|
"TokenExchangeUnderlying": crypto.Keccak256Hash([]byte("TokenExchangeUnderlying(address,int128,uint256,int128,uint256)")),
|
|
"AddLiquidity": crypto.Keccak256Hash([]byte("AddLiquidity(address,uint256[],uint256[],uint256,uint256)")),
|
|
"RemoveLiquidity": crypto.Keccak256Hash([]byte("RemoveLiquidity(address,uint256[],uint256)")),
|
|
}
|
|
|
|
// 1inch Aggregator
|
|
em.eventSignatures[arbcommon.Protocol1Inch] = map[string]common.Hash{
|
|
"Swapped": crypto.Keccak256Hash([]byte("Swapped(address,address,address,uint256,uint256)")),
|
|
}
|
|
|
|
// GMX
|
|
em.eventSignatures[arbcommon.ProtocolGMX] = map[string]common.Hash{
|
|
"Swap": crypto.Keccak256Hash([]byte("Swap(address,address,address,uint256,uint256,uint256,uint256)")),
|
|
"LiquidatePosition": crypto.Keccak256Hash([]byte("LiquidatePosition(bytes32,address,address,address,bool,uint256,uint256,uint256,int256,uint256)")),
|
|
}
|
|
|
|
// Radiant Capital
|
|
em.eventSignatures[arbcommon.ProtocolRadiant] = map[string]common.Hash{
|
|
"LiquidationCall": crypto.Keccak256Hash([]byte("LiquidationCall(address,address,address,uint256,uint256,address,bool)")),
|
|
"Deposit": crypto.Keccak256Hash([]byte("Deposit(address,address,address,uint256,uint16)")),
|
|
"Withdraw": crypto.Keccak256Hash([]byte("Withdraw(address,address,address,uint256)")),
|
|
}
|
|
}
|
|
|
|
// Start begins monitoring all DEX events
|
|
func (em *EventMonitor) Start() error {
|
|
em.logger.Info("🚀 Starting comprehensive DEX event monitoring")
|
|
|
|
// Start event processing workers
|
|
go em.processSwapEvents()
|
|
go em.processLiquidationEvents()
|
|
go em.processLiquidityEvents()
|
|
|
|
// Start monitoring all active protocols
|
|
if err := em.startProtocolMonitoring(); err != nil {
|
|
return fmt.Errorf("failed to start protocol monitoring: %w", err)
|
|
}
|
|
|
|
em.logger.Info("✅ DEX event monitoring started successfully")
|
|
return nil
|
|
}
|
|
|
|
// startProtocolMonitoring sets up monitoring for all active protocols
|
|
func (em *EventMonitor) startProtocolMonitoring() error {
|
|
protocols := em.protocolRegistry.GetActiveProtocols()
|
|
|
|
for _, protocol := range protocols {
|
|
// Get all pool addresses for this protocol
|
|
poolAddresses := em.poolCache.GetPoolAddressesByProtocol(arbcommon.Protocol(protocol.Name))
|
|
|
|
// If we don't have pools yet, monitor factory events to discover them
|
|
if len(poolAddresses) == 0 {
|
|
// Monitor factory events for pool creation
|
|
if err := em.monitorFactoryEvents(arbcommon.Protocol(protocol.Name)); err != nil {
|
|
em.logger.Error(fmt.Sprintf("Failed to monitor factory events for %s: %v", protocol.Name, err))
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Monitor swap events for existing pools
|
|
if err := em.monitorSwapEvents(arbcommon.Protocol(protocol.Name), poolAddresses); err != nil {
|
|
em.logger.Error(fmt.Sprintf("Failed to monitor swap events for %s: %v", protocol.Name, err))
|
|
continue
|
|
}
|
|
|
|
// Monitor liquidation events for lending protocols
|
|
if arbcommon.Protocol(protocol.Name) == arbcommon.ProtocolGMX || arbcommon.Protocol(protocol.Name) == arbcommon.ProtocolRadiant {
|
|
if err := em.monitorLiquidationEvents(arbcommon.Protocol(protocol.Name)); err != nil {
|
|
em.logger.Error(fmt.Sprintf("Failed to monitor liquidation events for %s: %v", protocol.Name, err))
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// monitorSwapEvents sets up monitoring for swap events on specific pools
|
|
func (em *EventMonitor) monitorSwapEvents(protocol arbcommon.Protocol, poolAddresses []common.Address) error {
|
|
if len(poolAddresses) == 0 {
|
|
return nil // No pools to monitor
|
|
}
|
|
|
|
// Get event signatures for this protocol
|
|
signatures, exists := em.eventSignatures[protocol]
|
|
if !exists {
|
|
return fmt.Errorf("no event signatures found for protocol %s", protocol)
|
|
}
|
|
|
|
// Get Swap event signature
|
|
swapTopic, exists := signatures["Swap"]
|
|
if !exists {
|
|
return fmt.Errorf("no Swap event signature found for protocol %s", protocol)
|
|
}
|
|
|
|
// Create filter query for Swap events
|
|
query := ethereum.FilterQuery{
|
|
Addresses: poolAddresses,
|
|
Topics: [][]common.Hash{{swapTopic}},
|
|
}
|
|
|
|
// Create subscription
|
|
subCtx, cancel := context.WithCancel(em.ctx)
|
|
sub := &subscription{
|
|
protocol: protocol,
|
|
query: query,
|
|
cancel: cancel,
|
|
}
|
|
|
|
// Store subscription
|
|
em.subMu.Lock()
|
|
em.subscriptions[string(protocol)+"_swap"] = sub
|
|
em.subMu.Unlock()
|
|
|
|
// Start monitoring in a goroutine
|
|
go em.monitorEvents(subCtx, sub, "Swap")
|
|
|
|
em.logger.Info(fmt.Sprintf("🔍 Monitoring Swap events for %d %s pools", len(poolAddresses), protocol))
|
|
return nil
|
|
}
|
|
|
|
// monitorFactoryEvents sets up monitoring for factory events to discover new pools
|
|
func (em *EventMonitor) monitorFactoryEvents(protocol arbcommon.Protocol) error {
|
|
// Get factory addresses for this protocol
|
|
factories := em.protocolRegistry.GetFactoryAddresses(protocol)
|
|
if len(factories) == 0 {
|
|
return nil // No factories to monitor
|
|
}
|
|
|
|
// Get event signatures for this protocol
|
|
signatures, exists := em.eventSignatures[protocol]
|
|
if !exists {
|
|
return fmt.Errorf("no event signatures found for protocol %s", protocol)
|
|
}
|
|
|
|
// Different protocols have different pool creation events
|
|
var creationTopic common.Hash
|
|
switch protocol {
|
|
case arbcommon.ProtocolUniswapV2, arbcommon.ProtocolSushiSwapV2:
|
|
creationTopic, exists = signatures["PairCreated"]
|
|
if !exists {
|
|
// Default to common creation event
|
|
creationTopic = crypto.Keccak256Hash([]byte("PairCreated(address,address,address,uint256)"))
|
|
}
|
|
case arbcommon.ProtocolUniswapV3, arbcommon.ProtocolCamelotV3:
|
|
creationTopic, exists = signatures["PoolCreated"]
|
|
if !exists {
|
|
// Default to common creation event
|
|
creationTopic = crypto.Keccak256Hash([]byte("PoolCreated(address,address,uint24,address)"))
|
|
}
|
|
case arbcommon.ProtocolBalancerV2:
|
|
creationTopic = crypto.Keccak256Hash([]byte("PoolCreated(address,address,address)"))
|
|
case arbcommon.ProtocolCurve:
|
|
creationTopic = crypto.Keccak256Hash([]byte("PoolAdded(address)"))
|
|
default:
|
|
// For other protocols, use a generic approach
|
|
creationTopic = crypto.Keccak256Hash([]byte("PoolCreated(address)"))
|
|
}
|
|
|
|
// Create filter query for pool creation events
|
|
query := ethereum.FilterQuery{
|
|
Addresses: factories,
|
|
Topics: [][]common.Hash{{creationTopic}},
|
|
}
|
|
|
|
// Create subscription
|
|
subCtx, cancel := context.WithCancel(em.ctx)
|
|
sub := &subscription{
|
|
protocol: protocol,
|
|
query: query,
|
|
cancel: cancel,
|
|
}
|
|
|
|
// Store subscription
|
|
em.subMu.Lock()
|
|
em.subscriptions[string(protocol)+"_factory"] = sub
|
|
em.subMu.Unlock()
|
|
|
|
// Start monitoring in a goroutine
|
|
go em.monitorEvents(subCtx, sub, "Factory")
|
|
|
|
em.logger.Info(fmt.Sprintf("🏭 Monitoring factory events for %s pool creation", protocol))
|
|
return nil
|
|
}
|
|
|
|
// monitorLiquidationEvents sets up monitoring for liquidation events
|
|
func (em *EventMonitor) monitorLiquidationEvents(protocol arbcommon.Protocol) error {
|
|
// Get contract addresses for this protocol
|
|
contracts := em.protocolRegistry.GetContractAddresses(protocol)
|
|
if len(contracts) == 0 {
|
|
return nil // No contracts to monitor
|
|
}
|
|
|
|
// Get event signatures for this protocol
|
|
signatures, exists := em.eventSignatures[protocol]
|
|
if !exists {
|
|
return fmt.Errorf("no event signatures found for protocol %s", protocol)
|
|
}
|
|
|
|
// Get Liquidation event signature
|
|
var liquidationTopic common.Hash
|
|
switch protocol {
|
|
case arbcommon.ProtocolGMX:
|
|
liquidationTopic, exists = signatures["LiquidatePosition"]
|
|
case arbcommon.ProtocolRadiant:
|
|
liquidationTopic, exists = signatures["LiquidationCall"]
|
|
default:
|
|
return nil // No liquidation events for this protocol
|
|
}
|
|
|
|
if !exists {
|
|
return fmt.Errorf("no liquidation event signature found for protocol %s", protocol)
|
|
}
|
|
|
|
// Create filter query for liquidation events
|
|
query := ethereum.FilterQuery{
|
|
Addresses: contracts,
|
|
Topics: [][]common.Hash{{liquidationTopic}},
|
|
}
|
|
|
|
// Create subscription
|
|
subCtx, cancel := context.WithCancel(em.ctx)
|
|
sub := &subscription{
|
|
protocol: protocol,
|
|
query: query,
|
|
cancel: cancel,
|
|
}
|
|
|
|
// Store subscription
|
|
em.subMu.Lock()
|
|
em.subscriptions[string(protocol)+"_liquidation"] = sub
|
|
em.subMu.Unlock()
|
|
|
|
// Start monitoring in a goroutine
|
|
go em.monitorEvents(subCtx, sub, "Liquidation")
|
|
|
|
em.logger.Info(fmt.Sprintf("💧 Monitoring liquidation events for %s", protocol))
|
|
return nil
|
|
}
|
|
|
|
// monitorEvents continuously monitors events for a subscription
|
|
func (em *EventMonitor) monitorEvents(ctx context.Context, sub *subscription, eventType string) {
|
|
ticker := time.NewTicker(1 * time.Second) // Poll every second
|
|
defer ticker.Stop()
|
|
|
|
var lastBlock uint64 = 0
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
// Get latest block
|
|
latestBlock, err := em.client.BlockNumber(ctx)
|
|
if err != nil {
|
|
em.logger.Error(fmt.Sprintf("Failed to get latest block: %v", err))
|
|
continue
|
|
}
|
|
|
|
// Set fromBlock to last processed block + 1, or latest - 1000 if starting
|
|
fromBlock := lastBlock + 1
|
|
if fromBlock == 1 || latestBlock-fromBlock > 10000 {
|
|
fromBlock = latestBlock - 10000
|
|
if fromBlock < 1 {
|
|
fromBlock = 1
|
|
}
|
|
}
|
|
|
|
// Set toBlock to latest block
|
|
toBlock := latestBlock
|
|
|
|
// Skip if no new blocks
|
|
if fromBlock > toBlock {
|
|
continue
|
|
}
|
|
|
|
// Update query block range
|
|
query := sub.query
|
|
query.FromBlock = new(big.Int).SetUint64(fromBlock)
|
|
query.ToBlock = new(big.Int).SetUint64(toBlock)
|
|
|
|
// Query logs with circuit breaker and rate limiting
|
|
var logs []types.Log
|
|
err = em.client.CallWithRateLimit(ctx, func() error {
|
|
var innerErr error
|
|
logs, innerErr = em.client.FilterLogs(ctx, query)
|
|
return innerErr
|
|
})
|
|
if err != nil {
|
|
em.logger.Error(fmt.Sprintf("Failed to filter logs for %s events: %v", eventType, err))
|
|
continue
|
|
}
|
|
|
|
// Process logs
|
|
for _, log := range logs {
|
|
if err := em.processLog(log, sub.protocol, eventType); err != nil {
|
|
em.logger.Error(fmt.Sprintf("Failed to process %s log: %v", eventType, err))
|
|
}
|
|
em.eventsProcessed++
|
|
}
|
|
|
|
// Update last processed block
|
|
lastBlock = toBlock
|
|
}
|
|
}
|
|
}
|
|
|
|
// processLog processes a single log entry
|
|
func (em *EventMonitor) processLog(log types.Log, protocol arbcommon.Protocol, eventType string) error {
|
|
switch eventType {
|
|
case "Swap":
|
|
return em.processSwapLog(log, protocol)
|
|
case "Factory":
|
|
return em.processFactoryLog(log, protocol)
|
|
case "Liquidation":
|
|
return em.processLiquidationLog(log, protocol)
|
|
default:
|
|
// Generic event processing
|
|
em.logger.Debug(fmt.Sprintf("Processing generic %s event from %s", eventType, protocol))
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// processSwapLog processes a swap event log
|
|
func (em *EventMonitor) processSwapLog(log types.Log, protocol arbcommon.Protocol) error {
|
|
// Parse swap event based on protocol
|
|
swapEvent, err := em.parseSwapEvent(log, protocol)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to parse swap event: %w", err)
|
|
}
|
|
|
|
// Send to swap events channel
|
|
select {
|
|
case em.swapEvents <- swapEvent:
|
|
em.swapsDetected++
|
|
default:
|
|
em.logger.Warn("Swap events channel full, dropping event")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// parseSwapEvent parses a swap event log into a SwapEvent
|
|
func (em *EventMonitor) parseSwapEvent(log types.Log, protocol arbcommon.Protocol) (*SwapEvent, error) {
|
|
// This is a simplified implementation - in practice, you'd parse the actual
|
|
// event data based on the protocol's ABI
|
|
swapEvent := &SwapEvent{
|
|
Timestamp: time.Now(),
|
|
BlockNumber: log.BlockNumber,
|
|
TxHash: log.TxHash.Hex(),
|
|
Protocol: string(protocol),
|
|
Pool: log.Address.Hex(),
|
|
// In a real implementation, you'd parse the actual event parameters here
|
|
TokenIn: "0x0000000000000000000000000000000000000000",
|
|
TokenOut: "0x0000000000000000000000000000000000000000",
|
|
AmountIn: "0",
|
|
AmountOut: "0",
|
|
Sender: "0x0000000000000000000000000000000000000000",
|
|
Recipient: "0x0000000000000000000000000000000000000000",
|
|
GasPrice: "0",
|
|
GasUsed: 0,
|
|
PriceImpact: 0.0,
|
|
MEVScore: 0.0,
|
|
Profitable: false,
|
|
}
|
|
|
|
// Add to pool cache if not already present
|
|
em.poolCache.AddPoolIfNotExists(log.Address, protocol)
|
|
|
|
return swapEvent, nil
|
|
}
|
|
|
|
// processFactoryLog processes a factory event log (pool creation)
|
|
func (em *EventMonitor) processFactoryLog(log types.Log, protocol arbcommon.Protocol) error {
|
|
// Parse pool creation event to extract token addresses
|
|
var token0, token1 common.Address
|
|
|
|
// For Uniswap V2/SushiSwap: PairCreated(address indexed token0, address indexed token1, address pair, uint)
|
|
// For Uniswap V3: PoolCreated(address indexed token0, address indexed token1, uint24 indexed fee, int24 tickSpacing, address pool)
|
|
if len(log.Topics) >= 3 {
|
|
token0 = common.BytesToAddress(log.Topics[1].Bytes())
|
|
token1 = common.BytesToAddress(log.Topics[2].Bytes())
|
|
}
|
|
|
|
// Handle empty addresses to prevent slice bounds panic
|
|
token0Display := "unknown"
|
|
token1Display := "unknown"
|
|
addressDisplay := "unknown"
|
|
if len(token0.Hex()) > 0 {
|
|
if len(token0.Hex()) > 8 {
|
|
token0Display = token0.Hex()[:8]
|
|
} else {
|
|
token0Display = token0.Hex()
|
|
}
|
|
}
|
|
if len(token1.Hex()) > 0 {
|
|
if len(token1.Hex()) > 8 {
|
|
token1Display = token1.Hex()[:8]
|
|
} else {
|
|
token1Display = token1.Hex()
|
|
}
|
|
}
|
|
if len(log.Address.Hex()) > 0 {
|
|
if len(log.Address.Hex()) > 10 {
|
|
addressDisplay = log.Address.Hex()[:10]
|
|
} else {
|
|
addressDisplay = log.Address.Hex()
|
|
}
|
|
}
|
|
em.logger.Info(fmt.Sprintf("🆕 New %s pool created: %s/%s at %s",
|
|
protocol,
|
|
token0Display,
|
|
token1Display,
|
|
addressDisplay))
|
|
|
|
// Add to pool cache and start monitoring this pool
|
|
em.poolCache.AddPoolIfNotExists(log.Address, protocol)
|
|
|
|
// CRITICAL FIX: Validate pool address is not zero before creating event
|
|
if log.Address == (common.Address{}) {
|
|
return fmt.Errorf("invalid zero pool address from log")
|
|
}
|
|
|
|
// CRITICAL: Create liquidity event to trigger cross-factory syncing
|
|
liquidityEvent := &LiquidityEvent{
|
|
Timestamp: time.Now(),
|
|
BlockNumber: log.BlockNumber,
|
|
TxHash: log.TxHash.Hex(),
|
|
Protocol: string(protocol),
|
|
Pool: log.Address.Hex(),
|
|
Token0: token0.Hex(),
|
|
Token1: token1.Hex(),
|
|
EventType: "pool_created",
|
|
Amount0: "0",
|
|
Amount1: "0",
|
|
Liquidity: "0",
|
|
PriceAfter: "0",
|
|
ImpactSize: 0.0,
|
|
ArbitrageOpp: false,
|
|
}
|
|
|
|
// Send to liquidity events channel for processing
|
|
select {
|
|
case em.liquidityEventsChan <- liquidityEvent:
|
|
em.liquidityEvents++
|
|
default:
|
|
em.logger.Warn("Liquidity events channel full, dropping pool creation event")
|
|
}
|
|
|
|
// If we have the swap pipeline, trigger cross-factory sync
|
|
if em.swapPipeline != nil {
|
|
// CRITICAL FIX: Validate pool address is not zero before creating event
|
|
if log.Address == (common.Address{}) {
|
|
return nil // Skip processing if zero address
|
|
}
|
|
|
|
// Create a mock swap event to trigger syncing
|
|
mockSwap := &SwapEvent{
|
|
Protocol: string(protocol),
|
|
Pool: log.Address.Hex(),
|
|
TokenIn: token0.Hex(),
|
|
TokenOut: token1.Hex(),
|
|
}
|
|
go em.swapPipeline.SubmitPoolDiscoverySwap(mockSwap)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// processLiquidationLog processes a liquidation event log
|
|
func (em *EventMonitor) processLiquidationLog(log types.Log, protocol arbcommon.Protocol) error {
|
|
// Parse liquidation event based on protocol
|
|
liquidationEvent := &LiquidationEvent{
|
|
Timestamp: time.Now(),
|
|
BlockNumber: log.BlockNumber,
|
|
TxHash: log.TxHash.Hex(),
|
|
Protocol: string(protocol),
|
|
// In a real implementation, you'd parse the actual event parameters here
|
|
}
|
|
|
|
// Send to liquidation events channel
|
|
select {
|
|
case em.liquidationEvents <- liquidationEvent:
|
|
default:
|
|
em.logger.Warn("Liquidation events channel full, dropping event")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// processSwapEvents processes swap events from the channel
|
|
func (em *EventMonitor) processSwapEvents() {
|
|
for {
|
|
select {
|
|
case <-em.ctx.Done():
|
|
return
|
|
case swapEvent := <-em.swapEvents:
|
|
// Log the swap event
|
|
if err := em.protocolRegistry.LogSwapEvent(swapEvent); err != nil {
|
|
em.logger.Error(fmt.Sprintf("Failed to log swap event: %v", err))
|
|
}
|
|
|
|
// Update metrics
|
|
em.swapsDetected++
|
|
}
|
|
}
|
|
}
|
|
|
|
// processLiquidationEvents processes liquidation events from the channel
|
|
func (em *EventMonitor) processLiquidationEvents() {
|
|
for {
|
|
select {
|
|
case <-em.ctx.Done():
|
|
return
|
|
case liquidationEvent := <-em.liquidationEvents:
|
|
// Log the liquidation event
|
|
if err := em.protocolRegistry.LogLiquidationEvent(liquidationEvent); err != nil {
|
|
em.logger.Error(fmt.Sprintf("Failed to log liquidation event: %v", err))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// processLiquidityEvents processes liquidity events from the channel
|
|
func (em *EventMonitor) processLiquidityEvents() {
|
|
for {
|
|
select {
|
|
case <-em.ctx.Done():
|
|
return
|
|
case liquidityEvent := <-em.liquidityEventsChan:
|
|
// Log the liquidity event
|
|
if err := em.protocolRegistry.LogLiquidityEvent(liquidityEvent); err != nil {
|
|
em.logger.Error(fmt.Sprintf("Failed to log liquidity event: %v", err))
|
|
}
|
|
|
|
em.liquidityEvents++
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetMetrics returns current monitoring metrics
|
|
func (em *EventMonitor) GetMetrics() map[string]interface{} {
|
|
return map[string]interface{}{
|
|
"events_processed": em.eventsProcessed,
|
|
"swaps_detected": em.swapsDetected,
|
|
"liquidity_events": em.liquidityEvents,
|
|
"active_subscriptions": len(em.subscriptions),
|
|
}
|
|
}
|
|
|
|
// Close stops all monitoring and cleans up resources
|
|
func (em *EventMonitor) Close() error {
|
|
em.logger.Info("🛑 Stopping DEX event monitoring")
|
|
|
|
// Cancel all subscriptions
|
|
em.subMu.Lock()
|
|
for _, sub := range em.subscriptions {
|
|
sub.cancel()
|
|
}
|
|
em.subscriptions = make(map[string]*subscription)
|
|
em.subMu.Unlock()
|
|
|
|
// Cancel main context
|
|
em.cancel()
|
|
|
|
em.logger.Info("✅ DEX event monitoring stopped")
|
|
return nil
|
|
}
|