Server’s room
Let’s create The Room! ;)
We are still missing probably the most important part - the heart of our application - the implementation of the room.
The room should dispatch messages sent from RTC Engine to appropriate peer channels - and at the same time, it should direct all the messages sent to it via peer channel to the RTC Engine.
Let’s start by creating lib/videoroom/room.ex
file with a declaration of Videoroom.Room module:
#FILE: lib/videoroom/room.ex
defmodule Videoroom.Room do
@moduledoc false
use GenServer
alias Membrane.RTC.Engine
alias Membrane.RTC.Engine.Message
alias Membrane.RTC.Engine.Endpoint.WebRTC
require Membrane.Logger
#we will put something here ;)
end
We will be using OTP’s GenServer to describe the behavior of this module.
Let’s start by adding wrappers for GenServer’s start
and start_link
functions:
#FILE: lib/videoroom/room.ex
def start(init_arg, opts) do
GenServer.start(__MODULE__, init_arg, opts)
end
def start_link(opts) do
GenServer.start_link(__MODULE__, [], opts)
end
Then we are providing the implementation of init/1
callback:
#FILE: lib/videoroom/room.ex
@impl true
def init(room_id) do
Membrane.Logger.info("Spawning room proces: #{inspect(self())}")
rtc_engine_options = [
id: room_id
]
network_options = [
stun_servers: [
%{server_addr: "stun.l.google.com", server_port: 19_302}
],
turn_servers: [],
dtls_pkey: Application.get_env(:membrane_videoroom_demo, :dtls_pkey),
dtls_cert: Application.get_env(:membrane_videoroom_demo, :dtls_cert)
]
{:ok, pid} = Membrane.RTC.Engine.start(rtc_engine_options, [])
Engine.register(pid, self())
{:ok, %{rtc_engine: pid, peer_channels: %{}, network_options: network_options}}
end
For the description of engine_options
please refer to Membrane’s documentation
We are starting Membrane.RTC.Engine
process (we will refer to this process using pid
) which will be serving as an RTC server.
Then we send a message to this process saying that we want to register ourselves (so that the RTC engine will be aware that we are the process responsible for dispatching the messages sent from the RTC engine to the clients).
The last thing we do is return the current state of the GenServer - in our state we are holding a reference to :rtc_engine
which is the id of this process and peer_channels
- the map of the following form: (peer_uuid -> peer_channel_pid). For now, this map is empty.
What’s next? We need to handle the callbacks to properly react to the incoming events. Once again - please take a look at the plugin documentation to find out what types of messages RTC sends and what types of messages RTC expects to receive.
We won’t implement handling all of these messages - only the ones which are crucial to set up the connection between peers, start the process of media streaming and take proper actions when participants disconnect. After finishing the reading of this tutorial you can try to implement handling of other messages (for instance those connected with voice activation detection - :vad_notification
).
Let’s start with handling messages sent to us by RTC.
#FILE: lib/videoroom/room.ex
@impl true
def handle_info(%Message.MediaEvent{to: :broadcast, data: data}, state) do
for {_peer_id, pid} <- state.peer_channels, do: send(pid, {:media_event, data})
{:noreply, state}
end
Here comes the first one - once we receive %Message.MediaEvent{}
from the RTC engine with the :broadcast
specifier, we will send this event to all peers’ channels which are currently saved in the state.peer_channels
map in the state of our GenServer. We need to “reformat” the event description so that the message sent to the peer channel matches the interface defined by us previously, in VideoroomWeb.PeerChannel
. If you are new to GenServers you might wonder what are we returning in this function - in fact, we are returning the state updated while handling this message. In our case, the state will be the same so we do not change anything. :no_reply
means that we do not need to send the response to the sender (who, in our case, is the RTC engine process). The updated state will be then passed to the next callback while handling the next message - and will be updated during the process of handling that message. And so on and so on :)
Here comes the next method:
#FILE: lib/videoroom/room.ex
@impl true
def handle_info(%Message.MediaEvent{to: to, data: data}, state) do
if state.peer_channels[to] != nil do
send(state.peer_channels[to], {:media_event, data})
end
{:noreply, state}
end
The idea here is very similar to the one in the code snippet described previously - we want to direct the messages sent by RTC Engine’s server to the RTC Engine’s client.
The only difference is that the event is about to be sent to a particular user - that is why instead of :broadcast
atom as the second element of event’s tuple we have to
- which is a peer unique id. Since we precisely know to who we should send the message there is nothing else to do than to find the peer channel’s process id associated with the given peer id (we are holding the (peer_id -> peer_channel_pid) mapping in the state of the GenServer!) and to send the message there. Once again the state does not need to change.
There we go with another message sent by RTC engine:
#FILE: lib/videoroom/room.ex
@impl true
def handle_info(%Message.NewPeer{rtc_engine: rtc_engine, peer: peer}, state) do
Membrane.Logger.info("New peer: #{inspect(peer)}. Accepting.")
# get node the peer with peer_id is running on
peer_channel_pid = Map.get(state.peer_channels, peer.id)
peer_node = node(peer_channel_pid)
handshake_opts =
if state.network_options[:dtls_pkey] &&
state.network_options[:dtls_cert] do
[
client_mode: false,
dtls_srtp: true,
pkey: state.network_options[:dtls_pkey],
cert: state.network_options[:dtls_cert]
]
else
[
client_mode: false,
dtls_srtp: true
]
end
endpoint = %WebRTC{
ice_name: peer.id,
extensions: %{},
owner: self(),
stun_servers: state.network_options[:stun_servers] || [],
turn_servers: state.network_options[:turn_servers] || [],
handshake_opts: handshake_opts,
log_metadata: [peer_id: peer.id]
}
Engine.accept_peer(rtc_engine, peer.id)
:ok = Engine.add_endpoint(rtc_engine, endpoint,
peer_id: peer.id,
node: peer_node
)
{:noreply, state}
end
That one might seem a little bit tricky. What is the deal here? Be aware that it is our room’s process who is the only one holding the mapping between peer’s id and peer channel’s PID. Once a new peer joins, the RTC Engine is not aware of this peer channel’s PID. That is it is asking our room process to give him some information about the new peer. Apart from sending just peer channel’s PID, the room process is also sending the identifier of a node on which the peer channel’s process is located (notice that due to the use of BEAM virtual machine our application can be distributed - and server can be put on many different nodes working in the same cluster). Later on, there comes a bunch of option definitions that will be used while defining a WebRTC endpoint. Then we create an endpoint corresponding to the peer who is trying to join. If you are interested in the options available in the WebRTC endpoint, you can read about them here but in most cases, all you would ever want to do with them is to simply copy-paste ;) Finally, we accept the peer and add his endpoint to the RTC Engine.
Here comes the next callback!
Once we receive %Message.PeerLeft{}
message from RTC we simply ignore that fact (we could of course remove the peer_id from the (peer_id->peer_channel_pid) mapping…but do we need to?):
#FILE: lib/videoroom/room.ex
@impl true
def handle_info(%Message.PeerLeft{peer: peer}, state) do
Membrane.Logger.info("Peer #{inspect(peer.id)} left RTC Engine")
{:noreply, state}
end
In case RTC Engine wants to communicate with the client during the signaling process, we know how to react - we are simply passing the message to the appropriate PeerChannel
.
How about messages coming from the client, via the PeerChannel
? We need to pass them to the RTC Engine!
#FILE: lib/videoroom/room.ex
@impl true
def handle_info({:media_event, _from, _event} = msg, state) do
Engine.receive_media_event(state.rtc_engine, msg)
{:noreply, state}
end
Again - no magic tricks there. We are receiving :media_event
- we are sending it to our RTC engine process.
And here come the callback for a :add_peer_channel
message:
#FILE: lib/videoroom/room.ex
@impl true
def handle_info({:add_peer_channel, peer_channel_pid, peer_id}, state) do
state = put_in(state, [:peer_channels, peer_id], peer_channel_pid)
Process.monitor(peer_channel_pid)
{:noreply, state}
end
It is a great example to show how does state updating look like. We are putting into our (peer_id->peer_channel_pid) the new entry - and we are returning
the state updated this way. Meanwhile, we also start monitoring the process with id peer_channel_pid
- to receive :DOWN
message when the peer channel process will be down.
We are almost done! We are monitoring all the peer channels processes. Once they die, we receive :DOWN
message. Let’s handle this event!
#FILE: lib/videoroom/room.ex
@impl true
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
{peer_id, _peer_channel_id} = state.peer_channels
|> Enum.find(fn {_peer_id, peer_channel_pid} -> peer_channel_pid == pid end)
Engine.remove_peer(state.rtc_engine, peer_id)
{_elem, state} = pop_in(state, [:peer_channels, peer_id])
{:noreply, state}
end
First, we find the id of a peer whose channel has died. Then we send a message to the RTC engine telling it to remove peer with given peer_id.
The last thing we do is to update the state - we remove the mapping (peer_id->peer_channel_pid) from our :peer_channels
map.
After all of this hard work, our server is finally ready. But we still need a client application.
NEXT - Client’s application
PREV - Server’s communication channels
List of contents
List of tutorials