< >
Home » OpenClaw-AI助手入门教程 » OpenClaw-AI助手入门教程-配置openclaw-mcp-adapter通过mcp-endpoint-server调用mcp工具

OpenClaw-AI助手入门教程-配置openclaw-mcp-adapter通过mcp-endpoint-server调用mcp工具

说明:

  • 在前面的教程中,我们分别学习了如何配置MCP-adapter来直接调用MCP服务,以及如何部署小智语音的mcp-endpoint-server。现在,我们将两者结合,实现一个更强大的架构:通过OpenClaw的MCP-adapter,经由mcp-endpoint-server调用内网或设备端的MCP工具。

  • 这个架构解决了什么问题?当你的MCP服务运行在内网或设备端(如树莓派、NAS)时,OpenClaw无法直接连接。而mcp-endpoint-server作为公网WebSocket服务器,可以让内网设备主动连接并注册工具。通过一个协议转换代理,OpenClaw就能通过标准的SSE/HTTP协议调用这些工具。整个过程无需公网IP,无需复杂的端口映射。

  • 架构:

  subgraph 公网云服务器
        A[mcp-endpoint-server<br/>WebSocket服务端]
        B[ws2sse代理<br/>WebSocket → SSE]
    end
    subgraph 内网/设备端
        C[mcp_pipe.py + 本地MCP服务]
    end
    subgraph OpenClaw环境
        D[openclaw-mcp-adapter]
    end
    C -- 主动WebSocket连接 --> A
    B -- 连接 --> A
    D -- SSE --> B
  • 工作流程:

内网设备通过mcp_pipe.py将本地MCP服务(如计算器)注册到公网的mcp-endpoint-server。
我们在公网部署一个ws2sse代理,它作为WebSocket客户端连接mcp-endpoint-server,同时对外提供SSE端点。
OpenClaw的mcp-adapter插件通过SSE连接ws2sse代理,发现并注册所有可用的MCP工具。
用户在OpenClaw中通过语音或文字调用工具,请求经mcp-adapter → ws2sse代理 → mcp-endpoint-server → 内网设备,最终执行并返回结果。

  • openclaw的插件mcp-adapter不能直接通过websocket访问mcp-endpoint-server,所以要通过一个ws2sse代理来做中转。
  • 在这里我们建立一个mcp-porter-server的ws2sse代理,使用docker部署在openclaw同一个服务器
  • 1.建立Dockerfile
FROM python:3.12-slim

WORKDIR /app

# 复制依赖文件并安装(仅当 requirements.txt 变化时才会重新安装)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制当前目录所有文件到镜像(会被卷挂载覆盖,但用于构建时保证镜像完整性)
COPY . .

# 容器启动命令
CMD ["python", "ws2sse_proxy.py"]

  • 2.建立docker-compose.yml
networks:
  mcp-shared-network:
    external: true
    name: mcp-endpoint-server_mcp-shared-network

services:
  mcp-porter:
    build: .                      # 使用当前目录的 Dockerfile 构建
    container_name: mcp-porter
    restart: always
    networks:
      - mcp-shared-network
    ports:
      - "8000:8000"
    volumes:
      - ./:/app                    # 将宿主机当前目录挂载到容器 /app,覆盖代码
    environment:
      - TZ=Asia/Shanghai
      - REMOTE_WS_URL=ws://mcp-endpoint-server:8004/mcp_endpoint/call/?token=${MCP_TOKEN}
    # 不再直接写 token,使用 .env 文件

    1. 建立requirements.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
websockets==12.0
python-multipart==0.0.6
pydantic==2.5.0
python-dotenv==1.0.0
configparser==6.0.0
asyncio-mqtt==0.16.1
pycryptodome==3.23.0
loguru==0.7.2
requests
    1. 建立ws2sse_proxy.py
