Elixir WebSockets via Phoenix Framework: Real-time Communication

Phoenix 1.6 and Elixir 1.12 have revolutionized real-time web applications. With Phoenix Channels, building WebSocket-powered applications becomes as natural as writing regular HTTP endpoints.

Why Phoenix Channels Dominate

Traditional WebSocket implementations are fragile, complex, and don’t scale. Phoenix Channels solve this with:

  • Fault-tolerant connections that survive process crashes
  • Built-in authentication and authorization
  • Automatic connection recovery
  • Distributed PubSub across multiple servers
  • Presence tracking for user status

All this runs on the BEAM VM, handling millions of concurrent connections effortlessly.

Phoenix Channels Architecture

Client (Browser/Mobile)
    ↓ WebSocket/Long Polling
Phoenix Endpoint
    ↓ Routes to
Channel Module
    ↓ Communicates via
PubSub System
    ↓ Across multiple
Phoenix Nodes

Each connection spawns an isolated GenServer process. When a process crashes, only that connection is affected—the rest continue normally.

Basic Channel Setup

1. Define the Socket

# lib/my_app_web/channels/user_socket.ex
defmodule MyAppWeb.UserSocket do
  use Phoenix.Socket

  # Define channel routes
  channel "room:*", MyAppWeb.RoomChannel
  channel "user:*", MyAppWeb.UserChannel
  channel "game:*", MyAppWeb.GameChannel

  # Socket authentication
  def connect(%{"token" => token}, socket, _connect_info) do
    case verify_user_token(token) do
      {:ok, user} ->
        socket = assign(socket, :current_user, user)
        {:ok, socket}
      
      {:error, _reason} ->
        :error
    end
  end

  def connect(_params, _socket, _connect_info), do: :error

  # Socket ID for efficient disconnection
  def id(socket), do: "user_socket:#{socket.assigns.current_user.id}"

  defp verify_user_token(token) do
    # Verify JWT or similar
    MyApp.Auth.verify_token(token)
  end
end

2. Configure the Endpoint

# lib/my_app_web/endpoint.ex
defmodule MyAppWeb.Endpoint do
  use Phoenix.Endpoint, otp_app: :my_app

  # WebSocket configuration
  socket "/socket", MyAppWeb.UserSocket,
    websocket: true,
    longpoll: false

  # ... rest of endpoint config
end

3. Create a Channel

# lib/my_app_web/channels/room_channel.ex
defmodule MyAppWeb.RoomChannel do
  use MyAppWeb, :channel
  alias MyApp.{Repo, Room, Message}

  # Join authorization
  def join("room:lobby", _payload, socket) do
    {:ok, socket}
  end

  def join("room:" <> room_id, _payload, socket) do
    case Repo.get(Room, room_id) do
      %Room{} = room ->
        # Check if user can access this room
        if can_access_room?(socket.assigns.current_user, room) do
          send(self(), :after_join)
          {:ok, assign(socket, :room_id, room_id)}
        else
          {:error, %{reason: "unauthorized"}}
        end
      
      nil ->
        {:error, %{reason: "room not found"}}
    end
  end

  # Handle incoming messages
  def handle_in("new_message", %{"body" => body}, socket) do
    user = socket.assigns.current_user
    room_id = socket.assigns.room_id

    case create_message(user, room_id, body) do
      {:ok, message} ->
        # Broadcast to all room subscribers
        broadcast(socket, "new_message", %{
          id: message.id,
          body: message.body,
          user: %{
            id: user.id,
            name: user.name
          },
          timestamp: message.inserted_at
        })
        
        {:reply, {:ok, %{id: message.id}}, socket}
      
      {:error, changeset} ->
        {:reply, {:error, %{errors: changeset}}, socket}
    end
  end

  def handle_in("typing", %{"typing" => typing}, socket) do
    broadcast_from(socket, "user_typing", %{
      user_id: socket.assigns.current_user.id,
      typing: typing
    })
    
    {:noreply, socket}
  end

  # Handle user leaving
  def terminate(_reason, socket) do
    broadcast_from(socket, "user_left", %{
      user_id: socket.assigns.current_user.id
    })
  end

  # Handle post-join setup
  def handle_info(:after_join, socket) do
    # Send recent messages
    messages = get_recent_messages(socket.assigns.room_id)
    push(socket, "recent_messages", %{messages: messages})

    # Announce user joined
    broadcast_from(socket, "user_joined", %{
      user: %{
        id: socket.assigns.current_user.id,
        name: socket.assigns.current_user.name
      }
    })

    {:noreply, socket}
  end

  defp can_access_room?(user, room) do
    # Room access logic
    true
  end

  defp create_message(user, room_id, body) do
    %Message{user_id: user.id, room_id: room_id}
    |> Message.changeset(%{body: body})
    |> Repo.insert()
  end

  defp get_recent_messages(room_id) do
    Message
    |> where(room_id: ^room_id)
    |> order_by(desc: :inserted_at)
    |> limit(50)
    |> preload(:user)
    |> Repo.all()
    |> Enum.reverse()
  end
