スマート監視AI解析端末の開発実践:構築からデプロイまでの完全ガイド

スマート監視AI解析端末は「カメラ+エッジAIゲートウェイ+ローカル推論+アラーム出力」を統合したデバイスであり、リアルタイム動画ストリームの分析(顔認識・異常行動検出・物体カウントなど)をクラウドに依存せずに実現します。本記事では「RK3588開発ボード+YOLOv8+USBカメラ」を例に、ハードウェア接続からソフトウェアデプロイまでの一連のフローを手順通りに解説します。すべてのコードは実際のプロジェクトへの移植が可能です。

一、スマート監視AI解析端末の基礎知識(初心者必見)

  1. 端末の主要構成

スマート監視AI解析端末 = 動画取得モジュール + エッジAI計算モジュール + アラーム出力モジュール + ストレージ/通信モジュール。従来の監視端末との違い:

特性 従来の監視端末 スマート監視AI解析端末
主要機能 動画録画・リアルタイムプレビュー 動画録画+AI認識(顔/物体/行動)+異常アラーム+ローカルストレージ
依存条件 クラウドまたはHDD録画機が必要 ローカルAI推論でオフラインでも動作可能。アラーム結果のみアップロード
応答速度 リアルタイム分析なし、手動確認必要 ローカル推論遅延<100ms、異常時リアルタイムアラーム
主要コンポーネント カメラ+通信モジュール カメラ+AI開発ボード(NPU搭載)+アラーム装置(LED/蜂鳴器)
  1. 主要な応用シーン

  • 安全監視:オフィス/工場の不正侵入検出、見知らぬ人識別、夜間徘徊警告;
  • 工業監視:生産ラインの安全帯未着用検出、機械状態認識;
  • 商業監視:店舗客数計測、商品盗難検出、VIP顧客識別;
  • 家庭監視:高齢者/子供の活動監視、見知らぬ人侵入アラーム。
  1. 必修技術スタック

  • ハードウェア:NPU搭載AI開発ボード、USB/ネットワークカメラ、アラーム装置(LED/蜂鳴器)、電源;
  • オペレーティングシステム:Ubuntu 20.04(開発ボード対応版);
  • AIフレームワーク:Ultralytics YOLOv8(目標検出/行動認識)、OpenCV(動画処理);
  • 開発言語:Python(開発効率)、C++(性能最適化);
  • ストレージ/通信:ローカルSDカード/SSD(アラーム動画保存)、MQTT(アラーム情報クラウド送信)。

二、ハードウェア選定と接続(実践推奨案)

高コストパフォーマンス+エコシステムが整ったハードウェア組み合わせを優先。初心者向けにはRK3588開発ボード(内蔵NPU、算力十分)を推奨し、産業シーンではJetson Orin Nanoへのグレードアップが可能。

  1. ハードウェアリスト(初心者向け/産業向け)

部品タイプ 初心者向け構成(予算2000~3000元) 産業向け構成(予算5000~8000元)
AI開発ボード RK3588開発ボード(8GBメモリ、内蔵6TOPS NPU) 英伟達 Jetson Orin Nano(8GBメモリ、20TOPS NPU)
カメラ USB高解像度カメラ(1080P、MJPGフォーマット対応) 産業ネットワークカメラ(PoE給電、200万画素、RTSPストリーム対応)
アラーム装置 LEDランプ+蜂鳴器(GPIO制御) 音光アラーム装置(RS485インターフェース、産業用防水仕様)
ストレージ装置 64GB高速SDカード 512GB NVMe SSD(アラーム動画保存用)
電源 12V 5A直流電源(安定供給) 12V 10A幅広電源(産業電圧変動対応)
補助部品 ジャンパ線、ブレッドボード、HDMIディスプレイ(デバッグ用) PoEスイッチ、産業用防水接続ボックス、M12コネクタ
  1. ハードウェア接続(RK3588+USBカメラ例)

(1)主要接続図

RK3588開発ボードピン 外付け装置 接続説明
USB 3.0ポート USBカメラ 動画ストリーム伝送(1080P@30fps)
GPIO1_1(物理ピン12) LED正極 高電圧でLED点灯(220Ω抵抗で電流制限)
GPIO1_2(物理ピン13) 蜂鳴器正極 高電圧で蜂鳴器駆動(1kΩ抵抗直列)
GNDピン LED負極+蜂鳴器負極 共通接地
HDMIポート ディスプレイ デバッグ時に動画プレビュー
ネットワークポート ルーター アラーム情報をインターネットに送信