#!/usr/bin/env python3
"""
MCP WebSocket 转 SSE 代理(完整版)
- 作为客户端连接到远程 WebSocket MCP 服务器
- 使用独立接收循环处理消息,支持并发请求
- 对外提供 SSE 端点供 OpenClaw 连接
"""

import asyncio
import json
import os
import logging
from contextlib import asynccontextmanager
from typing import Dict, Optional

import websockets
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse, JSONResponse
import uvicorn

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp-proxy")

# 从环境变量读取远程 WebSocket 地址
REMOTE_WS_URL = os.getenv(
    "REMOTE_WS_URL",
    "ws://127.0.0.1:8004/mcp_endpoint/call/?token=2svzrombSJCGKEdlm6znodXSkJlwtfDcjsBV9mfNLtA="
)

# 全局状态
remote_ws: Optional[websockets.WebSocketClientProtocol] = None
remote_tools: list = []
pending_requests: Dict[int, asyncio.Future] = {}
connection_lock = asyncio.Lock()
receive_task: Optional[asyncio.Task] = None


async def _receive_loop():
    """后台接收远程 WebSocket 消息,根据 id 分发结果"""
    global remote_ws, pending_requests  # 必须放在最前面
    try:
        async for message in remote_ws:
            data = json.loads(message)
            logger.debug(f"Received from remote: {data}")
            req_id = data.get("id")
            if req_id is not None and req_id in pending_requests:
                future = pending_requests.pop(req_id)
                future.set_result(data)
            else:
                logger.info(f"Ignored message (no matching request): {data}")
    except websockets.exceptions.ConnectionClosed as e:
        logger.warning(f"Remote connection closed: {e}")
        # 取消所有等待的请求
        for req_id, future in pending_requests.items():
            future.set_exception(Exception("Remote connection closed"))
        pending_requests.clear()
        remote_ws = None
    except Exception as e:
        logger.error(f"Receive loop error: {e}", exc_info=True)


async def connect_to_remote():
    """连接到远程 WebSocket 服务器并完成 MCP 握手"""
    global remote_ws, remote_tools, receive_task
    async with connection_lock:
        if remote_ws and not remote_ws.closed:
            return

        logger.info(f"Connecting to remote server: {REMOTE_WS_URL}")
        try:
            remote_ws = await websockets.connect(REMOTE_WS_URL)
            logger.info("WebSocket connected, starting MCP handshake...")

            # 1. 发送 initialize 请求
            init_req = {
                "jsonrpc": "2.0",
                "id": 1,
                "method": "initialize",
                "params": {
                    "protocolVersion": "2024-11-05",
                    "capabilities": {},
                    "clientInfo": {"name": "mcp-ws-proxy", "version": "2.0"}
                }
            }
            await remote_ws.send(json.dumps(init_req))
            # 手动等待初始化响应(因为接收循环尚未启动)
            init_resp = json.loads(await remote_ws.recv())
            logger.info(f"Initialize response: {init_resp}")

            # 2. 发送 initialized 通知
            notif = {"jsonrpc": "2.0", "method": "notifications/initialized"}
            await remote_ws.send(json.dumps(notif))

            # 3. 请求工具列表(同样手动等待)
            tools_req = {"jsonrpc": "2.0", "id": 2, "method": "tools/list"}
            await remote_ws.send(json.dumps(tools_req))
            tools_resp = json.loads(await remote_ws.recv())
            remote_tools = tools_resp.get("result", {}).get("tools", [])
            logger.info(f"Discovered {len(remote_tools)} tools: {[t['name'] for t in remote_tools]}")

            # 启动后台接收循环
            receive_task = asyncio.create_task(_receive_loop())

        except Exception as e:
            logger.error(f"Failed to connect to remote server: {e}")
            remote_ws = None
            raise