end

Client-Side JavaScript

// assets/js/socket.js
import {Socket} from "phoenix"

class ChatClient {
  constructor() {
    this.socket = new Socket("/socket", {
      params: {token: this.getAuthToken()}
    })
    
    this.socket.connect()
    this.setupEventListeners()
  }

  joinRoom(roomId) {
    this.channel = this.socket.channel(`room:${roomId}`, {})
    
    // Handle incoming messages
    this.channel.on("new_message", message => {
      this.displayMessage(message)
    })

    this.channel.on("user_joined", data => {
      this.showNotification(`${data.user.name} joined`)
    })

    this.channel.on("user_left", data => {
      this.showNotification(`User left`)
    })

    this.channel.on("user_typing", data => {
      this.showTypingIndicator(data.user_id, data.typing)
    })

    this.channel.on("recent_messages", data => {
      data.messages.forEach(message => {
        this.displayMessage(message, false) // Don't animate old messages
      })
    })

    // Join and handle responses
    this.channel.join()
      .receive("ok", resp => {
        console.log("Joined successfully", resp)
      })
      .receive("error", resp => {
        console.log("Unable to join", resp)
      })
  }

  sendMessage(body) {
    if (!body.trim()) return

    this.channel.push("new_message", {body: body})
      .receive("ok", resp => {
        console.log("Message sent", resp.id)
        this.clearInput()
      })
      .receive("error", resp => {
        console.log("Failed to send message", resp.errors)
      })
  }

  sendTypingNotification(typing) {
    this.channel.push("typing", {typing: typing})
  }

  setupEventListeners() {
    const messageInput = document.getElementById('message-input')
    const sendButton = document.getElementById('send-button')
    let typingTimer

    // Send message on enter or button click
    messageInput.addEventListener('keypress', (e) => {
      if (e.key === 'Enter') {
        this.sendMessage(messageInput.value)
      }
    })

    sendButton.addEventListener('click', () => {
      this.sendMessage(messageInput.value)
    })

    // Typing indicators
    messageInput.addEventListener('input', () => {
      this.sendTypingNotification(true)
      
      clearTimeout(typingTimer)
      typingTimer = setTimeout(() => {
        this.sendTypingNotification(false)
      }, 1000)
    })

    // Connection status
    this.socket.onOpen(() => {
      this.showConnectionStatus('Connected')
    })

    this.socket.onError(() => {
      this.showConnectionStatus('Connection error')
    })

    this.socket.onClose(() => {
      this.showConnectionStatus('Disconnected')
    })
  }

  displayMessage(message, animate = true) {
    const messagesContainer = document.getElementById('messages')
    const messageElement = document.createElement('div')
    messageElement.className = `message ${animate ? 'animate-in' : ''}`
    
    messageElement.innerHTML = `
      <div class="message-header">
        <span class="user-name">${message.user.name}</span>
        <span class="timestamp">${this.formatTime(message.timestamp)}</span>
      </div>
      <div class="message-body">${this.escapeHtml(message.body)}</div>
    `
    
    messagesContainer.appendChild(messageElement)
    messagesContainer.scrollTop = messagesContainer.scrollHeight
  }

  showTypingIndicator(userId, typing) {
    const indicator = document.getElementById(`typing-${userId}`)
    
    if (typing && !indicator) {
      const typingElement = document.createElement('div')
      typingElement.id = `typing-${userId}`
      typingElement.className = 'typing-indicator'
      typingElement.textContent = 'Someone is typing...'
      
      document.getElementById('typing-indicators').appendChild(typingElement)
    } else if (!typing && indicator) {
      indicator.remove()
    }
  }

  getAuthToken() {
    return localStorage.getItem('auth_token') || ''
  }

  formatTime(timestamp) {
    return new Date(timestamp).toLocaleTimeString()
  }

