Python SDK Examples

Comprehensive Python integration examples for Omne blockchain development. Perfect for data science, automation, and backend integration.

Installation & Setup

Installation

Install the Omne Python SDK using pip or your preferred package manager.

# Install via pip pip install omne-sdk # Or with additional dependencies for specific use cases pip install omne-sdk[web3,data,async] # For development with all extras pip install omne-sdk[dev]

Basic Configuration

Set up the SDK with your network configuration and credentials.

import os from omne_sdk import OmneClient, NetworkConfig from omne_sdk.auth import PrivateKeyAuth # Configure the client config = NetworkConfig( network='mainnet', # or 'testnet', 'devnet' rpc_url='https://mainnet.omne.network', chain_id=1, gas_price='auto', confirmation_blocks=1 ) # Authentication with private key auth = PrivateKeyAuth(private_key=os.getenv('OMNE_PRIVATE_KEY')) # Initialize the client omne = OmneClient(config=config, auth=auth) # Verify connection async def verify_connection(): latest_block = await omne.get_latest_block() balance = await omne.get_balance() print(f"Connected to Omne network") print(f"Latest block: {latest_block['number']}") print(f"Account balance: {balance} OMNE") return { 'block': latest_block, 'balance': balance, 'address': omne.address } # Run verification import asyncio connection_info = asyncio.run(verify_connection())

Transaction Operations

Basic Transfers

Send OMNE tokens with automatic gas estimation and transaction monitoring.

import asyncio from omne_sdk import OmneClient, TransactionParams from omne_sdk.exceptions import TransactionError, InsufficientFundsError async def send_omne_tokens(): """Send OMNE tokens with comprehensive error handling""" try: # Check balance before sending balance = await omne.get_balance() send_amount = 1.5 # 1.5 OMNE if balance < send_amount: raise InsufficientFundsError(f"Balance {balance} insufficient for {send_amount}") # Prepare transaction parameters tx_params = TransactionParams( to='0x742d35Cc6634C0532925a3b8D0A4E643C8b1c95b', value=omne.to_wei(send_amount), gas_limit=21000, # Standard transfer gas limit gas_price='auto', # Automatic gas price estimation nonce='auto' # Automatic nonce management ) # Estimate gas (optional but recommended) estimated_gas = await omne.estimate_gas(tx_params) tx_params.gas_limit = estimated_gas * 1.1 # Add 10% buffer print(f"Sending {send_amount} OMNE...") print(f"Estimated gas: {estimated_gas}") # Send transaction tx_hash = await omne.send_transaction(tx_params) print(f"Transaction sent: {tx_hash}") # Wait for confirmation receipt = await omne.wait_for_transaction(tx_hash, timeout=300) if receipt['status'] == 1: print(f"Transaction confirmed in block {receipt['blockNumber']}") print(f"Gas used: {receipt['gasUsed']}") # Update balance new_balance = await omne.get_balance() print(f"New balance: {new_balance} OMNE") return { 'hash': tx_hash, 'receipt': receipt, 'new_balance': new_balance } else: raise TransactionError(f"Transaction failed: {tx_hash}") except InsufficientFundsError as e: print(f"Insufficient funds: {e}") return None except TransactionError as e: print(f"Transaction error: {e}") return None except Exception as e: print(f"Unexpected error: {e}") return None # Execute the transfer result = asyncio.run(send_omne_tokens())

Batch Transactions

Execute multiple transactions efficiently with batch processing and parallel execution.