async def forward_request_to_remote(request_body: dict) -> dict:
    """将请求转发给远程服务器,并等待响应"""
    global remote_ws, pending_requests
    if not remote_ws or remote_ws.closed:
        await connect_to_remote()

    req_id = request_body.get("id")
    if req_id is None:
        raise ValueError("Request missing 'id' field")

    # 创建 Future 并存入字典
    loop = asyncio.get_event_loop()
    future = loop.create_future()
    pending_requests[req_id] = future

    try:
        await remote_ws.send(json.dumps(request_body))
        logger.debug(f"Sent to remote: {request_body}")
        # 等待响应,设置超时
        response = await asyncio.wait_for(future, timeout=30.0)
        return response
    except asyncio.TimeoutError:
        pending_requests.pop(req_id, None)
        raise Exception(f"Request {req_id} timed out")
    except Exception as e:
        pending_requests.pop(req_id, None)
        raise e


@asynccontextmanager
async def lifespan(app: FastAPI):
    """应用启动时连接远程服务器"""
    try:
        await connect_to_remote()
    except Exception as e:
        logger.error(f"Startup connection failed: {e}")
    yield
    if receive_task:
        receive_task.cancel()
    if remote_ws and not remote_ws.closed:
        await remote_ws.close()


app = FastAPI(lifespan=lifespan)


@app.get("/sse")
async def sse_endpoint(request: Request):
    """SSE 端点,用于 OpenClaw 连接"""
    async def event_generator():
        # 发送初始化事件(符合 MCP over SSE 规范)
        init_event = {
            "jsonrpc": "2.0",
            "id": 1,
            "result": {
                "protocolVersion": "2024-11-05",
                "capabilities": {},
                "serverInfo": {"name": "mcp-ws-proxy", "version": "2.0"}
            }
        }
        yield f"data: {json.dumps(init_event)}\n\n"

        # 保持连接,发送心跳
        while not await request.is_disconnected():
            await asyncio.sleep(1)
            yield ": heartbeat\n\n"

    return StreamingResponse(event_generator(), media_type="text/event-stream")


@app.post("/sse")
async def sse_post(request: Request):
    """处理来自 OpenClaw 的 JSON-RPC 请求"""
    body = await request.json()
    print("=== 收到请求体 ===")          # 添加直接打印
    print(json.dumps(body, indent=2))   # 格式化打印
    logger.info(f"Received request: {body}")

    method = body.get("method")
    req_id = body.get("id")

    # tools/list 直接返回缓存的工具列表
    if method == "tools/list":
        return JSONResponse({
            "jsonrpc": "2.0",
            "id": req_id,
            "result": {"tools": remote_tools}
        })

    # initialize 返回本地信息
    if method == "initialize":
        return JSONResponse({
            "jsonrpc": "2.0",
            "id": req_id,
            "result": {
                "protocolVersion": "2024-11-05",
                "capabilities": {},
                "serverInfo": {"name": "mcp-ws-proxy", "version": "2.0"}
            }
        })

    # 其他请求(如 tools/call)转发给远程服务器
    try:
        resp = await forward_request_to_remote(body)
        return JSONResponse(resp)
    except Exception as e:
        logger.error(f"Forwarding failed: {e}", exc_info=True)
        return JSONResponse({
            "jsonrpc": "2.0",
            "id": req_id,
            "error": {"code": -32000, "message": str(e)}
        }, status_code=502)


@app.get("/health")
async def health():
    """健康检查"""
    return {
        "status": "ok",
        "remote_connected": remote_ws is not None and not remote_ws.closed,
        "tools_count": len(remote_tools)
    }


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)
  • 构建docker
docker compose build mcp-porter
  • 启动docker
docker compose mcp-porter up -d 

验证是否能通过mcp-porter-server来访问计算工具

  • 构建测试脚本test_proxy.py
#!/usr/bin/env python3
"""
增强版 MCP 代理测试脚本
- 动态获取工具参数 schema,自动构造合适的测试参数
- 支持任意工具,无需硬编码参数名
- 提供详细的请求/响应日志
"""

import json
import requests
import argparse
import sys
from typing import Dict, Any

