[Fact] Total

1. equpiment_metrics #

[도메인 개요] #

항목설명
목적설비 상태(온도, 진동, 압력 등)의 실시간 수집 및 이상 탐지
발생 주기초당 1~5건(설비당), 수천 대 설비 스케일 대응
주요 활용처알람 시스템, 정비 예측, 품질 관리 시스템과 연계

Message Schema(Kafka / Avro) #

{
  "timestamp": "2025-05-24T13:00:00Z",
  "equipment_id": "LINE_A_PRESS_001",
  "equipment_type": "hydraulic_press",
  "location": "line_a",
  "schema_version": "v1.2",
  "metrics": [
    {
      "sensor_id": "TEMP_SENSOR_001",
      "sensor_type": "temperature",
      "value": 68.2,
      "unit": "celsius",
      "quality": "good"
    },
    {
      "sensor_id": "VIB_SENSOR_001",
      "sensor_type": "vibration",
      "value": 0.8,
      "unit": "mm/s",
      "quality": "good"
    }
  ],
  "status": {
    "current": "running",
    "previous": "idle",
    "changed_at": "2025-05-24T12:58:30Z"
  },
  "operator_id": "OP001",
  "batch_id": "BATCH_20250524_001",
  "ingestion_timestamp": "2025-05-24T13:00:02Z"
}

이상 탐지 처리 흐름 (Spark) #

  • Join 대상: dim_thresholds (설비+센서+시즌+제품+교대 기준 임계값)
  • 처리 방식: Watermark 15분 + 상태 전이 비교 + 동적 임계값
.withColumn("anomaly", $"value" < $"min_threshold" || $"value" > $"max_threshold")

Iceberg 저장 전략 #

항목내용
테이블명iot.fact_equipment_metrics
파티셔닝days(timestamp), bucket(16, equipment_id)
테이블 유형Append-only, MERGE INTO 지원 준비
적재 주기실시간 스트리밍 적재 (Trigger = ProcessingTime 10s)
스토리지GCS
보관 기간365일 (Time Travel + Metadata Clean 주기 분리)

Kafka Topic 구성 #

iot.equipment_metrics:
  partitions: 20
  replication-factor: 3
  compression.type: lz4
  retention.ms: 604800000  # 7일

iot.equipment_metrics.dlq:
  partitions: 5
  retention.ms: 2592000000 # 30일

DLQ 설계 #

유형조건 예시처리 방식
schema_errorrequired field 누락 / 타입 오류DLQ 전송 + Slack 알림
business_rule_violation센서 품질 “bad”, 측정값 ±99% 이상DLQ 전송 + Grafana 경고
duplicate동일 equipment_id + sensor_type + timestamp 중복Kafka Header로 중복검출
late_arrivalingestion_timestamp - timestamp > 10min로그 + DLQ 기록

주요 모니터링 지표 #

spark_streaming_input_rate: 초당 수신 메시지 수
spark_streaming_processing_time: 배치 처리 시간
kafka_lag_max: 최대 consumer lag
iceberg_write_latency_p99: 99퍼센트ile 적재 지연
equipment_anomaly_rate: 이상 측정 비율
late_arrival_count: 지연 도착 메시지 건수

SLA 기준 #

항목기준
Kafka ingest -> Spark 처리 지연$<= 5s$
Spark 처리 -> Iceberg 적재 지연$<= 10s$
이상 감지 후 알림 전파 시간$3s$
최대 처리량초당 5,000건
DLQ 비율$<= 0.1%$ 권장

연계 도메인 및 데이터 흐름 #

flowchart LR
    KAFKA(Kafka topic<br/>iot.equipment_metrics) --> SPARK(Spark Streaming)
    SPARK --> ICEBERG(Iceberg Table<br/>iot.fact_equipment_metrics)
    SPARK --> ALERT(Kafka topic<br/>iot.alarm_log)
    ALERT --> OPS[운영 시스템 알림]
    ICEBERG --> TRINO[Trino for analytics]

관련 Dimension Table #

테이블명주요 컬럼
dim_equipmentequipment_id, type, install_date, location
dim_operatoroperator_id, name, shift
dim_thresholdsequipment_id, sensor_type, min, max, 조건들
dim_locationlocation, zone, site

2. qc_result #

도메인 개요 #

항목설명
목적생산 공정 중 검사 장비에서 수집된 품질 검사 결과를 이벤트로 기록하고, 이상 항목을 식별하여 품질/정비 시스템에 전달함
발생 주기제품 1개당 1건. 초당 수~수십 건 이상 발생 가능 (라인 병렬 기준)
주요 연계 도메인equipment_metrics, alarm_log, dim_batch, dim_inspection_item, dim_qc_threshold

