Skip to content

S2Conn

sorm2-connector — промежуточный сервис (gateway), соединяющий физическое оборудование СОРМ-2 с остальными компонентами системы через Apache ActiveMQ. Он транслирует команды от бэкенда в бинарный протокол СОРМ-2 и возвращает перехваченный трафик обратно в очередь сообщений.


Место в общей архитектуре

Section titled “Место в общей архитектуре”
graph LR
    subgraph HW["Оборудование СОРМ-2"]
        SORM["СОРМ-2\n10.10.77.3"]
        CMD_PORT["Командный канал\n:16118"]
        DATA_PORT["Канал данных\n:16117"]
        SORM --- CMD_PORT
        SORM --- DATA_PORT
    end

    subgraph CONN["sorm2-connector"]
        direction TB
        OUT["comOutputThread\n(команды → СОРМ)"]
        IN["comInputThread\n(ответы ← СОРМ)"]
        DATA["dataThread\n(трафик ← СОРМ)"]
        MISS["missionStatusThread\n(опрос миссий, опц.)"]
    end

    subgraph MQ["Apache ActiveMQ"]
        QC["channel-command\n(вход)"]
        QA["channel-answer\n(выход)"]
        QD["channel-data\n(выход)"]
        QM["channel-missions\n(выход)"]
    end

    subgraph BACK["sorm2-backend"]
        BE["Spring Boot\nбэкенд"]
    end

    subgraph REASM["sorm-2-reasm"]
        RE["Сервис переборки\nпакетов"]
    end

    BE -->|команды| QC
    QC --> OUT
    OUT -->|бинарный протокол| CMD_PORT
    CMD_PORT -->|ответы| IN
    IN -->|ответы| QA
    QA --> BE

    DATA_PORT -->|перехваченный трафик| DATA
    DATA -->|payload| QD
    QD --> RE
    QD --> BE

    MISS -->|список миссий| QM
    QM --> BE

Инициализация и подключение

Section titled “Инициализация и подключение”
  1. Загрузка конфигурацииConfigBuilder читает config/config.properties

  2. TCP-подключение к СОРМ-2Network устанавливает два сокета:

    • :16118 — командный канал (двунаправленный)
    • :16117 — канал данных (только чтение)
  3. Подключение к ActiveMQAmq создаёт JMS-соединение и 4 очереди

  4. Запуск 4 потоковApp запускает параллельные обработчики

  5. АутентификацияcomOutputThread отправляет auth-сообщение, ждёт ответ кода 0x81

  6. Сброс состояния (если sorm.state.reset=true) — отправляет reset 0x07, ждёт 0x87

sequenceDiagram
    participant R as Runner
    participant CB as ConfigBuilder
    participant N as Network
    participant AMQ as Amq
    participant APP as App
    participant SORM as СОРМ-2 :16118

    R->>CB: build() → Configurer
    R->>N: new Network(host, 16118, 16117)
    N->>SORM: TCP connect
    R->>AMQ: new Amq(activemq.address)
    AMQ-->>R: JMS connection + 4 queues
    R->>APP: new App(configurer, network, amq)
    APP->>APP: Запуск 4 потоков

    APP->>SORM: Auth [0x01, iDent=0x0001, len=15, "ABC"]
    SORM-->>APP: [0x81, iDent, len, payload]
    Note over APP: authLock.notify() — аутентификация пройдена

    alt sorm.state.reset = true
        APP->>SORM: Reset [0x07, iDent=0x0002]
        SORM-->>APP: [0x87, ...]
        Note over APP: resetLock.notify() — сброс выполнен
        APP->>SORM: Auth (повторно)
        SORM-->>APP: [0x81, ...]
    end

    Note over APP: Система готова к работе

Бинарный протокол СОРМ-2

Section titled “Бинарный протокол СОРМ-2”
┌──────────┬──────────────┬─────────────────┬──────────────────────────┐
│ Code │ iDent │ Length │ Payload │
│ (1 байт) │ (2 байта) │ (4 байта) │ (Length байт) │
└──────────┴──────────────┴─────────────────┴──────────────────────────┘
КодHexНаправлениеОписание
10x01→ СОРМАутентификация
70x07→ СОРМСброс состояния (reset)
1250x7D← СОРМKeepalive / проверка доступности
1290x81← СОРМОтвет переменной длины (auth OK)
1300x82← СОРМОтвет переменной длины
1310x83← СОРМОтвет переменной длины
1320x84← СОРМФиксированный 8-байтный ответ
1330x85← СОРМФиксированный 8-байтный ответ
1350x87← СОРМПодтверждение сброса
1400x8C← СОРМОтвет переменной длины
2550xFF→ СОРМПодтверждение получения данных (ACK)