(2)接続注意事項

  • GPIOピンは抵抗を直列に接続:LEDは220Ω、蜂鳴器は1kΩ抵抗を使用してピンを保護;
  • 電源極性は逆接続しない:開発ボードとカメラの正負極を区別し、逆接続で機器破損;
  • 産業環境ではシールド線使用:RS485インターフェース、ネットワークケーブルはEMI対策のためにシールド線を選択。

三、開発環境構築(RK3588+Ubuntu 20.04例)

  1. システムインストール

  1. RK3588対応のUbuntu 20.04イメージをダウンロード(公式提供の「Ubuntu 20.04 Server with NPUドライバ」を推奨);

  2. Etcherツールでイメージを64GB SDカードに焼印;

  3. 開発ボードに挿入し、ディスプレイ、キーボード、電源を接続し、起動後初期設定(ユーザー名・パスワード・WiFi設定)を完了。

  4. 核心依存ライブラリのインストール


SSHで開発ボードに接続(または直接ターミナル操作),以下のコマンドでAIフレームワークとツールライブラリをインストール:

# システム更新

sudo apt update && sudo apt upgrade -y

# 基本依存のインストール

sudo apt install python3-pip python3-numpy python3-opencv git cmake -y

# YOLOv8のインストール(目標検出のコアフレームワーク)

pip3 install ultralytics

# GPIO制御ライブラリのインストール(RK3588専用)

pip3 install rpi-gpio  # 不適合の場合:pip3 install gpiod

# MQTT通信ライブラリのインストール(アラーム情報送信用)

pip3 install paho-mqtt

# 動画エンコーディングライブラリのインストール(OpenCVによるカメラ読み込み問題解決)

sudo apt install ffmpeg libavcodec-dev libavformat-dev -y

  1. 環境確認

以下のコードを実行して、カメラ、YOLOv8、GPIOが正常に動作することを確認:

import cv2

from ultralytics import YOLO

import RPi.GPIO as GPIO

import time

# カメラ確認

cap = cv2.VideoCapture(0)  # 0=デフォルトUSBカメラ

cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)

cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)

ret, frame = cap.read()

if ret:

    print("カメラ動作正常、画質:", frame.shape)

else:

    print("カメラ読み込み失敗!")

cap.release()

# YOLOv8確認

model = YOLO('yolov8n.pt')  # 軽量モデル読み込み

results = model(frame) if ret else None

if results:

    print("YOLOv8モデル読み込み成功、検出対象数:", len(results[0].boxes))

# GPIO確認

GPIO.setmode(GPIO.BCM)

GPIO.setup(18, GPIO.OUT)  # GPIO1_1対応BCM番号18

GPIO.output(18, GPIO.HIGH)

time.sleep(1)

GPIO.output(18, GPIO.LOW)

GPIO.cleanup()

print("GPIO制御正常!")

エラーがない場合、環境構築完了です。

四、主要機能開発(四大モジュール完全コード)

スマート監視AI解析端末の主要機能は「動画取得→AI認識→異常アラーム→データ保存/送信」です。以下に各モジュールの実装コードを紹介します。

  1. モジュール1:動画取得(USB/ネットワークカメラ)

USBカメラ(初心者向け)とネットワークカメラ(産業向け)をサポートし、動画ストリーム形式を自動適応:

import cv2

import time

class VideoInputHandler:

    def __init__(self, source=0, is_rtsp=False):

        """
        source:カメラソース(0=USBカメラ、RTSPアドレス=ネットワークカメラ)

        is_rtsp:RTSPストリームか否か(産業カメラでよく使われる)

        """

        self.is_rtsp = is_rtsp

        self.cap = self._initialize_camera(source)

    def _initialize_camera(self, source):

        """カメラ初期化"""

        cap = cv2.VideoCapture(source)

        if self.is_rtsp:

            # ネットワークカメラRTSPストリーム設定(遅延低減)

            cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)

            cap.set(cv2.CAP_PROP_FPS, 30)

        else:

            # USBカメラ設定(1080P@30fps)

            cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)

            cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)

            cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('MJPG'))

        if not cap.isOpened():

            raise Exception(f"カメラ初期化失敗!ソース:{source}")

        return cap

    def get_frame(self):

        """1フレーム取得"""

        ret, frame = self.cap.read()

        if not ret:

            # 一時的な断流対策

            time.sleep(0.1)

            ret, frame = self.cap.read()

        return ret, frame

    def release(self):

        """カメラリソース解放"""

        self.cap.release()

