Piping Basics: Elegant Data Transformation in Elixir
The pipe operator |> is one of Elixir’s most distinctive features. It transforms nested, hard-to-read function calls into elegant, readable pipelines that flow naturally from left to right.
The Problem with Nested Function Calls
Consider this typical data transformation:
# ❌ Hard to read - inside-out thinking
result = String.upcase(
String.trim(
String.replace(
String.downcase(" Hello WORLD "),
"world",
"elixir"
)
)
)
# "HELLO ELIXIR"
Reading this requires mental stack management. You have to:
- Start from the innermost function
- Work your way outward
- Keep track of intermediate results mentally
The Pipe Operator Solution
The pipe operator |> takes the result of the expression on its left and passes it as the first argument to the function on its right:
# ✅ Readable pipeline - natural flow
result = " Hello WORLD "
|> String.downcase()
|> String.replace("world", "elixir")
|> String.trim()
|> String.upcase()
# "HELLO ELIXIR"
This reads like a recipe: take the input, then do step 1, then step 2, etc.
Basic Piping Rules
Rule 1: First Argument Insertion
The pipe operator always passes the left side as the first argument to the right side:
# These are equivalent:
"hello" |> String.upcase()
String.upcase("hello")
# These are equivalent:
[1, 2, 3] |> Enum.map(&(&1 * 2))
Enum.map([1, 2, 3], &(&1 * 2))
# These are equivalent:
%{a: 1} |> Map.put(:b, 2)
Map.put(%{a: 1}, :b, 2)
Rule 2: Function Call Syntax
When piping, you need parentheses to call functions:
# ✅ Correct
"hello" |> String.upcase()
# ❌ This creates a function reference, doesn't call it
"hello" |> String.upcase
Rule 3: Single Expression Per Line
Format pipelines with one expression per line for maximum readability:
# ✅ Readable
user_data
|> validate_input()
|> normalize_fields()
|> save_to_database()
|> send_confirmation_email()
# ❌ Hard to read
user_data |> validate_input() |> normalize_fields() |> save_to_database()
Common Piping Patterns
Data Processing Pipelines
defmodule DataProcessor do
def process_user_input(raw_input) do
raw_input
|> String.trim() # Remove whitespace
|> String.downcase() # Normalize case
|> String.split(",") # Split on commas
|> Enum.map(&String.trim/1) # Trim each item
|> Enum.reject(&(&1 == "")) # Remove empty strings
|> Enum.uniq() # Remove duplicates
end
def analyze_numbers(number_strings) do
number_strings
|> Enum.map(&String.to_integer/1) # Convert to integers
|> Enum.filter(&(&1 > 0)) # Keep positive numbers
|> Enum.sort() # Sort ascending
|> calculate_statistics() # Custom function
end
defp calculate_statistics(numbers) do
%{
count: length(numbers),
sum: Enum.sum(numbers),
average: Enum.sum(numbers) / length(numbers),
min: Enum.min(numbers),
max: Enum.max(numbers)
}
end
end
Map/List Transformations
defmodule Transformations do
def process_users(users) do
users
|> Enum.filter(&(&1.active)) # Active users only
|> Enum.map(&normalize_user/1) # Normalize data
|> Enum.group_by(&(&1.role)) # Group by role
|> Enum.map(fn {role, users} -> # Transform groups
{role, Enum.count(users)}
end)
|> Map.new() # Convert to map
end
def user_summary(users) do
users
|> Enum.map(&extract_key_info/1) # Extract relevant fields
|> Enum.sort_by(&(&1.name)) # Sort by name
|> Enum.take(10) # Top 10 only
|> Enum.map(&format_user_display/1) # Format for display
end
defp normalize_user(user) do
%{
user |
email: String.downcase(user.email),
name: String.trim(user.name)
}
end
defp extract_key_info(user) do
%{
id: user.id,
name: user.name,
email: user.email,
last_login: user.last_login
}
end
defp format_user_display(user) do
"#{user.name} (#{user.email})"
end
end
Advanced Piping Techniques
Using Anonymous Functions in Pipes
Sometimes you need to call functions that don’t take the piped value as their first argument:
defmodule AdvancedPiping do
# When the piped value isn't the first argument
def process_config(config_path) do
config_path
|> File.read!() # Read file
|> Jason.decode!() # Parse JSON
|> then(&Map.merge(default_config(), &1)) # Merge with defaults
|> validate_config() # Custom validation
end
# Using then/2 for complex transformations (Elixir 1.12+)
# For older versions, use anonymous functions
def legacy_process_config(config_path) do
config_path
|> File.read!()
|> Jason.decode!()
|> (&Map.merge(default_config(), &1)).() # Anonymous function call
|> validate_config()
end
defp default_config do
%{timeout: 5000, retries: 3}
end
defp validate_config(config) do
# Validation logic here
config
end
end
Conditional Piping
Handle conditional transformations in pipelines:
defmodule ConditionalPiping do
def process_user_data(user_data, opts \\ []) do
user_data
|> validate_required_fields()
|> normalize_email()
|> (&if opts[:uppercase_name], do: uppercase_name(&1), else: &1).()
|> (&if opts[:add_timestamp], do: add_timestamp(&1), else: &1).()
|> save_user()
end
# Better approach: extract conditional logic
def process_user_data_v2(user_data, opts \\ []) do
user_data
|> validate_required_fields()
|> normalize_email()
|> maybe_uppercase_name(opts[:uppercase_name])
|> maybe_add_timestamp(opts[:add_timestamp])
|> save_user()
end
defp maybe_uppercase_name(user, true), do: uppercase_name(user)
defp maybe_uppercase_name(user, _), do: user
defp maybe_add_timestamp(user, true), do: add_timestamp(user)
defp maybe_add_timestamp(user, _), do: user
# Implementation functions...
defp validate_required_fields(user), do: user
defp normalize_email(user), do: user
defp uppercase_name(user), do: user
defp add_timestamp(user), do: user
defp save_user(user), do: user
end
Error Handling in Pipelines
Handle errors gracefully without breaking the pipeline:
defmodule ErrorHandling do
# Using with for error-prone pipelines
def process_file(filename) do
with {:ok, content} <- File.read(filename),
{:ok, json} <- Jason.decode(content),
{:ok, validated} <- validate_structure(json),
{:ok, processed} <- process_data(validated) do
{:ok, processed}
else
{:error, reason} -> {:error, reason}
end
end
# Using custom error handling functions
def process_user_registration(params) do
params
|> validate_email()
|> case do
{:ok, params} ->
params
|> hash_password()
|> save_user()
|> send_welcome_email()
{:error, _} = error -> error
end
end
# Pipeline with result tuples
def safe_pipeline(input) do
{:ok, input}
|> safe_step(&validate_input/1)
|> safe_step(&process_data/1)
|> safe_step(&format_output/1)
end
defp safe_step({:ok, value}, func), do: func.(value)
defp safe_step({:error, _} = error, _func), do: error
# Placeholder implementations
defp validate_structure(json), do: {:ok, json}
defp process_data(data), do: {:ok, data}
defp validate_email(params), do: {:ok, params}
defp hash_password(params), do: {:ok, params}
defp save_user(params), do: {:ok, params}
defp send_welcome_email(user), do: {:ok, user}
defp validate_input(input), do: {:ok, input}
defp format_output(output), do: {:ok, output}
end
Performance Considerations
Efficient Pipeline Design
defmodule PerformantPipelines do
# ✅ Filter early to reduce work
def process_large_dataset(items) do
items
|> Stream.filter(&active?/1) # Filter first (lazy)
|> Stream.map(&expensive_transform/1) # Then transform (lazy)
|> Stream.take(100) # Limit early (lazy)
|> Enum.to_list() # Materialize at the end
end
# ❌ Transform everything then filter
def inefficient_processing(items) do
items
|> Enum.map(&expensive_transform/1) # Transforms inactive items too
|> Enum.filter(&active?/1) # Filters after expensive work
|> Enum.take(100)
end
# Use Stream for large datasets
def process_file_lines(filename) do
filename
|> File.stream!() # Lazy file reading
|> Stream.map(&String.trim/1) # Lazy transformation
|> Stream.filter(&(&1 != "")) # Lazy filtering
|> Stream.with_index() # Add line numbers
|> Enum.take(1000) # Process first 1000 lines
end
defp active?(item), do: item.status == :active
defp expensive_transform(item), do: item # Placeholder
end
Real-World Example: API Response Processing
defmodule ApiResponseProcessor do
def process_api_response(raw_response) do
raw_response
|> validate_response_format()
|> extract_data_field()
|> normalize_user_records()
|> filter_active_users()
|> enrich_with_preferences()
|> format_for_client()
|> add_metadata()
end
defp validate_response_format(%{"success" => true, "data" => data}), do: {:ok, data}
defp validate_response_format(_), do: {:error, :invalid_format}
defp extract_data_field({:ok, data}), do: data
defp extract_data_field({:error, _} = error), do: error
defp normalize_user_records(users) when is_list(users) do
users
|> Enum.map(&normalize_single_user/1)
end
defp normalize_user_records(error), do: error
defp normalize_single_user(user) do
%{
id: user["id"],
name: String.trim(user["name"] || ""),
email: String.downcase(user["email"] || ""),
status: String.to_atom(user["status"] || "inactive"),
created_at: parse_timestamp(user["created_at"])
}
end
defp filter_active_users(users) when is_list(users) do
Enum.filter(users, &(&1.status == :active))
end
defp filter_active_users(error), do: error
defp enrich_with_preferences(users) when is_list(users) do
users
|> Enum.map(&add_user_preferences/1)
end
defp enrich_with_preferences(error), do: error
defp add_user_preferences(user) do
preferences = fetch_user_preferences(user.id)
Map.put(user, :preferences, preferences)
end
defp format_for_client(users) when is_list(users) do
%{
users: users,
count: length(users)
}
end
defp format_for_client(error), do: error
defp add_metadata(result) when is_map(result) do
Map.merge(result, %{
processed_at: DateTime.utc_now(),
version: "1.0"
})
end
defp add_metadata(error), do: error
# Placeholder implementations
defp parse_timestamp(nil), do: nil
defp parse_timestamp(timestamp), do: timestamp
defp fetch_user_preferences(_user_id), do: %{}
end
Common Pitfalls and Solutions
Pitfall 1: Forgetting Parentheses
# ❌ This doesn't call the function
"hello" |> String.upcase
# ✅ This calls the function
"hello" |> String.upcase()
Pitfall 2: Wrong Argument Position
# ❌ String.replace expects (string, pattern, replacement)
"hello world" |> String.replace("world") |> "elixir" # Syntax error!
# ✅ Correct argument order
"hello world" |> String.replace("world", "elixir")
Pitfall 3: Overusing Pipes
# ❌ Unnecessary pipe for single operation
input |> validate()
# ✅ Simple function call is clearer
validate(input)
# ❌ Too many trivial steps
data |> Map.get(:key) |> List.first()
# ✅ Combine simple operations
List.first(Map.get(data, :key))
Best Practices
- Use pipes for multi-step transformations (3+ steps)
- Keep pipeline steps simple - each step should do one thing
- Extract complex logic into named functions
- Use meaningful intermediate variable names when debugging
- Consider Stream for large datasets
- Handle errors explicitly in pipelines
The pipe operator transforms imperative, nested code into declarative, readable pipelines. Master it, and your Elixir code will be cleaner, more maintainable, and more expressive.
This concludes our journey through Elixir fundamentals. You now have the tools to write clean, functional, and efficient Elixir code using pattern matching, data structures, recursion, and piping.