Browse Source

Refactor: Modularize Tuya MQTT Bridge into 'bridge' package

- Replaced monolithic tuya_mqtt_bridge.py with modular bridge/ package
- Created core, handlers (door, climate, general), and config modules
- Updated Dockerfile.tuya to use new structure
- Updated documentation (README.md, copilot-instructions.md)
- Added inspection tools in tools/
- Restored flow files
master
Matteo Benedetto 3 months ago
parent
commit
4a9002afd4
  1. 6
      .github/copilot-instructions.md
  2. 1
      .gitignore
  3. 7
      porta-lab/Dockerfile.tuya
  4. 6
      porta-lab/README.md
  5. 61
      porta-lab/bridge/README.md
  6. 0
      porta-lab/bridge/__init__.py
  7. 24
      porta-lab/bridge/config.py
  8. 243
      porta-lab/bridge/core.py
  9. 0
      porta-lab/bridge/handlers/__init__.py
  10. 13
      porta-lab/bridge/handlers/base.py
  11. 62
      porta-lab/bridge/handlers/climate.py
  12. 45
      porta-lab/bridge/handlers/door.py
  13. 81
      porta-lab/bridge/handlers/general.py
  14. 9
      porta-lab/bridge/main.py
  15. 23
      porta-lab/bridge/utils.py
  16. 422
      porta-lab/tuya_mqtt_bridge.py

6
.github/copilot-instructions.md

@ -266,20 +266,20 @@ Il servizio `monitor` lo rileva automaticamente e riavvia il container
Il sistema integra una gestione avanzata per dispositivi infrarossi (es. Condizionatori) che richiedono comandi complessi non supportati dallo standard MQTT di base.
### Logica del Bridge (`tuya_mqtt_bridge.py`)
### Logica del Bridge (`porta-lab/bridge/`)
1. **Auto-Discovery Parent**: All'avvio, il bridge scansiona tutti i dispositivi. Se trova un dispositivo `infrared_ac`, cerca il suo "genitore" fisico (IR Blaster) confrontando le `local_key` (i dispositivi virtuali condividono la chiave con il gateway fisico).
2. **Virtual State Management**: Poiché i telecomandi IR inviano lo stato completo (non solo "accendi", ma "accendi a 24°C freddo"), il bridge mantiene una memoria dello stato corrente per ogni AC.
3. **API V2.0**: I comandi vengono inviati tramite l'endpoint Tuya avanzato:
`POST /v2.0/infrareds/{parent_id}/air-conditioners/{device_id}/scenes/command`
### Debugging
Per verificare i comandi IR, utilizzare lo script `porta-lab/test_ac_advanced.py` che bypassa MQTT e invia comandi diretti alle API Tuya.
Per verificare i comandi IR, utilizzare lo script `porta-lab/tools/test_ac_advanced.py` che bypassa MQTT e invia comandi diretti alle API Tuya.
## Integrazione Porta/Tapparella (RF Device)
Il sistema gestisce un dispositivo RF "Curtains" (`bfd206260a90dcb6ec8uah`) che controlla la porta/tapparella del laboratorio. Poiché il controllo diretto via DPS non è affidabile per questo dispositivo RF, l'integrazione utilizza le **Scene Tuya Cloud** (Tap-to-Run).
### Logica del Bridge (`tuya_mqtt_bridge.py`)
### Logica del Bridge (`porta-lab/bridge/handlers/door.py`)
1. **Intercettazione Comandi**: Il bridge ascolta sul topic `tuya/curtains/command`.
2. **Mapping Scene**:
* Payload `open` / `on` → Attiva scena Cloud **OPEN** (`4vxIR8nrcLzri9QW`)

1
.gitignore vendored

@ -39,3 +39,4 @@ check_*.py
inspect_*.py
list_*.py
*.bak

7
porta-lab/Dockerfile.tuya

@ -6,10 +6,11 @@ WORKDIR /app
COPY okp-requirements.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
# Copy script
COPY tuya_mqtt_bridge.py .
# Copy application code
COPY bridge ./bridge
# Unbuffered output
ENV PYTHONUNBUFFERED=1
ENV PYTHONPATH=/app
CMD ["python", "-u", "tuya_mqtt_bridge.py"]
CMD ["python", "-m", "bridge.main"]

6
porta-lab/README.md

