Piping Basics: Elegant Data Transformation in Elixir

The pipe operator |> is one of Elixir’s most distinctive features. It transforms nested, hard-to-read function calls into elegant, readable pipelines that flow naturally from left to right.

The Problem with Nested Function Calls

Consider this typical data transformation:

# ❌ Hard to read - inside-out thinking
result = String.upcase(
  String.trim(
    String.replace(
      String.downcase(" Hello WORLD "),
      "world",
      "elixir"
    )
  )
)
# "HELLO ELIXIR"

Reading this requires mental stack management. You have to:

  1. Start from the innermost function
  2. Work your way outward
  3. Keep track of intermediate results mentally

The Pipe Operator Solution

The pipe operator |> takes the result of the expression on its left and passes it as the first argument to the function on its right:

# ✅ Readable pipeline - natural flow
result = " Hello WORLD "
|> String.downcase()
|> String.replace("world", "elixir")  
|> String.trim()
|> String.upcase()
# "HELLO ELIXIR"

This reads like a recipe: take the input, then do step 1, then step 2, etc.

Basic Piping Rules

Rule 1: First Argument Insertion

The pipe operator always passes the left side as the first argument to the right side:

# These are equivalent:
"hello" |> String.upcase()
String.upcase("hello")

# These are equivalent:
[1, 2, 3] |> Enum.map(&(&1 * 2))
Enum.map([1, 2, 3], &(&1 * 2))

# These are equivalent:
%{a: 1} |> Map.put(:b, 2)
Map.put(%{a: 1}, :b, 2)

Rule 2: Function Call Syntax

When piping, you need parentheses to call functions:

# ✅ Correct
"hello" |> String.upcase()

# ❌ This creates a function reference, doesn't call it
"hello" |> String.upcase

Rule 3: Single Expression Per Line

Format pipelines with one expression per line for maximum readability:

# ✅ Readable
user_data
|> validate_input()
|> normalize_fields()
|> save_to_database()
|> send_confirmation_email()

# ❌ Hard to read
user_data |> validate_input() |> normalize_fields() |> save_to_database()

Common Piping Patterns

Data Processing Pipelines

defmodule DataProcessor do
  def process_user_input(raw_input) do
    raw_input
    |> String.trim()                    # Remove whitespace
    |> String.downcase()                # Normalize case
    |> String.split(",")                # Split on commas
    |> Enum.map(&String.trim/1)         # Trim each item
    |> Enum.reject(&(&1 == ""))         # Remove empty strings
    |> Enum.uniq()                      # Remove duplicates
  end
  
  def analyze_numbers(number_strings) do
    number_strings
    |> Enum.map(&String.to_integer/1)   # Convert to integers
    |> Enum.filter(&(&1 > 0))           # Keep positive numbers
    |> Enum.sort()                      # Sort ascending
    |> calculate_statistics()           # Custom function
  end
  
  defp calculate_statistics(numbers) do
    %{
      count: length(numbers),
      sum: Enum.sum(numbers),
      average: Enum.sum(numbers) / length(numbers),
      min: Enum.min(numbers),
      max: Enum.max(numbers)
    }
  end
end

Map/List Transformations

defmodule Transformations do
  def process_users(users) do
    users
    |> Enum.filter(&(&1.active))                    # Active users only
    |> Enum.map(&normalize_user/1)                  # Normalize data
    |> Enum.group_by(&(&1.role))                    # Group by role
    |> Enum.map(fn {role, users} ->                 # Transform groups
         {role, Enum.count(users)}
       end)
    |> Map.new()                                    # Convert to map
  end
  
  def user_summary(users) do
    users
    |> Enum.map(&extract_key_info/1)               # Extract relevant fields
    |> Enum.sort_by(&(&1.name))                   # Sort by name
    |> Enum.take(10)                              # Top 10 only
    |> Enum.map(&format_user_display/1)           # Format for display
  end
  
  defp normalize_user(user) do
    %{
      user |
      email: String.downcase(user.email),
      name: String.trim(user.name)
    }
  end
  
  defp extract_key_info(user) do
    %{
      id: user.id,
      name: user.name,
      email: user.email,
      last_login: user.last_login
    }
  end
  
  defp format_user_display(user) do
    "#{user.name} (#{user.email})"
  end