메시지 스키마(Kafka/Avro) #

{
  "timestamp": "2025-05-24T14:10:00Z",
  "equipment_id": "LINE_B_VISION_001",
  "inspection_type": "vision",
  "product_code": "P12345678",
  "batch_id": "BATCH_20250524_002",
  "operator_id": "OP001",
  "location": "line_b",
  "inspection_items": [
    {
      "item_id": "WIDTH",
      "value": 15.3,
      "unit": "mm"
    },
    {
      "item_id": "SCRATCH",
      "value": 2,
      "unit": "count"
    }
  ],
  "schema_version": "v2.0",
  "ingestion_timestamp": "2025-05-24T14:10:04Z"
}

이상 탐지 처리 흐름(Spark) #

항목내용
기준 테이블dim_qc_threshold
Join Keyitem_id, product_code, inspection_type
이상 조건value < min_threshold OR value > max_threshold
알람 발행 대상이상 발생 시 Kafka: iot.alarm_log에 발행
DLQ 조건누락 필드, 기준 없음, 단위 불일치, 타입 오류 등
.withColumn("defect", $"value" < $"min_threshold" || $"value" > $"max_threshold")

Iceberg 저장 전략 #

항목내용
테이블명iot.fact_qc_result
파티셔닝days(timestamp), bucket(16, equipment_id)
테이블 구조검사 항목별 explode(inspection_items) 후 1 row 1 항목 적재
MERGE 전략Spark upsert 처리 가능: batch_id + item_id + timestamp 기준 dedup
보관 정책365일 + Time Travel / Compaction 주기적 적용

Kafka Topic 구성 #

iot.qc_result:
  partitions: 12
  replication-factor: 3
  compression.type: lz4
  retention.ms: 1209600000  # 14일

iot.qc_result.dlq:
  partitions: 4
  retention.ms: 2592000000  # 30일

DLQ 설계 #

유형조건 예시처리 방식
스키마 오류item_id 누락, unit 없음 등schema_error DLQ
기준 없음item_id + product_code → 기준 없음threshold_missing DLQ
단위 불일치측정단위와 기준단위 불일치 (예: mm vs cm)unit_mismatch DLQ
late arrival (지연도착)ingestion_timestamp - timestamp > 10분late_arrival DLQ

주요 모니터링 지표 #

qc_defect_rate: 불량률 (%)
qc_threshold_miss_rate: 기준값 누락률
qc_unit_mismatch_count: 단위 불일치 건수
qc_operator_repeat_defect: 동일 작업자 반복 불량

SLA 기준 #

항목기준
Kafka ingest → Spark 지연≤ 5초
Spark 처리 → Iceberg 적재≤ 10초
이상 감지 후 알람 전파 시간≤ 3초
최대 처리량초당 3,000건 이상
기준 없는 항목 비율≤ 0.5%

연계 흐름 #

flowchart LR
    KAFKA(Kafka: iot.qc_result) --> SPARK(Spark QC Processor)
    SPARK --> ICEBERG(Iceberg: fact_qc_result)
    SPARK --> ALERT(Kafka: iot.alarm_log)
    ICEBERG --> SUP(Superset 시각화)
    ICEBERG --> TRINO(Ad-hoc Query via Trino)

관련 Dimension Table #

테이블명주요 컬럼
dim_batchbatch_id, product_code, 생산 시작/종료 시간 등
dim_inspection_itemitem_id, item_name, unit, 검사 방식
dim_qc_thresholditem_id, product_code, inspection_type, min, max, unit
dim_equipmentequipment_id, equipment_type, location
dim_operatoroperator_id, name, 자격 등급, 근무조 등

3. energy_usage #

도메인 개요 #

항목설명
목적설비별 에너지 소비량(전기, 스팀, 가스 등)의 수집, 비용 예측, 이상 감지 및 탄소 배출 분석
발생 주기1~10분 주기 보고, 설비별 또는 라인 단위 (설비 병렬화 시 TPS 수천 이상 가능)
주요 연계dim_equipment, dim_energy_type, dim_energy_threshold, maintenance_log

메시지 스키마(Kafka/Avro) #

{
  "timestamp": "2025-05-24T14:15:00Z",
  "equipment_id": "LINE_C_PRESS_003",
  "energy_type": "electricity",
  "usage": 12.7,
  "unit": "kWh",
  "location": "line_c",
  "batch_id": "BATCH_20250524_002",
  "schema_version": "v1.1",
  "ingestion_timestamp": "2025-05-24T14:15:03Z"
}