import asyncio from typing import List, Dict, Any from omne_sdk import OmneClient, TransactionParams from omne_sdk.batch import BatchProcessor class BatchTransactionManager: def __init__(self, omne_client: OmneClient): self.omne = omne_client self.batch_processor = BatchProcessor(omne_client) async def send_batch_payments(self, recipients: List[Dict[str, Any]]) -> List[Dict]: """Send multiple payments in a single batch""" # Prepare all transactions transactions = [] for recipient in recipients: tx_params = TransactionParams( to=recipient['address'], value=self.omne.to_wei(recipient['amount']), gas_limit=21000, gas_price='auto' ) transactions.append(tx_params) print(f"Preparing batch of {len(transactions)} transactions...") # Estimate total gas needed total_gas_estimate = await self.batch_processor.estimate_batch_gas(transactions) total_cost = sum(tx.value for tx in transactions) + total_gas_estimate print(f"Total cost estimate: {self.omne.from_wei(total_cost)} OMNE") # Execute batch batch_result = await self.batch_processor.execute_batch( transactions, max_concurrent=5, # Limit concurrent transactions retry_failed=True, confirmation_blocks=1 ) # Process results successful = [r for r in batch_result if r['status'] == 'success'] failed = [r for r in batch_result if r['status'] == 'failed'] print(f"Batch complete: {len(successful)} successful, {len(failed)} failed") return { 'successful': successful, 'failed': failed, 'total_gas_used': sum(r.get('gas_used', 0) for r in successful) } async def distribute_tokens(self, token_address: str, distributions: List[Dict]) -> Dict: """Distribute ERC-20 tokens to multiple recipients""" from omne_sdk.contracts import ERC20Contract token_contract = ERC20Contract(self.omne, token_address) # Check total distribution amount total_amount = sum(d['amount'] for d in distributions) token_balance = await token_contract.balance_of(self.omne.address) if token_balance < total_amount: raise ValueError(f"Insufficient token balance: {token_balance} < {total_amount}") # Prepare transfer transactions transfer_txs = [] for dist in distributions: tx_data = await token_contract.prepare_transfer( to=dist['address'], amount=dist['amount'] ) transfer_txs.append(tx_data) # Execute batch transfers results = await self.batch_processor.execute_contract_batch( transfer_txs, max_concurrent=3 ) return results # Usage example async def example_batch_operations(): batch_manager = BatchTransactionManager(omne) # Example 1: Batch OMNE payments recipients = [ {'address': '0x123...', 'amount': 0.1}, {'address': '0x456...', 'amount': 0.2}, {'address': '0x789...', 'amount': 0.15} ] payment_results = await batch_manager.send_batch_payments(recipients) print("Batch payments result:", payment_results) # Example 2: Token distribution token_distributions = [ {'address': '0x123...', 'amount': 1000}, {'address': '0x456...', 'amount': 2000}, {'address': '0x789...', 'amount': 1500} ] token_results = await batch_manager.distribute_tokens( token_address='0xToken...', distributions=token_distributions ) print("Token distribution result:", token_results) # Run batch operations asyncio.run(example_batch_operations())

Smart Contract Integration

Contract Deployment

Deploy and interact with smart contracts using Python with full ABI support.