  escapeHtml(text) {
    const div = document.createElement('div')
    div.textContent = text
    return div.innerHTML
  }

  clearInput() {
    document.getElementById('message-input').value = ''
  }

  showNotification(message) {
    // Show toast notification
    console.log('Notification:', message)
  }

  showConnectionStatus(status) {
    const statusElement = document.getElementById('connection-status')
    if (statusElement) {
      statusElement.textContent = status
      statusElement.className = `status ${status.toLowerCase().replace(' ', '-')}`
    }
  }
}

// Initialize chat
window.chatClient = new ChatClient()

Advanced Patterns

1. Private User Channels

defmodule MyAppWeb.UserChannel do
  use MyAppWeb, :channel

  def join("user:" <> user_id, _payload, socket) do
    current_user_id = to_string(socket.assigns.current_user.id)
    
    if user_id == current_user_id do
      {:ok, socket}
    else
      {:error, %{reason: "unauthorized"}}
    end
  end

  def handle_in("mark_notifications_read", %{"ids" => ids}, socket) do
    user = socket.assigns.current_user
    
    MyApp.Notifications.mark_as_read(user, ids)
    
    {:reply, :ok, socket}
  end
end

2. Presence Tracking

defmodule MyAppWeb.RoomChannel do
  use MyAppWeb, :channel
  alias MyAppWeb.Presence

  def join("room:" <> room_id, _payload, socket) do
    send(self(), :after_join)
    {:ok, assign(socket, :room_id, room_id)}
  end

  def handle_info(:after_join, socket) do
    user = socket.assigns.current_user
    
    # Track user presence
    {:ok, _} = Presence.track(socket, user.id, %{
      name: user.name,
      avatar: user.avatar_url,
      joined_at: inspect(System.system_time(:second))
    })

    # Send current presence list
    push(socket, "presence_state", Presence.list(socket))
    
    {:noreply, socket}
  end

  # Handle presence updates
  def handle_in("update_status", %{"status" => status}, socket) do
    user = socket.assigns.current_user
    
    {:ok, _} = Presence.update(socket, user.id, %{
      status: status
    })
    
    {:noreply, socket}
  end
end

# Setup presence in application.ex
def start(_type, _args) do
  children = [
    MyAppWeb.Presence,
    # ... other children
  ]
  
  opts = [strategy: :one_for_one, name: MyApp.Supervisor]
  Supervisor.start_link(children, opts)
end

Presence module:

defmodule MyAppWeb.Presence do
  use Phoenix.Presence,
    otp_app: :my_app,
    pubsub_server: MyApp.PubSub
end

Client-side presence:

import {Presence} from "phoenix"

// Track presence changes
let presences = {}

channel.on("presence_state", state => {
  presences = Presence.syncState(presences, state)
  updateUserList(presences)
})

channel.on("presence_diff", diff => {
  presences = Presence.syncDiff(presences, diff)
  updateUserList(presences)
})

function updateUserList(presences) {
  const userList = document.getElementById('user-list')
  userList.innerHTML = ''
  
  Presence.list(presences, (id, {metas: [first, ...rest]}) => {
    const userElement = document.createElement('div')
    userElement.className = 'user-item'
    userElement.innerHTML = `
      <img src="${first.avatar}" alt="${first.name}" class="avatar">
      <span class="name">${first.name}</span>
      <span class="status ${first.status || 'online'}">${first.status || 'online'}</span>
    `
    userList.appendChild(userElement)
  })
}

3. Rate Limiting and Security

defmodule MyAppWeb.RoomChannel do
  use MyAppWeb, :channel
  
  @rate_limit_window 60_000 # 1 minute
  @rate_limit_count 30      # 30 messages per minute

  def handle_in("new_message", payload, socket) do
    user_id = socket.assigns.current_user.id
    
    case check_rate_limit(user_id) do
      :ok ->
        handle_new_message(payload, socket)
      
      {:error, :rate_limited} ->
        {:reply, {:error, %{reason: "rate_limited"}}, socket}
    end
  end

  defp check_rate_limit(user_id) do
    key = "rate_limit:user:#{user_id}"
    
    case MyApp.Cache.increment(key, 1, @rate_limit_window) do
      count when count <= @rate_limit_count -> :ok
      _count -> {:error, :rate_limited}
    end
  end

  defp handle_new_message(%{"body" => body}, socket) when byte_size(body) > 1000 do
    {:reply, {:error, %{reason: "message_too_long"}}, socket}
  end

  defp handle_new_message(%{"body" => body}, socket) do
    sanitized_body = MyApp.ContentFilter.sanitize(body)
    
    # Check for spam/inappropriate content
    case MyApp.ContentFilter.validate(sanitized_body) do
      :ok ->
        create_and_broadcast_message(sanitized_body, socket)
      
      {:error, reason} ->
        {:reply, {:error, %{reason: reason}}, socket}
    end
  end