Структура кадра данных (канал :16117)

Section titled “Структура кадра данных (канал :16117)”
┌──────┬──────┬──────┬──────────────┬──────────────────────────────┐
│ Code │ FRp │ FRs │ Length │ Payload │
│ 1 б │ 1 б │ 1 б │ 4 байта │ содержит CNN, NodeID, данные │
└──────┴──────┴──────┴──────────────┴──────────────────────────────┘

FRp (Frame Rate packet) и FRs (Frame Rate stream) — параметры управления потоком.

CNN-байт (классификация блоков)

Section titled “CNN-байт (классификация блоков)”

Байт CNN определяет тип блока в потоке данных:

graph TD
    CNN["CNN байт"] --> B0{"Биты 7-5"}
    B0 -->|"000 (0x00)"| BEG["Begin stream block\n(начало потока)"]
    B0 -->|"001 (0x20)"| BEG
    B0 -->|"010 (0x40)"| AUX["Auxiliary block\n(вспомогательный)"]
    B0 -->|"011 (0x60)"| AUX
    B0 -->|"100 (0x80)"| DAT["Data block\n(данные)"]
    B0 -->|"101 (0xA0)"| DAT
    B0 -->|"110 (0xC0)"| DAT
    B0 -->|"111 (0xE0)"| DAT

    CNN --> B4{"Бит 4 (0x10)"}
    B4 -->|"= 1"| END["Флаг End of Stream\n(конец потока)"]

Поток 1 — Команды (Backend → СОРМ)

Section titled “Поток 1 — Команды (Backend → СОРМ)”
sequenceDiagram
    participant BE as sorm2-backend
    participant MQ as ActiveMQ\nchannel-command
    participant OT as comOutputThread
    participant SORM as СОРМ-2 :16118

    BE->>MQ: BytesMessage [payload команды]
    loop Бесконечный цикл
        MQ-->>OT: receive() → BytesMessage
        OT->>OT: iDent++
        OT->>OT: Обернуть в протокольный кадр:\n[код, iDent, length, payload]
        OT->>SORM: Запись в tcpComSock
    end

Поток 2 — Ответы (СОРМ → Backend)

Section titled “Поток 2 — Ответы (СОРМ → Backend)”
sequenceDiagram
    participant SORM as СОРМ-2 :16118
    participant IT as comInputThread
    participant MSG as Messenger
    participant MQ as ActiveMQ\nchannel-answer
    participant BE as sorm2-backend

    loop Мониторинг сокета
        SORM->>IT: байты ответа
        IT->>MSG: readAnswer(inputStream)
        MSG->>MSG: Читать code (1 байт)
        alt code = 129/130/131/140
            MSG->>MSG: Читать iDent (2 байта)\n+ length (4 байта)\n+ payload (length байт)
        else code = 132/133
            MSG->>MSG: Читать фиксированные 8 байт
        end
        MSG-->>IT: Answer{code, data[]}

        alt code = 0x81 (auth OK)
            IT->>IT: authLock.notify()
        else code = 0x87 (reset OK)
            IT->>IT: resetLock.notify()
        else прочие коды
            IT->>MQ: send(BytesMessage с полным кадром)
            MQ->>BE: BytesMessage
        end
    end

Поток 3 — Перехваченный трафик (СОРМ → Очередь)

Section titled “Поток 3 — Перехваченный трафик (СОРМ → Очередь)”
sequenceDiagram
    participant SORM as СОРМ-2 :16117
    participant DT as dataThread
    participant NM as NotificationMessage
    participant MQ as ActiveMQ\nchannel-data
    participant REASM as sorm-2-reasm

    loop Непрерывное чтение
        SORM->>DT: кадр данных
        DT->>NM: readNotify(inputStream)
        NM->>NM: Читать code (1 байт)
        NM->>NM: Читать FRp, FRs (по 1 байту)
        NM->>NM: Читать length (4 байта)
        NM->>NM: Читать payload (length байт)

        alt code = 125 (keepalive, 0x7D)
            DT->>SORM: ACK: [0xFF, FRp+1, FRs]
        else payload существует
            NM->>NM: Извлечь CNN байт
            NM->>NM: Извлечь Node ID (5 байт, смещение 12)
            NM->>NM: Определить тип блока (begin/data/aux/end)
            DT->>MQ: send(BytesMessage с payload)
            Note over DT,MQ: count++

            alt count % 90 == 0
                DT->>SORM: Групповой ACK [0xFF, FRp+1, FRs]
            end
        end
    end

    MQ->>REASM: BytesMessage (перехваченный трафик)