이상 탐지 처리 흐름(Spark) #

항목내용
기준 테이블dim_energy_threshold
Join 기준equipment_id + energy_type
이상 조건usage > max_threshold 또는 unit ≠ expected_unit
알람 트리거연속 3회 이상 초과 or 특정 시간대 피크 초과
DLQ 사유 분류schema_error, unit_mismatch, threshold_missing, late_arrival
.withColumn("anomaly", $"usage" > $"max_threshold")
.withColumn("unit_mismatch", $"unit" =!= $"expected_unit")

Iceberg 저장 전략 #

항목내용
테이블명iot.fact_energy_usage
파티셔닝days(timestamp), bucket(16, equipment_id)
테이블 유형Append-only + MERGE 지원 구조
적재 지연 허용Watermark 10분 적용 예정
보관 정책365일 + compaction / metadata cleanup 주기 운영

Kafka Topic 구성 #

iot.energy_usage:
  partitions: 12
  replication-factor: 3
  compression.type: zstd
  retention.ms: 1209600000  # 14일

iot.energy_usage.dlq:
  partitions: 4
  retention.ms: 2592000000  # 30일

주요 모니터링 지표 #

iot_energy_usage_total: 전체 소비량
iot_energy_cost_prediction: 예상 vs 실제 요금
iot_energy_peak_over_count: 기준 초과 횟수
iot_energy_unit_mismatch: 단위 불일치 건수
iot_energy_carbon_emission: 탄소 배출량 예측치

SLA 기준 #

항목기준
Kafka ingest → Spark 처리 지연≤ 5초
Spark 처리 → Iceberg 적재 지연≤ 10초
DLQ 비율≤ 0.05%
처리량 유지초당 5,000건 이상 처리 보장
알람 전파 지연≤ 3초

연계 흐름 #

flowchart LR
    KAFKA(Kafka: iot.energy_usage) --> SPARK(Spark Aggregator)
    SPARK --> ICEBERG(Iceberg: fact_energy_usage)
    SPARK --> ALERT(Kafka: iot.alarm_log)
    ICEBERG --> TRINO(Trino: cost/carbon 분석)
    ICEBERG --> SUP(Superset: 전력 추이 대시보드)

관련 Dimension Table #

dim_energy_type

필드명설명
energy_type전력, 스팀, 가스 등 (enum)
standard_unit기준 단위 (예: kWh, m³)
conversion_rate환산 비율 (예: Wh → kWh = 0.001)
carbon_factor탄소 환산계수 (gCO2 per 단위)
cost_per_unit요금 정보

dim_energy_threshold

필드명설명
equipment_id설비 ID
energy_type전력/가스/스팀 등
max_threshold초과 기준
expected_unit기준 단위
valid_from, valid_to적용 기간 (스냅샷용)

4. robot_status #

도메인 개요 #

항목설명
목적공장 내 로봇의 상태 전이, 에러, 배터리 수준, 위치 흐름을 수집하고 장애 감지 및 운영 최적화에 활용
발생 주기초당 1~3건, 수천 대 병렬로 운영될 수 있음
주요 연계dim_robot, dim_location, dim_robot_error_code, alarm_log, robot_heartbeat

메시지 스키마(Kafka/Avro) #

{
  "timestamp": "2025-05-24T14:20:00Z",
  "robot_id": "ROBOT_005",
  "model": "agv_type_c",
  "status": {
    "current": "error",
    "previous": "moving",
    "changed_at": "2025-05-24T14:19:40Z"
  },
  "location": "zone_4",
  "battery_level": 23.5,
  "error_code": "E405",
  "task_id": "TSK_20250524_018",
  "schema_version": "v2.0",
  "ingestion_timestamp": "2025-05-24T14:20:03Z"
}

이상 탐지 처리 흐름(Spark) #

항목내용
상태 enum 고정“idle”, “moving”, “charging”, “error”
이상 조건status.current = “error” OR battery_level < 20 OR error_code IS NOT NULL
오프라인 탐지heartbeat_timeout_minutes = 10 기준, 미보고 시 DLQ or 알람
DLQ 분리schema_error, invalid_enum, unit_violation, missing_error_code, offline_timeout
알람 전파이상 발생 시 iot.alarm_log 토픽으로 발행

Iceberg 저장 전략 #

항목내용
테이블명iot.fact_robot_status
파티셔닝days(timestamp), bucket(16, robot_id)
구조 정리status.* flatten, task_id nullable
중복 제거 키robot_id + timestamp 기준 중복 제거 가능

Kafka Topic 구성 #