@ -8,9 +8,11 @@ Il cuore del sistema è il **Tuya General Bridge**, un servizio Python container
### Componenti Principali
1. **Tuya MQTT Bridge** (`tuya_mqtt_bridge.py`):
1. **Tuya MQTT Bridge** (`bridge/`):
* Applicazione modulare Python.
* Auto-discovery di tutti i dispositivi Tuya.
* Supporto avanzato per dispositivi IR (Condizionatori) con gestione dello stato virtuale.
* Supporto per dispositivi RF (Tende/Porte) tramite Scene Cloud.
* Pubblicazione stato su `tuya/<slug>/status`.
* Ricezione comandi su `tuya/<slug>/command`.
@ -21,7 +23,7 @@ Il cuore del sistema è il **Tuya General Bridge**, un servizio Python container
## Struttura Repository
* `tuya_mqtt_bridge.py`: Codice sorgente del bridge.
* `bridge/`: Codice sorgente modulare del bridge (vedi [bridge/README.md](bridge/README.md)).
* `Dockerfile.tuya`: Definizione container Docker.
* `ac_flow.json`: Flusso Node-RED per il controllo condizionatori.
* `tools/`: Script di utilità (es. `merge_flows.py`, `deploy_door_flow.py`).

61
porta-lab/bridge/README.md

@ -0,0 +1,61 @@
# Tuya MQTT Bridge Architecture
## Overview
This module provides a robust bridge between Tuya Cloud and MQTT, allowing local control of Tuya devices via standard MQTT messages. It is designed to be modular, extensible, and easy to maintain.
## Directory Structure
```
bridge/
├── __init__.py
├── main.py # Entry point
├── core.py # Main application logic (MQTT loop, device discovery)
├── config.py # Configuration settings (Credentials, Topics)
├── utils.py # Helper functions (Logging, Slugify)
└── handlers/ # Device-specific logic
├── __init__.py
├── base.py # Abstract Base Class for handlers
├── door.py # RF Door/Curtain logic (Scene triggers)
├── climate.py # IR AC logic (Virtual state, v2.0 API)
└── general.py # Generic devices (Lights, Switches, Vacuums)
```
## Core Components
### 1. Core (`core.py`)
The `TuyaBridgeCore` class is the heart of the application. It:
- Connects to Tuya Cloud using `tinytuya`.
- Connects to the MQTT Broker.
- Discovers devices and maps them to slugs (e.g., `tuya/kitchen_light`).
- Dispatches incoming MQTT commands to the appropriate Handler.
- Periodically polls device status and publishes updates.
### 2. Handlers (`handlers/`)
Instead of a monolithic `if/else` block, device logic is separated into classes inheriting from `BaseHandler`.
- **`GeneralHandler`**: Handles standard devices where commands map directly to Tuya codes (e.g., `switch_1`, `led_switch`).
- **`ClimateHandler`**: Manages Infrared ACs. Since IR is stateless, it maintains a "Virtual State" in memory and sends full state updates via the Tuya v2.0 API.
- **`DoorHandler`**: Manages RF Curtains/Doors. Since these devices often don't support direct control via API, this handler triggers specific Cloud Scenes (Tap-to-Run) to execute Open/Close actions.
### 3. Configuration (`config.py`)
All hardcoded values (API Keys, Device IDs, Topic Prefixes) are centralized here.
## Data Flow
1. **Discovery**: On startup, `core.py` fetches all devices from Tuya.
2. **Command**:
- User publishes to `tuya/{device_slug}/command`.
- `core.py` receives message.
- `core.py` identifies the device type and calls `handler.handle(dev_id, payload)`.
- Handler processes the payload and calls Tuya API.
3. **Status**:
- `core.py` polls status every `STATUS_INTERVAL`.
- Publishes result to `tuya/{device_slug}/status`.
## Extending
To add support for a new complex device type:
1. Create a new file in `handlers/` (e.g., `handlers/vacuum.py`).
2. Inherit from `BaseHandler`.
3. Implement the `handle` method.
4. Register the new handler in `core.py`'s `__init__` and `dispatch_command` methods.

0
porta-lab/bridge/__init__.py

24
porta-lab/bridge/config.py

@ -0,0 +1,24 @@
"""
Configuration settings for the Tuya MQTT Bridge.
"""
# MQTT Configuration
MQTT_BROKER = "mqtt"
MQTT_PORT = 1883
MQTT_CLIENT_ID = "tuya_bridge_general"
MQTT_TOPIC_PREFIX = "tuya"
# Tuya Cloud Credentials
TUYA_REGION = "eu"
TUYA_KEY = "3vkr7nafauo7ro69yeps"
TUYA_SECRET = "75c2e0ab536c4029bb1912f89cb5c6d9"
TUYA_DEVICE_ID = "bf6a2ef9072cf7b319qgh1" # Any valid device ID from the account
# Door/Curtain Configuration
DOOR_DEVICE_ID = "bfd206260a90dcb6ec8uah"
# Scene IDs for Door Control (Fallback/Primary method for RF devices)
URI_TRIGGER_OPEN = 'cloud/scene/rule/4vxIR8nrcLzri9QW/actions/trigger'
URI_TRIGGER_CLOSE = 'cloud/scene/rule/OY6pWbCSciVLe6lV/actions/trigger'
# Update Interval (seconds)
STATUS_INTERVAL = 60

243
porta-lab/bridge/core.py