def send_jsonrpc(url: str, method: str, params: Dict = None, req_id: int = 1) -> Dict:
    """发送 JSON-RPC 请求并返回响应"""
    payload = {
        "jsonrpc": "2.0",
        "id": req_id,
        "method": method
    }
    if params is not None:
        payload["params"] = params

    headers = {"Content-Type": "application/json"}
    print(f"\n>>> 发送请求 [{req_id}]: {method}")
    print(json.dumps(payload, indent=2, ensure_ascii=False))

    try:
        response = requests.post(url, json=payload, headers=headers, timeout=10)
        response.raise_for_status()
        resp_json = response.json()
        print(f"<<< 收到响应:")
        print(json.dumps(resp_json, indent=2, ensure_ascii=False))
        return resp_json
    except requests.exceptions.RequestException as e:
        print(f" 请求失败: {e}")
        if hasattr(e, 'response') and e.response is not None:
            print(f"响应状态码: {e.response.status_code}")
            print(f"响应内容: {e.response.text}")
        sys.exit(1)
    except json.JSONDecodeError:
        print(f" 响应不是有效的 JSON: {response.text}")
        sys.exit(1)

def generate_test_arguments(schema: Dict) -> Dict:
    """
    根据工具的 inputSchema 生成测试参数
    schema 格式示例:
    {
        "type": "object",
        "properties": {
            "python_expression": {"type": "string", "description": "..."}
        },
        "required": ["python_expression"]
    }
    """
    if not schema:
        return {}

    properties = schema.get("properties", {})
    required = schema.get("required", [])

    args = {}
    for field in required:
        prop = properties.get(field, {})
        prop_type = prop.get("type", "string")
        # 根据类型生成合适的测试值
        if prop_type == "number":
            args[field] = 10  # 示例数字
        elif prop_type == "integer":
            args[field] = 5
        elif prop_type == "boolean":
            args[field] = True
        elif prop_type == "array":
            args[field] = []  # 空数组
        elif prop_type == "object":
            args[field] = {}  # 空对象
        else:  # 默认字符串
            args[field] = "test" if field != "python_expression" else "10 + 5"

    # 如果没有 required 字段,尝试取第一个属性作为示例
    if not required and properties:
        field = next(iter(properties))
        prop = properties[field]
        prop_type = prop.get("type", "string")
        if prop_type == "number":
            args[field] = 10
        elif prop_type == "integer":
            args[field] = 5
        else:
            args[field] = "example"

    return args

def main():
    parser = argparse.ArgumentParser(description="增强版 MCP 代理测试")
    parser.add_argument("--url", default="http://127.0.0.1:8000/sse",
                        help="代理的 SSE 端点 URL (默认: http://127.0.0.1:8000/sse)")
    parser.add_argument("--tool", help="指定要测试的工具名称,不指定则使用第一个工具")
    args = parser.parse_args()

    print(f" 测试代理: {args.url}")

    # 1. 获取工具列表
    print("\n 正在获取工具列表...")
    tools_resp = send_jsonrpc(args.url, "tools/list", req_id=1)
    if "error" in tools_resp:
        print(f" 获取工具列表失败: {tools_resp['error']}")
        sys.exit(1)

    tools = tools_resp.get("result", {}).get("tools", [])
    if not tools:
        print("  代理未返回任何工具")
        sys.exit(0)

    print(f" 发现 {len(tools)} 个工具:")
    for idx, tool in enumerate(tools, 1):
        name = tool.get("name", "未知")
        desc = tool.get("description", "")
        print(f"  {idx}. {name}: {desc}")

    # 2. 选择要测试的工具
    target_tool = None
    if args.tool:
        for tool in tools:
            if tool.get("name") == args.tool:
                target_tool = tool
                break
        if not target_tool:
            print(f" 未找到指定工具: {args.tool}")
            sys.exit(1)
    else:
        # 默认选择第一个工具
        target_tool = tools[0]
        print(f"\n  未指定工具,将使用第一个工具: {target_tool['name']}")

    tool_name = target_tool["name"]
    tool_schema = target_tool.get("inputSchema", {})

    print(f"\n 工具 '{tool_name}' 的输入 schema:")
    print(json.dumps(tool_schema, indent=2, ensure_ascii=False))

    # 3. 生成测试参数
    test_args = generate_test_arguments(tool_schema)
    print(f"\n 生成的测试参数: {test_args}")

    # 4. 调用工具
    params = {"name": tool_name, "arguments": test_args}
    call_resp = send_jsonrpc(args.url, "tools/call", params, req_id=2)

    if "error" in call_resp:
        print(f"\n 调用失败: {call_resp['error']}")
        sys.exit(1)

    result = call_resp.get("result", {})
    print(f"\n 调用成功,原始结果:")
    print(json.dumps(result, indent=2, ensure_ascii=False))

    # 尝试提取文本内容(如果存在)
    content = result.get("content", [])
    for item in content:
        if item.get("type") == "text":
            print(f"\n 计算结果文本: {item['text']}")