# テスト:USBカメラ取得

if __name__ == "__main__":

    video = VideoInputHandler(source=0, is_rtsp=False)

    while True:

        ret, frame = video.get_frame()

        if ret:

            cv2.imshow("Video Capture", frame)

            if cv2.waitKey(1) & 0xFF == ord('q'):

                break

        else:

            print("フレーム取得失敗!")

    video.release()

    cv2.destroyAllWindows()

  1. モジュール2:AI認識(YOLOv8目標検出/行動認識)

「異常侵入検出」を例に、画面中の「人」を認識し、設定区域外または人数超過でアラームを発生:

from ultralytics import YOLO

import cv2

import numpy as np

class AIDetectionEngine:

    def __init__(self, model_path='yolov8n.pt', conf_threshold=0.5):

        """
        model_path:YOLOモデルパス(yolov8n=軽量型、yolov8s=高精度型)

        conf_threshold:信頼度閾値(低信頼度検出結果フィルタ)

        """

        self.model = YOLO(model_path)

        self.conf_threshold = conf_threshold

        self.class_names = self.model.names  # ターゲットカテゴリ名(0=person、1=bicycle等)

    def detect(self, frame):

        """目標検出:検出結果(境界框、カテゴリ、信頼度)を返す"""

        results = self.model(frame, conf=self.conf_threshold, device=0)  # device=0でNPU加速

        detections = []

        for r in results:

            boxes = r.boxes.data.cpu().numpy()  # 境界框:[x1,y1,x2,y2,conf,class]

            for box in boxes:

                x1, y1, x2, y2, conf, cls = box

                detections.append({

                    "class": self.class_names[int(cls)],

                    "confidence": float(conf),

                    "bbox": [int(x1), int(y1), int(x2), int(y2)]

                })

        return detections

    def draw_detections(self, frame, detections):

        """検出結果を画面に描画(境界框+ラベル)"""

        for det in detections:

            x1, y1, x2, y2 = det["bbox"]

            label = f"{det['class']} {det['confidence']:.2f}"

            # 境界框描画(赤:人、緑:他のターゲット)

            color = (0, 0, 255) if det["class"] == "person" else (0, 255, 0)

            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)

            # ラベル背景

            cv2.rectangle(frame, (x1, y1-20), (x2, y1), color, -1)

            cv2.putText(frame, label, (x1+5, y1-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1)

        return frame

    def check_abnormal(self, detections, max_person=1, restricted_area=None):

        """

        異常検出:

        - max_person:最大許容人数

        - restricted_area:禁止区域(例:[500,300,1500,800]、即x1=500,y1=300,x2=1500,y2=800)

        """

        abnormal = False

        abnormal_msg = ""

        # 人数統計

        person_count = len([d for d in detections if d["class"] == "person"])

        if person_count > max_person:

            abnormal = True

            abnormal_msg = f"人数超過:{person_count}/{max_person}"

        # 禁止区域への侵入検出

        if restricted_area and person_count > 0:

            rx1, ry1, rx2, ry2 = restricted_area

            for det in detections:

                if det["class"] == "person":

                    x1, y1, x2, y2 = det["bbox"]

                    # 境界框と禁止区域の重複判定

                    if not (x2 > rx2 or y2 > ry2):

                        abnormal = True

                        abnormal_msg = f"禁止区域侵入!"

        return abnormal, abnormal_msg

# テスト:AI認識と異常検出

if __name__ == "__main__":

    video = VideoInputHandler(source=0)

    ai_detector = AIDetectionEngine(model_path='yolov8n.pt', conf_threshold=0.5)

    restricted_area = [500, 300, 1500, 800]  # 禁止区域(実際の画面に応じて調整)

    while True:

        ret, frame = video.get_frame()

        if not ret:

            continue

        # AI検出

        detections = ai_detector.detect(frame)

        # 検出結果描画

        frame_with_detections = ai_detector.draw_detections(frame, detections)

        # 禁止区域描画(赤い点線枠)

        cv2.rectangle(frame_with_detections, (restricted_area[0], restricted_area[1]),

                      (restricted_area[2], restricted_area[3]), (0,0,255), 2, cv2.LINE_AA)

        # 異常検出

        abnormal, abnormal_msg = ai_detector.check_abnormal(detections, max_person=1, restricted_area=restricted_area)

        # 異常メッセージ表示

        if abnormal:

            cv2.putText(frame_with_detections, abnormal_msg, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2)

        cv2.imshow("AI Detection", frame_with_detections)

        if cv2.waitKey(1) & 0xFF == ord('q'):

            break

    video.release()

    cv2.destroyAllWindows()

  1. モジュール3:異常アラーム(LED+蜂鳴器+動画保存)

異常発生時にローカルの音光アラームを発生させ、アラーム動画を保存(10秒前+10秒後の映像を含む):

import RPi.GPIO as GPIO

import cv2

import time

import os

from datetime import datetime

class AlarmSystem:

    def __init__(self, led_pin=18, buzzer_pin=13, video_save_path="./alarm_videos"):

        """
        led_pin:LED GPIOピン(BCM番号)

        buzzer_pin:蜂鳴器GPIOピン(BCM番号)

        video_save_path:アラーム動画保存パス

        """

        # GPIO初期化

        GPIO.setmode(GPIO.BCM)

        GPIO.setup(led_pin, GPIO.OUT)

        GPIO.setup(buzzer_pin, GPIO.OUT)

        self.led_pin = led_pin

        self.buzzer_pin = buzzer_pin

        # 動画保存初期化

        self.video_save_path = video_save_path

        if not os.path.exists(video_save_path):

            os.makedirs(video_save_path)

        # 最近10秒のフレームをキャッシュ(アラーム動画用)

        self.frame_buffer = []

        self.buffer_size = 300  # 30fps×10秒=300フレーム

    def add_frame_to_buffer(self, frame):

        """フレームをバッファに追加"""

        self.frame_buffer.append(frame)

        if len(self.frame_buffer) > self.buffer_size:

            self.frame_buffer.pop(0)

    def trigger_alarm(self, duration=5):

        """音光アラームを発生(duration秒間)"""

        print(f"アラーム発生!{duration}秒間継続")

        for _ in range(duration):

            GPIO.output(self.led_pin, GPIO.HIGH)

            GPIO.output(self.buzzer_pin, GPIO.HIGH)

            time.sleep(0.5)

            GPIO.output(self.led_pin, GPIO.LOW)

            GPIO.output(self.buzzer_pin, GPIO.LOW)

            time.sleep(0.5)

    def save_alarm_video(self, abnormal_msg):

        """アラーム動画を保存(キャッシュ10秒+後続10秒)"""

        # 動画ファイル名:時間_異常タイプ.mp4

        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

        filename = f"{timestamp}_{abnormal_msg.replace(' ', '_')}.mp4"

        save_path = os.path.join(self.video_save_path, filename)

        # 動画書き込み初期化

        fourcc = cv2.VideoWriter_fourcc('mp4v')

        height, width = self.frame_buffer[0].shape[:2]

        out = cv2.VideoWriter(save_path, fourcc, 30.0, (width, height))

        # キャッシュフレーム書き込み

        for frame in self.frame_buffer:

            out.write(frame)

        # さらに10秒間のリアルタイムフレームを取得

        cap = cv2.VideoCapture(0)

        for _ in range(300):

            ret, frame = cap.read()

            if ret:

                out.write(frame)

            time.sleep(1/30)

        cap.release()

        out.release()

        print(f"アラーム動画保存成功:{save_path}")

    def cleanup(self):

        """リソース解放"""

        GPIO.output(self.led_pin, GPIO.LOW)

        GPIO.output(self.buzzer_pin, GPIO.LOW)

        GPIO.cleanup()

# テスト:アラームモジュール

if __name__ == "__main__":

    alarm = AlarmSystem(led_pin=18, buzzer_pin=13)

    video = VideoInputHandler(source=0)

    # フレームキャッシュ

    for _ in range(300):

        ret, frame = video.get_frame()

        if ret:

            alarm.add_frame_to_buffer(frame)

        time.sleep(1/30)

    # アラーム発生と動画保存

    alarm.trigger_alarm(duration=3)

    alarm.save_alarm_video("テストアラーム")

    # リソース解放

    alarm.cleanup()

    video.release()

  1. モジュール4:データアップロード(MQTT+クラウドアラーム通知)

アラーム情報(時間、異常タイプ、動画パス)をクラウドプラットフォーム(例:アリババクラウドIoT、ThingsBoard)にアップロード:

import paho.mqtt.client as mqtt

import json

import time

import os

class MQTTUploader:

    def __init__(self, broker, port, client_id, username, password, topic="smart_monitor/alarm"):

        """
        broker:MQTTサーバーのアドレス

        port:MQTTポート(デフォルト1883)

        client_id:クライアントID(ユニーク)

        username/password:MQTT認証情報

        topic:アラーム情報アップロードトピック

        """

        self.client = mqtt.Client(client_id=client_id)

        self.client.username_pw_set(username, password)

        self.broker = broker

        self.port = port

        self.topic = topic

        # 接続コールバック

        self.client.on_connect = self._on_connect

        self.client.on_disconnect = self._on_disconnect

        # MQTTサーバー接続

        self.connect()

    def _on_connect(self, client, userdata, flags, rc):

        if rc == 0:

            print("MQTT接続成功")

        else:

            print(f"MQTT接続失敗、エラーコード:{rc}")

    def _on_disconnect(self, client, userdata, rc):

        print("MQTT接続切断、再接続中...")

        time.sleep(5)

        self.connect()

    def connect(self):

        """MQTTサーバー接続"""

        try:

            self.client.connect(self.broker, self.port, keepalive=60)

            self.client.loop_start()  # バックグラウンドループ開始

        except Exception as e:

            print(f"MQTT接続異常:{e}")

    def upload_alarm(self, abnormal_msg, video_path):

        """アラーム情報アップロード"""

        alarm_data = {

            "device_id": "smart_monitor_001",

            "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),

            "abnormal_type": abnormal_msg,

            "video_path": video_path,

            "status": "alarm"

        }

        try:

            payload = json.dumps(alarm_data)

            result = self.client.publish(self.topic, payload, qos=1)

            result.wait_for_publish()

            if result.is_published():

                print(f"アラーム情報アップロード成功:{payload}")

            else:

                print("アラーム情報アップロード失敗、ローカルにキャッシュ")

                self._cache_alarm(alarm_data)

        except Exception as e:

            print(f"アップロード異常:{e}")

            self._cache_alarm(alarm_data)

    def _cache_alarm(self, alarm_data):

        """ネットワーク切断時にアラーム情報をローカルにキャッシュ"""

        cache_path = "./alarm_cache"

        if not os.path.exists(cache_path):

            os.makedirs(cache_path)

        filename = f"cache_{int(time.time())}.json"

        with open(os.path.join(cache_path, filename), 'w') as f:

            json.dump(alarm_data, f)

# テスト:MQTTアップロード

if __name__ == "__main__":

    # クラウドMQTTパラメータに置き換えてください

    mqtt_uploader = MQTTUploader(

        broker="mqtt.aliyuncs.com",

        port=1883,

        client_id="smart_monitor_001",

        username="your_username",

        password="your_password"

    )

    # テストアラーム情報アップロード

    mqtt_uploader.upload_alarm("テスト異常", "./alarm_videos/test.mp4")

    time.sleep(2)

五、実践ケース:オフィス異常侵入検出端末

シナリオ説明

オフィスの終了後(18:00-08:00)、禁止区域(例:会計室入り口)に「人」が検出されると、音光アラームを発生させ、アラーム動画を保存し、クラウドに通知して管理者に連絡します。

完全なフローのコード

import time

from datetime import datetime

from VideoInputHandler import VideoInputHandler

from AIDetectionEngine import AIDetectionEngine

from AlarmSystem import AlarmSystem

from MQTTUploader import MQTTUploader

if __name__ == "__main__":

    # 1. 各モジュールの初期化

    video = VideoInputHandler(source=0, is_rtsp=False)

    ai_detector = AIDetectionEngine(model_path='yolov8n.pt', conf_threshold=0.5)

    alarm = AlarmSystem(led_pin=18, buzzer_pin=13)

    mqtt_uploader = MQTTUploader(

        broker="mqtt.aliyuncs.com",

        port=1883,

        client_id="smart_monitor_office",

        username="your_username",

        password="your_password"

    )

    # 2. パラメータ設定

    restricted_area = [500, 300, 1500, 800]  # 禁止区域(会計室入り口)

    alarm_hours = [18, 8]  # アラーム時間帯:18:00-08:00

    max_person = 0  # アラーム時間帯内は誰も入ってはいけない

    alarm_cooldown = 300  # アラーム冷却時間(5分、重複アラームを防ぐため)

    last_alarm_time = 0

    try:

        print("スマート監視AI解析端末起動成功!")

        while True:

            # 現在時間を取得

            now = datetime.now()

            current_hour = now.hour

            # アラーム時間帯かどうかを判定

            in_alarm_hour = (current_hour >= alarm_hours[0]) or (current_hour < alarm_hours[1])

            # 1. 動画取得とキャッシュ

            ret, frame = video.get_frame()

            if not ret:

                time.sleep(0.1)

                continue

            alarm.add_frame_to_buffer(frame)

            # 2. AI認識(アラーム時間帯のみ有効、算力節約)

            if in_alarm_hour:

                detections = ai_detector.detect(frame)

                # 検出結果と禁止区域を描画

                frame_with_detections = ai_detector.draw_detections(frame, detections)

                cv2.rectangle(frame_with_detections, (restricted_area[0], restricted_area[1]),

                              (restricted_area[2], restricted_area[3]), (0,0,255), 2, cv2.LINE_AA)

                # 異常検出

                abnormal, abnormal_msg = ai_detector.check_abnormal(

                    detections, max_person=max_person, restricted_area=restricted_area

                )

                # 状態情報表示

                status = f"監視中 | 時間帯:{'アラーム時間帯' if in_alarm_hour else '通常時間帯'}"

                cv2.putText(frame_with_detections, status, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,255,0), 2)

                if abnormal:

                    cv2.putText(frame_with_detections, abnormal_msg, (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2)

                    # 冷却時間内に再アラームを防ぐ

                    current_time = time.time()

                    if current_time - last_alarm_time > alarm_cooldown:

                        # アラーム発生

                        alarm.trigger_alarm(duration=5)

                        # アラーム動画保存

                        video_path = alarm.save_alarm_video(abnormal_msg)

                        # クラウドにアップロード

                        mqtt_uploader.upload_alarm(abnormal_msg, video_path)

                        last_alarm_time = current_time

            else:

                # 通常時間帯は監視画面のみ表示

                frame_with_detections = frame

                status = f"監視中 | 時間帯:通常時間帯"

                cv2.putText(frame_with_detections, status, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,255,0), 2)

            # 実時プレビュー(qで終了)

            cv2.imshow("Smart Monitor AI Terminal", frame_with_detections)

            if cv2.waitKey(1) & 0xFF == ord('q'):

                break

    except Exception as e:

        print(f"端末運行異常:{e}")

    finally:

        # リソース解放

        video.release()

        alarm.cleanup()

        cv2.destroyAllWindows()

        print("スマート監視AI解析端末終了")

