Elixir WebSockets via Phoenix Framework: Real-time Communication
Phoenix 1.6 and Elixir 1.12 have revolutionized real-time web applications. With Phoenix Channels, building WebSocket-powered applications becomes as natural as writing regular HTTP endpoints.
Why Phoenix Channels Dominate
Traditional WebSocket implementations are fragile, complex, and don’t scale. Phoenix Channels solve this with:
- Fault-tolerant connections that survive process crashes
- Built-in authentication and authorization
- Automatic connection recovery
- Distributed PubSub across multiple servers
- Presence tracking for user status
All this runs on the BEAM VM, handling millions of concurrent connections effortlessly.
Phoenix Channels Architecture
Client (Browser/Mobile)
↓ WebSocket/Long Polling
Phoenix Endpoint
↓ Routes to
Channel Module
↓ Communicates via
PubSub System
↓ Across multiple
Phoenix Nodes
Each connection spawns an isolated GenServer process. When a process crashes, only that connection is affected—the rest continue normally.
Basic Channel Setup
1. Define the Socket
# lib/my_app_web/channels/user_socket.ex
defmodule MyAppWeb.UserSocket do
use Phoenix.Socket
# Define channel routes
channel "room:*", MyAppWeb.RoomChannel
channel "user:*", MyAppWeb.UserChannel
channel "game:*", MyAppWeb.GameChannel
# Socket authentication
def connect(%{"token" => token}, socket, _connect_info) do
case verify_user_token(token) do
{:ok, user} ->
socket = assign(socket, :current_user, user)
{:ok, socket}
{:error, _reason} ->
:error
end
end
def connect(_params, _socket, _connect_info), do: :error
# Socket ID for efficient disconnection
def id(socket), do: "user_socket:#{socket.assigns.current_user.id}"
defp verify_user_token(token) do
# Verify JWT or similar
MyApp.Auth.verify_token(token)
end
end
2. Configure the Endpoint
# lib/my_app_web/endpoint.ex
defmodule MyAppWeb.Endpoint do
use Phoenix.Endpoint, otp_app: :my_app
# WebSocket configuration
socket "/socket", MyAppWeb.UserSocket,
websocket: true,
longpoll: false
# ... rest of endpoint config
end
3. Create a Channel
# lib/my_app_web/channels/room_channel.ex
defmodule MyAppWeb.RoomChannel do
use MyAppWeb, :channel
alias MyApp.{Repo, Room, Message}
# Join authorization
def join("room:lobby", _payload, socket) do
{:ok, socket}
end
def join("room:" <> room_id, _payload, socket) do
case Repo.get(Room, room_id) do
%Room{} = room ->
# Check if user can access this room
if can_access_room?(socket.assigns.current_user, room) do
send(self(), :after_join)
{:ok, assign(socket, :room_id, room_id)}
else
{:error, %{reason: "unauthorized"}}
end
nil ->
{:error, %{reason: "room not found"}}
end
end
# Handle incoming messages
def handle_in("new_message", %{"body" => body}, socket) do
user = socket.assigns.current_user
room_id = socket.assigns.room_id
case create_message(user, room_id, body) do
{:ok, message} ->
# Broadcast to all room subscribers
broadcast(socket, "new_message", %{
id: message.id,
body: message.body,
user: %{
id: user.id,
name: user.name
},
timestamp: message.inserted_at
})
{:reply, {:ok, %{id: message.id}}, socket}
{:error, changeset} ->
{:reply, {:error, %{errors: changeset}}, socket}
end
end
def handle_in("typing", %{"typing" => typing}, socket) do
broadcast_from(socket, "user_typing", %{
user_id: socket.assigns.current_user.id,
typing: typing
})
{:noreply, socket}
end
# Handle user leaving
def terminate(_reason, socket) do
broadcast_from(socket, "user_left", %{
user_id: socket.assigns.current_user.id
})
end
# Handle post-join setup
def handle_info(:after_join, socket) do
# Send recent messages
messages = get_recent_messages(socket.assigns.room_id)
push(socket, "recent_messages", %{messages: messages})
# Announce user joined
broadcast_from(socket, "user_joined", %{
user: %{
id: socket.assigns.current_user.id,
name: socket.assigns.current_user.name
}
})
{:noreply, socket}
end
defp can_access_room?(user, room) do
# Room access logic
true
end
defp create_message(user, room_id, body) do
%Message{user_id: user.id, room_id: room_id}
|> Message.changeset(%{body: body})
|> Repo.insert()
end
defp get_recent_messages(room_id) do
Message
|> where(room_id: ^room_id)
|> order_by(desc: :inserted_at)
|> limit(50)
|> preload(:user)
|> Repo.all()
|> Enum.reverse()
end
end
Client-Side JavaScript
// assets/js/socket.js
import {Socket} from "phoenix"
class ChatClient {
constructor() {
this.socket = new Socket("/socket", {
params: {token: this.getAuthToken()}
})
this.socket.connect()
this.setupEventListeners()
}
joinRoom(roomId) {
this.channel = this.socket.channel(`room:${roomId}`, {})
// Handle incoming messages
this.channel.on("new_message", message => {
this.displayMessage(message)
})
this.channel.on("user_joined", data => {
this.showNotification(`${data.user.name} joined`)
})
this.channel.on("user_left", data => {
this.showNotification(`User left`)
})
this.channel.on("user_typing", data => {
this.showTypingIndicator(data.user_id, data.typing)
})
this.channel.on("recent_messages", data => {
data.messages.forEach(message => {
this.displayMessage(message, false) // Don't animate old messages
})
})
// Join and handle responses
this.channel.join()
.receive("ok", resp => {
console.log("Joined successfully", resp)
})
.receive("error", resp => {
console.log("Unable to join", resp)
})
}
sendMessage(body) {
if (!body.trim()) return
this.channel.push("new_message", {body: body})
.receive("ok", resp => {
console.log("Message sent", resp.id)
this.clearInput()
})
.receive("error", resp => {
console.log("Failed to send message", resp.errors)
})
}
sendTypingNotification(typing) {
this.channel.push("typing", {typing: typing})
}
setupEventListeners() {
const messageInput = document.getElementById('message-input')
const sendButton = document.getElementById('send-button')
let typingTimer
// Send message on enter or button click
messageInput.addEventListener('keypress', (e) => {
if (e.key === 'Enter') {
this.sendMessage(messageInput.value)
}
})
sendButton.addEventListener('click', () => {
this.sendMessage(messageInput.value)
})
// Typing indicators
messageInput.addEventListener('input', () => {
this.sendTypingNotification(true)
clearTimeout(typingTimer)
typingTimer = setTimeout(() => {
this.sendTypingNotification(false)
}, 1000)
})
// Connection status
this.socket.onOpen(() => {
this.showConnectionStatus('Connected')
})
this.socket.onError(() => {
this.showConnectionStatus('Connection error')
})
this.socket.onClose(() => {
this.showConnectionStatus('Disconnected')
})
}
displayMessage(message, animate = true) {
const messagesContainer = document.getElementById('messages')
const messageElement = document.createElement('div')
messageElement.className = `message ${animate ? 'animate-in' : ''}`
messageElement.innerHTML = `
<div class="message-header">
<span class="user-name">${message.user.name}</span>
<span class="timestamp">${this.formatTime(message.timestamp)}</span>
</div>
<div class="message-body">${this.escapeHtml(message.body)}</div>
`
messagesContainer.appendChild(messageElement)
messagesContainer.scrollTop = messagesContainer.scrollHeight
}
showTypingIndicator(userId, typing) {
const indicator = document.getElementById(`typing-${userId}`)
if (typing && !indicator) {
const typingElement = document.createElement('div')
typingElement.id = `typing-${userId}`
typingElement.className = 'typing-indicator'
typingElement.textContent = 'Someone is typing...'
document.getElementById('typing-indicators').appendChild(typingElement)
} else if (!typing && indicator) {
indicator.remove()
}
}
getAuthToken() {
return localStorage.getItem('auth_token') || ''
}
formatTime(timestamp) {
return new Date(timestamp).toLocaleTimeString()
}
escapeHtml(text) {
const div = document.createElement('div')
div.textContent = text
return div.innerHTML
}
clearInput() {
document.getElementById('message-input').value = ''
}
showNotification(message) {
// Show toast notification
console.log('Notification:', message)
}
showConnectionStatus(status) {
const statusElement = document.getElementById('connection-status')
if (statusElement) {
statusElement.textContent = status
statusElement.className = `status ${status.toLowerCase().replace(' ', '-')}`
}
}
}
// Initialize chat
window.chatClient = new ChatClient()
Advanced Patterns
1. Private User Channels
defmodule MyAppWeb.UserChannel do
use MyAppWeb, :channel
def join("user:" <> user_id, _payload, socket) do
current_user_id = to_string(socket.assigns.current_user.id)
if user_id == current_user_id do
{:ok, socket}
else
{:error, %{reason: "unauthorized"}}
end
end
def handle_in("mark_notifications_read", %{"ids" => ids}, socket) do
user = socket.assigns.current_user
MyApp.Notifications.mark_as_read(user, ids)
{:reply, :ok, socket}
end
end
2. Presence Tracking
defmodule MyAppWeb.RoomChannel do
use MyAppWeb, :channel
alias MyAppWeb.Presence
def join("room:" <> room_id, _payload, socket) do
send(self(), :after_join)
{:ok, assign(socket, :room_id, room_id)}
end
def handle_info(:after_join, socket) do
user = socket.assigns.current_user
# Track user presence
{:ok, _} = Presence.track(socket, user.id, %{
name: user.name,
avatar: user.avatar_url,
joined_at: inspect(System.system_time(:second))
})
# Send current presence list
push(socket, "presence_state", Presence.list(socket))
{:noreply, socket}
end
# Handle presence updates
def handle_in("update_status", %{"status" => status}, socket) do
user = socket.assigns.current_user
{:ok, _} = Presence.update(socket, user.id, %{
status: status
})
{:noreply, socket}
end
end
# Setup presence in application.ex
def start(_type, _args) do
children = [
MyAppWeb.Presence,
# ... other children
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
Presence module:
defmodule MyAppWeb.Presence do
use Phoenix.Presence,
otp_app: :my_app,
pubsub_server: MyApp.PubSub
end
Client-side presence:
import {Presence} from "phoenix"
// Track presence changes
let presences = {}
channel.on("presence_state", state => {
presences = Presence.syncState(presences, state)
updateUserList(presences)
})
channel.on("presence_diff", diff => {
presences = Presence.syncDiff(presences, diff)
updateUserList(presences)
})
function updateUserList(presences) {
const userList = document.getElementById('user-list')
userList.innerHTML = ''
Presence.list(presences, (id, {metas: [first, ...rest]}) => {
const userElement = document.createElement('div')
userElement.className = 'user-item'
userElement.innerHTML = `
<img src="${first.avatar}" alt="${first.name}" class="avatar">
<span class="name">${first.name}</span>
<span class="status ${first.status || 'online'}">${first.status || 'online'}</span>
`
userList.appendChild(userElement)
})
}
3. Rate Limiting and Security
defmodule MyAppWeb.RoomChannel do
use MyAppWeb, :channel
@rate_limit_window 60_000 # 1 minute
@rate_limit_count 30 # 30 messages per minute
def handle_in("new_message", payload, socket) do
user_id = socket.assigns.current_user.id
case check_rate_limit(user_id) do
:ok ->
handle_new_message(payload, socket)
{:error, :rate_limited} ->
{:reply, {:error, %{reason: "rate_limited"}}, socket}
end
end
defp check_rate_limit(user_id) do
key = "rate_limit:user:#{user_id}"
case MyApp.Cache.increment(key, 1, @rate_limit_window) do
count when count <= @rate_limit_count -> :ok
_count -> {:error, :rate_limited}
end
end
defp handle_new_message(%{"body" => body}, socket) when byte_size(body) > 1000 do
{:reply, {:error, %{reason: "message_too_long"}}, socket}
end
defp handle_new_message(%{"body" => body}, socket) do
sanitized_body = MyApp.ContentFilter.sanitize(body)
# Check for spam/inappropriate content
case MyApp.ContentFilter.validate(sanitized_body) do
:ok ->
create_and_broadcast_message(sanitized_body, socket)
{:error, reason} ->
{:reply, {:error, %{reason: reason}}, socket}
end
end
end
4. Cross-Node Broadcasting
defmodule MyApp.GameUpdates do
alias Phoenix.PubSub
def broadcast_game_state(game_id, state) do
# This works across all connected Phoenix nodes
PubSub.broadcast(MyApp.PubSub, "game:#{game_id}", {:game_update, state})
end
def subscribe_to_game(game_id) do
PubSub.subscribe(MyApp.PubSub, "game:#{game_id}")
end
end
# In your channel
def handle_info({:game_update, state}, socket) do
push(socket, "game_state", state)
{:noreply, socket}
end
Performance and Monitoring
Connection Metrics
defmodule MyAppWeb.ChannelMetrics do
def track_connection(socket) do
:telemetry.execute([:phoenix, :channel, :joined], %{count: 1}, %{
channel: socket.channel,
user_id: socket.assigns[:current_user]&.id
})
end
def track_message(socket, event) do
:telemetry.execute([:phoenix, :channel, :message], %{count: 1}, %{
channel: socket.channel,
event: event
})
end
end
# In your channel
def join(topic, _payload, socket) do
MyAppWeb.ChannelMetrics.track_connection(socket)
# ... rest of join logic
end
Memory Management
# Monitor channel process memory
def handle_info(:check_memory, socket) do
process_info = Process.info(self(), [:memory, :message_queue_len])
if process_info[:memory] > 10_000_000 do # 10MB
Logger.warn("Channel process using excessive memory", process_info)
end
Process.send_after(self(), :check_memory, 30_000) # Check every 30s
{:noreply, socket}
end
Testing Channels
defmodule MyAppWeb.RoomChannelTest do
use MyAppWeb.ChannelCase
alias MyAppWeb.{RoomChannel, UserSocket}
setup do
user = insert(:user)
{:ok, socket} = connect(UserSocket, %{"token" => generate_token(user)})
socket = assign(socket, :current_user, user)
{:ok, socket: socket, user: user}
end
test "joining room:lobby", %{socket: socket} do
{:ok, _, socket} = subscribe_and_join(socket, RoomChannel, "room:lobby")
assert socket.assigns.room_id == "lobby"
end
test "broadcasts new messages", %{socket: socket} do
{:ok, _, socket} = subscribe_and_join(socket, RoomChannel, "room:lobby")
ref = push(socket, "new_message", %{"body" => "Hello world"})
assert_reply ref, :ok, %{id: message_id}
assert_broadcast "new_message", %{id: ^message_id, body: "Hello world"}
end
test "rejects empty messages", %{socket: socket} do
{:ok, _, socket} = subscribe_and_join(socket, RoomChannel, "room:lobby")
ref = push(socket, "new_message", %{"body" => ""})
assert_reply ref, :error, %{errors: _changeset}
end
end
Production Considerations
1. Connection Limits
# config/prod.exs
config :my_app, MyAppWeb.Endpoint,
http: [
port: 4000,
transport_options: [
max_connections: 16_384,
num_acceptors: 10
]
]
2. Heartbeat Configuration
# lib/my_app_web/channels/user_socket.ex
def connect(_params, socket, _connect_info) do
# Custom heartbeat interval
socket = assign(socket, :heartbeat_interval, 30_000) # 30 seconds
{:ok, socket}
end
3. Graceful Shutdowns
def terminate(reason, socket) do
# Clean up resources
MyApp.Presence.untrack(socket, socket.assigns.current_user.id)
# Log disconnection reason
Logger.info("Channel terminated", %{
reason: reason,
user_id: socket.assigns.current_user.id,
channel: socket.channel
})
end
Phoenix Channels provide a robust foundation for real-time applications. With proper authentication, rate limiting, and monitoring, they can handle massive scale while maintaining excellent developer experience.
The combination of Phoenix Channels and Elixir’s actor model creates unmatched reliability and performance for real-time web applications.