if __name__ == "__main__":
    main()
  • 测试运行

python3 test_proxy.py

  • 效果
$ ./run_test.sh

测试代理: http://127.0.0.1:8000/sse

正在获取工具列表...

>>> 发送请求 [1]: tools/list
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "tools/list"
}
<<< 收到响应:
{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "tools": [
      {
        "name": "calculator",
        "description": "For mathamatical calculation, always use this tool to calculate the result of a python expression. You can use 'math' or 'random' directly, without 'import'.",
        "inputSchema": {
          "properties": {
            "python_expression": {
              "type": "string"
            }
          },
          "required": [
            "python_expression"
          ],
          "type": "object"
        },
        "outputSchema": {
          "additionalProperties": true,
          "type": "object"
        },
        "_meta": {
          "_fastmcp": {
            "tags": []
          }
        }
      }
    ]
  }
}
发现 1 个工具:
  1. calculator: For mathamatical calculation, always use this tool to calculate the result of a python expression. You can use 'math' or 'random' directly, without 'import'.

未指定工具,将使用第一个工具: calculator

工具 'calculator' 的输入 schema:
{
  "properties": {
    "python_expression": {
      "type": "string"
    }
  },
  "required": [
    "python_expression"
  ],
  "type": "object"
}

生成的测试参数: {'python_expression': '10 + 5'}

>>> 发送请求 [2]: tools/call
{
  "jsonrpc": "2.0",
  "id": 2,
  "method": "tools/call",
  "params": {
    "name": "calculator",
    "arguments": {
      "python_expression": "10 + 5"
    }
  }
}
<<< 收到响应:
{
  "jsonrpc": "2.0",
  "id": 2,
  "result": {
    "content": [
      {
        "type": "text",
        "text": "{\"success\":true,\"result\":15}"
      }
    ],
    "structuredContent": {
      "success": true,
      "result": 15
    },
    "isError": false
  }
}

调用成功,原始结果:
{
  "content": [
    {
      "type": "text",
      "text": "{\"success\":true,\"result\":15}"
    }
  ],
  "structuredContent": {
    "success": true,
    "result": 15
  },
  "isError": false
}

计算结果文本: {"success":true,"result":15}
  • 因为openclaw是docker里面,所有需要访问docker版的mcp-endpoint-server和mcp-porter-server,需要他们都在一个网络里面,这样才能访问访问。

  • 我们在mcp-endpoint-server增加一个docker网络,

  • mcp-endpoint-server改为

$ cat docker-compose.yml
# Docker安装全模块

version: '3'
services:
  # MCP 接入点服务
  mcp-endpoint-server:
    image: ghcr.nju.edu.cn/xinnan-tech/mcp-endpoint-server:latest
    container_name: mcp-endpoint-server
    restart: always
    networks:
      - mcp-shared-network  # 改为共享网络
    ports:
      - "8004:8004"
    security_opt:
      - seccomp:unconfined
    environment:
      - TZ=Asia/Shanghai
    volumes:
      # 配置文件目录
      - ./data:/app/data
