Websocket
This SDK implementation uses WebSockets to enable real-time interaction with MINOS AI, particularly for ARIADNE’s pool monitoring and Telegram notifications.
import asyncio
import websockets
import json
from typing import Optional
class MinosAIWebSocketSDK:
"""WebSocket-based SDK for real-time MINOS AI trading and notifications."""
def __init__(self, api_key: str, ws_url: str = "wss://ws.minosai.net/v1"):
"""Initialize SDK with API key and WebSocket URL."""
self.api_key = api_key
self.ws_url = ws_url
self.websocket = None
async def connect(self):
"""Establish WebSocket connection."""
try:
self.websocket = await websockets.connect(self.ws_url)
# Authenticate with API key
await self.websocket.send(json.dumps({"action": "auth", "api_key": self.api_key}))
response = await self.websocket.recv()
print("Connection Response:", json.loads(response))
except Exception as e:
raise Exception(f"WebSocket connection failed: {str(e)}")
async def subscribe_ariadne_notifications(self, pool: str = "raydium"):
"""Subscribe to ARIADNE's real-time pool activity notifications."""
await self.websocket.send(json.dumps({
"action": "subscribe",
"agent": "ariadne",
"pool": pool
}))
async for message in self.websocket:
data = json.loads(message)
print(f"ARIADNE Notification: {data}") # E.g., unusual trading activity alert
async def execute_deucalion_copy_trade(self, wallet: str, min_buy_size: float):
"""Execute a DEUCALION copy trade in real-time."""
await self.websocket.send(json.dumps({
"action": "execute",
"agent": "deucalion",
"wallet_address": wallet,
"min_buy_size_sol": min_buy_size
}))
response = await self.websocket.recv()
return json.loads(response)
async def close(self):
"""Close WebSocket connection."""
if self.websocket:
await self.websocket.close()
# Example usage
async def main():
sdk = MinosAIWebSocketSDK(api_key="your-api-key-here")
try:
await sdk.connect()
# Subscribe to ARIADNE notifications for Raydium pools
asyncio.create_task(sdk.subscribe_ariadne_notifications(pool="raydium"))
# Execute a DEUCALION copy trade
result = await sdk.execute_deucalion_copy_trade(wallet="wallet_address_X", min_buy_size=10.0)
print("DEUCALION Trade Result:", result)
# Keep running for notifications
await asyncio.sleep(60) # Run for 60 seconds
finally:
await sdk.close()
if __name__ == "__main__":
asyncio.run(main())
Last updated