@ -0,0 +1,243 @@
"""
Core logic for the Tuya MQTT Bridge.
"""
import tinytuya
import paho.mqtt.client as mqtt
import json
import time
import threading
from datetime import datetime
from .config import (
MQTT_BROKER, MQTT_CLIENT_ID, MQTT_TOPIC_PREFIX,
TUYA_REGION, TUYA_KEY, TUYA_SECRET, TUYA_DEVICE_ID,
DOOR_DEVICE_ID, STATUS_INTERVAL
)
from .utils import logger, slugify
from .handlers.door import DoorHandler
from .handlers.climate import ClimateHandler
from .handlers.general import GeneralHandler
class TuyaBridgeCore:
def __init__(self):
self.cloud = tinytuya.Cloud(
apiRegion=TUYA_REGION,
apiKey=TUYA_KEY,
apiSecret=TUYA_SECRET,
apiDeviceID=TUYA_DEVICE_ID
)
self.mqtt = mqtt.Client(MQTT_CLIENT_ID)
self.mqtt.on_connect = self.on_connect
self.mqtt.on_message = self.on_message
self.mqtt.on_disconnect = self.on_disconnect
self.devices = {} # id -> device_info
self.slug_map = {} # slug -> id
self.ac_states = {} # id -> {power, mode, temp, wind}
self.running = True
# Initialize Handlers
self.door_handler = DoorHandler(self)
self.climate_handler = ClimateHandler(self)
self.general_handler = GeneralHandler(self)
def fetch_devices(self):
"""Fetches list of devices from Tuya Cloud"""
logger.info("🔄 Fetching devices from Tuya Cloud...")
try:
devices = self.cloud.getdevices()
if not devices:
logger.warning(" No devices found!")
return
# Pass 1: Map Local Keys to Parent IDs (Gateways/Blasters)
key_map = {}
for dev in devices:
if not dev.get('sub', False) or dev['category'] in ['qt', 'wnykq', 'smart_ir']:
if 'key' in dev:
key_map[dev['key']] = dev['id']
logger.info(f"🔑 Found {len(key_map)} potential parent devices (by key)")
# Pass 2: Process all devices
for dev in devices:
dev_id = dev['id']
name = dev['name']
category = dev['category']
slug = slugify(name)
# Handle duplicate slugs
original_slug = slug
counter = 1
while slug in self.slug_map and self.slug_map[slug] != dev_id:
slug = f"{original_slug}_{counter}"
counter += 1
parent_id = None
if category == 'infrared_ac':
# Try to find parent by key
if 'key' in dev and dev['key'] in key_map:
parent_id = key_map[dev['key']]
logger.info(f"🔗 Linked AC {name} to Parent {parent_id}")
# Initialize virtual state
self.ac_states[dev_id] = {
'power': 0,
'mode': 2, # Auto
'temp': 24,
'wind': 0 # Auto
}
self.devices[dev_id] = {
'name': name,
'slug': slug,
'category': category,
'data': dev,
'parent_id': parent_id
}
self.slug_map[slug] = dev_id
logger.info(f"📱 Discovered: {name} ({category}) -> tuya/{slug}")
except Exception as e:
logger.error(f"❌ Error fetching devices: {e}")
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
logger.info(f"✅ Connected to MQTT: {MQTT_BROKER}")
# Subscribe to all command topics
topic = f"{MQTT_TOPIC_PREFIX}/+/command"
client.subscribe(topic)
logger.info(f"📥 Subscribed to: {topic}")
# Publish bridge status
client.publish(f"{MQTT_TOPIC_PREFIX}/bridge/status", "online", retain=True)
# Publish discovery info
self.publish_discovery()
else:
logger.error(f"❌ MQTT Connection failed: {rc}")
def on_disconnect(self, client, userdata, rc):
logger.warning(f" Disconnected from MQTT (rc: {rc})")
def on_message(self, client, userdata, msg):
try:
topic_parts = msg.topic.split('/')
if len(topic_parts) != 3 or topic_parts[2] != 'command':
return
slug = topic_parts[1]
payload = msg.payload.decode('utf-8').strip()
if slug not in self.slug_map:
logger.warning(f" Unknown device slug: {slug}")
return
dev_id = self.slug_map[slug]
self.dispatch_command(dev_id, payload)
except Exception as e:
logger.error(f"❌ Error handling message: {e}")
def dispatch_command(self, dev_id, payload):
"""Route command to appropriate handler"""
dev_info = self.devices[dev_id]
dev_name = dev_info['name']
category = dev_info['category']
logger.info(f"📨 Command for {dev_name} (ID: {dev_id}): {payload}")
# Special Handling for Door/Curtain
if dev_id == DOOR_DEVICE_ID:
self.door_handler.handle(dev_id, payload)
return
# Special Handling for Infrared AC
if category == 'infrared_ac' and dev_info.get('parent_id'):
self.climate_handler.handle(dev_id, payload)
return
# Default Handler
self.general_handler.handle(dev_id, payload)
def update_device_status(self, dev_id):
"""Fetch and publish status for a single device"""
slug = self.devices[dev_id]['slug']
# For ACs, publish virtual state as data
if self.devices[dev_id]['category'] == 'infrared_ac':
state = self.ac_states.get(dev_id, {})
payload = {
'timestamp': datetime.now().isoformat(),
'online': True,
'data': state
}
topic = f"{MQTT_TOPIC_PREFIX}/{slug}/status"
self.mqtt.publish(topic, json.dumps(payload), retain=True)
return
try:
res = self.cloud.getstatus(dev_id)
if res and 'result' in res:
status = {}
for item in res['result']:
status[item['code']] = item['value']
# Add metadata
payload = {
'timestamp': datetime.now().isoformat(),
'online': True,
'data': status
}
topic = f"{MQTT_TOPIC_PREFIX}/{slug}/status"
self.mqtt.publish(topic, json.dumps(payload), retain=True)
logger.debug(f"Updated status for {slug}")
else:
logger.warning(f" Could not fetch status for {slug}")
except Exception as e:
logger.error(f"❌ Error updating status for {slug}: {e}")
def publish_discovery(self):
"""Publish list of devices to MQTT"""
payload = {
'timestamp': datetime.now().isoformat(),
'devices': []
}
for dev_id, info in self.devices.items():
payload['devices'].append({
'name': info['name'],
'slug': info['slug'],
'category': info['category'],
'id': dev_id
})
self.mqtt.publish(f"{MQTT_TOPIC_PREFIX}/bridge/devices", json.dumps(payload), retain=True)
def run(self):
"""Main loop"""
self.fetch_devices()
logger.info(f"🔌 Connecting to MQTT Broker: {MQTT_BROKER}")
try:
self.mqtt.connect(MQTT_BROKER, 1883, 60)
self.mqtt.loop_start()
except Exception as e:
logger.error(f"❌ MQTT Connection Error: {e}")
return
logger.info("🚀 Bridge Started")
try:
while self.running:
logger.info("🔄 Updating all device statuses...")
for dev_id in self.devices:
self.update_device_status(dev_id)
time.sleep(0.5) # Avoid rate limits
time.sleep(STATUS_INTERVAL)
except KeyboardInterrupt:
logger.info("🛑 Stopping Bridge...")
self.running = False
self.mqtt.loop_stop()
self.mqtt.disconnect()