デプロイとデバッグ手順

  1. ハードウェアデプロイ:接続図に従ってカメラ、LED、蜂鳴器を接続し、電源が安定していることを確認してください;
  2. ソフトウェアデプロイ
  • 全モジュールコードを同一ディレクトリに配置(import文に応じて命名);
  • YOLOv8モデルをダウンロード:wget ``https://github.com/ultralytics/assets/releases/download/v8.1.0/yolov8n.pt
  • MQTTパラメータをクラウドプラットフォームの情報を置き換えてください;
  1. デバッグテクニック
  • 通常時間帯にカメラとAI認識をテストし、restricted_area座標を実際の画面に合わせて調整;
  • 手動で異常を発生させ(禁止区域に入り)、アラームと動画保存が正常かを確認;
  • htopコマンドでCPU/NPU使用率を監視し、フレートが低ければyolov8n.pt軽量モデルに切り替える。

六、高度な最適化:端末性能と安定性向上

  1. AI推論フレート向上

  • モデル最適化:YOLOv8モデルをINT8形式に量子化(Ultralytics量子化ツールを使用)、推論速度を2倍に向上;
  • ハードウェア加速:開発ボードのNPUが正常に動作していることを確認(RK3588は公式NPUドライバをインストール、JetsonはJetPackをインストール);
  • 画像下採样:カメラの解像度を720Pに調整し、AI推論の計算量を削減。
  1. 安定性向上

  • watchdog監視プロセス:supervisorツールで端末プログラムを監視し、クラッシュした場合は自動再起動;
  • 電源管理:産業環境では幅広電源を選択し、バックアップバッテリーを追加(停電による監視停止を防ぐ);
  • ストレージ管理:過期したアラーム動画を定期的に削除(30日保存)、SDカード/SSDがいっぱいになるのを防ぐ。
  1. 拡張機能

  • 多カメラ連携:4路ネットワークカメラをサポート(マルチスレッド並列処理);
  • 顔認識:YOLOv8-Faceモデルをロードし、特定人物(従業員/見知らぬ人)を識別;
  • クラウド管理プラットフォーム:ThingsBoardに接続し、リモートでパラメータ設定、アラーム記録を確認する。

タグ: YOLOv8 Ubuntu Python OpenCV MQTT

6月16日 18:35 投稿