Websocket

This SDK implementation uses WebSockets to enable real-time interaction with MINOS AI, particularly for ARIADNE’s pool monitoring and Telegram notifications.

import asyncio
import websockets
import json
from typing import Optional

class MinosAIWebSocketSDK:
    """WebSocket-based SDK for real-time MINOS AI trading and notifications."""

    def __init__(self, api_key: str, ws_url: str = "wss://ws.minosai.net/v1"):
        """Initialize SDK with API key and WebSocket URL."""
        self.api_key = api_key
        self.ws_url = ws_url
        self.websocket = None

    async def connect(self):
        """Establish WebSocket connection."""
        try:
            self.websocket = await websockets.connect(self.ws_url)
            # Authenticate with API key
            await self.websocket.send(json.dumps({"action": "auth", "api_key": self.api_key}))
            response = await self.websocket.recv()
            print("Connection Response:", json.loads(response))
        except Exception as e:
            raise Exception(f"WebSocket connection failed: {str(e)}")

    async def subscribe_ariadne_notifications(self, pool: str = "raydium"):
        """Subscribe to ARIADNE's real-time pool activity notifications."""
        await self.websocket.send(json.dumps({
            "action": "subscribe",
            "agent": "ariadne",
            "pool": pool
        }))
        async for message in self.websocket:
            data = json.loads(message)
            print(f"ARIADNE Notification: {data}")  # E.g., unusual trading activity alert

    async def execute_deucalion_copy_trade(self, wallet: str, min_buy_size: float):
        """Execute a DEUCALION copy trade in real-time."""
        await self.websocket.send(json.dumps({
            "action": "execute",
            "agent": "deucalion",
            "wallet_address": wallet,
            "min_buy_size_sol": min_buy_size
        }))
        response = await self.websocket.recv()
        return json.loads(response)

    async def close(self):
        """Close WebSocket connection."""
        if self.websocket:
            await self.websocket.close()

# Example usage
async def main():
    sdk = MinosAIWebSocketSDK(api_key="your-api-key-here")
    try:
        await sdk.connect()
        # Subscribe to ARIADNE notifications for Raydium pools
        asyncio.create_task(sdk.subscribe_ariadne_notifications(pool="raydium"))
        # Execute a DEUCALION copy trade
        result = await sdk.execute_deucalion_copy_trade(wallet="wallet_address_X", min_buy_size=10.0)
        print("DEUCALION Trade Result:", result)
        # Keep running for notifications
        await asyncio.sleep(60)  # Run for 60 seconds
    finally:
        await sdk.close()

if __name__ == "__main__":
    asyncio.run(main())

Last updated