networks:
  mcp-shared-network:
    driver: bridge
  • 增加一个mcp-shared-network网络
  • mcp-porter-server使用这个网络
$ cat docker-compose.yml
networks:
  mcp-shared-network:
    external: true
    name: mcp-endpoint-server_mcp-shared-network

services:
  mcp-porter:
    build: .                      # 使用当前目录的 Dockerfile 构建
    container_name: mcp-porter
    restart: always
    networks:
      - mcp-shared-network
    ports:
      - "8000:8000"
    volumes:
      - ./:/app                    # 将宿主机当前目录挂载到容器 /app,覆盖代码
    environment:
      - TZ=Asia/Shanghai
      - REMOTE_WS_URL=ws://mcp-endpoint-server:8004/mcp_endpoint/call/?token=${MCP_TOKEN}
    # 不再直接写 token,使用 .env 文件
  • openclaw也使用这个网络
$ cat docker-compose.yml
networks:
  mcp-shared-network:
    external: true
    name: mcp-endpoint-server_mcp-shared-network
services:
  openclaw-cli:
    entrypoint:
    - node
    - dist/index.js
    environment:
      BROWSER: echo
      HOME: /home/node
      OPENCLAW_GATEWAY_TOKEN: ${OPENCLAW_GATEWAY_TOKEN}
      TERM: xterm-256color
    image: ${OPENCLAW_IMAGE:-openclaw:local}
    init: true
    networks:
    - default
    - mcp-shared-network
    stdin_open: true
    tty: true
    volumes:
    - ${OPENCLAW_CONFIG_DIR}:/home/node/.openclaw
    - ${OPENCLAW_WORKSPACE_DIR}:/home/node/.openclaw/workspace
  openclaw-gateway:
    command:
    - node
    - dist/index.js
    - gateway
    - --bind
    - ${OPENCLAW_GATEWAY_BIND:-lan}
    - --port
    - '18789'
    env_file:
    - ./.env
    environment:
      GATEWAY_TRUSTED_PROXIES: ${GATEWAY_TRUSTED_PROXIES}
      HOME: /home/node
      OPENCLAW_GATEWAY_TOKEN: ${OPENCLAW_GATEWAY_TOKEN}
      TERM: xterm-256color
    image: ${OPENCLAW_IMAGE:-openclaw:local}
    init: true
    networks:
    - default
    - mcp-shared-network
    ports:
    - ${OPENCLAW_GATEWAY_PORT:-18789}:18789
    - ${OPENCLAW_BRIDGE_PORT:-18790}:18790
    restart: unless-stopped
    volumes:
    - ${OPENCLAW_CONFIG_DIR}:/home/node/.openclaw
    - ${OPENCLAW_WORKSPACE_DIR}:/home/node/.openclaw/workspace
    - /etc/localtime:/etc/localtime:ro
    - /etc/timezone:/etc/timezone:ro
  • 配置openclaw.json使用容器名称即可
 "plugins": {
    "entries": {
      "feishu": {
        "enabled": true
      },
      "openclaw-mcp-adapter": {
        "enabled": true,
        "config": {
          "servers": [
            {
              "name": "mcp-porter-calculator",
              "transport": "http",
              "url": "http://mcp-porter:8000/sse"
            }
          ],
          "toolPrefix": true
        }
      }
    }
  }
  • 重启三个docker,即可通过openclaw来调用虾哥的计算器功能
  • 在web-ui的chat界面发指令:
使用mcp工具来计算10+10

纠错,疑问,交流: 请进入讨论区点击加入Q群

获取最新文章: 扫一扫右上角的二维码加入“创客智造”公众号


标签: openclaw-ai助手入门教程