iot.robot_status:
  partitions: 12
  replication-factor: 3
  compression.type: zstd
  retention.ms: 604800000  # 7일

iot.robot_status.dlq:
  partitions: 4
  retention.ms: 2592000000  # 30일

주요 모니터링 지표 #

robot_state_error_count
robot_battery_low_alerts
robot_state_transition_rate
robot_offline_timeout_count
robot_avg_battery_drain_per_hour
robot_status_enum_violation

SLA 기준 #

항목기준
상태 보고 지연 허용≤ 3초
Spark 처리 후 Iceberg 적재 지연≤ 10초
상태 오류 감지 후 알람 지연≤ 3초
DLQ 비율≤ 0.1%

연계 흐름 #

flowchart LR
    KAFKA(Kafka: iot.robot_status) --> SPARK(Spark State Monitor)
    SPARK --> ICEBERG(Iceberg: fact_robot_status)
    SPARK --> ALERT(Kafka: iot.alarm_log)
    ICEBERG --> TRINO(Trino: 분석)
    ICEBERG --> SUP(Superset: 상태 전이 시각화)

관련 Dimension Table #

dim_robot

필드명설명
robot_id고유 로봇 ID
modelAGV / 팔로봇 / 드론 등
install_date설치일
heartbeat_cycle정상 보고 주기 (초 단위)
team관리 부서

dim_location

필드명설명
location공장 내 위치 (zone_1 ~ zone_n)
area_type작업 구역 / 충전소 / 대기 공간 등

dim_robot_error_code

필드명설명
error_code“E405” 등 코드
description“서보 모터 과열” 등 설명
severity“critical”, “warn” 등

5. environmental_readings #

도메인 개요 #

항목설명
목적생산/작업 구역의 환경 데이터(온도, 습도, CO₂ 등)를 수집하고 기준 초과 감지 및 생산 품질 상관 분석에 활용
발생 주기1~10분 간격, Zone 단위 / 작업장 단위
주요 연계dim_location, dim_env_sensor, dim_env_threshold, alarm_log

메시지 스키마(Kafka/Avro) #

{
  "timestamp": "2025-05-24T14:30:00Z",
  "location": "line_d_zone_1",
  "readings": [
    {
      "sensor_id": "ENV_TEMP_01",
      "sensor_type": "temperature",
      "value": 29.8,
      "unit": "C"
    },
    {
      "sensor_id": "ENV_CO2_01",
      "sensor_type": "co2",
      "value": 1100,
      "unit": "ppm"
    }
  ],
  "schema_version": "v1.0",
  "ingestion_timestamp": "2025-05-24T14:30:04Z"
}

이상 탐지 처리 흐름(Spark) #

항목내용
기준 테이블dim_env_threshold
Join Keysensor_type + location
이상 조건value > max_threshold OR value < min_threshold
단위 불일치unit ≠ expected_unit (예: °C vs ℉)
연속 이상 감지CO₂ 3회 이상 초과 시 알람 전파
DLQ 유형 분리schema_error, unit_mismatch, threshold_missing, late_arrival
.withColumn("is_anomaly", $"value" > $"max_threshold" || $"value" < $"min_threshold")
.withColumn("unit_mismatch", $"unit" =!= $"expected_unit")

Iceberg 저장 전략 #

항목내용
테이블명iot.fact_environmental_readings
파티셔닝days(timestamp), location
테이블 구조explode(readings) → sensor 1건 = row 1건
MERGE 전략sensor_id + timestamp 기준 중복 제거 가능

Kafka Topic 구성 #

iot.environmental_readings:
  partitions: 8
  replication-factor: 3
  compression.type: zstd
  retention.ms: 1209600000  # 14일

iot.environmental_readings.dlq:
  partitions: 3
  retention.ms: 2592000000  # 30일

주요 모니터링 지표 #

env_anomaly_count_by_location
env_co2_over_1000_ppm_count
env_temperature_range_breach
env_unit_mismatch_count
env_missing_threshold_ratio

SLA 기준 #

항목기준
ingest → Spark 처리 지연≤ 5초
Spark → Iceberg 적재 지연≤ 10초
이상 감지 후 알람 전파 지연≤ 3초
DLQ 비율≤ 0.2%

연계 흐름 #

flowchart LR
    KAFKA(Kafka: iot.environmental_readings) --> SPARK(Spark Env Monitor)
    SPARK --> ICEBERG(Iceberg: fact_environmental_readings)
    SPARK --> ALERT(Kafka: iot.alarm_log)
    ICEBERG --> TRINO(Trino: 실내공기 품질 분석)
    ICEBERG --> SUP(Superset: 환경지표 시각화)