import json from omne_sdk.contracts import ContractFactory, Contract from omne_sdk.compiler import SolidityCompiler class SmartContractManager: def __init__(self, omne_client): self.omne = omne_client self.compiler = SolidityCompiler() async def deploy_contract(self, contract_source: str, constructor_args: list = None): """Deploy a smart contract from Solidity source""" # Compile the contract compiled = await self.compiler.compile(contract_source) contract_name = list(compiled['contracts'].keys())[0] contract_data = compiled['contracts'][contract_name] print(f"Deploying contract: {contract_name}") print(f"Bytecode size: {len(contract_data['bytecode']) // 2} bytes") # Create contract factory factory = ContractFactory( self.omne, abi=contract_data['abi'], bytecode=contract_data['bytecode'] ) # Estimate deployment gas deploy_gas = await factory.estimate_gas_for_deployment(constructor_args or []) print(f"Estimated deployment gas: {deploy_gas}") # Deploy contract contract = await factory.deploy( constructor_args or [], gas_limit=deploy_gas * 1.2, # 20% buffer gas_price='auto' ) print(f"Contract deployed at: {contract.address}") print(f"Deployment transaction: {contract.deployment_tx}") return contract async def interact_with_contract(self, contract_address: str, abi: list): """Demonstrate contract interaction patterns""" contract = Contract(self.omne, contract_address, abi) # Read-only function call (view/pure) try: name = await contract.functions.name().call() symbol = await contract.functions.symbol().call() total_supply = await contract.functions.totalSupply().call() print(f"Token Info: {name} ({symbol})") print(f"Total Supply: {total_supply}") except Exception as e: print(f"Error reading contract state: {e}") # State-changing function call try: # Estimate gas for the transaction gas_estimate = await contract.functions.transfer( '0x742d35Cc6634C0532925a3b8D0A4E643C8b1c95b', 1000 ).estimate_gas({'from': self.omne.address}) # Execute transaction tx_hash = await contract.functions.transfer( '0x742d35Cc6634C0532925a3b8D0A4E643C8b1c95b', 1000 ).transact({ 'gas': gas_estimate * 1.1, 'gasPrice': 'auto' }) # Wait for confirmation receipt = await self.omne.wait_for_transaction(tx_hash) print(f"Transfer successful: {tx_hash}") # Parse events from receipt events = contract.events.Transfer().process_receipt(receipt) for event in events: print(f"Transfer event: {event['args']}") except Exception as e: print(f"Error executing transaction: {e}") return contract # Example ERC-20 contract source ERC20_CONTRACT = ''' // SPDX-License-Identifier: MIT pragma solidity ^0.8.0; contract SimpleToken { string public name = "Omne Test Token"; string public symbol = "OTT"; uint8 public decimals = 18; uint256 public totalSupply = 1000000 * 10**18; mapping(address => uint256) public balanceOf; mapping(address => mapping(address => uint256)) public allowance; event Transfer(address indexed from, address indexed to, uint256 value); event Approval(address indexed owner, address indexed spender, uint256 value); constructor() { balanceOf[msg.sender] = totalSupply; } function transfer(address to, uint256 value) public returns (bool) { require(balanceOf[msg.sender] >= value, "Insufficient balance"); balanceOf[msg.sender] -= value; balanceOf[to] += value; emit Transfer(msg.sender, to, value); return true; } function approve(address spender, uint256 value) public returns (bool) { allowance[msg.sender][spender] = value; emit Approval(msg.sender, spender, value); return true; } } ''' # Deploy and interact with contract async def contract_example(): manager = SmartContractManager(omne) # Deploy new contract contract = await manager.deploy_contract(ERC20_CONTRACT) # Interact with deployed contract await manager.interact_with_contract(contract.address, contract.abi) asyncio.run(contract_example())

Event Monitoring

Monitor blockchain events in real-time with filtering and persistence.

