S2Conn
sorm2-connector — промежуточный сервис (gateway), соединяющий физическое оборудование СОРМ-2 с остальными компонентами системы через Apache ActiveMQ. Он транслирует команды от бэкенда в бинарный протокол СОРМ-2 и возвращает перехваченный трафик обратно в очередь сообщений.
Место в общей архитектуре
Section titled “Место в общей архитектуре”graph LR
subgraph HW["Оборудование СОРМ-2"]
SORM["СОРМ-2\n10.10.77.3"]
CMD_PORT["Командный канал\n:16118"]
DATA_PORT["Канал данных\n:16117"]
SORM --- CMD_PORT
SORM --- DATA_PORT
end
subgraph CONN["sorm2-connector"]
direction TB
OUT["comOutputThread\n(команды → СОРМ)"]
IN["comInputThread\n(ответы ← СОРМ)"]
DATA["dataThread\n(трафик ← СОРМ)"]
MISS["missionStatusThread\n(опрос миссий, опц.)"]
end
subgraph MQ["Apache ActiveMQ"]
QC["channel-command\n(вход)"]
QA["channel-answer\n(выход)"]
QD["channel-data\n(выход)"]
QM["channel-missions\n(выход)"]
end
subgraph BACK["sorm2-backend"]
BE["Spring Boot\nбэкенд"]
end
subgraph REASM["sorm-2-reasm"]
RE["Сервис переборки\nпакетов"]
end
BE -->|команды| QC
QC --> OUT
OUT -->|бинарный протокол| CMD_PORT
CMD_PORT -->|ответы| IN
IN -->|ответы| QA
QA --> BE
DATA_PORT -->|перехваченный трафик| DATA
DATA -->|payload| QD
QD --> RE
QD --> BE
MISS -->|список миссий| QM
QM --> BE
Инициализация и подключение
Section titled “Инициализация и подключение”-
Загрузка конфигурации —
ConfigBuilderчитаетconfig/config.properties -
TCP-подключение к СОРМ-2 —
Networkустанавливает два сокета::16118— командный канал (двунаправленный):16117— канал данных (только чтение)
-
Подключение к ActiveMQ —
Amqсоздаёт JMS-соединение и 4 очереди -
Запуск 4 потоков —
Appзапускает параллельные обработчики -
Аутентификация —
comOutputThreadотправляет auth-сообщение, ждёт ответ кода0x81 -
Сброс состояния (если
sorm.state.reset=true) — отправляет reset0x07, ждёт0x87
sequenceDiagram
participant R as Runner
participant CB as ConfigBuilder
participant N as Network
participant AMQ as Amq
participant APP as App
participant SORM as СОРМ-2 :16118
R->>CB: build() → Configurer
R->>N: new Network(host, 16118, 16117)
N->>SORM: TCP connect
R->>AMQ: new Amq(activemq.address)
AMQ-->>R: JMS connection + 4 queues
R->>APP: new App(configurer, network, amq)
APP->>APP: Запуск 4 потоков
APP->>SORM: Auth [0x01, iDent=0x0001, len=15, "ABC"]
SORM-->>APP: [0x81, iDent, len, payload]
Note over APP: authLock.notify() — аутентификация пройдена
alt sorm.state.reset = true
APP->>SORM: Reset [0x07, iDent=0x0002]
SORM-->>APP: [0x87, ...]
Note over APP: resetLock.notify() — сброс выполнен
APP->>SORM: Auth (повторно)
SORM-->>APP: [0x81, ...]
end
Note over APP: Система готова к работе
Бинарный протокол СОРМ-2
Section titled “Бинарный протокол СОРМ-2”Структура сообщения
Section titled “Структура сообщения”┌──────────┬──────────────┬─────────────────┬──────────────────────────┐│ Code │ iDent │ Length │ Payload ││ (1 байт) │ (2 байта) │ (4 байта) │ (Length байт) │└──────────┴──────────────┴─────────────────┴──────────────────────────┘Коды сообщений
Section titled “Коды сообщений”| Код | Hex | Направление | Описание |
|---|---|---|---|
| 1 | 0x01 | → СОРМ | Аутентификация |
| 7 | 0x07 | → СОРМ | Сброс состояния (reset) |
| 125 | 0x7D | ← СОРМ | Keepalive / проверка доступности |
| 129 | 0x81 | ← СОРМ | Ответ переменной длины (auth OK) |
| 130 | 0x82 | ← СОРМ | Ответ переменной длины |
| 131 | 0x83 | ← СОРМ | Ответ переменной длины |
| 132 | 0x84 | ← СОРМ | Фиксированный 8-байтный ответ |
| 133 | 0x85 | ← СОРМ | Фиксированный 8-байтный ответ |
| 135 | 0x87 | ← СОРМ | Подтверждение сброса |
| 140 | 0x8C | ← СОРМ | Ответ переменной длины |
| 255 | 0xFF | → СОРМ | Подтверждение получения данных (ACK) |
Структура кадра данных (канал :16117)
Section titled “Структура кадра данных (канал :16117)”┌──────┬──────┬──────┬──────────────┬──────────────────────────────┐│ Code │ FRp │ FRs │ Length │ Payload ││ 1 б │ 1 б │ 1 б │ 4 байта │ содержит CNN, NodeID, данные │└──────┴──────┴──────┴──────────────┴──────────────────────────────┘FRp (Frame Rate packet) и FRs (Frame Rate stream) — параметры управления потоком.
CNN-байт (классификация блоков)
Section titled “CNN-байт (классификация блоков)”Байт CNN определяет тип блока в потоке данных:
graph TD
CNN["CNN байт"] --> B0{"Биты 7-5"}
B0 -->|"000 (0x00)"| BEG["Begin stream block\n(начало потока)"]
B0 -->|"001 (0x20)"| BEG
B0 -->|"010 (0x40)"| AUX["Auxiliary block\n(вспомогательный)"]
B0 -->|"011 (0x60)"| AUX
B0 -->|"100 (0x80)"| DAT["Data block\n(данные)"]
B0 -->|"101 (0xA0)"| DAT
B0 -->|"110 (0xC0)"| DAT
B0 -->|"111 (0xE0)"| DAT
CNN --> B4{"Бит 4 (0x10)"}
B4 -->|"= 1"| END["Флаг End of Stream\n(конец потока)"]
Потоки данных
Section titled “Потоки данных”Поток 1 — Команды (Backend → СОРМ)
Section titled “Поток 1 — Команды (Backend → СОРМ)”sequenceDiagram
participant BE as sorm2-backend
participant MQ as ActiveMQ\nchannel-command
participant OT as comOutputThread
participant SORM as СОРМ-2 :16118
BE->>MQ: BytesMessage [payload команды]
loop Бесконечный цикл
MQ-->>OT: receive() → BytesMessage
OT->>OT: iDent++
OT->>OT: Обернуть в протокольный кадр:\n[код, iDent, length, payload]
OT->>SORM: Запись в tcpComSock
end
Поток 2 — Ответы (СОРМ → Backend)
Section titled “Поток 2 — Ответы (СОРМ → Backend)”sequenceDiagram
participant SORM as СОРМ-2 :16118
participant IT as comInputThread
participant MSG as Messenger
participant MQ as ActiveMQ\nchannel-answer
participant BE as sorm2-backend
loop Мониторинг сокета
SORM->>IT: байты ответа
IT->>MSG: readAnswer(inputStream)
MSG->>MSG: Читать code (1 байт)
alt code = 129/130/131/140
MSG->>MSG: Читать iDent (2 байта)\n+ length (4 байта)\n+ payload (length байт)
else code = 132/133
MSG->>MSG: Читать фиксированные 8 байт
end
MSG-->>IT: Answer{code, data[]}
alt code = 0x81 (auth OK)
IT->>IT: authLock.notify()
else code = 0x87 (reset OK)
IT->>IT: resetLock.notify()
else прочие коды
IT->>MQ: send(BytesMessage с полным кадром)
MQ->>BE: BytesMessage
end
end
Поток 3 — Перехваченный трафик (СОРМ → Очередь)
Section titled “Поток 3 — Перехваченный трафик (СОРМ → Очередь)”sequenceDiagram
participant SORM as СОРМ-2 :16117
participant DT as dataThread
participant NM as NotificationMessage
participant MQ as ActiveMQ\nchannel-data
participant REASM as sorm-2-reasm
loop Непрерывное чтение
SORM->>DT: кадр данных
DT->>NM: readNotify(inputStream)
NM->>NM: Читать code (1 байт)
NM->>NM: Читать FRp, FRs (по 1 байту)
NM->>NM: Читать length (4 байта)
NM->>NM: Читать payload (length байт)
alt code = 125 (keepalive, 0x7D)
DT->>SORM: ACK: [0xFF, FRp+1, FRs]
else payload существует
NM->>NM: Извлечь CNN байт
NM->>NM: Извлечь Node ID (5 байт, смещение 12)
NM->>NM: Определить тип блока (begin/data/aux/end)
DT->>MQ: send(BytesMessage с payload)
Note over DT,MQ: count++
alt count % 90 == 0
DT->>SORM: Групповой ACK [0xFF, FRp+1, FRs]
end
end
end
MQ->>REASM: BytesMessage (перехваченный трафик)
Поток 4 — Статус миссий (опционально)
Section titled “Поток 4 — Статус миссий (опционально)”sequenceDiagram
participant MT as missionStatusThread
participant HTTP as HTTP :9090/rules
participant MA as MissionActualization
participant MQ as ActiveMQ\nchannel-missions
participant BE as sorm2-backend
loop Каждые 60 секунд
MT->>HTTP: GET /rules
HTTP-->>MT: текстовый ответ построчно
MT->>MA: parse(lines)
loop Каждая строка
MA->>MA: Regex: "^Rule\D*(\d*).*"
MA->>MA: Извлечь UCI (числовой ID)
MA->>MA: Создать MissionInfo{id, createdAt}
end
MA-->>MT: MissionInfo[]
MT->>MT: Сериализовать в JSON
MT->>MQ: send(BytesMessage с JSON)
MQ->>BE: BytesMessage → List<MissionInfo>
end
Потоковая модель (Threading)
Section titled “Потоковая модель (Threading)”graph TB
APP["App.run()"]
subgraph T1["comOutputThread"]
direction TB
O1["Ждать authLock\n(аутентификация)"]
O2["Читать channel-command"]
O3["Обернуть в кадр"]
O4["Запись в :16118"]
O1 --> O2 --> O3 --> O4 --> O2
end
subgraph T2["comInputThread"]
direction TB
I1["Читать :16118"]
I2["Messenger.readAnswer()"]
I3{"code?"}
I4["authLock.notify()"]
I5["resetLock.notify()"]
I6["Запись в channel-answer"]
I1 --> I2 --> I3
I3 -->|0x81| I4
I3 -->|0x87| I5
I3 -->|прочие| I6
I4 --> I1
I5 --> I1
I6 --> I1
end
subgraph T3["dataThread"]
direction TB
D1["Читать :16117"]
D2["NotificationMessage.readNotify()"]
D3{"code?"}
D4["ACK → :16117"]
D5["Запись в channel-data"]
D6{"count % 90 == 0?"}
D7["Групповой ACK"]
D1 --> D2 --> D3
D3 -->|0x7D keepalive| D4 --> D1
D3 -->|данные| D5 --> D6
D6 -->|да| D7 --> D1
D6 -->|нет| D1
end
subgraph T4["missionStatusThread (опц.)"]
direction TB
M1["HTTP GET /rules"]
M2["Парсинг regex"]
M3["Запись в channel-missions"]
M4["Пауза 60 сек"]
M1 --> M2 --> M3 --> M4 --> M1
end
APP --> T1
APP --> T2
APP --> T3
APP --> T4
Очереди ActiveMQ
Section titled “Очереди ActiveMQ”| Очередь | Направление | Delivery | Описание |
|---|---|---|---|
channel-command | → Connector (вход) | PERSISTENT | Команды от бэкенда к СОРМ-2 |
channel-answer | ← Connector (выход) | PERSISTENT | Ответы СОРМ-2 на команды |
channel-data | ← Connector (выход) | NON_PERSISTENT | Перехваченный трафик (высокая частота) |
channel-missions | ← Connector (выход) | PERSISTENT | Обновления списка активных миссий |
Ключевые классы
Section titled “Ключевые классы”classDiagram
class Runner {
+main(args)
-retryLoop() : 5 сек задержка при ошибке
}
class App {
-Configurer configurer
-Network network
-Amq amq
-int iDent
-Object authLock
-Object resetLock
+run()
-comOutputThread()
-comInputThread()
-dataThread()
-missionStatusThread()
}
class Configurer {
+String hostSorm
+int portComChannel
+int portDataChannel
+String activeMqAddress
+boolean resetState
+boolean missionStatusState
+String missionStatusUrl
}
class Network {
-Socket tcpComSock
-Socket tcpDataSock
+connect()
+getComIn() InputStream
+getComOut() OutputStream
+getDataIn() InputStream
+close()
}
class Amq {
-Connection connection
-Session session
+Queue commandQueue
+Queue answerQueue
+Queue dataQueue
+Queue missionsQueue
+MessageConsumer commandConsumer
+MessageProducer answerProducer
+MessageProducer dataProducer
+MessageProducer missionsProducer
+createConnection()
}
class Messenger {
+readAnswer(InputStream) Answer
}
class Answer {
+int code
+byte[] data
}
class NotificationMessage {
-int code
-byte FRp
-byte FRs
-int length
-byte[] payload
-boolean existPayload
-int cnn
-byte[] nodeId
+readNotify(InputStream)
}
class Additional {
+createAuthMessage(iDent) byte[]
+createResetMessage(iDent) byte[]
+makeAskData(FRp, FRs) byte[]
}
class Utils {
+getBlockType(cnn) String
+isEndOfStream(cnn) boolean
}
class MissionActualization {
+parse(lines) List~MissionInfo~
}
class MissionInfo {
+Short id
+Date createdAt
}
Runner --> App
App --> Configurer
App --> Network
App --> Amq
App --> Messenger
App --> NotificationMessage
App --> Additional
App --> MissionActualization
Messenger --> Answer
MissionActualization --> MissionInfo
Устойчивость и обработка ошибок
Section titled “Устойчивость и обработка ошибок”graph TD
START["Runner.main()"]
BUILD["app.build()\n(подключение к СОРМ + ActiveMQ)"]
RUN["app.run()\n(запуск потоков)"]
ERR{"Ошибка?\n(IOException/JMSException)"}
CLOSE["Закрыть Network + Amq"]
WAIT["Ждать 5 секунд"]
RETRY["Повторить"]
START --> BUILD
BUILD --> ERR
ERR -->|нет| RUN
ERR -->|да| CLOSE
CLOSE --> WAIT
WAIT --> RETRY
RETRY --> BUILD
subgraph "Внутри dataThread"
D_READ["readNotify()"]
D_ERR{"Исключение?"}
D_EOF{"EOF (-1)?"}
D_CONT["Продолжить\n(existPayload=false)"]
D_SLEEP["Пауза 1 сек"]
D_READ --> D_ERR
D_ERR -->|да| D_CONT
D_READ --> D_EOF
D_EOF -->|да| D_SLEEP
D_SLEEP --> D_READ
end
Конфигурация
Section titled “Конфигурация”Файл: config/config.properties
| Параметр | Значение по умолчанию | Описание |
|---|---|---|
sorm.host | 10.10.77.3 | IP-адрес оборудования СОРМ-2 |
sorm.port.command | 16118 | Порт командного канала TCP |
sorm.port.data | 16117 | Порт канала данных TCP |
sorm.state.reset | false | Отправить reset при старте |
activemq.address | tcp://10.10.77.11:61616?wireFormat.maxInactivityDuration=0 | URL брокера ActiveMQ |
mission-status.state | false | Включить опрос статуса миссий |
mission-status.url | http://localhost:9090/rules | HTTP-эндпоинт списка миссий |
Логирование
Section titled “Логирование”Файл конфигурации: src/main/resources/log4j2.xml
| Параметр | Значение |
|---|---|
| Файл лога | data/logs/S2Conn.log |
| Ротация | По размеру 8 МБ |
| Архивирование | .gz сжатие |
Уровень (пакет com.codeyz) | DEBUG |
| Уровень (root) | DEBUG |
| Вывод | Файл + консоль |
| Формат | %d{yyyy-MM-dd HH:mm:ss} %p %m%n |
Технологический стек
Section titled “Технологический стек”| Компонент | Технология |
|---|---|
| Язык | Java 11 |
| Сборка | Maven 4.0.0 |
| Артефакт | Fat JAR (maven-assembly-plugin) |
| Брокер сообщений | Apache ActiveMQ 5.7.0 (activemq-core) |
| Сериализация JSON | Jackson Databind 2.11.0 |
| Логирование | Log4j2 2.17.2 |
| Тесты | JUnit 5.4.2, Mockito 4.0.0 |
| Контейнер | Docker (OpenJDK 11) |
Структура файлов
Section titled “Структура файлов”Directorysorm2-connector
- pom.xml
- Dockerfile
Directoryconfig
- config.properties ← Конфигурация (хост СОРМ, ActiveMQ)
Directorysrc
Directorymain
Directoryjava
Directorycom
Directorycodeyz
- Runner.java ← Точка входа, retry-петля
- App.java ← Оркестратор 4 потоков
- Configurer.java ← Хранитель конфигурации
- ConfigBuilder.java ← Загрузчик config.properties
Directoryapp
- Network.java ← TCP-сокеты к СОРМ-2
- Amq.java ← JMS-соединение и очереди
- Messenger.java ← Парсер ответов командного канала
- Answer.java ← DTO ответа (code + data[])
- NotificationMessage.java ← Парсер кадров канала данных
- Additional.java ← Фабрика протокольных кадров
- Utils.java ← Интерпретатор CNN-байта
- MissionActualization.java ← Парсер HTTP-списка миссий