Python SDK Examples
Comprehensive Python integration examples for Omne blockchain development. Perfect for data science, automation, and backend integration.
Installation & Setup
Installation
Install the Omne Python SDK using pip or your preferred package manager.
# Install via pip
pip install omne-sdk
# Or with additional dependencies for specific use cases
pip install omne-sdk[web3,data,async]
# For development with all extras
pip install omne-sdk[dev]
Basic Configuration
Set up the SDK with your network configuration and credentials.
import os
from omne_sdk import OmneClient, NetworkConfig
from omne_sdk.auth import PrivateKeyAuth
# Configure the client
config = NetworkConfig(
network='mainnet', # or 'testnet', 'devnet'
rpc_url='https://mainnet.omne.network',
chain_id=1,
gas_price='auto',
confirmation_blocks=1
)
# Authentication with private key
auth = PrivateKeyAuth(private_key=os.getenv('OMNE_PRIVATE_KEY'))
# Initialize the client
omne = OmneClient(config=config, auth=auth)
# Verify connection
async def verify_connection():
latest_block = await omne.get_latest_block()
balance = await omne.get_balance()
print(f"Connected to Omne network")
print(f"Latest block: {latest_block['number']}")
print(f"Account balance: {balance} OMNE")
return {
'block': latest_block,
'balance': balance,
'address': omne.address
}
# Run verification
import asyncio
connection_info = asyncio.run(verify_connection())
Transaction Operations
Basic Transfers
Send OMNE tokens with automatic gas estimation and transaction monitoring.
import asyncio
from omne_sdk import OmneClient, TransactionParams
from omne_sdk.exceptions import TransactionError, InsufficientFundsError
async def send_omne_tokens():
"""Send OMNE tokens with comprehensive error handling"""
try:
# Check balance before sending
balance = await omne.get_balance()
send_amount = 1.5 # 1.5 OMNE
if balance < send_amount:
raise InsufficientFundsError(f"Balance {balance} insufficient for {send_amount}")
# Prepare transaction parameters
tx_params = TransactionParams(
to='0x742d35Cc6634C0532925a3b8D0A4E643C8b1c95b',
value=omne.to_wei(send_amount),
gas_limit=21000, # Standard transfer gas limit
gas_price='auto', # Automatic gas price estimation
nonce='auto' # Automatic nonce management
)
# Estimate gas (optional but recommended)
estimated_gas = await omne.estimate_gas(tx_params)
tx_params.gas_limit = estimated_gas * 1.1 # Add 10% buffer
print(f"Sending {send_amount} OMNE...")
print(f"Estimated gas: {estimated_gas}")
# Send transaction
tx_hash = await omne.send_transaction(tx_params)
print(f"Transaction sent: {tx_hash}")
# Wait for confirmation
receipt = await omne.wait_for_transaction(tx_hash, timeout=300)
if receipt['status'] == 1:
print(f"Transaction confirmed in block {receipt['blockNumber']}")
print(f"Gas used: {receipt['gasUsed']}")
# Update balance
new_balance = await omne.get_balance()
print(f"New balance: {new_balance} OMNE")
return {
'hash': tx_hash,
'receipt': receipt,
'new_balance': new_balance
}
else:
raise TransactionError(f"Transaction failed: {tx_hash}")
except InsufficientFundsError as e:
print(f"Insufficient funds: {e}")
return None
except TransactionError as e:
print(f"Transaction error: {e}")
return None
except Exception as e:
print(f"Unexpected error: {e}")
return None
# Execute the transfer
result = asyncio.run(send_omne_tokens())
Batch Transactions
Execute multiple transactions efficiently with batch processing and parallel execution.
import asyncio
from typing import List, Dict, Any
from omne_sdk import OmneClient, TransactionParams
from omne_sdk.batch import BatchProcessor
class BatchTransactionManager:
def __init__(self, omne_client: OmneClient):
self.omne = omne_client
self.batch_processor = BatchProcessor(omne_client)
async def send_batch_payments(self, recipients: List[Dict[str, Any]]) -> List[Dict]:
"""Send multiple payments in a single batch"""
# Prepare all transactions
transactions = []
for recipient in recipients:
tx_params = TransactionParams(
to=recipient['address'],
value=self.omne.to_wei(recipient['amount']),
gas_limit=21000,
gas_price='auto'
)
transactions.append(tx_params)
print(f"Preparing batch of {len(transactions)} transactions...")
# Estimate total gas needed
total_gas_estimate = await self.batch_processor.estimate_batch_gas(transactions)
total_cost = sum(tx.value for tx in transactions) + total_gas_estimate
print(f"Total cost estimate: {self.omne.from_wei(total_cost)} OMNE")
# Execute batch
batch_result = await self.batch_processor.execute_batch(
transactions,
max_concurrent=5, # Limit concurrent transactions
retry_failed=True,
confirmation_blocks=1
)
# Process results
successful = [r for r in batch_result if r['status'] == 'success']
failed = [r for r in batch_result if r['status'] == 'failed']
print(f"Batch complete: {len(successful)} successful, {len(failed)} failed")
return {
'successful': successful,
'failed': failed,
'total_gas_used': sum(r.get('gas_used', 0) for r in successful)
}
async def distribute_tokens(self, token_address: str, distributions: List[Dict]) -> Dict:
"""Distribute ERC-20 tokens to multiple recipients"""
from omne_sdk.contracts import ERC20Contract
token_contract = ERC20Contract(self.omne, token_address)
# Check total distribution amount
total_amount = sum(d['amount'] for d in distributions)
token_balance = await token_contract.balance_of(self.omne.address)
if token_balance < total_amount:
raise ValueError(f"Insufficient token balance: {token_balance} < {total_amount}")
# Prepare transfer transactions
transfer_txs = []
for dist in distributions:
tx_data = await token_contract.prepare_transfer(
to=dist['address'],
amount=dist['amount']
)
transfer_txs.append(tx_data)
# Execute batch transfers
results = await self.batch_processor.execute_contract_batch(
transfer_txs,
max_concurrent=3
)
return results
# Usage example
async def example_batch_operations():
batch_manager = BatchTransactionManager(omne)
# Example 1: Batch OMNE payments
recipients = [
{'address': '0x123...', 'amount': 0.1},
{'address': '0x456...', 'amount': 0.2},
{'address': '0x789...', 'amount': 0.15}
]
payment_results = await batch_manager.send_batch_payments(recipients)
print("Batch payments result:", payment_results)
# Example 2: Token distribution
token_distributions = [
{'address': '0x123...', 'amount': 1000},
{'address': '0x456...', 'amount': 2000},
{'address': '0x789...', 'amount': 1500}
]
token_results = await batch_manager.distribute_tokens(
token_address='0xToken...',
distributions=token_distributions
)
print("Token distribution result:", token_results)
# Run batch operations
asyncio.run(example_batch_operations())
Smart Contract Integration
Contract Deployment
Deploy and interact with smart contracts using Python with full ABI support.
import json
from omne_sdk.contracts import ContractFactory, Contract
from omne_sdk.compiler import SolidityCompiler
class SmartContractManager:
def __init__(self, omne_client):
self.omne = omne_client
self.compiler = SolidityCompiler()
async def deploy_contract(self, contract_source: str, constructor_args: list = None):
"""Deploy a smart contract from Solidity source"""
# Compile the contract
compiled = await self.compiler.compile(contract_source)
contract_name = list(compiled['contracts'].keys())[0]
contract_data = compiled['contracts'][contract_name]
print(f"Deploying contract: {contract_name}")
print(f"Bytecode size: {len(contract_data['bytecode']) // 2} bytes")
# Create contract factory
factory = ContractFactory(
self.omne,
abi=contract_data['abi'],
bytecode=contract_data['bytecode']
)
# Estimate deployment gas
deploy_gas = await factory.estimate_gas_for_deployment(constructor_args or [])
print(f"Estimated deployment gas: {deploy_gas}")
# Deploy contract
contract = await factory.deploy(
constructor_args or [],
gas_limit=deploy_gas * 1.2, # 20% buffer
gas_price='auto'
)
print(f"Contract deployed at: {contract.address}")
print(f"Deployment transaction: {contract.deployment_tx}")
return contract
async def interact_with_contract(self, contract_address: str, abi: list):
"""Demonstrate contract interaction patterns"""
contract = Contract(self.omne, contract_address, abi)
# Read-only function call (view/pure)
try:
name = await contract.functions.name().call()
symbol = await contract.functions.symbol().call()
total_supply = await contract.functions.totalSupply().call()
print(f"Token Info: {name} ({symbol})")
print(f"Total Supply: {total_supply}")
except Exception as e:
print(f"Error reading contract state: {e}")
# State-changing function call
try:
# Estimate gas for the transaction
gas_estimate = await contract.functions.transfer(
'0x742d35Cc6634C0532925a3b8D0A4E643C8b1c95b',
1000
).estimate_gas({'from': self.omne.address})
# Execute transaction
tx_hash = await contract.functions.transfer(
'0x742d35Cc6634C0532925a3b8D0A4E643C8b1c95b',
1000
).transact({
'gas': gas_estimate * 1.1,
'gasPrice': 'auto'
})
# Wait for confirmation
receipt = await self.omne.wait_for_transaction(tx_hash)
print(f"Transfer successful: {tx_hash}")
# Parse events from receipt
events = contract.events.Transfer().process_receipt(receipt)
for event in events:
print(f"Transfer event: {event['args']}")
except Exception as e:
print(f"Error executing transaction: {e}")
return contract
# Example ERC-20 contract source
ERC20_CONTRACT = '''
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.0;
contract SimpleToken {
string public name = "Omne Test Token";
string public symbol = "OTT";
uint8 public decimals = 18;
uint256 public totalSupply = 1000000 * 10**18;
mapping(address => uint256) public balanceOf;
mapping(address => mapping(address => uint256)) public allowance;
event Transfer(address indexed from, address indexed to, uint256 value);
event Approval(address indexed owner, address indexed spender, uint256 value);
constructor() {
balanceOf[msg.sender] = totalSupply;
}
function transfer(address to, uint256 value) public returns (bool) {
require(balanceOf[msg.sender] >= value, "Insufficient balance");
balanceOf[msg.sender] -= value;
balanceOf[to] += value;
emit Transfer(msg.sender, to, value);
return true;
}
function approve(address spender, uint256 value) public returns (bool) {
allowance[msg.sender][spender] = value;
emit Approval(msg.sender, spender, value);
return true;
}
}
'''
# Deploy and interact with contract
async def contract_example():
manager = SmartContractManager(omne)
# Deploy new contract
contract = await manager.deploy_contract(ERC20_CONTRACT)
# Interact with deployed contract
await manager.interact_with_contract(contract.address, contract.abi)
asyncio.run(contract_example())
Event Monitoring
Monitor blockchain events in real-time with filtering and persistence.
import asyncio
import logging
from typing import Dict, List, Callable
from omne_sdk.events import EventMonitor, EventFilter
from omne_sdk.storage import EventStorage
class BlockchainEventMonitor:
def __init__(self, omne_client, storage_backend='sqlite'):
self.omne = omne_client
self.monitor = EventMonitor(omne_client)
self.storage = EventStorage(storage_backend)
self.event_handlers: Dict[str, List[Callable]] = {}
def register_handler(self, event_signature: str, handler: Callable):
"""Register an event handler function"""
if event_signature not in self.event_handlers:
self.event_handlers[event_signature] = []
self.event_handlers[event_signature].append(handler)
async def start_monitoring(self, contracts: List[Dict], from_block='latest'):
"""Start monitoring events from specified contracts"""
print(f"Starting event monitoring from block: {from_block}")
# Set up event filters
filters = []
for contract_info in contracts:
for event_name in contract_info['events']:
filter_config = EventFilter(
contract_address=contract_info['address'],
event_signature=event_name,
from_block=from_block
)
filters.append(filter_config)
# Start monitoring with filters
async for event in self.monitor.watch_events(filters):
await self.process_event(event)
async def process_event(self, event: Dict):
"""Process a single blockchain event"""
try:
# Store event in database
await self.storage.store_event(event)
# Call registered handlers
event_signature = event['event']
if event_signature in self.event_handlers:
for handler in self.event_handlers[event_signature]:
try:
await handler(event)
except Exception as e:
logging.error(f"Error in event handler {handler.__name__}: {e}")
logging.info(f"Processed event: {event_signature} from {event['address']}")
except Exception as e:
logging.error(f"Error processing event: {e}")
# Event handler functions
async def handle_transfer_event(event):
"""Handle ERC-20 Transfer events"""
args = event['args']
amount = args['value'] / 10**18 # Convert from wei
print(f"Transfer detected:")
print(f" From: {args['from']}")
print(f" To: {args['to']}")
print(f" Amount: {amount} tokens")
print(f" Block: {event['blockNumber']}")
print(f" Transaction: {event['transactionHash']}")
# Custom logic for large transfers
if amount > 10000:
await send_alert(f"Large transfer detected: {amount} tokens")
async def handle_approval_event(event):
"""Handle ERC-20 Approval events"""
args = event['args']
amount = args['value'] / 10**18
print(f"Approval detected:")
print(f" Owner: {args['owner']}")
print(f" Spender: {args['spender']}")
print(f" Amount: {amount} tokens")
async def handle_new_block(event):
"""Handle new block events"""
block_number = event['blockNumber']
timestamp = event['timestamp']
print(f"New block: {block_number} at {timestamp}")
# Update monitoring statistics
await update_monitoring_stats(block_number)
async def send_alert(message: str):
"""Send alert notification"""
# Implement your alert system (email, Slack, Discord, etc.)
print(f"🚨 ALERT: {message}")
async def update_monitoring_stats(block_number: int):
"""Update monitoring statistics"""
# Track monitoring performance
pass
# Usage example
async def start_event_monitoring():
monitor = BlockchainEventMonitor(omne)
# Register event handlers
monitor.register_handler('Transfer', handle_transfer_event)
monitor.register_handler('Approval', handle_approval_event)
monitor.register_handler('NewBlock', handle_new_block)
# Define contracts to monitor
contracts_to_monitor = [
{
'address': '0x...', # ERC-20 token contract
'events': ['Transfer', 'Approval']
},
{
'address': '0x...', # DEX contract
'events': ['Swap', 'Mint', 'Burn']
}
]
# Start monitoring (this will run indefinitely)
await monitor.start_monitoring(contracts_to_monitor, from_block=-100) # Last 100 blocks
# Historical event analysis
async def analyze_historical_events():
"""Analyze historical events for patterns and insights"""
storage = EventStorage('sqlite')
# Get transfer events from last 24 hours
recent_transfers = await storage.get_events(
event_type='Transfer',
from_timestamp=int(time.time()) - 86400,
limit=1000
)
# Analyze transfer patterns
transfer_volume = sum(event['args']['value'] for event in recent_transfers)
unique_addresses = set()
for event in recent_transfers:
unique_addresses.add(event['args']['from'])
unique_addresses.add(event['args']['to'])
print(f"24h Transfer Analysis:")
print(f" Total transfers: {len(recent_transfers)}")
print(f" Total volume: {transfer_volume / 10**18:,.2f} tokens")
print(f" Unique addresses: {len(unique_addresses)}")
# Find top holders by transfer activity
address_volumes = {}
for event in recent_transfers:
sender = event['args']['from']
receiver = event['args']['to']
amount = event['args']['value']
address_volumes[sender] = address_volumes.get(sender, 0) + amount
address_volumes[receiver] = address_volumes.get(receiver, 0) + amount
top_addresses = sorted(address_volumes.items(), key=lambda x: x[1], reverse=True)[:10]
print("\nTop 10 Most Active Addresses:")
for addr, volume in top_addresses:
print(f" {addr}: {volume / 10**18:,.2f} tokens")
# Run monitoring (choose one)
# asyncio.run(start_event_monitoring()) # Real-time monitoring
# asyncio.run(analyze_historical_events()) # Historical analysis
Blockchain Data Analytics
Transaction Analysis
Analyze blockchain data with pandas and create comprehensive reports.
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
from omne_sdk.analytics import BlockchainAnalyzer, TransactionAnalyzer
class OmneAnalytics:
def __init__(self, omne_client):
self.omne = omne_client
self.analyzer = BlockchainAnalyzer(omne_client)
self.tx_analyzer = TransactionAnalyzer(omne_client)
async def analyze_network_activity(self, days: int = 30) -> pd.DataFrame:
"""Analyze network activity over specified time period"""
# Get block data
end_block = await self.omne.get_latest_block_number()
start_block = end_block - (days * 24 * 60 * 4) # Assuming 15s block time
print(f"Analyzing blocks {start_block} to {end_block}")
# Fetch block data in chunks
block_data = []
chunk_size = 1000
for start in range(start_block, end_block, chunk_size):
end = min(start + chunk_size, end_block)
blocks = await self.analyzer.get_blocks_range(start, end)
for block in blocks:
block_data.append({
'block_number': block['number'],
'timestamp': datetime.fromtimestamp(block['timestamp']),
'transaction_count': len(block['transactions']),
'gas_used': block['gasUsed'],
'gas_limit': block['gasLimit'],
'block_size': block['size'],
'difficulty': block['difficulty']
})
df = pd.DataFrame(block_data)
df['date'] = df['timestamp'].dt.date
df['hour'] = df['timestamp'].dt.hour
df['gas_utilization'] = df['gas_used'] / df['gas_limit']
return df
async def transaction_volume_analysis(self, df: pd.DataFrame):
"""Analyze transaction volume patterns"""
# Daily aggregations
daily_stats = df.groupby('date').agg({
'transaction_count': ['sum', 'mean', 'std'],
'gas_used': ['sum', 'mean'],
'gas_utilization': 'mean',
'block_size': 'mean'
}).round(2)
daily_stats.columns = ['total_txs', 'avg_txs_per_block', 'tx_std',
'total_gas', 'avg_gas_per_block', 'avg_utilization', 'avg_block_size']
print("Daily Transaction Statistics:")
print(daily_stats.tail(7)) # Last 7 days
# Hourly patterns
hourly_stats = df.groupby('hour').agg({
'transaction_count': 'mean',
'gas_utilization': 'mean'
}).round(2)
print("\nHourly Activity Patterns:")
print(hourly_stats)
# Create visualizations
plt.figure(figsize=(15, 10))
# Transaction volume over time
plt.subplot(2, 2, 1)
daily_stats['total_txs'].plot(kind='line', marker='o')
plt.title('Daily Transaction Volume')
plt.ylabel('Total Transactions')
plt.xticks(rotation=45)
# Gas utilization over time
plt.subplot(2, 2, 2)
daily_stats['avg_utilization'].plot(kind='line', marker='s', color='orange')
plt.title('Daily Average Gas Utilization')
plt.ylabel('Gas Utilization %')
plt.xticks(rotation=45)
# Hourly transaction pattern
plt.subplot(2, 2, 3)
hourly_stats['transaction_count'].plot(kind='bar')
plt.title('Average Transactions by Hour')
plt.ylabel('Avg Transactions per Block')
plt.xlabel('Hour of Day')
# Gas utilization by hour
plt.subplot(2, 2, 4)
hourly_stats['gas_utilization'].plot(kind='bar', color='green')
plt.title('Average Gas Utilization by Hour')
plt.ylabel('Gas Utilization %')
plt.xlabel('Hour of Day')
plt.tight_layout()
plt.savefig('omne_network_analysis.png', dpi=300, bbox_inches='tight')
plt.show()
return daily_stats, hourly_stats
async def address_activity_analysis(self, addresses: list, days: int = 30):
"""Analyze activity patterns for specific addresses"""
address_data = []
for address in addresses:
# Get transaction history
transactions = await self.tx_analyzer.get_address_transactions(
address,
days=days
)
# Calculate metrics
tx_count = len(transactions)
total_value_sent = sum(tx['value'] for tx in transactions if tx['from'].lower() == address.lower())
total_value_received = sum(tx['value'] for tx in transactions if tx['to'].lower() == address.lower())
avg_gas_price = np.mean([tx['gasPrice'] for tx in transactions])
unique_counterparties = len(set([tx['to'] if tx['from'].lower() == address.lower() else tx['from'] for tx in transactions]))
address_data.append({
'address': address,
'tx_count': tx_count,
'total_sent': self.omne.from_wei(total_value_sent),
'total_received': self.omne.from_wei(total_value_received),
'net_flow': self.omne.from_wei(total_value_received - total_value_sent),
'avg_gas_price': avg_gas_price,
'unique_counterparties': unique_counterparties,
'avg_tx_per_day': tx_count / days
})
df_addresses = pd.DataFrame(address_data)
print("Address Activity Analysis:")
print(df_addresses.to_string(index=False))
return df_addresses
async def generate_network_report(self, save_path: str = 'omne_network_report.html'):
"""Generate comprehensive network analysis report"""
print("Generating comprehensive network analysis report...")
# Analyze last 30 days
df = await self.analyze_network_activity(30)
daily_stats, hourly_stats = await self.transaction_volume_analysis(df)
# Additional metrics
latest_block = await self.omne.get_latest_block()
network_hash_rate = await self.analyzer.get_network_hashrate()
active_validators = await self.analyzer.get_active_validators()
# Create HTML report
html_report = f"""
<!DOCTYPE html>
<html>
<head>
<title>Omne Network Analysis Report</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 40px; }}
.metric {{ background: #f5f5f5; padding: 15px; margin: 10px 0; border-radius: 5px; }}
.highlight {{ color: #2196F3; font-weight: bold; }}
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
th {{ background-color: #f2f2f2; }}
</style>
</head>
<body>
<h1>Omne Network Analysis Report</h1>
<p>Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
<h2>Network Overview</h2>
<div class="metric">
<strong>Latest Block:</strong> <span class="highlight">#{latest_block['number']:,}</span><br>
<strong>Network Hash Rate:</strong> <span class="highlight">{network_hash_rate:,.2f} H/s</span><br>
<strong>Active Validators:</strong> <span class="highlight">{active_validators}</span>
</div>
<h2>Transaction Statistics (Last 30 Days)</h2>
<div class="metric">
<strong>Total Transactions:</strong> <span class="highlight">{daily_stats['total_txs'].sum():,}</span><br>
<strong>Average Daily Transactions:</strong> <span class="highlight">{daily_stats['total_txs'].mean():,.0f}</span><br>
<strong>Average Gas Utilization:</strong> <span class="highlight">{daily_stats['avg_utilization'].mean():.1%}</span>
</div>
<h2>Recent Daily Statistics</h2>
{daily_stats.tail(7).to_html(classes='table')}
<h2>Hourly Activity Patterns</h2>
{hourly_stats.to_html(classes='table')}
</body>
</html>
"""
with open(save_path, 'w') as f:
f.write(html_report)
print(f"Report saved to {save_path}")
return {
'daily_stats': daily_stats,
'hourly_stats': hourly_stats,
'latest_block': latest_block,
'report_path': save_path
}
# Usage examples
async def run_analytics():
analytics = OmneAnalytics(omne)
# Generate comprehensive report
report_data = await analytics.generate_network_report()
# Analyze specific addresses
important_addresses = [
'0x...', # Exchange address
'0x...', # Large holder
'0x...' # DeFi protocol
]
address_analysis = await analytics.address_activity_analysis(important_addresses)
return report_data, address_analysis
# Run analytics
# report, addresses = asyncio.run(run_analytics())
Python SDK Best Practices
🔄 Async/Await Patterns
- • Always use async/await for blockchain operations to prevent blocking
- • Implement proper connection pooling for high-throughput applications
- • Use asyncio.gather() for parallel operations when possible
- • Handle rate limiting with exponential backoff strategies
- • Set appropriate timeouts for all network operations
🛡️ Error Handling
- • Implement comprehensive exception handling for network failures
- • Use transaction receipts to verify successful execution
- • Implement retry logic with jitter for transient failures
- • Log all transaction hashes for debugging and audit trails
- • Validate input parameters before sending transactions
⚡ Performance Optimization
- • Use batch operations for multiple transactions
- • Cache contract ABIs and frequently used data
- • Implement efficient event filtering to reduce data transfer
- • Use connection pooling for high-frequency applications
- • Monitor gas prices and optimize transaction timing
🔐 Security Guidelines
- • Store private keys securely using environment variables or key management systems
- • Validate all contract addresses before interaction
- • Implement transaction simulation before execution
- • Use multi-signature wallets for high-value operations
- • Regularly audit and update dependencies