import asyncio import logging from typing import Dict, List, Callable from omne_sdk.events import EventMonitor, EventFilter from omne_sdk.storage import EventStorage class BlockchainEventMonitor: def __init__(self, omne_client, storage_backend='sqlite'): self.omne = omne_client self.monitor = EventMonitor(omne_client) self.storage = EventStorage(storage_backend) self.event_handlers: Dict[str, List[Callable]] = {} def register_handler(self, event_signature: str, handler: Callable): """Register an event handler function""" if event_signature not in self.event_handlers: self.event_handlers[event_signature] = [] self.event_handlers[event_signature].append(handler) async def start_monitoring(self, contracts: List[Dict], from_block='latest'): """Start monitoring events from specified contracts""" print(f"Starting event monitoring from block: {from_block}") # Set up event filters filters = [] for contract_info in contracts: for event_name in contract_info['events']: filter_config = EventFilter( contract_address=contract_info['address'], event_signature=event_name, from_block=from_block ) filters.append(filter_config) # Start monitoring with filters async for event in self.monitor.watch_events(filters): await self.process_event(event) async def process_event(self, event: Dict): """Process a single blockchain event""" try: # Store event in database await self.storage.store_event(event) # Call registered handlers event_signature = event['event'] if event_signature in self.event_handlers: for handler in self.event_handlers[event_signature]: try: await handler(event) except Exception as e: logging.error(f"Error in event handler {handler.__name__}: {e}") logging.info(f"Processed event: {event_signature} from {event['address']}") except Exception as e: logging.error(f"Error processing event: {e}") # Event handler functions async def handle_transfer_event(event): """Handle ERC-20 Transfer events""" args = event['args'] amount = args['value'] / 10**18 # Convert from wei print(f"Transfer detected:") print(f" From: {args['from']}") print(f" To: {args['to']}") print(f" Amount: {amount} tokens") print(f" Block: {event['blockNumber']}") print(f" Transaction: {event['transactionHash']}") # Custom logic for large transfers if amount > 10000: await send_alert(f"Large transfer detected: {amount} tokens") async def handle_approval_event(event): """Handle ERC-20 Approval events""" args = event['args'] amount = args['value'] / 10**18 print(f"Approval detected:") print(f" Owner: {args['owner']}") print(f" Spender: {args['spender']}") print(f" Amount: {amount} tokens") async def handle_new_block(event): """Handle new block events""" block_number = event['blockNumber'] timestamp = event['timestamp'] print(f"New block: {block_number} at {timestamp}") # Update monitoring statistics await update_monitoring_stats(block_number) async def send_alert(message: str): """Send alert notification""" # Implement your alert system (email, Slack, Discord, etc.) print(f"🚨 ALERT: {message}") async def update_monitoring_stats(block_number: int): """Update monitoring statistics""" # Track monitoring performance pass # Usage example async def start_event_monitoring(): monitor = BlockchainEventMonitor(omne) # Register event handlers monitor.register_handler('Transfer', handle_transfer_event) monitor.register_handler('Approval', handle_approval_event) monitor.register_handler('NewBlock', handle_new_block) # Define contracts to monitor contracts_to_monitor = [ { 'address': '0x...', # ERC-20 token contract 'events': ['Transfer', 'Approval'] }, { 'address': '0x...', # DEX contract 'events': ['Swap', 'Mint', 'Burn'] } ] # Start monitoring (this will run indefinitely) await monitor.start_monitoring(contracts_to_monitor, from_block=-100) # Last 100 blocks # Historical event analysis async def analyze_historical_events(): """Analyze historical events for patterns and insights""" storage = EventStorage('sqlite') # Get transfer events from last 24 hours recent_transfers = await storage.get_events( event_type='Transfer', from_timestamp=int(time.time()) - 86400, limit=1000 ) # Analyze transfer patterns transfer_volume = sum(event['args']['value'] for event in recent_transfers) unique_addresses = set() for event in recent_transfers: unique_addresses.add(event['args']['from']) unique_addresses.add(event['args']['to']) print(f"24h Transfer Analysis:") print(f" Total transfers: {len(recent_transfers)}") print(f" Total volume: {transfer_volume / 10**18:,.2f} tokens") print(f" Unique addresses: {len(unique_addresses)}") # Find top holders by transfer activity address_volumes = {} for event in recent_transfers: sender = event['args']['from'] receiver = event['args']['to'] amount = event['args']['value'] address_volumes[sender] = address_volumes.get(sender, 0) + amount address_volumes[receiver] = address_volumes.get(receiver, 0) + amount top_addresses = sorted(address_volumes.items(), key=lambda x: x[1], reverse=True)[:10] print("\nTop 10 Most Active Addresses:") for addr, volume in top_addresses: print(f" {addr}: {volume / 10**18:,.2f} tokens") # Run monitoring (choose one) # asyncio.run(start_event_monitoring()) # Real-time monitoring # asyncio.run(analyze_historical_events()) # Historical analysis

Blockchain Data Analytics

Transaction Analysis

Analyze blockchain data with pandas and create comprehensive reports.