0
porta-lab/bridge/handlers/__init__.py

13
porta-lab/bridge/handlers/base.py

@ -0,0 +1,13 @@
"""
Base handler interface for device commands.
"""
from abc import ABC, abstractmethod
class BaseHandler(ABC):
def __init__(self, bridge):
self.bridge = bridge
@abstractmethod
def handle(self, dev_id, payload):
"""Process a command for a specific device."""
pass

62
porta-lab/bridge/handlers/climate.py

@ -0,0 +1,62 @@
"""
Handler for Infrared AC devices using Tuya v2.0 API.
"""
import json
from ..utils import logger
from .base import BaseHandler
class ClimateHandler(BaseHandler):
def handle(self, dev_id, payload):
"""Handle commands for IR AC using v2.0 API."""
dev_info = self.bridge.devices.get(dev_id)
if not dev_info or not dev_info.get('parent_id'):
logger.error(f"❌ Missing parent_id for AC device {dev_id}")
return
parent_id = dev_info['parent_id']
# Get current state or default
state = self.bridge.ac_states.get(dev_id, {'power': 0, 'mode': 2, 'temp': 24, 'wind': 0})
try:
data = json.loads(payload)
except json.JSONDecodeError:
logger.error(f"❌ Invalid JSON for AC command: {payload}")
return
# Update Virtual State
# Handle standard Tuya codes if present
if 'PowerOn' in data:
state['power'] = 1
elif 'PowerOff' in data:
state['power'] = 0
elif 'T' in data:
state['temp'] = int(data['T'])
elif 'M' in data:
state['mode'] = int(data['M'])
elif 'F' in data:
state['wind'] = int(data['F'])
# Handle direct keys if sent (e.g. from Node-RED direct injection)
if 'power' in data: state['power'] = int(data['power'])
if 'temp' in data: state['temp'] = int(data['temp'])
if 'mode' in data: state['mode'] = int(data['mode'])
if 'wind' in data: state['wind'] = int(data['wind'])
# Save state back to bridge memory
self.bridge.ac_states[dev_id] = state
# Construct API Call
url = f"/v2.0/infrareds/{parent_id}/air-conditioners/{dev_id}/scenes/command"
logger.info(f"🚀 Sending AC State to {url}: {state}")
try:
res = self.bridge.cloud.cloudrequest(url, post=state)
if res and res.get('success'):
logger.info(f"✅ AC Command executed for {dev_info['name']}")
# Publish updated status back to MQTT so UI stays in sync
self.bridge.update_device_status(dev_id)
else:
logger.error(f"❌ AC Command failed: {res}")
except Exception as e:
logger.error(f"❌ Error sending AC command: {e}")

