← 返回
沟通协作 中文

Kiro Realtime Chat

Enables near real-time chat between multiple Kiro instances via a shared JSON file with polling for quick message exchange and task coordination.
通过共享 JSON 文件实现多个 Kiro 实例间的近实时聊天,利用轮询机制快速交换消息并协调任务。
sonerbo
沟通协作 clawhub v1.0.0 1 版本 100000 Key: 无需
★ 0
Stars
📥 490
下载
💾 6
安装
1
版本
#latest

概述

Kiro Realtime Chat

Overview

This skill enables near real-time communication between multiple Kiro instances using a shared JSON file with polling. Each message includes timestamp, sender, and content.

File Location

memory/kiro-realtime.json

JSON Structure

{
  "messages": [
    {
      "id": 1,
      "from": "VPS Kiro",
      "to": "Laptop Kiro",
      "message": "Selam!",
      "timestamp": "2026-03-05T22:58:00Z",
      "read": false
    }
  ],
  "lastCheck": "2026-03-05T22:58:00Z"
}

Usage

Sending a Message

import json
import os
from datetime import datetime

CHAT_FILE = "memory/kiro-realtime.json"

def send_message(from_name, to_name, message):
    # Load or create chat file
    if os.path.exists(CHAT_FILE):
        with open(CHAT_FILE, "r") as f:
            chat = json.load(f)
    else:
        chat = {"messages": [], "lastCheck": datetime.now().isoformat()}
    
    # Add new message
    msg_id = len(chat["messages"]) + 1
    chat["messages"].append({
        "id": msg_id,
        "from": from_name,
        "to": to_name,
        "message": message,
        "timestamp": datetime.now().isoformat(),
        "read": False
    })
    
    # Save
    with open(CHAT_FILE, "w") as f:
        json.dump(chat, f, indent=2)
    
    return msg_id

Checking for New Messages

def check_messages(my_name, since_timestamp=None):
    if not os.path.exists(CHAT_FILE):
        return []
    
    with open(CHAT_FILE, "r") as f:
        chat = json.load(f)
    
    # Update last check time
    chat["lastCheck"] = datetime.now().isoformat()
    with open(CHAT_FILE, "w") as f:
        json.dump(chat, f, indent=2)
    
    # Filter messages my_messages = []
 for me
       for msg in chat["messages"]:
        if msg["to"] == my_name and not msg["read"]:
            my_messages.append(msg)
            msg["read"] = True
    
    # Save read status
    with open(CHAT_FILE, "w") as f:
        json.dump(chat, f, indent=2)
    
    return my_messages

Polling Loop (for near real-time)

import time

def poll_messages(my_name, interval=5):
    """Poll for new messages every N seconds"""
    while True:
        msgs = check_messages(my_name)
        for msg in msgs:
            print(f"{msg['from']}: {msg['message']}")
            # Process and respond here
        time.sleep(interval)

Recommended Polling Interval

  • Fast response needed: 2-3 seconds
  • Normal: 5-10 seconds
  • Low bandwidth: 30-60 seconds

Tips

  • Mark messages as "read" after processing
  • Use timestamps to avoid duplicate processing
  • For better real-time, consider MCP server or WebSocket
  • Keep messages under 1000 characters

版本历史

共 1 个版本

  • v1.0.0 当前
    2026-03-30 14:56 安全 安全

安全检测

腾讯云安全 (Keen)

安全,无风险
查看报告

腾讯云安全 (Sanbu)

安全,无风险
查看报告

🔗 相关推荐

communication-collaboration

imap-smtp-email

gzlicanyi
使用IMAP/SMTP读取和发送邮件;检查新/未读邮件、获取内容、搜索邮箱、标记已读/未读、发送带附件的邮件。支持...
★ 114 📥 52,449
communication-collaboration

Gmail

byungkyu
Gmail API 集成,托管 OAuth,支持读取、发送和管理邮件、线程、标签及草稿,适用于需要与 Gmail 交互的场景。
★ 72 📥 37,741
communication-collaboration

Slack

steipete
当需要通过 slack 工具从 Clawdbot 控制 Slack 时使用,包括在频道或私信中回复消息或置顶/取消置顶项目。
★ 157 📥 47,688