end

Advanced Piping Techniques

Using Anonymous Functions in Pipes

Sometimes you need to call functions that don’t take the piped value as their first argument:

defmodule AdvancedPiping do
  # When the piped value isn't the first argument
  def process_config(config_path) do
    config_path
    |> File.read!()                              # Read file
    |> Jason.decode!()                           # Parse JSON
    |> then(&Map.merge(default_config(), &1))    # Merge with defaults
    |> validate_config()                         # Custom validation
  end
  
  # Using then/2 for complex transformations (Elixir 1.12+)
  # For older versions, use anonymous functions
  def legacy_process_config(config_path) do
    config_path
    |> File.read!()
    |> Jason.decode!()
    |> (&Map.merge(default_config(), &1)).()     # Anonymous function call
    |> validate_config()
  end
  
  defp default_config do
    %{timeout: 5000, retries: 3}
  end
  
  defp validate_config(config) do
    # Validation logic here
    config
  end
end

Conditional Piping

Handle conditional transformations in pipelines:

defmodule ConditionalPiping do
  def process_user_data(user_data, opts \\ []) do
    user_data
    |> validate_required_fields()
    |> normalize_email()
    |> (&if opts[:uppercase_name], do: uppercase_name(&1), else: &1).()
    |> (&if opts[:add_timestamp], do: add_timestamp(&1), else: &1).()
    |> save_user()
  end
  
  # Better approach: extract conditional logic
  def process_user_data_v2(user_data, opts \\ []) do
    user_data
    |> validate_required_fields()
    |> normalize_email()
    |> maybe_uppercase_name(opts[:uppercase_name])
    |> maybe_add_timestamp(opts[:add_timestamp])
    |> save_user()
  end
  
  defp maybe_uppercase_name(user, true), do: uppercase_name(user)
  defp maybe_uppercase_name(user, _), do: user
  
  defp maybe_add_timestamp(user, true), do: add_timestamp(user)
  defp maybe_add_timestamp(user, _), do: user
  
  # Implementation functions...
  defp validate_required_fields(user), do: user
  defp normalize_email(user), do: user
  defp uppercase_name(user), do: user
  defp add_timestamp(user), do: user
  defp save_user(user), do: user
end

Error Handling in Pipelines

Handle errors gracefully without breaking the pipeline:

defmodule ErrorHandling do
  # Using with for error-prone pipelines
  def process_file(filename) do
    with {:ok, content} <- File.read(filename),
         {:ok, json} <- Jason.decode(content),
         {:ok, validated} <- validate_structure(json),
         {:ok, processed} <- process_data(validated) do
      {:ok, processed}
    else
      {:error, reason} -> {:error, reason}
    end
  end
  
  # Using custom error handling functions
  def process_user_registration(params) do
    params
    |> validate_email()
    |> case do
         {:ok, params} -> 
           params
           |> hash_password()
           |> save_user()
           |> send_welcome_email()
         {:error, _} = error -> error
       end
  end
  
  # Pipeline with result tuples
  def safe_pipeline(input) do
    {:ok, input}
    |> safe_step(&validate_input/1)
    |> safe_step(&process_data/1)  
    |> safe_step(&format_output/1)
  end
  
  defp safe_step({:ok, value}, func), do: func.(value)
  defp safe_step({:error, _} = error, _func), do: error
  
  # Placeholder implementations
  defp validate_structure(json), do: {:ok, json}
  defp process_data(data), do: {:ok, data}
  defp validate_email(params), do: {:ok, params}
  defp hash_password(params), do: {:ok, params}
  defp save_user(params), do: {:ok, params}
  defp send_welcome_email(user), do: {:ok, user}
  defp validate_input(input), do: {:ok, input}
  defp format_output(output), do: {:ok, output}
end

Performance Considerations

Efficient Pipeline Design

