#!/usr/bin/env python3 import sys import time import json import ftd2xx import os OPEN_BY_SERIAL_NUMBER = 1 script_dir = os.path.dirname(os.path.abspath(__file__)) def load_frames(json_path): with open(json_path, "r", encoding="utf-8") as f: data = json.load(f) data.sort(key=lambda x: x["frame_num"]) return data def send_and_check_frame(dev, frame, current_bitmode): fnum = frame["frame_num"] desc = frame.get("description", f"Frame {fnum}") print(f"[A] Packet {fnum} - {desc}") ftype = frame["type"] if ftype == "purge": purge_type = frame.get("purge_type", 1) dev.purge(purge_type) print(f"[A] PURGE called with type={purge_type}") elif ftype == "setBitMode": bitmask = int(frame["bitmask"], 16) mode = int(frame["mode"], 16) if current_bitmode != (bitmask, mode): dev.setBitMode(bitmask, mode) current_bitmode = (bitmask, mode) print(f"[A] setBitMode(bitmask={bitmask:#04x}, mode={mode:#04x})") else: print("[A] setBitMode skipped (already in mode).") elif ftype == "write": payload_hex = frame["payload_hex"] payload = bytes.fromhex(payload_hex.replace(" ", "")) print(f"[A] Sending: {payload.hex()}") dev.write(payload) print("[A] Sent.") elif ftype == "read": print("[A] Performing read from device.") er = frame.get("expected_response", {}) length = er.get("length", 64) starts_with_hex = er.get("starts_with_hex") accept_less = er.get("accept_less", False) print(f"[A] Single read({length}), relying on FTDI timeouts") try: received = dev.read(length) except ftd2xx.ftd2xx.DeviceError as e: print(f"[A] [ERROR] Read failed: {e}") sys.exit(1) print(f"[A] Received {len(received)} bytes: {received.hex()}") if starts_with_hex: starts_bytes = bytes.fromhex(starts_with_hex.replace(" ", "")) if not received.startswith(starts_bytes): print(f"[A] Expected: {starts_bytes.hex()}") print(f"[A] Received : {received.hex()}") print("[A] Response mismatch! Exiting.") sys.exit(1) else: print("[A] Response matches expectation.") if not accept_less and len(received) < length: print(f"[A] Expected response length not met (only {len(received)} bytes). Exiting.") sys.exit(1) elif ftype == "resetDevice": dev.resetDevice() print("[A] resetDevice called.") elif ftype == "purgeRxTx": dev.purge(1) dev.purge(2) print("[A] PURGE_RX and PURGE_TX called.") elif ftype == "no_op": print("[A] No operation (no action).") elif ftype == "delay": delay_ms = frame.get("delay_ms", 0) print(f"[A] Frame {fnum}: Delay for {delay_ms} ms") time.sleep(delay_ms / 1000.0) elif ftype == "upload_algo": file_path = frame.get("file_path") chunk_size = frame.get("chunk_size", 32768) fill_last_chunk = frame.get("fill_last_chunk", False) if not file_path: print(f"[A] Error: 'file_path' missing for frame {fnum}.") sys.exit(1) algo_path = os.path.join(script_dir, file_path) if not os.path.isfile(algo_path): print(f"[A] File {algo_path} does not exist!") sys.exit(1) with open(algo_path, "rb") as fobj: data = fobj.read() total_len = len(data) print(f"[A] Starting upload of {file_path}, size {total_len} B, chunk_size={chunk_size}.") offset = 0 while offset < total_len: remain = total_len - offset if remain >= chunk_size: chunk = data[offset : offset + chunk_size] offset += chunk_size else: chunk = data[offset : offset + remain] offset += remain if fill_last_chunk and len(chunk) < chunk_size: padding_len = chunk_size - len(chunk) chunk += b'\xFF' * padding_len print(f"[A] Sending chunk {len(chunk)} B (offset={offset - len(chunk)}..{offset - 1}).") dev.write(chunk) print("[A] Chunk sent.") ack = dev.read(1) if ack: print(f"[A] Received ack 1B: {ack.hex()} (no verification)") else: print(f"[A] No ack byte received (timeout?). Continuing...") print("[A] File upload completed.") elif ftype == "repeat_frames": print("[A] repeat_frames handling occurs in initialize_channel_a (pseudo frame).") else: print(f"[A] Unknown type: {ftype}") return current_bitmode def initialize_channel_a(dev, frames): current_bitmode = None i = 0 while i < len(frames): frame = frames[i] ftype = frame["type"] if ftype == "repeat_frames": repeat_count = frame.get("repeat_count", 1) back_frames = frame.get("back_frames", 1) print(f"[A] repeat_frames: repeat_count={repeat_count}, back_frames={back_frames}") if repeat_count <= 0: print("[A] repeat_frames: repeat_count <= 0, moving forward.") i += 1 continue repeat_count -= 1 frame["repeat_count"] = repeat_count jump_target = i - back_frames if jump_target < 0: print("[A] Warning: jump_target out of range, ignoring.") i += 1 else: print(f"[A] Jumping back {back_frames} frames to index {jump_target}.") i = jump_target else: current_bitmode = send_and_check_frame(dev, frame, current_bitmode) i += 1 def main(): json_path = os.path.join(script_dir, "RT809H-frames.json") if not os.path.isfile(json_path): print(f"[ERROR] JSON file not found at path: {json_path}") sys.exit(1) frames = load_frames(json_path) serial_a = b'ggggggggA' serial_b = b'ggggggggB' try: dev_a = ftd2xx.openEx(serial_a, OPEN_BY_SERIAL_NUMBER) print(f"[A] Device opened: {serial_a.decode()} -> handle={dev_a}") except ftd2xx.ftd2xx.DeviceError as e: print(f"[A] [ERROR] Opening A failed: {e}") dev_a = None if dev_a: initialize_channel_a(dev_a, frames) dev_a.close() print("[A] Device closed.") if __name__ == "__main__": main()