UgCS video streamer
streamer.h
1 #ifndef INCLUDE_MUXER_H_
2 #define INCLUDE_MUXER_H_
3 
4 #include <log.h>
5 #include <net.h>
6 #include <ffmpeg.h>
7 #include <live_input_session.h>
8 #include <telemetry.h>
9 #include <output_streamer.h>
10 #include <transcoder.h>
11 #include <video_streaming_protocol.h>
12 #include <urtp/handler.h>
13 
14 
15 /** Top-level component for producing MISP-compliant video stream. */
16 class Streamer {
17 public:
18  /** Should match MispStreamer.Params in managed code. */
19  struct Params {
20  /** Vehicle ID (typically serial number). */
22  /** User-defined tail number, may be empty. */
24  /** Target URI for streaming. Examples:
25  * * `udp://1.2.3.4:3338` - Plain UDP streaming, suitable for external MISP-compliant players
26  * such as GIS software or just a regular media player (metadata will not be available
27  * in such case).
28  * * `urtp+connect://1.2.3.4://3341` - URTP streaming with outgoing connection. URTP is UgCS
29  * proprietary protocol supported by its Command Center components. It ensures better
30  * quality on unreliable links.
31  */
33  /** Zero value disables transcoding, -1 for auto (based on input bitrate and scale),
34  * otherwise value in bits/s.
35  */
36  int64_t transcodeBitrate = 0;
37  /** Frame size scale factor for transcoding output, 1.0 to preserve input size. */
38  float transcodeScale = 1.0;
39  /** Frame rate limit for transcoding output, fps. Zero to preserve all input frames. */
41  };
42 
43  /** Should match MispStreamer.Status in managed code. */
44  enum class Status {
45  /** Stream is being initialized (format is probed, codec parameters determined etc.). */
47  /** Stream is being sent normally. */
49  /** Connection has been closed unexpectedly. */
51  /** Streaming protocol version mismatch. */
53  /** Other streaming failure (e.g. network interface lost). */
55  /** Streamer was stopped normally. */
56  FINISHED
57  };
58 
59  /** Handler methods are called in a dedicated thread. */
61  public:
62  virtual
63  ~EventsListener()
64  {}
65 
66  /** Called on streamer status change. */
67  virtual void
68  OnStatus(Status status) = 0;
69 
70  /** Called when queue is overflowed or when it is consumed again.
71  * @param status True when overflowed, false when back in normal state.
72  */
73  virtual void
74  OnQueueOverflow(bool status) = 0;
75  };
76 
77  /**
78  * @param params Streaming parameters.
79  * @param eventsListener Optional listener for several event types.
80  */
81  Streamer(const Params &params, std::unique_ptr<EventsListener> &&eventsListener = nullptr);
82 
83  /** Start the component. It cannot be used until started. */
84  void
85  Start();
86 
87  /** Stop the component releasing all used resources. The component cannot be used after that. */
88  void
89  Stop();
90 
91  /** Set new tail number. */
92  void
93  SetTailNumber(const std::string &tailNumber);
94 
95  /** Feed next chunk of raw H.264 video stream. There are no requirements for chunks alignment
96  * and sizes respectively to H.264 protocol data units.
97  */
98  void
99  FeedData(const net::SingleBuffer::Ptr &pkt);
100 
101  /** Set most recent telemetry values to send in next metadata packet. Fields may be updated
102  * selectively, only non-empty values are accounted in each invocation.
103  */
104  void
105  FeedTelemetry(const Telemetry &telemetry);
106 
107  /** Get various streaming statistics. The statistics does not have any fixed structure (actual
108  * content is a subject for change at any time) and is intended for display/logging only. No
109  * any logic should be bound to.
110  */
111  StructuredData
112  GetStatistics() const;
113 
114  /** @return Peer protocol version if known, zero if not known. The call is legal only for URTP
115  * target.
116  */
117  int
118  GetUrtpPeerVersion() const;
119 
120 private:
121  constexpr static auto TELEMETRY_SEND_INTERVAL = std::chrono::milliseconds(500);
122  constexpr static int MIN_SCALED_FRAME_SIZE = 32;
123 
124  Logger _log{Log::GetLogger("Streamer")};
125  Params params;
126  const std::unique_ptr<EventsListener> eventsListener;
127  misp::StreamingTarget target;
128  asio::io_context ioCtx;
129  std::thread thread;
130  misp::LiveInputSession inputSession;
131  std::unique_ptr<misp::OutputStreamer> outputStreamer;
132  Optional<misp::Transcoder> transcoder;
133  ev::ConnectionHandle inputStateConn, inputQueueStatusConn, outputQueueStatusConn, urtpStateConn;
134  /** Values pending for next transmission. */
135  Telemetry pendingTelemetry;
136  asio::steady_timer telemetryTimer;
137  ProcessingContext statusEventsCtx;
138  std::thread statusEventsThread;
139  /** Tail number value to send in KLV. */
140  std::string klvTailNumber;
141  Status curStatus = Status::INITIALIZING;
142  bool inputQueueOverflow = false, outputQueueOveflow = false;
143 
144  void
145  Run();
146 
147  /** @return true on success, false on failure (status set). */
148  bool
149  CreateOutput();
150 
151  void
152  HandleInputStateChange(misp::InputSession::State state);
153 
154  void
155  SetTelemetryTimer();
156 
157  void
158  OnTelemetryTimer(const asio::error_code &error);
159 
160  void
161  SendTelemetry();
162 
163  void
164  StatusEventsLoop();
165 
166  /** Should be called in muxer context only. */
167  void
168  SetStatus(Status status);
169 
170  void
171  OnQueueOverflowStatus(bool isInput, bool status);
172 
173  void
174  OnUrtpState(misp::urtp::UrtpHandler::State state, misp::urtp::UrtpHandler::ErrorCode errorCode,
175  const std::string &errorMsg);
176 
177  bool
178  CanAcceptData() const
179  {
180  return curStatus < Status::CONNECTION_FAILURE;
181  }
182 };
183 
184 #endif /* INCLUDE_MUXER_H_ */
Streamer::GetStatistics
StructuredData GetStatistics() const
Streamer::Params
Definition: streamer.h:19
Streamer::FeedData
void FeedData(const net::SingleBuffer::Ptr &pkt)
Streamer::Params::transcodeBitrate
int64_t transcodeBitrate
Definition: streamer.h:36
Streamer::Status::OTHER_FAILURE
@ OTHER_FAILURE
Streamer::SetTailNumber
void SetTailNumber(const std::string &tailNumber)
Streamer::GetUrtpPeerVersion
int GetUrtpPeerVersion() const
Streamer::Status::STREAMING_PROTOCOL_BAD_VERSION
@ STREAMING_PROTOCOL_BAD_VERSION
Streamer
Definition: streamer.h:16
Streamer::EventsListener::OnQueueOverflow
virtual void OnQueueOverflow(bool status)=0
Streamer::Params::tailNumber
Optional< std::string > tailNumber
Definition: streamer.h:23
Streamer::Status::INITIALIZING
@ INITIALIZING
Streamer::Params::targetUri
std::string targetUri
Definition: streamer.h:32
Streamer::Status::FINISHED
@ FINISHED
Streamer::FeedTelemetry
void FeedTelemetry(const Telemetry &telemetry)
Telemetry
Definition: telemetry.h:9
Streamer::Status::OPERATIONAL
@ OPERATIONAL
Streamer::Streamer
Streamer(const Params &params, std::unique_ptr< EventsListener > &&eventsListener=nullptr)
Streamer::Stop
void Stop()
Streamer::EventsListener
Definition: streamer.h:60
Streamer::Status
Status
Definition: streamer.h:44
Streamer::Params::transcodeScale
float transcodeScale
Definition: streamer.h:38
Streamer::Status::CONNECTION_FAILURE
@ CONNECTION_FAILURE
Streamer::Params::vehicleId
std::string vehicleId
Definition: streamer.h:21
Streamer::Start
void Start()
Streamer::Params::transcodeFramerateLimit
float transcodeFramerateLimit
Definition: streamer.h:40
Streamer::EventsListener::OnStatus
virtual void OnStatus(Status status)=0