45
porta-lab/bridge/handlers/door.py

@ -0,0 +1,45 @@
"""
Handler for RF Door/Curtain devices using Scene Triggers.
"""
import json
from ..config import URI_TRIGGER_OPEN, URI_TRIGGER_CLOSE
from ..utils import logger
from .base import BaseHandler
class DoorHandler(BaseHandler):
def handle(self, dev_id, payload):
"""Handle commands for Door/Curtain using Scene Triggers."""
cmd = payload.lower().strip()
uri = None
# Parse JSON if needed
try:
data = json.loads(payload)
if 'commands' in data:
# Check for scene codes if passed as JSON
for c in data['commands']:
if c.get('code') == 'scene_1': # Assuming scene_1 maps to Open
uri = URI_TRIGGER_OPEN
elif c.get('code') == 'scene_2': # Assuming scene_2 maps to Close
uri = URI_TRIGGER_CLOSE
elif 'open' in data: # Simple JSON {"open": true}
uri = URI_TRIGGER_OPEN if data['open'] else URI_TRIGGER_CLOSE
except json.JSONDecodeError:
pass
# Simple String Commands
if cmd in ['open', 'on', 'true']:
uri = URI_TRIGGER_OPEN
elif cmd in ['close', 'off', 'false']:
uri = URI_TRIGGER_CLOSE
if uri:
logger.info(f"🚪 Triggering Door Scene: {uri}")
try:
# Accessing the cloud object from the bridge instance
self.bridge.cloud._tuyaplatform(uri=uri, ver="v2.0", action="POST")
logger.info("✅ Door Scene Triggered Successfully")
except Exception as e:
logger.error(f"❌ Error triggering door scene: {e}")
else:
logger.warning(f" Unknown door command: {payload}")

81
porta-lab/bridge/handlers/general.py

@ -0,0 +1,81 @@
"""
Handler for general Tuya devices (Lights, Switches, Vacuums).
"""
import json
import time
from ..utils import logger
from .base import BaseHandler
class GeneralHandler(BaseHandler):
def handle(self, dev_id, payload):
"""Process generic command for a device."""
dev_info = self.bridge.devices.get(dev_id)
if not dev_info:
return
dev_name = dev_info['name']
category = dev_info['category']
commands = {}
# Try parsing JSON
try:
data = json.loads(payload)
if 'commands' in data:
commands = data # Already in Tuya format
elif 'code' in data and 'value' in data:
commands = {'commands': [data]}
else:
# Assume simple key-value pairs
cmds = []
for k, v in data.items():
cmds.append({'code': k, 'value': v})
commands = {'commands': cmds}
except json.JSONDecodeError:
# Handle simple string commands (shortcuts)
cmd_str = payload.lower()
# Robot Vacuum Shortcuts
if category == 'sd':
if cmd_str in ['start', 'clean']:
commands = {'commands': [{'code': 'power_go', 'value': True}]}
elif cmd_str in ['stop', 'pause']:
commands = {'commands': [{'code': 'power_go', 'value': False}]}
if cmd_str == 'pause':
commands = {'commands': [{'code': 'pause', 'value': True}]}
elif cmd_str in ['home', 'charge']:
commands = {'commands': [{'code': 'switch_charge', 'value': True}]}
elif cmd_str == 'seek':
commands = {'commands': [{'code': 'seek', 'value': True}]}
# Light Shortcuts
elif category == 'dj':
if cmd_str == 'on':
commands = {'commands': [{'code': 'switch_led', 'value': True}]}
elif cmd_str == 'off':
commands = {'commands': [{'code': 'switch_led', 'value': False}]}
# Switch/Socket Shortcuts
elif category in ['cz', 'kg']:
if cmd_str == 'on':
commands = {'commands': [{'code': 'switch_1', 'value': True}]}
elif cmd_str == 'off':
commands = {'commands': [{'code': 'switch_1', 'value': False}]}
if not commands:
logger.warning(f" Could not parse command for {dev_name}: {payload}")
return
# Send to Tuya
try:
logger.info(f"🚀 Sending to Tuya: {commands}")
res = self.bridge.cloud.sendcommand(dev_id, commands)
if res and res.get('success'):
logger.info(f"✅ Command executed for {dev_name}")
# Force immediate status update
time.sleep(1)
self.bridge.update_device_status(dev_id)
else:
logger.error(f"❌ Command failed: {res}")
except Exception as e:
logger.error(f"❌ Error sending command: {e}")

9
porta-lab/bridge/main.py

