class BaseChainScanner:
"""Scanner for Base blockchain contract deployments."""
def __init__(self, rpc_url: str, basescan_api_key: str, min_contract_size: int = 100):
self.w3 = Web3(Web3.HTTPProvider(rpc_url))
self.basescan_api_key = basescan_api_key
self.basescan_url = "https://api.basescan.org/api"
self.min_contract_size = min_contract_size
self.max_retries = 3
self.retry_delay = 1.0 # seconds
if not self.w3.is_connected():
raise ConnectionError(f"Failed to connect to Base RPC at {rpc_url}")
```python
**Key Parameters:**
- `rpc_url` - Base blockchain RPC endpoint (e.g., Alchemy, Infura)
- `basescan_api_key` - API key for Basescan contract verification checks
- `min_contract_size` - Minimum bytecode size to filter trivial contracts (default: 100 bytes)
## Contract Detection
### Identifying Contract Deployments
Contract creation transactions are identified by checking if the `to` field is `None`:
```python
def scan_blocks(self, start_block: int, end_block: int) -> list[ContractDeployment]:
"""Scan a range of blocks for contract deployments."""
deployments = []
for block_num in range(start_block, end_block + 1):
block = self.w3.eth.get_block(block_num, full_transactions=True)
block_timestamp = datetime.utcfromtimestamp(block["timestamp"])
for tx in block["transactions"]:
# Contract creation: to is None or empty
if tx["to"] is None:
deployment = self._process_contract_creation(tx, block_num, block_timestamp)
if deployment:
deployments.append(deployment)
return deployments
```python
<Info>
In Ethereum and Base, contract creation transactions have `to: null`, while regular transactions specify a recipient address.
</Info>
### Processing Contract Creations
Once a contract creation is detected, the scanner extracts deployment details:
```python
def _process_contract_creation(
self, tx: dict, block_number: int, timestamp: datetime
) -> Optional[ContractDeployment]:
"""Process a contract creation transaction."""
# Get transaction receipt to find contract address
receipt = self.w3.eth.get_transaction_receipt(tx["hash"])
if receipt["contractAddress"] is None:
return None
contract_address = receipt["contractAddress"]
# Get contract bytecode size
code = self.w3.eth.get_code(contract_address)
bytecode_size = len(code)
# Skip contracts smaller than minimum size
if bytecode_size < self.min_contract_size:
logger.debug(f"Skipping small contract {contract_address} (size: {bytecode_size})")
return None
deployment = ContractDeployment(
address=contract_address,
deployer=tx["from"],
tx_hash=tx["hash"].hex(),
block_number=block_number,
timestamp=timestamp,
bytecode_size=bytecode_size
)
return deployment
```python
### ContractDeployment Data Structure
```python
@dataclass
class ContractDeployment:
"""Represents a detected contract deployment."""
address: str # Contract address (0x...)
deployer: str # Deployer address
tx_hash: str # Transaction hash
block_number: int # Block number where deployed
timestamp: datetime # Block timestamp
bytecode_size: int # Size of deployed bytecode
```python
## Contract Verification
### Checking Verification Status
The scanner checks if contracts are verified on Basescan:
```python
def is_contract_verified(self, address: str) -> bool:
"""Check if contract is verified on Basescan."""
response = requests.get(
self.basescan_url,
params={
"module": "contract",
"action": "getabi",
"address": address,
"apikey": self.basescan_api_key
},
timeout=10
)
data = response.json()
# Status "1" means ABI is available (verified)
return data.get("status") == "1"
```python
### Fetching Source Code
For verified contracts, the scanner retrieves the full source code:
```python
def get_contract_source(self, address: str) -> Optional[dict]:
"""Get verified contract source code from Basescan."""
response = requests.get(
self.basescan_url,
params={
"module": "contract",
"action": "getsourcecode",
"address": address,
"apikey": self.basescan_api_key
},
timeout=10
)
data = response.json()
if data.get("status") == "1" and data.get("result"):
result = data["result"][0]
if result.get("SourceCode"):
return {
"contract_name": result.get("ContractName", "Unknown"),
"source_code": result.get("SourceCode", ""),
"compiler_version": result.get("CompilerVersion", ""),
"optimization_used": result.get("OptimizationUsed", ""),
"abi": result.get("ABI", ""),
"constructor_arguments": result.get("ConstructorArguments", ""),
"implementation": result.get("Implementation", ""), # For proxies
}
return None
```python
<Warning>
Basescan returns source code in different formats:
- Single file: Direct source code string
- Multi-file: JSON object with `{{"sources": {...}}}`
The bot handles both formats when extracting GitHub URLs.
</Warning>
### Extracting Metadata
The scanner attempts to extract GitHub URLs from source code comments:
```python
def _extract_github_url(self, source_code: str) -> Optional[str]:
"""Extract GitHub URL from source code comments."""
import re
# Common patterns for GitHub URLs in Solidity source
patterns = [
r'https?://github\.com/[\w\-]+/[\w\-\.]+',
r'github\.com/[\w\-]+/[\w\-\.]+',
]
for pattern in patterns:
match = re.search(pattern, source_code)
if match:
url = match.group(0)
if not url.startswith("http"):
url = "https://" + url
# Clean up URL (remove trailing slashes, file paths)
url = re.sub(r'/blob/.*$', '', url)
url = re.sub(r'/tree/.*$', '', url)
url = url.rstrip("/")
return url
return None
```python
## Error Handling and Resilience
### Retry Logic with Exponential Backoff
```python
def _retry_on_failure(self, func, *args, **kwargs):
"""Execute function with retry logic."""
last_error = None
for attempt in range(self.max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
last_error = e
if attempt < self.max_retries - 1:
delay = self.retry_delay * (2 ** attempt) # Exponential backoff
logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
time.sleep(delay)
logger.error(f"All {self.max_retries} attempts failed: {last_error}")
raise last_error
```python
**Retry Pattern:**
- Attempt 1: Immediate
- Attempt 2: 1 second delay
- Attempt 3: 2 second delay
- Attempt 4: 4 second delay
### Handling RPC Failures
All Web3 calls are wrapped with retry logic:
```python
def get_latest_block(self) -> int:
"""Get the latest block number."""
return self._retry_on_failure(lambda: self.w3.eth.block_number)
def get_block_timestamp(self, block_number: int) -> datetime:
"""Get timestamp for a block."""
block = self._retry_on_failure(lambda: self.w3.eth.get_block(block_number))
return datetime.utcfromtimestamp(block["timestamp"])
```python
## Scanning Strategy
### Block Range Selection
The bot maintains state to track the last scanned block:
```python
def _scan_for_contracts(self) -> list[ContractDeployment]:
"""Scan blockchain for new contract deployments."""
latest_block = self.scanner.get_latest_block()
# Determine start block
if self.last_block_scanned > 0:
start_block = self.last_block_scanned + 1
else:
start_block = max(0, latest_block - self.config.blocks_to_scan + 1)
# Don't scan if we're already caught up
if start_block > latest_block:
return []
# Scan blocks
deployments = self.scanner.scan_blocks(start_block, latest_block)
# Update last scanned block
self.last_block_scanned = latest_block
return deployments
```python
### Scan Intervals
Configurable via `SCAN_INTERVAL_MINUTES` (default: 15 minutes):
```python
while self.running:
try:
self._run_cycle()
except Exception as e:
logger.error(f"Error in main loop: {e}", exc_info=True)
# Wait for next cycle
time.sleep(self.config.scan_interval_minutes * 60)
```python
<Tip>
**Recommended intervals:**
- Development: 5 minutes
- Production: 10-15 minutes
- High frequency: 2-5 minutes (requires higher API rate limits)
</Tip>
## Performance Optimization
### Filtering Small Contracts
The `min_contract_size` parameter filters out trivial contracts:
```python
if bytecode_size < self.min_contract_size:
logger.debug(f"Skipping small contract {contract_address} (size: {bytecode_size})")
return None
```python
This avoids auditing:
- Test contracts
- Simple forwarders
- Minimal proxies
- Interface-only contracts
### Batch Processing
The scanner processes multiple blocks in a single cycle:
```python
# Scan up to 100 blocks per cycle (configurable)
deployments = self.scanner.scan_blocks(start_block, end_block)
```python
## Integration with Main Bot
The scanner is initialized and used by the main `AuditBot`:
```python
# Initialize scanner
self.scanner = BaseChainScanner(
rpc_url=config.base_rpc_url,
basescan_api_key=config.basescan_api_key,
min_contract_size=config.min_contract_size
)
# Scan for deployments
deployments = self._scan_for_contracts()
# Process each deployment
for deployment in deployments:
self._process_deployment(deployment)
```python
## Monitoring Metrics
Key metrics to monitor:
- **Blocks scanned per cycle**: Indicates scanning throughput
- **Deployments found**: Number of new contracts detected
- **Verification rate**: Percentage of verified vs unverified contracts
- **API errors**: RPC and Basescan API failures
- **Scan latency**: Time to process a block range
## Next Steps
<CardGroup cols={2}>
<Card title="GitHub Discovery" icon="github" href="/concepts/github-discovery">
Learn how the bot finds GitHub repositories for contracts
</Card>
<Card title="AI-Powered Auditing" icon="brain" href="/concepts/ai-auditing">
Understand how Claude analyzes contract source code
</Card>
</CardGroup>