Поток 4 — Статус миссий (опционально)

Section titled “Поток 4 — Статус миссий (опционально)”
sequenceDiagram
    participant MT as missionStatusThread
    participant HTTP as HTTP :9090/rules
    participant MA as MissionActualization
    participant MQ as ActiveMQ\nchannel-missions
    participant BE as sorm2-backend

    loop Каждые 60 секунд
        MT->>HTTP: GET /rules
        HTTP-->>MT: текстовый ответ построчно
        MT->>MA: parse(lines)
        loop Каждая строка
            MA->>MA: Regex: "^Rule\D*(\d*).*"
            MA->>MA: Извлечь UCI (числовой ID)
            MA->>MA: Создать MissionInfo{id, createdAt}
        end
        MA-->>MT: MissionInfo[]
        MT->>MT: Сериализовать в JSON
        MT->>MQ: send(BytesMessage с JSON)
        MQ->>BE: BytesMessage → List<MissionInfo>
    end

Потоковая модель (Threading)

Section titled “Потоковая модель (Threading)”
graph TB
    APP["App.run()"]

    subgraph T1["comOutputThread"]
        direction TB
        O1["Ждать authLock\n(аутентификация)"]
        O2["Читать channel-command"]
        O3["Обернуть в кадр"]
        O4["Запись в :16118"]
        O1 --> O2 --> O3 --> O4 --> O2
    end

    subgraph T2["comInputThread"]
        direction TB
        I1["Читать :16118"]
        I2["Messenger.readAnswer()"]
        I3{"code?"}
        I4["authLock.notify()"]
        I5["resetLock.notify()"]
        I6["Запись в channel-answer"]
        I1 --> I2 --> I3
        I3 -->|0x81| I4
        I3 -->|0x87| I5
        I3 -->|прочие| I6
        I4 --> I1
        I5 --> I1
        I6 --> I1
    end

    subgraph T3["dataThread"]
        direction TB
        D1["Читать :16117"]
        D2["NotificationMessage.readNotify()"]
        D3{"code?"}
        D4["ACK → :16117"]
        D5["Запись в channel-data"]
        D6{"count % 90 == 0?"}
        D7["Групповой ACK"]
        D1 --> D2 --> D3
        D3 -->|0x7D keepalive| D4 --> D1
        D3 -->|данные| D5 --> D6
        D6 -->|да| D7 --> D1
        D6 -->|нет| D1
    end

    subgraph T4["missionStatusThread (опц.)"]
        direction TB
        M1["HTTP GET /rules"]
        M2["Парсинг regex"]
        M3["Запись в channel-missions"]
        M4["Пауза 60 сек"]
        M1 --> M2 --> M3 --> M4 --> M1
    end

    APP --> T1
    APP --> T2
    APP --> T3
    APP --> T4

ОчередьНаправлениеDeliveryОписание
channel-command→ Connector (вход)PERSISTENTКоманды от бэкенда к СОРМ-2
channel-answer← Connector (выход)PERSISTENTОтветы СОРМ-2 на команды
channel-data← Connector (выход)NON_PERSISTENTПерехваченный трафик (высокая частота)
channel-missions← Connector (выход)PERSISTENTОбновления списка активных миссий

classDiagram
    class Runner {
        +main(args)
        -retryLoop() : 5 сек задержка при ошибке
    }

    class App {
        -Configurer configurer
        -Network network
        -Amq amq
        -int iDent
        -Object authLock
        -Object resetLock
        +run()
        -comOutputThread()
        -comInputThread()
        -dataThread()
        -missionStatusThread()
    }

    class Configurer {
        +String hostSorm
        +int portComChannel
        +int portDataChannel
        +String activeMqAddress
        +boolean resetState
        +boolean missionStatusState
        +String missionStatusUrl
    }

    class Network {
        -Socket tcpComSock
        -Socket tcpDataSock
        +connect()
        +getComIn() InputStream
        +getComOut() OutputStream
        +getDataIn() InputStream
        +close()
    }

    class Amq {
        -Connection connection
        -Session session
        +Queue commandQueue
        +Queue answerQueue
        +Queue dataQueue
        +Queue missionsQueue
        +MessageConsumer commandConsumer
        +MessageProducer answerProducer
        +MessageProducer dataProducer
        +MessageProducer missionsProducer
        +createConnection()
    }

    class Messenger {
        +readAnswer(InputStream) Answer
    }

    class Answer {
        +int code
        +byte[] data
    }

    class NotificationMessage {
        -int code
        -byte FRp
        -byte FRs
        -int length
        -byte[] payload
        -boolean existPayload
        -int cnn
        -byte[] nodeId
        +readNotify(InputStream)
    }

    class Additional {
        +createAuthMessage(iDent) byte[]
        +createResetMessage(iDent) byte[]
        +makeAskData(FRp, FRs) byte[]
    }

    class Utils {
        +getBlockType(cnn) String
        +isEndOfStream(cnn) boolean
    }

    class MissionActualization {
        +parse(lines) List~MissionInfo~
    }

    class MissionInfo {
        +Short id
        +Date createdAt
    }

    Runner --> App
    App --> Configurer
    App --> Network
    App --> Amq
    App --> Messenger
    App --> NotificationMessage
    App --> Additional
    App --> MissionActualization
    Messenger --> Answer
    MissionActualization --> MissionInfo