@ -0,0 +1,9 @@
#!/usr/bin/env python3
"""
Entry point for the Tuya MQTT Bridge.
"""
from bridge.core import TuyaBridgeCore
if __name__ == "__main__":
bridge = TuyaBridgeCore()
bridge.run()

23
porta-lab/bridge/utils.py

@ -0,0 +1,23 @@
"""
Utility functions for the Tuya MQTT Bridge.
"""
import re
import logging
def setup_logging():
"""Configures the logging for the application."""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
return logging.getLogger("TuyaBridge")
logger = setup_logging()
def slugify(text):
"""Converts text to a valid MQTT topic slug."""
if not text:
return "unknown"
text = text.lower().strip()
text = re.sub(r'[^a-z0-9]+', '_', text)
return text.strip('_')

422
porta-lab/tuya_mqtt_bridge.py

@ -1,422 +0,0 @@
#!/usr/bin/env python3
"""
Tuya Cloud MQTT Bridge (General Purpose)
Publishes status and handles commands for ALL Tuya devices via MQTT.
"""
import paho.mqtt.client as mqtt
import tinytuya
import json
import time
import logging
import re
import threading
from datetime import datetime
# Configuration
MQTT_BROKER = "mqtt"
MQTT_PORT = 1883
MQTT_CLIENT_ID = "tuya_bridge_general"
MQTT_TOPIC_PREFIX = "tuya"
# Tuya Cloud Credentials
TUYA_REGION = "eu"
TUYA_KEY = "3vkr7nafauo7ro69yeps"
TUYA_SECRET = "75c2e0ab536c4029bb1912f89cb5c6d9"
TUYA_DEVICE_ID = "bf6a2ef9072cf7b319qgh1" # Any valid device ID from the account
# Door/Curtain Configuration
DOOR_DEVICE_ID = "bfd206260a90dcb6ec8uah"
URI_TRIGGER_OPEN = 'cloud/scene/rule/4vxIR8nrcLzri9QW/actions/trigger'
URI_TRIGGER_CLOSE = 'cloud/scene/rule/OY6pWbCSciVLe6lV/actions/trigger'
# Update Interval (seconds)
STATUS_INTERVAL = 60
# Logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def slugify(text):
"""Converts text to a valid MQTT topic slug"""
text = text.lower().strip()
text = re.sub(r'[^a-z0-9]+', '_', text)
return text.strip('_')
class TuyaGeneralBridge:
def __init__(self):
self.cloud = tinytuya.Cloud(
apiRegion=TUYA_REGION,
apiKey=TUYA_KEY,
apiSecret=TUYA_SECRET,
apiDeviceID=TUYA_DEVICE_ID
)
self.mqtt = mqtt.Client(MQTT_CLIENT_ID)
self.mqtt.on_connect = self.on_connect
self.mqtt.on_message = self.on_message
self.mqtt.on_disconnect = self.on_disconnect
self.devices = {} # id -> device_info
self.slug_map = {} # slug -> id
self.ac_states = {} # id -> {power, mode, temp, wind}
self.running = True
def fetch_devices(self):
"""Fetches list of devices from Tuya Cloud"""
logger.info("🔄 Fetching devices from Tuya Cloud...")
try:
devices = self.cloud.getdevices()
if not devices:
logger.warning(" No devices found!")
return
# Pass 1: Map Local Keys to Parent IDs (Gateways/Blasters)
key_map = {}
for dev in devices:
if not dev.get('sub', False) or dev['category'] in ['qt', 'wnykq', 'smart_ir']:
if 'key' in dev:
key_map[dev['key']] = dev['id']
logger.info(f"🔑 Found {len(key_map)} potential parent devices (by key)")
# Pass 2: Process all devices
for dev in devices:
dev_id = dev['id']
name = dev['name']
category = dev['category']
slug = slugify(name)
# Handle duplicate slugs
original_slug = slug
counter = 1
while slug in self.slug_map and self.slug_map[slug] != dev_id:
slug = f"{original_slug}_{counter}"
counter += 1
parent_id = None
if category == 'infrared_ac':
# Try to find parent by key
if 'key' in dev and dev['key'] in key_map:
parent_id = key_map[dev['key']]
logger.info(f"🔗 Linked AC {name} to Parent {parent_id}")
# Initialize virtual state
self.ac_states[dev_id] = {
'power': 0,
'mode': 2, # Auto
'temp': 24,
'wind': 0 # Auto
}
self.devices[dev_id] = {
'name': name,
'slug': slug,
'category': category,
'data': dev,
'parent_id': parent_id
}
self.slug_map[slug] = dev_id
logger.info(f"📱 Discovered: {name} ({category}) -> tuya/{slug}")
except Exception as e:
logger.error(f"❌ Error fetching devices: {e}")
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
logger.info(f"✅ Connected to MQTT: {MQTT_BROKER}")
# Subscribe to all command topics
topic = f"{MQTT_TOPIC_PREFIX}/+/command"
client.subscribe(topic)
logger.info(f"📥 Subscribed to: {topic}")
# Publish bridge status
client.publish(f"{MQTT_TOPIC_PREFIX}/bridge/status", "online", retain=True)
# Publish discovery info
self.publish_discovery()
else:
logger.error(f"❌ MQTT Connection failed: {rc}")
def on_disconnect(self, client, userdata, rc):
logger.warning(f" Disconnected from MQTT (rc: {rc})")
def on_message(self, client, userdata, msg):
try:
topic_parts = msg.topic.split('/')
if len(topic_parts) != 3 or topic_parts[2] != 'command':
return
slug = topic_parts[1]
payload = msg.payload.decode('utf-8').strip()
if slug not in self.slug_map:
logger.warning(f" Unknown device slug: {slug}")
return
dev_id = self.slug_map[slug]
self.handle_command(dev_id, payload)
except Exception as e:
logger.error(f"❌ Error handling message: {e}")
def handle_command(self, dev_id, payload):
"""Process command for a device"""
dev_info = self.devices[dev_id]
dev_name = dev_info['name']
category = dev_info['category']
logger.info(f"📨 Command for {dev_name} (ID: {dev_id}): {payload}")
logger.info(f"🔍 Checking against DOOR_DEVICE_ID: {DOOR_DEVICE_ID}")
# Special Handling for Door/Curtain (Scene Trigger)
if dev_id == DOOR_DEVICE_ID:
logger.info("🚪 Match found! Handling door command...")
self.handle_door_command(payload)
return
# Special Handling for Infrared AC
if category == 'infrared_ac' and dev_info.get('parent_id'):
self.handle_ac_command(dev_id, payload)
return
commands = {}
# Try parsing JSON
try:
data = json.loads(payload)
if 'commands' in data:
commands = data # Already in Tuya format
elif 'code' in data and 'value' in data:
commands = {'commands': [data]}
else:
# Assume simple key-value pairs
cmds = []
for k, v in data.items():
cmds.append({'code': k, 'value': v})
commands = {'commands': cmds}
except json.JSONDecodeError:
# Handle simple string commands (shortcuts)
cmd_str = payload.lower()
category = self.devices[dev_id]['category']
# Robot Vacuum Shortcuts
if category == 'sd':
if cmd_str in ['start', 'clean']:
commands = {'commands': [{'code': 'power_go', 'value': True}]}
elif cmd_str in ['stop', 'pause']:
commands = {'commands': [{'code': 'power_go', 'value': False}]} # Or pause
# Note: Some robots use 'pause' code, others power_go=False
# Based on previous OKP analysis: 'pause' code exists
if cmd_str == 'pause':
commands = {'commands': [{'code': 'pause', 'value': True}]}
elif cmd_str in ['home', 'charge']:
commands = {'commands': [{'code': 'switch_charge', 'value': True}]}
elif cmd_str == 'seek':
commands = {'commands': [{'code': 'seek', 'value': True}]}
# Light Shortcuts
elif category == 'dj':
if cmd_str == 'on':
commands = {'commands': [{'code': 'switch_led', 'value': True}]}
elif cmd_str == 'off':
commands = {'commands': [{'code': 'switch_led', 'value': False}]}
# Switch/Socket Shortcuts
elif category in ['cz', 'kg']:
if cmd_str == 'on':
commands = {'commands': [{'code': 'switch_1', 'value': True}]}
elif cmd_str == 'off':
commands = {'commands': [{'code': 'switch_1', 'value': False}]}
if not commands:
logger.warning(f" Could not parse command for {dev_name}: {payload}")
return
# Send to Tuya
try:
logger.info(f"🚀 Sending to Tuya: {commands}")
res = self.cloud.sendcommand(dev_id, commands)
if res and res.get('success'):
logger.info(f"✅ Command executed for {dev_name}")
# Force immediate status update
time.sleep(1)
self.update_device_status(dev_id)
else:
logger.error(f"❌ Command failed: {res}")
except Exception as e:
logger.error(f"❌ Error sending command: {e}")
def handle_door_command(self, payload):
"""Handle commands for Door/Curtain using Scene Triggers"""
cmd = payload.lower().strip()
uri = None
# Parse JSON if needed
try:
data = json.loads(payload)
if 'commands' in data:
# Check for scene codes if passed as JSON
for c in data['commands']:
if c.get('code') == 'scene_1': # Assuming scene_1 maps to Open
uri = URI_TRIGGER_OPEN
elif c.get('code') == 'scene_2': # Assuming scene_2 maps to Close
uri = URI_TRIGGER_CLOSE
elif 'open' in data: # Simple JSON {"open": true}
uri = URI_TRIGGER_OPEN if data['open'] else URI_TRIGGER_CLOSE
except:
pass
# Simple String Commands
if cmd in ['open', 'on', 'true']:
uri = URI_TRIGGER_OPEN
elif cmd in ['close', 'off', 'false']:
uri = URI_TRIGGER_CLOSE
if uri:
logger.info(f"🚪 Triggering Door Scene: {uri}")
try:
self.cloud._tuyaplatform(uri=uri, ver="v2.0", action="POST")
logger.info("✅ Door Scene Triggered Successfully")
except Exception as e:
logger.error(f"❌ Error triggering door scene: {e}")
else:
logger.warning(f" Unknown door command: {payload}")
def handle_ac_command(self, dev_id, payload):
"""Handle commands for IR AC using v2.0 API"""
parent_id = self.devices[dev_id]['parent_id']
state = self.ac_states.get(dev_id, {'power': 0, 'mode': 2, 'temp': 24, 'wind': 0})
try:
data = json.loads(payload)
except:
logger.error(f"❌ Invalid JSON for AC command: {payload}")
return
# Update Virtual State
# Handle standard Tuya codes if present
if 'PowerOn' in data:
state['power'] = 1
elif 'PowerOff' in data:
state['power'] = 0
elif 'T' in data:
state['temp'] = int(data['T'])
elif 'M' in data:
state['mode'] = int(data['M'])
elif 'F' in data:
state['wind'] = int(data['F'])
# Handle direct keys if sent (e.g. from Node-RED direct injection)
if 'power' in data: state['power'] = int(data['power'])
if 'temp' in data: state['temp'] = int(data['temp'])
if 'mode' in data: state['mode'] = int(data['mode'])
if 'wind' in data: state['wind'] = int(data['wind'])
# Save state
self.ac_states[dev_id] = state
# Construct API Call
url = f"/v2.0/infrareds/{parent_id}/air-conditioners/{dev_id}/scenes/command"
logger.info(f"🚀 Sending AC State to {url}: {state}")
try:
res = self.cloud.cloudrequest(url, post=state)
if res and res.get('success'):
logger.info(f"✅ AC Command executed for {self.devices[dev_id]['name']}")
# Publish updated status back to MQTT so UI stays in sync
self.update_device_status(dev_id)
else:
logger.error(f"❌ AC Command failed: {res}")
except Exception as e:
logger.error(f"❌ Error sending AC command: {e}")
def update_device_status(self, dev_id):
"""Fetch and publish status for a single device"""
slug = self.devices[dev_id]['slug']
# For ACs, publish virtual state as data
if self.devices[dev_id]['category'] == 'infrared_ac':
state = self.ac_states.get(dev_id, {})
payload = {
'timestamp': datetime.now().isoformat(),
'online': True,
'data': state
}
topic = f"{MQTT_TOPIC_PREFIX}/{slug}/status"
self.mqtt.publish(topic, json.dumps(payload), retain=True)
return
try:
res = self.cloud.getstatus(dev_id)
if res and 'result' in res:
status = {}
for item in res['result']:
status[item['code']] = item['value']
# Add metadata
payload = {
'timestamp': datetime.now().isoformat(),
'online': True, # If we got status, it's likely online (cloud-wise)
'data': status
}
topic = f"{MQTT_TOPIC_PREFIX}/{slug}/status"
self.mqtt.publish(topic, json.dumps(payload), retain=True)
# logger.debug(f"📤 Updated {slug}")
else:
# Offline or error
payload = {
'timestamp': datetime.now().isoformat(),
'online': False,
'error': 'No result from cloud'
}
self.mqtt.publish(f"{MQTT_TOPIC_PREFIX}/{slug}/status", json.dumps(payload), retain=True)
except Exception as e:
logger.error(f"❌ Error updating {slug}: {e}")
def publish_discovery(self):
"""Publish list of devices"""
discovery_data = []
for dev_id, info in self.devices.items():
discovery_data.append({
'name': info['name'],
'slug': info['slug'],
'category': info['category'],
'id': dev_id
})
self.mqtt.publish(f"{MQTT_TOPIC_PREFIX}/discovery", json.dumps(discovery_data), retain=True)
def run(self):
logger.info("🚀 Starting Tuya General Bridge...")
self.fetch_devices()
# Connect MQTT
try:
self.mqtt.connect(MQTT_BROKER, MQTT_PORT, 60)
self.mqtt.loop_start()
except Exception as e:
logger.error(f"❌ MQTT Connection Error: {e}")
return
# Main Loop
try:
while self.running:
logger.info("🔄 Updating all devices...")
for dev_id in self.devices:
self.update_device_status(dev_id)
time.sleep(0.5) # Avoid rate limits
time.sleep(STATUS_INTERVAL)
except KeyboardInterrupt:
logger.info("👋 Stopping...")
finally:
self.mqtt.loop_stop()
self.mqtt.disconnect()
if __name__ == "__main__":
bridge = TuyaGeneralBridge()
bridge.run()
Loading…
Cancel
Save