import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns from datetime import datetime, timedelta from omne_sdk.analytics import BlockchainAnalyzer, TransactionAnalyzer class OmneAnalytics: def __init__(self, omne_client): self.omne = omne_client self.analyzer = BlockchainAnalyzer(omne_client) self.tx_analyzer = TransactionAnalyzer(omne_client) async def analyze_network_activity(self, days: int = 30) -> pd.DataFrame: """Analyze network activity over specified time period""" # Get block data end_block = await self.omne.get_latest_block_number() start_block = end_block - (days * 24 * 60 * 4) # Assuming 15s block time print(f"Analyzing blocks {start_block} to {end_block}") # Fetch block data in chunks block_data = [] chunk_size = 1000 for start in range(start_block, end_block, chunk_size): end = min(start + chunk_size, end_block) blocks = await self.analyzer.get_blocks_range(start, end) for block in blocks: block_data.append({ 'block_number': block['number'], 'timestamp': datetime.fromtimestamp(block['timestamp']), 'transaction_count': len(block['transactions']), 'gas_used': block['gasUsed'], 'gas_limit': block['gasLimit'], 'block_size': block['size'], 'difficulty': block['difficulty'] }) df = pd.DataFrame(block_data) df['date'] = df['timestamp'].dt.date df['hour'] = df['timestamp'].dt.hour df['gas_utilization'] = df['gas_used'] / df['gas_limit'] return df async def transaction_volume_analysis(self, df: pd.DataFrame): """Analyze transaction volume patterns""" # Daily aggregations daily_stats = df.groupby('date').agg({ 'transaction_count': ['sum', 'mean', 'std'], 'gas_used': ['sum', 'mean'], 'gas_utilization': 'mean', 'block_size': 'mean' }).round(2) daily_stats.columns = ['total_txs', 'avg_txs_per_block', 'tx_std', 'total_gas', 'avg_gas_per_block', 'avg_utilization', 'avg_block_size'] print("Daily Transaction Statistics:") print(daily_stats.tail(7)) # Last 7 days # Hourly patterns hourly_stats = df.groupby('hour').agg({ 'transaction_count': 'mean', 'gas_utilization': 'mean' }).round(2) print("\nHourly Activity Patterns:") print(hourly_stats) # Create visualizations plt.figure(figsize=(15, 10)) # Transaction volume over time plt.subplot(2, 2, 1) daily_stats['total_txs'].plot(kind='line', marker='o') plt.title('Daily Transaction Volume') plt.ylabel('Total Transactions') plt.xticks(rotation=45) # Gas utilization over time plt.subplot(2, 2, 2) daily_stats['avg_utilization'].plot(kind='line', marker='s', color='orange') plt.title('Daily Average Gas Utilization') plt.ylabel('Gas Utilization %') plt.xticks(rotation=45) # Hourly transaction pattern plt.subplot(2, 2, 3) hourly_stats['transaction_count'].plot(kind='bar') plt.title('Average Transactions by Hour') plt.ylabel('Avg Transactions per Block') plt.xlabel('Hour of Day') # Gas utilization by hour plt.subplot(2, 2, 4) hourly_stats['gas_utilization'].plot(kind='bar', color='green') plt.title('Average Gas Utilization by Hour') plt.ylabel('Gas Utilization %') plt.xlabel('Hour of Day') plt.tight_layout() plt.savefig('omne_network_analysis.png', dpi=300, bbox_inches='tight') plt.show() return daily_stats, hourly_stats async def address_activity_analysis(self, addresses: list, days: int = 30): """Analyze activity patterns for specific addresses""" address_data = [] for address in addresses: # Get transaction history transactions = await self.tx_analyzer.get_address_transactions( address, days=days ) # Calculate metrics tx_count = len(transactions) total_value_sent = sum(tx['value'] for tx in transactions if tx['from'].lower() == address.lower()) total_value_received = sum(tx['value'] for tx in transactions if tx['to'].lower() == address.lower()) avg_gas_price = np.mean([tx['gasPrice'] for tx in transactions]) unique_counterparties = len(set([tx['to'] if tx['from'].lower() == address.lower() else tx['from'] for tx in transactions])) address_data.append({ 'address': address, 'tx_count': tx_count, 'total_sent': self.omne.from_wei(total_value_sent), 'total_received': self.omne.from_wei(total_value_received), 'net_flow': self.omne.from_wei(total_value_received - total_value_sent), 'avg_gas_price': avg_gas_price, 'unique_counterparties': unique_counterparties, 'avg_tx_per_day': tx_count / days }) df_addresses = pd.DataFrame(address_data) print("Address Activity Analysis:") print(df_addresses.to_string(index=False)) return df_addresses async def generate_network_report(self, save_path: str = 'omne_network_report.html'): """Generate comprehensive network analysis report""" print("Generating comprehensive network analysis report...") # Analyze last 30 days df = await self.analyze_network_activity(30) daily_stats, hourly_stats = await self.transaction_volume_analysis(df) # Additional metrics latest_block = await self.omne.get_latest_block() network_hash_rate = await self.analyzer.get_network_hashrate() active_validators = await self.analyzer.get_active_validators() # Create HTML report html_report = f""" <!DOCTYPE html> <html> <head> <title>Omne Network Analysis Report</title> <style> body {{ font-family: Arial, sans-serif; margin: 40px; }} .metric {{ background: #f5f5f5; padding: 15px; margin: 10px 0; border-radius: 5px; }} .highlight {{ color: #2196F3; font-weight: bold; }} table {{ border-collapse: collapse; width: 100%; }} th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }} th {{ background-color: #f2f2f2; }} </style> </head> <body> <h1>Omne Network Analysis Report</h1> <p>Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p> <h2>Network Overview</h2> <div class="metric"> <strong>Latest Block:</strong> <span class="highlight">#{latest_block['number']:,}</span><br> <strong>Network Hash Rate:</strong> <span class="highlight">{network_hash_rate:,.2f} H/s</span><br> <strong>Active Validators:</strong> <span class="highlight">{active_validators}</span> </div> <h2>Transaction Statistics (Last 30 Days)</h2> <div class="metric"> <strong>Total Transactions:</strong> <span class="highlight">{daily_stats['total_txs'].sum():,}</span><br> <strong>Average Daily Transactions:</strong> <span class="highlight">{daily_stats['total_txs'].mean():,.0f}</span><br> <strong>Average Gas Utilization:</strong> <span class="highlight">{daily_stats['avg_utilization'].mean():.1%}</span> </div> <h2>Recent Daily Statistics</h2> {daily_stats.tail(7).to_html(classes='table')} <h2>Hourly Activity Patterns</h2> {hourly_stats.to_html(classes='table')} </body> </html> """ with open(save_path, 'w') as f: f.write(html_report) print(f"Report saved to {save_path}") return { 'daily_stats': daily_stats, 'hourly_stats': hourly_stats, 'latest_block': latest_block, 'report_path': save_path } # Usage examples async def run_analytics(): analytics = OmneAnalytics(omne) # Generate comprehensive report report_data = await analytics.generate_network_report() # Analyze specific addresses important_addresses = [ '0x...', # Exchange address '0x...', # Large holder '0x...' # DeFi protocol ] address_analysis = await analytics.address_activity_analysis(important_addresses) return report_data, address_analysis # Run analytics # report, addresses = asyncio.run(run_analytics())

Python SDK Best Practices

🔄 Async/Await Patterns

  • • Always use async/await for blockchain operations to prevent blocking
  • • Implement proper connection pooling for high-throughput applications
  • • Use asyncio.gather() for parallel operations when possible
  • • Handle rate limiting with exponential backoff strategies
  • • Set appropriate timeouts for all network operations

🛡️ Error Handling

  • • Implement comprehensive exception handling for network failures
  • • Use transaction receipts to verify successful execution
  • • Implement retry logic with jitter for transient failures
  • • Log all transaction hashes for debugging and audit trails
  • • Validate input parameters before sending transactions

⚡ Performance Optimization

  • • Use batch operations for multiple transactions
  • • Cache contract ABIs and frequently used data
  • • Implement efficient event filtering to reduce data transfer
  • • Use connection pooling for high-frequency applications
  • • Monitor gas prices and optimize transaction timing

🔐 Security Guidelines

  • • Store private keys securely using environment variables or key management systems
  • • Validate all contract addresses before interaction
  • • Implement transaction simulation before execution
  • • Use multi-signature wallets for high-value operations
  • • Regularly audit and update dependencies

Related Examples