defmodule PerformantPipelines do
  # ✅ Filter early to reduce work
  def process_large_dataset(items) do
    items
    |> Stream.filter(&active?/1)      # Filter first (lazy)
    |> Stream.map(&expensive_transform/1) # Then transform (lazy)
    |> Stream.take(100)               # Limit early (lazy)
    |> Enum.to_list()                # Materialize at the end
  end
  
  # ❌ Transform everything then filter
  def inefficient_processing(items) do
    items
    |> Enum.map(&expensive_transform/1)   # Transforms inactive items too
    |> Enum.filter(&active?/1)            # Filters after expensive work
    |> Enum.take(100)
  end
  
  # Use Stream for large datasets
  def process_file_lines(filename) do
    filename
    |> File.stream!()                     # Lazy file reading
    |> Stream.map(&String.trim/1)         # Lazy transformation
    |> Stream.filter(&(&1 != ""))         # Lazy filtering
    |> Stream.with_index()                # Add line numbers
    |> Enum.take(1000)                    # Process first 1000 lines
  end
  
  defp active?(item), do: item.status == :active
  defp expensive_transform(item), do: item  # Placeholder
end

Real-World Example: API Response Processing

defmodule ApiResponseProcessor do
  def process_api_response(raw_response) do
    raw_response
    |> validate_response_format()
    |> extract_data_field()
    |> normalize_user_records()
    |> filter_active_users()
    |> enrich_with_preferences()
    |> format_for_client()
    |> add_metadata()
  end
  
  defp validate_response_format(%{"success" => true, "data" => data}), do: {:ok, data}
  defp validate_response_format(_), do: {:error, :invalid_format}
  
  defp extract_data_field({:ok, data}), do: data
  defp extract_data_field({:error, _} = error), do: error
  
  defp normalize_user_records(users) when is_list(users) do
    users
    |> Enum.map(&normalize_single_user/1)
  end
  defp normalize_user_records(error), do: error
  
  defp normalize_single_user(user) do
    %{
      id: user["id"],
      name: String.trim(user["name"] || ""),
      email: String.downcase(user["email"] || ""),
      status: String.to_atom(user["status"] || "inactive"),
      created_at: parse_timestamp(user["created_at"])
    }
  end
  
  defp filter_active_users(users) when is_list(users) do
    Enum.filter(users, &(&1.status == :active))
  end
  defp filter_active_users(error), do: error
  
  defp enrich_with_preferences(users) when is_list(users) do
    users
    |> Enum.map(&add_user_preferences/1)
  end
  defp enrich_with_preferences(error), do: error
  
  defp add_user_preferences(user) do
    preferences = fetch_user_preferences(user.id)
    Map.put(user, :preferences, preferences)
  end
  
  defp format_for_client(users) when is_list(users) do
    %{
      users: users,
      count: length(users)
    }
  end
  defp format_for_client(error), do: error
  
  defp add_metadata(result) when is_map(result) do
    Map.merge(result, %{
      processed_at: DateTime.utc_now(),
      version: "1.0"
    })
  end
  defp add_metadata(error), do: error
  
  # Placeholder implementations
  defp parse_timestamp(nil), do: nil
  defp parse_timestamp(timestamp), do: timestamp
  defp fetch_user_preferences(_user_id), do: %{}
end

Common Pitfalls and Solutions

Pitfall 1: Forgetting Parentheses

# ❌ This doesn't call the function
"hello" |> String.upcase

# ✅ This calls the function  
"hello" |> String.upcase()

Pitfall 2: Wrong Argument Position

# ❌ String.replace expects (string, pattern, replacement)
"hello world" |> String.replace("world") |> "elixir"  # Syntax error!

# ✅ Correct argument order
"hello world" |> String.replace("world", "elixir")

Pitfall 3: Overusing Pipes

# ❌ Unnecessary pipe for single operation
input |> validate()

# ✅ Simple function call is clearer
validate(input)

# ❌ Too many trivial steps
data |> Map.get(:key) |> List.first()

# ✅ Combine simple operations
List.first(Map.get(data, :key))

Best Practices

  1. Use pipes for multi-step transformations (3+ steps)
  2. Keep pipeline steps simple - each step should do one thing
  3. Extract complex logic into named functions
  4. Use meaningful intermediate variable names when debugging
  5. Consider Stream for large datasets
  6. Handle errors explicitly in pipelines

The pipe operator transforms imperative, nested code into declarative, readable pipelines. Master it, and your Elixir code will be cleaner, more maintainable, and more expressive.


This concludes our journey through Elixir fundamentals. You now have the tools to write clean, functional, and efficient Elixir code using pattern matching, data structures, recursion, and piping.