[과제명][학교][이름] 바꿔주세요, 과제태그를 선생님 설명 듣고 넣어주세요.
웹 AI모델로 피코 제어하기
# 통합된 BLE 서보모터 및 LED 제어 시스템
# ble_advertising.py, ble_simple_peripheral.py, main.py를 하나로 합친 버전
from machine import Pin, PWM
from micropython import const
import bluetooth
import struct
import time
import random
# ========== BLE Advertising 관련 코드 ==========
# Advertising payloads are repeated packets of the following form:
# 1 byte data length (N + 1)
# 1 byte type (see constants below)
# N bytes type-specific data
_ADV_TYPE_FLAGS = const(0x01)
_ADV_TYPE_NAME = const(0x09)
_ADV_TYPE_UUID16_COMPLETE = const(0x3)
_ADV_TYPE_UUID32_COMPLETE = const(0x5)
_ADV_TYPE_UUID128_COMPLETE = const(0x7)
_ADV_TYPE_UUID16_MORE = const(0x2)
_ADV_TYPE_UUID32_MORE = const(0x4)
_ADV_TYPE_UUID128_MORE = const(0x6)
_ADV_TYPE_APPEARANCE = const(0x19)
def advertising_payload(limited_disc=False, br_edr=False, name=None, services=None, appearance=0):
payload = bytearray()
def _append(adv_type, value):
nonlocal payload
payload += struct.pack("BB", len(value) + 1, adv_type) + value
_append(
_ADV_TYPE_FLAGS,
struct.pack("B", (0x01 if limited_disc else 0x02) + (0x18 if br_edr else 0x04)),
)
if name:
_append(_ADV_TYPE_NAME, name)
if services:
for uuid in services:
b = bytes(uuid)
if len(b) == 2:
_append(_ADV_TYPE_UUID16_COMPLETE, b)
elif len(b) == 4:
_append(_ADV_TYPE_UUID32_COMPLETE, b)
elif len(b) == 16:
_append(_ADV_TYPE_UUID128_COMPLETE, b)
# See org.bluetooth.characteristic.gap.appearance.xml
if appearance:
_append(_ADV_TYPE_APPEARANCE, struct.pack("<h", appearance))
return payload
def decode_field(payload, adv_type):
i = 0
result = []
while i + 1 < len(payload):
if payload[i + 1] == adv_type:
result.append(payload[i + 2 : i + payload[i] + 1])
i += 1 + payload[i]
return result
def decode_name(payload):
n = decode_field(payload, _ADV_TYPE_NAME)
return str(n[0], "utf-8") if n else ""
def decode_services(payload):
services = []
for u in decode_field(payload, _ADV_TYPE_UUID16_COMPLETE):
services.append(bluetooth.UUID(struct.unpack("<h", u)[0]))
for u in decode_field(payload, _ADV_TYPE_UUID32_COMPLETE):
services.append(bluetooth.UUID(struct.unpack("<d", u)[0]))
for u in decode_field(payload, _ADV_TYPE_UUID128_COMPLETE):
services.append(bluetooth.UUID(u))
return services
# ========== BLE Simple Peripheral 관련 코드 ==========
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_FLAG_READ = const(0x0002)
_FLAG_WRITE_NO_RESPONSE = const(0x0004)
_FLAG_WRITE = const(0x0008)
_FLAG_NOTIFY = const(0x0010)
_UART_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX = (
bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"),
_FLAG_READ | _FLAG_NOTIFY,
)
_UART_RX = (
bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"),
_FLAG_WRITE | _FLAG_WRITE_NO_RESPONSE,
)
_UART_SERVICE = (
_UART_UUID,
(_UART_TX, _UART_RX),
)
class BLESimplePeripheral:
def __init__(self, ble, name="SJM"):
self._ble = ble
self._ble.active(True)
self._ble.irq(self._irq)
((self._handle_tx, self._handle_rx),) = self._ble.gatts_register_services((_UART_SERVICE,))
self._connections = set()
self._write_callback = None
self._payload = advertising_payload(name=name, services=[_UART_UUID])
self._advertise()
def _irq(self, event, data):
# Track connections so we can send notifications.
if event == _IRQ_CENTRAL_CONNECT:
conn_handle, _, _ = data
print("New connection", conn_handle)
self._connections.add(conn_handle)
elif event == _IRQ_CENTRAL_DISCONNECT:
conn_handle, _, _ = data
print("Disconnected", conn_handle)
self._connections.remove(conn_handle)
# Start advertising again to allow a new connection.
self._advertise()
elif event == _IRQ_GATTS_WRITE:
conn_handle, value_handle = data
value = self._ble.gatts_read(value_handle)
if value_handle == self._handle_rx and self._write_callback:
self._write_callback(value)
def send(self, data):
for conn_handle in self._connections:
self._ble.gatts_notify(conn_handle, self._handle_tx, data)
def is_connected(self):
return len(self._connections) > 0
def _advertise(self, interval_us=500000):
print("Starting advertising")
self._ble.gap_advertise(interval_us, adv_data=self._payload)
def on_write(self, callback):
self._write_callback = callback
# ========== 간단한 큐 클래스 ==========
class SimpleQueue:
def __init__(self, maxsize):
self.queue = []
self.maxsize = maxsize
def put(self, item):
if len(self.queue) < self.maxsize:
self.queue.append(item)
else:
raise OverflowError("Queue is full")
def get(self):
if len(self.queue) > 0:
return self.queue.pop(0)
else:
raise IndexError("Queue is empty")
def empty(self):
return len(self.queue) == 0
def full(self):
return len(self.queue) >= self.maxsize
# ========== 제어 명령 처리 함수들 (위쪽 배치) ==========
def handle_servo_left():
"""a: 서보모터1 왼쪽으로"""
global servo1_angle
servo1_angle = max(0, servo1_angle - 10)
servo1.duty_u16(angle_to_duty(servo1_angle))
print(f"Servo1 left: {servo1_angle}°")
def handle_servo_right():
"""d: 서보모터1 오른쪽으로"""
global servo1_angle
servo1_angle = min(180, servo1_angle + 10)
servo1.duty_u16(angle_to_duty(servo1_angle))
print(f"Servo1 right: {servo1_angle}°")
def handle_servo2_up():
"""w: 서보모터2 위로"""
global servo2_angle
servo2_angle = min(180, servo2_angle + 10)
servo2.duty_u16(angle_to_duty(servo2_angle))
print(f"Servo2 up: {servo2_angle}°")
def handle_servo2_down():
"""s: 서보모터2 아래로"""
global servo2_angle
servo2_angle = max(0, servo2_angle - 10)
servo2.duty_u16(angle_to_duty(servo2_angle))
print(f"Servo2 down: {servo2_angle}°")
def handle_key_1():
"""1: 내부 LED 켜기"""
onboard_led.on()
print("Onboard LED ON")
def handle_key_2():
"""2: 내부 LED 끄기"""
onboard_led.off()
print("Onboard LED OFF")
def handle_key_3():
"""3: 외부 LED 깜빡이기"""
global led_blink_active
led_blink_active = not led_blink_active
if not led_blink_active:
external_led.off()
print(f"External LED blinking: {'ON' if led_blink_active else 'OFF'}")
def handle_key_4():
"""4: RGB LED 빨간색"""
rgb_red.duty_u16(65535)
rgb_green.duty_u16(0)
rgb_blue.duty_u16(0)
print("RGB LED: Red")
def handle_key_5():
"""5: RGB LED 초록색"""
rgb_red.duty_u16(0)
rgb_green.duty_u16(65535)
rgb_blue.duty_u16(0)
print("RGB LED: Green")
def handle_key_6():
"""6: RGB LED 파란색"""
rgb_red.duty_u16(0)
rgb_green.duty_u16(0)
rgb_blue.duty_u16(65535)
print("RGB LED: Blue")
def handle_key_7():
"""7: RGB LED 색상 순환"""
global rgb_cycle_active
rgb_cycle_active = not rgb_cycle_active
print(f"RGB LED cycling: {'ON' if rgb_cycle_active else 'OFF'}")
def handle_key_8():
"""8: 예약"""
pass
def handle_key_9():
"""9: 예약"""
pass
def handle_reset_all():
"""x: 모두 초기화"""
global servo1_angle, servo2_angle, led_blink_active, rgb_cycle_active
# 모든 활동 중지
led_blink_active = False
rgb_cycle_active = False
# 서보모터 90도로 초기화
servo1_angle = 90
servo2_angle = 90
servo1.duty_u16(angle_to_duty(servo1_angle))
servo2.duty_u16(angle_to_duty(servo2_angle))
# 모든 LED 끄기
onboard_led.off()
external_led.off()
rgb_red.duty_u16(0)
rgb_green.duty_u16(0)
rgb_blue.duty_u16(0)
print("All systems reset")
# ========== 메인 애플리케이션 코드 ==========
# BLE 초기화
ble = bluetooth.BLE()
sp = BLESimplePeripheral(ble)
# 핀 초기화
# 서보모터 (변경된 핀)
servo1 = PWM(Pin(15)) # 서보모터1
servo2 = PWM(Pin(16)) # 서보모터2
servo1.freq(50)
servo2.freq(50)
# LED 핀
onboard_led = Pin("LED", Pin.OUT) # 내장 LED (Pico W의 경우)
external_led = Pin(1, Pin.OUT) # 외부 LED
# RGB LED 핀 (변경된 핀)
rgb_red = PWM(Pin(19))
rgb_green = PWM(Pin(20))
rgb_blue = PWM(Pin(21))
rgb_red.freq(1000)
rgb_green.freq(1000)
rgb_blue.freq(1000)
# 서보모터 각도 상태
servo1_angle = 90
servo2_angle = 90
# 제어 상태 변수
led_blink_active = False
rgb_cycle_active = False
led_blink_state = False
rgb_color_index = 0
# 타이밍 제어 변수
last_led_blink = 0
last_rgb_change = 0
# 데이터 처리 큐
data_queue = SimpleQueue(maxsize=10)
# RGB 색상 배열
rgb_colors = [
(65535, 0, 0), # 빨강
(0, 65535, 0), # 초록
(0, 0, 65535), # 파랑
(65535, 65535, 0), # 노랑
(65535, 0, 65535), # 자홍
(0, 65535, 65535), # 청록
(32768, 32768, 32768), # 흰색
]
# 각도를 서보모터의 듀티 값으로 변환
def angle_to_duty(angle):
return int(3277 + (angle * 3277 / 90)) # 0° = 3277 (~0.5ms), 180° = 6553 (~2.5ms)
# 주기적 작업 처리
def handle_periodic_tasks():
global led_blink_active, led_blink_state, last_led_blink
global rgb_cycle_active, rgb_color_index, last_rgb_change
current_time = time.ticks_ms()
# LED 깜빡이기 처리 (500ms 간격)
if led_blink_active and time.ticks_diff(current_time, last_led_blink) > 500:
led_blink_state = not led_blink_state
external_led.value(led_blink_state)
last_led_blink = current_time
# RGB LED 색상 변경 처리 (1000ms 간격)
if rgb_cycle_active and time.ticks_diff(current_time, last_rgb_change) > 1000:
r, g, b = rgb_colors[rgb_color_index]
rgb_red.duty_u16(r)
rgb_green.duty_u16(g)
rgb_blue.duty_u16(b)
rgb_color_index = (rgb_color_index + 1) % len(rgb_colors)
last_rgb_change = current_time
# 수신 데이터 처리
def process_data():
while not data_queue.empty():
try:
data = data_queue.get()
print("Processing data:", data)
# 단일 문자 명령 처리
if len(data) == 1:
cmd = chr(data[0])
# 서보모터 제어
if cmd == 'a':
handle_servo_left()
elif cmd == 'd':
handle_servo_right()
elif cmd == 'w':
handle_servo2_up()
elif cmd == 's':
handle_servo2_down()
# 숫자 키 1-9
elif cmd == '1':
handle_key_1()
elif cmd == '2':
handle_key_2()
elif cmd == '3':
handle_key_3()
elif cmd == '4':
handle_key_4()
elif cmd == '5':
handle_key_5()
elif cmd == '6':
handle_key_6()
elif cmd == '7':
handle_key_7()
elif cmd == '8':
handle_key_8()
elif cmd == '9':
handle_key_9()
# 리셋
elif cmd == 'x':
handle_reset_all()
except Exception as e:
print("Error processing data:", e)
# BLE 데이터 수신 핸들러
def on_rx(data):
try:
print("Received:", data)
if not data_queue.full():
data_queue.put(data) # 데이터를 큐에 저장
else:
print("Data queue is full, dropping data")
except Exception as e:
print("Error receiving data:", e)
# 초기화
handle_reset_all() # 모든 시스템 초기화
print("BLE Controller Started")
print("=== Control Commands ===")
print("Servo Control:")
print(" a: Servo1 Left (-10°)")
print(" d: Servo1 Right (+10°)")
print(" w: Servo2 Up (+10°)")
print(" s: Servo2 Down (-10°)")
print("LED Control:")
print(" 1: Onboard LED ON")
print(" 2: Onboard LED OFF")
print(" 3: External LED Blink Toggle")
print("RGB LED Control:")
print(" 4: RGB Red")
print(" 5: RGB Green")
print(" 6: RGB Blue")
print(" 7: RGB Cycle Toggle")
print("Reserved:")
print(" 8: (Reserved)")
print(" 9: (Reserved)")
print("System:")
print(" x: Reset All")
print("Waiting for connections...")
# 메인 루프
while True:
if sp.is_connected():
sp.on_write(on_rx) # BLE 데이터 수신
process_data() # 큐에 있는 데이터를 처리
handle_periodic_tasks() # 주기적 작업 처리
else:
time.sleep(0.1) # 연결되지 않은 경우 대기
Python
복사






