"""
Joystick, EMGによるロボットカー制御:送信側 (Sender, Client)
20250831-robotcar-sender.py
(修正:SSD1306のtext座標描画で3行固定表示:CMD / EMG / Thr)
"""
import network
import espnow
from machine import Pin, ADC, I2C
import time
from oledcs import OLEDCS_I2C
from ssd1306 import SSD1306_I2C # ★ 追加:座標指定描画に使用
# ===== 定数設定 =====
AVERAGE = 100
THRESHOLD = 10
JOY_LOW = 1000
JOY_HIGH = 3000
KEEPALIVE_MS = 300 # 受信側FAILSAFE=700ms未満
OLED_MIN_INTERVAL_MS = 200 # OLED間引き更新の最短間隔
CHANNEL = 6
# I2C OLED初期化 (SSD1306)
i2c = I2C(0, scl=Pin(22), sda=Pin(21), freq=400000) # ESP32用
oled = OLEDCS_I2C(128, 64, i2c) # 既存のまま(不要なら使わなくてもOK)
ssd = SSD1306_I2C(128, 64, i2c) # ★ 追加:座標描画用の純正ドライバ
# RobotCar3およびチョロQのコントロールモニターLEDピンの初期化
led_forward = Pin(15, Pin.OUT)
led_backward = Pin(2, Pin.OUT)
led_left = Pin(4, Pin.OUT)
led_right = Pin(17, Pin.OUT)
# チョロQ制御ピンの初期化
ChoroQ_forward = Pin(18, Pin.OUT)
ChoroQ_backward = Pin(19, Pin.OUT)
# 筋電位計測用ピンの初期化
EMG_left = ADC(Pin(36))
EMG_right = ADC(Pin(39))
try:
EMG_left.atten(ADC.ATTN_11DB)
EMG_right.atten(ADC.ATTN_11DB)
except:
pass
# ジョイスティックのx, y計測用ピンの初期化
joystick_x = ADC(Pin(34))
joystick_y = ADC(Pin(35))
joystick_x.atten(ADC.ATTN_11DB)
joystick_y.atten(ADC.ATTN_11DB)
# Wi-FiをSTAモードに設定
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
try:
wlan.config(channel=CHANNEL) # 受信側とチャネル一致
except:
pass
# ESP-NOWの初期化
esp = espnow.ESPNow()
esp.active(True)
# 受信側のMACアドレスを登録(XIAO ESP32-C3)
peer_mac = b'\x9c\x9en\xf7\x17H'
try:
esp.add_peer(peer_mac)
except Exception as e:
print("Failed to add peer:", e)
# LEDとモーターをリセットする関数
def reset_leds():
led_forward.off()
led_backward.off()
led_left.off()
led_right.off()
ChoroQ_forward.off()
ChoroQ_backward.off()
# ==== OLED 点滅防止(差分+間引き、3行固定位置に描画) ====
_last_lines = ["","",""]
_last_oled_ms = 0
def oled_update_3lines(line1, line2, line3):
global _last_lines, _last_oled_ms
now = time.ticks_ms()
new = [line1, line2, line3]
if (new != _last_lines) and (time.ticks_diff(now, _last_oled_ms) >= OLED_MIN_INTERVAL_MS):
try:
ssd.fill(0) # 画面クリア
ssd.text(line1, 0, 0) # 1行目 (y=0)
ssd.text(line2, 0, 16) # 2行目 (y=16)
ssd.text(line3, 0, 32) # 3行目 (y=32)
ssd.show()
except Exception as e:
# 万一ssdが失敗した時のフォールバック(1行ずつprint)
try:
oled.clear()
oled.print(line1)
oled.print(line2)
oled.print(line3)
except:
pass
_last_lines = new[:]
_last_oled_ms = now
# ==== KeepAlive送信 ====
_last_cmd = None
_last_send_ms = 0
# メインループ
while True:
# ジョイスティックの値を取得
x_value = joystick_x.read()
y_value = joystick_y.read()
# 筋電位の平均値を計算(元ルール)
val_left = sum(EMG_left.read() for _ in range(AVERAGE)) // AVERAGE
val_right = sum(EMG_right.read() for _ in range(AVERAGE)) // AVERAGE
# --- GND時の微小ノイズ対策:閾値未満は0扱い ---
if val_left < THRESHOLD: val_left = 0
if val_right < THRESHOLD: val_right = 0
# 状態の初期化
reset_leds()
decision = "stop"
# ===== 判定 =====
joystick_home = (JOY_LOW <= x_value <= JOY_HIGH) and (JOY_LOW <= y_value <= JOY_HIGH)
if not joystick_home:
# ジョイスティック優先
if y_value < JOY_LOW:
decision = "forward"; led_forward.on(); ChoroQ_forward.on()
elif y_value > JOY_HIGH:
decision = "backward"; led_backward.on(); ChoroQ_backward.on()
elif x_value < JOY_LOW:
decision = "left_turn"; led_left.on()
elif x_value > JOY_HIGH:
decision = "right_turn"; led_right.on()
else:
# デッドゾーン内 → EMGでフォールバック(元ルール)
if (val_left >= THRESHOLD) and (val_right < THRESHOLD):
decision = "forward"; led_forward.on(); ChoroQ_forward.on()
elif (val_right >= THRESHOLD) and (val_left < THRESHOLD):
decision = "backward"; led_backward.on(); ChoroQ_backward.on()
else:
decision = "stop"
else:
# ジョイスティックがホームでも EMG が閾値超なら命令(元ルール)
if (val_left >= THRESHOLD) and (val_right < THRESHOLD):
decision = "forward"; led_forward.on(); ChoroQ_forward.on()
elif (val_right >= THRESHOLD) and (val_left < THRESHOLD):
decision = "backward"; led_backward.on(); ChoroQ_backward.on()
else:
decision = "stop"
# ★ OLEDを3行で確実に改行表示(CMD / EMG / Thr)
oled_update_3lines(
"CMD: " + decision,
"EMG L:%4d R:%4d" % (val_left, val_right),
"Thr=%d" % THRESHOLD
)
# ESP-NOWでコマンドを送信(変化時は即、同一継続時も一定間隔で再送)
need_send = False
now = time.ticks_ms()
if decision != _last_cmd:
need_send = True
elif time.ticks_diff(now, _last_send_ms) >= KEEPALIVE_MS:
need_send = True
if need_send:
try:
esp.send(peer_mac, decision.encode())
print("Command Sent:", decision)
_last_cmd = decision
_last_send_ms = now
except Exception as e:
print("Failed to send:", e)
# デバッグ出力(元形式)
print("x=%d, y=%d, Left EMG=%d, Right EMG=%d, Decision=%s" %
(x_value, y_value, val_left, val_right, decision))
time.sleep(0.1)