end

4. Cross-Node Broadcasting

defmodule MyApp.GameUpdates do
  alias Phoenix.PubSub

  def broadcast_game_state(game_id, state) do
    # This works across all connected Phoenix nodes
    PubSub.broadcast(MyApp.PubSub, "game:#{game_id}", {:game_update, state})
  end

  def subscribe_to_game(game_id) do
    PubSub.subscribe(MyApp.PubSub, "game:#{game_id}")
  end
end

# In your channel
def handle_info({:game_update, state}, socket) do
  push(socket, "game_state", state)
  {:noreply, socket}
end

Performance and Monitoring

Connection Metrics

defmodule MyAppWeb.ChannelMetrics do
  def track_connection(socket) do
    :telemetry.execute([:phoenix, :channel, :joined], %{count: 1}, %{
      channel: socket.channel,
      user_id: socket.assigns[:current_user]&.id
    })
  end

  def track_message(socket, event) do
    :telemetry.execute([:phoenix, :channel, :message], %{count: 1}, %{
      channel: socket.channel,
      event: event
    })
  end
end

# In your channel
def join(topic, _payload, socket) do
  MyAppWeb.ChannelMetrics.track_connection(socket)
  # ... rest of join logic
end

Memory Management

# Monitor channel process memory
def handle_info(:check_memory, socket) do
  process_info = Process.info(self(), [:memory, :message_queue_len])
  
  if process_info[:memory] > 10_000_000 do # 10MB
    Logger.warn("Channel process using excessive memory", process_info)
  end

  Process.send_after(self(), :check_memory, 30_000) # Check every 30s
  {:noreply, socket}
end

Testing Channels

defmodule MyAppWeb.RoomChannelTest do
  use MyAppWeb.ChannelCase
  alias MyAppWeb.{RoomChannel, UserSocket}

  setup do
    user = insert(:user)
    {:ok, socket} = connect(UserSocket, %{"token" => generate_token(user)})
    socket = assign(socket, :current_user, user)
    {:ok, socket: socket, user: user}
  end

  test "joining room:lobby", %{socket: socket} do
    {:ok, _, socket} = subscribe_and_join(socket, RoomChannel, "room:lobby")
    assert socket.assigns.room_id == "lobby"
  end

  test "broadcasts new messages", %{socket: socket} do
    {:ok, _, socket} = subscribe_and_join(socket, RoomChannel, "room:lobby")
    
    ref = push(socket, "new_message", %{"body" => "Hello world"})
    assert_reply ref, :ok, %{id: message_id}
    assert_broadcast "new_message", %{id: ^message_id, body: "Hello world"}
  end

  test "rejects empty messages", %{socket: socket} do
    {:ok, _, socket} = subscribe_and_join(socket, RoomChannel, "room:lobby")
    
    ref = push(socket, "new_message", %{"body" => ""})
    assert_reply ref, :error, %{errors: _changeset}
  end
end

Production Considerations

1. Connection Limits

# config/prod.exs
config :my_app, MyAppWeb.Endpoint,
  http: [
    port: 4000,
    transport_options: [
      max_connections: 16_384,
      num_acceptors: 10
    ]
  ]

2. Heartbeat Configuration

# lib/my_app_web/channels/user_socket.ex
def connect(_params, socket, _connect_info) do
  # Custom heartbeat interval
  socket = assign(socket, :heartbeat_interval, 30_000) # 30 seconds
  {:ok, socket}
end

3. Graceful Shutdowns

def terminate(reason, socket) do
  # Clean up resources
  MyApp.Presence.untrack(socket, socket.assigns.current_user.id)
  
  # Log disconnection reason
  Logger.info("Channel terminated", %{
    reason: reason,
    user_id: socket.assigns.current_user.id,
    channel: socket.channel
  })
end

Phoenix Channels provide a robust foundation for real-time applications. With proper authentication, rate limiting, and monitoring, they can handle massive scale while maintaining excellent developer experience.


The combination of Phoenix Channels and Elixir’s actor model creates unmatched reliability and performance for real-time web applications.