Устойчивость и обработка ошибок

Section titled “Устойчивость и обработка ошибок”
graph TD
    START["Runner.main()"]
    BUILD["app.build()\n(подключение к СОРМ + ActiveMQ)"]
    RUN["app.run()\n(запуск потоков)"]
    ERR{"Ошибка?\n(IOException/JMSException)"}
    CLOSE["Закрыть Network + Amq"]
    WAIT["Ждать 5 секунд"]
    RETRY["Повторить"]

    START --> BUILD
    BUILD --> ERR
    ERR -->|нет| RUN
    ERR -->|да| CLOSE
    CLOSE --> WAIT
    WAIT --> RETRY
    RETRY --> BUILD

    subgraph "Внутри dataThread"
        D_READ["readNotify()"]
        D_ERR{"Исключение?"}
        D_EOF{"EOF (-1)?"}
        D_CONT["Продолжить\n(existPayload=false)"]
        D_SLEEP["Пауза 1 сек"]

        D_READ --> D_ERR
        D_ERR -->|да| D_CONT
        D_READ --> D_EOF
        D_EOF -->|да| D_SLEEP
        D_SLEEP --> D_READ
    end

Файл: config/config.properties

ПараметрЗначение по умолчаниюОписание
sorm.host10.10.77.3IP-адрес оборудования СОРМ-2
sorm.port.command16118Порт командного канала TCP
sorm.port.data16117Порт канала данных TCP
sorm.state.resetfalseОтправить reset при старте
activemq.addresstcp://10.10.77.11:61616?wireFormat.maxInactivityDuration=0URL брокера ActiveMQ
mission-status.statefalseВключить опрос статуса миссий
mission-status.urlhttp://localhost:9090/rulesHTTP-эндпоинт списка миссий

Файл конфигурации: src/main/resources/log4j2.xml

ПараметрЗначение
Файл логаdata/logs/S2Conn.log
РотацияПо размеру 8 МБ
Архивирование.gz сжатие
Уровень (пакет com.codeyz)DEBUG
Уровень (root)DEBUG
ВыводФайл + консоль
Формат%d{yyyy-MM-dd HH:mm:ss} %p %m%n

КомпонентТехнология
ЯзыкJava 11
СборкаMaven 4.0.0
АртефактFat JAR (maven-assembly-plugin)
Брокер сообщенийApache ActiveMQ 5.7.0 (activemq-core)
Сериализация JSONJackson Databind 2.11.0
ЛогированиеLog4j2 2.17.2
ТестыJUnit 5.4.2, Mockito 4.0.0
КонтейнерDocker (OpenJDK 11)

  • Directorysorm2-connector
    • pom.xml
    • Dockerfile
    • Directoryconfig
      • config.properties ← Конфигурация (хост СОРМ, ActiveMQ)
    • Directorysrc
      • Directorymain
        • Directoryjava
          • Directorycom
            • Directorycodeyz
              • Runner.java ← Точка входа, retry-петля
              • App.java ← Оркестратор 4 потоков
              • Configurer.java ← Хранитель конфигурации
              • ConfigBuilder.java ← Загрузчик config.properties
              • Directoryapp
                • Network.java ← TCP-сокеты к СОРМ-2
                • Amq.java ← JMS-соединение и очереди
                • Messenger.java ← Парсер ответов командного канала
                • Answer.java ← DTO ответа (code + data[])
                • NotificationMessage.java ← Парсер кадров канала данных
                • Additional.java ← Фабрика протокольных кадров
                • Utils.java ← Интерпретатор CNN-байта
                • MissionActualization.java ← Парсер HTTP-списка миссий