관련 Dimension Table #

dim_env_sensor

필드명설명
sensor_id고유 센서 ID
sensor_type온도 / 습도 / CO₂ 등
unit측정 단위
accuracy신뢰도 ±값
install_date설치일

dim_env_threshold

필드명설명
sensor_type센서 타입 (co2, temperature 등)
location적용 위치 (zone 단위)
min_threshold최소 허용값
max_threshold최대 허용값
expected_unit기준 단위 (예: C, ppm)
valid_from/to적용 기간 (시즌별 기준 스냅샷 대응 가능)

dim_location

필드명설명
locationzone_1, zone_2 …
site공장명 / 물류센터명
area_type작업장 / 휴게실 / 충전소 등

6. device_status #

[Fact Domain] device_status #

도메인 개요 #

항목설명
목적센서, 로봇, 설비 등 IoT 기기의 상태 보고(ping, 지연, 오류) 및 오프라인/이상 상태 감지
보고 주기기기별 heartbeat 주기 (30~180초)
연계 도메인dim_device, dim_device_threshold, dim_device_error_code, alarm_log

메시지 스키마(Kafka/Avro) #

{
  "timestamp": "2025-05-24T14:35:00Z",
  "device_id": "SENSOR_ENV_001",
  "device_type": "env_sensor",
  "location": "zone_7",
  "ip_address_hash": "0x7e239a...",  // ✅ 원본 IP는 저장 안함
  "status": {
    "ping_success": true,
    "latency_ms": 43,
    "last_success_at": "2025-05-24T14:34:30Z"
  },
  "error_code": null,
  "schema_version": "v2.0",
  "ingestion_timestamp": "2025-05-24T14:35:02Z"
}

이상 탐지 처리 흐름(Spark) #

항목내용
기준 테이블dim_device_threshold
Join Keydevice_id, device_type
이상 조건ping_success = false OR latency_ms > max_latency
timeout 판단timestamp - last_success_at > timeout_threshold
timeout 계산 기준timeout_threshold = heartbeat_cycle * 2.5
error_code 정규화dim_device_error_code 조인으로 설명 및 심각도 분석
DLQ 조건unit_mismatch, schema_error, timeout, error_code missing 등

Iceberg 저장 전략 #

항목내용
테이블명iot.fact_device_status
파티셔닝days(timestamp), bucket(16, device_id)
중복 제거 기준device_id + timestamp
TTL 정책최근 90일 (단기 장애 탐지용)

Kafka Topic 구성 #

iot.device_status:
  partitions: 8
  replication-factor: 3
  compression.type: zstd
  retention.ms: 604800000  # 7일

iot.device_status.dlq:
  partitions: 3
  retention.ms: 2592000000  # 30일

DLQ 설계 #

유형조건 예시처리 방식
ping 실패 (ping_success=false)즉시 DLQ or 알람ping_failure DLQ
latency 초과latency_ms > 300 등latency_violation DLQ
timeout 상태timestamp - last_success_at > 300초 등offline_timeout DLQ
error_code 누락status = false인데 코드 없음missing_error_code DLQ
invalid enum/typeschema 오류, 잘못된 device_type 등
schema_error DLQ

주요 모니터링 지표 #

device_status_success_ratio
device_offline_timeout_count
device_latency_p95
device_ping_failure_count
device_error_code_distribution

SLA 기준 #

항목기준
상태 보고 수집 지연≤ 3초
Spark → Iceberg 적재 지연≤ 10초
DLQ 비율≤ 0.1%
오프라인 탐지 지연 허용≤ 60초
알람 전파 지연≤ 2초

연계 흐름 #

flowchart LR
    KAFKA(Kafka: iot.device_status) --> SPARK(Spark Device Monitor)
    SPARK --> ICEBERG(Iceberg: fact_device_status)
    SPARK --> ALERT(Kafka: iot.alarm_log)
    ICEBERG --> TRINO(장치별 연결 안정성 분석)
    ICEBERG --> SUP(Superset: 장애 발생 위치 시각화)

관련 Dimension Table #

dim_device

필드명설명
device_id고유 ID
device_type센서, 로봇, 게이트웨이 등
location위치
install_date설치일
heartbeat_cycle보고 주기 (초)

dim_device_threshold

필드명설명
device_type센서/로봇 구분
max_latency지연 임계값 (ms)
timeout_multiplier기본값 2.5 → timeout = heartbeat × multiplier

dim_device_error_code

필드명설명
error_code코드
description오류 설명
severitywarning/critical