R3 integration

R3 Integration is available since v2.1.0. Requires R3 package to be installed.


Purpose

This guide explains how to use Reactive SO with R3 (Reactive Extensions for Unity). R3 integration enables advanced reactive patterns like throttling, buffering, and combining multiple event streams.


Prerequisites

Install the R3 package via NuGet for Unity or OpenUPM.

Once R3 is installed, extension methods become automatically available. No additional setup is required.


EventChannel extensions

AsObservable

Convert any EventChannel to an R3 Observable.

Typed EventChannel

using Tang3cko.ReactiveSO;
using Tang3cko.ReactiveSO.R3;
using R3;
using UnityEngine;

public class DamageHandler : MonoBehaviour
{
    [SerializeField] private IntEventChannelSO onDamageReceived;

    private void Start()
    {
        // Convert event channel to Observable
        onDamageReceived.AsObservable()
            .Subscribe(damage => Debug.Log($"Received {damage} damage"))
            .AddTo(this);
    }
}

VoidEventChannel

using Tang3cko.ReactiveSO;
using Tang3cko.ReactiveSO.R3;
using R3;
using UnityEngine;

public class GameOverHandler : MonoBehaviour
{
    [SerializeField] private VoidEventChannelSO onPlayerDeath;

    private void Start()
    {
        // VoidEventChannel emits Unit
        onPlayerDeath.AsObservable()
            .Subscribe(_ => ShowGameOverScreen())
            .AddTo(this);
    }

    private void ShowGameOverScreen() { /* ... */ }
}

ReactiveEntitySet extensions

ReactiveEntitySet provides four observation methods for entity lifecycle events.

Method Emits Description
ObserveAdd() int (entity ID) Emits when an entity is registered
ObserveRemove() int (entity ID) Emits when an entity is unregistered
ObserveDataChanged() int (entity ID) Emits when an entity’s data is modified
ObserveSetChanged() Unit Emits on any change (add, remove, or data change)

Basic usage

using Tang3cko.ReactiveSO;
using Tang3cko.ReactiveSO.R3;
using R3;
using UnityEngine;

public class EnemyTracker : MonoBehaviour
{
    [SerializeField] private EnemySet enemySet;

    private void Start()
    {
        // Track enemy additions
        enemySet.ObserveAdd()
            .Subscribe(id => Debug.Log($"Enemy {id} spawned"))
            .AddTo(this);

        // Track enemy removals
        enemySet.ObserveRemove()
            .Subscribe(id => Debug.Log($"Enemy {id} destroyed"))
            .AddTo(this);

        // Track data changes
        enemySet.ObserveDataChanged()
            .Subscribe(id => Debug.Log($"Enemy {id} data updated"))
            .AddTo(this);
    }
}

Components exposing EventChannels

AsObservable() on EventChannel is the foundation of R3 integration. Any component that exposes an EventChannel via a public getter can be used with R3, whether built-in or user-defined.

Built-in components

Component Property Type
RuntimeSetSO OnItemsChanged VoidEventChannelSO
  OnCountChanged IntEventChannelSO
VariableSO OnValueChanged EventChannelSO
ReactiveEntitySetSO OnItemAdded IntEventChannelSO
  OnItemRemoved IntEventChannelSO
  OnDataChanged IntEventChannelSO
  OnSetChanged VoidEventChannelSO
// RuntimeSetSO
runtimeSet.OnItemsChanged.AsObservable()
    .Subscribe(_ => RefreshUI())
    .AddTo(this);

runtimeSet.OnCountChanged.AsObservable()
    .Throttle(TimeSpan.FromMilliseconds(100))
    .Subscribe(count => UpdateCountLabel(count))
    .AddTo(this);

// VariableSO
healthVariable.OnValueChanged.AsObservable()
    .Where(hp => hp < 20)
    .Subscribe(_ => ShowLowHealthWarning())
    .AddTo(this);

// ReactiveEntitySetSO (via EventChannel)
entitySet.OnItemAdded.AsObservable()
    .Subscribe(id => Debug.Log($"Added: {id}"))
    .AddTo(this);

Custom components

User-defined components that expose EventChannels work the same way.

public class GameManager : MonoBehaviour
{
    [SerializeField] private VoidEventChannelSO onGameStarted;
    [SerializeField] private IntEventChannelSO onScoreChanged;

    // Expose EventChannels via public getters
    public VoidEventChannelSO OnGameStarted => onGameStarted;
    public IntEventChannelSO OnScoreChanged => onScoreChanged;
}

// Usage
gameManager.OnGameStarted.AsObservable()
    .Subscribe(_ => InitializeGame())
    .AddTo(this);

gameManager.OnScoreChanged.AsObservable()
    .Where(score => score >= 1000)
    .Subscribe(_ => UnlockAchievement())
    .AddTo(this);

ReactiveEntitySetSO dedicated extensions

ReactiveEntitySetSO provides dedicated extension methods for convenience. Internally, they are equivalent to using AsObservable() on the EventChannel.

// Dedicated extension (shorter syntax)
entitySet.ObserveAdd()
    .Subscribe(id => Debug.Log($"Added: {id}"))
    .AddTo(this);

// Via EventChannel (equivalent behavior)
entitySet.OnItemAdded.AsObservable()
    .Subscribe(id => Debug.Log($"Added: {id}"))
    .AddTo(this);

Important: Unassigned EventChannels

If the EventChannel is not assigned in the Inspector, the getter returns null. Calling AsObservable() on null throws NullReferenceException.

// Safe usage with null check
if (runtimeSet.OnItemsChanged != null)
{
    runtimeSet.OnItemsChanged.AsObservable()
        .Subscribe(_ => RefreshUI())
        .AddTo(this);
}

Practical patterns

Throttling high-frequency events

Prevent UI updates from firing too frequently.

using Tang3cko.ReactiveSO;
using Tang3cko.ReactiveSO.R3;
using R3;
using System;
using UnityEngine;

public class HealthBar : MonoBehaviour
{
    [SerializeField] private IntEventChannelSO onHealthChanged;

    private void Start()
    {
        // Update UI at most once per 100ms
        onHealthChanged.AsObservable()
            .Throttle(TimeSpan.FromMilliseconds(100))
            .Subscribe(health => UpdateHealthBar(health))
            .AddTo(this);
    }

    private void UpdateHealthBar(int health) { /* ... */ }
}

Combining multiple event streams

React to multiple events with a single subscription.

using Tang3cko.ReactiveSO;
using Tang3cko.ReactiveSO.R3;
using R3;
using UnityEngine;

public class EnemyListUI : MonoBehaviour
{
    [SerializeField] private EnemySet enemySet;

    private void Start()
    {
        // Refresh list on any add or remove
        Observable.Merge(
            enemySet.ObserveAdd(),
            enemySet.ObserveRemove()
        )
        .Subscribe(_ => RefreshEnemyList())
        .AddTo(this);
    }

    private void RefreshEnemyList() { /* ... */ }
}

Filtering events

Process only events that meet certain criteria.

using Tang3cko.ReactiveSO;
using Tang3cko.ReactiveSO.R3;
using R3;
using UnityEngine;

public class CriticalDamageEffect : MonoBehaviour
{
    [SerializeField] private IntEventChannelSO onDamageReceived;

    private void Start()
    {
        // Only react to critical hits (damage > 50)
        onDamageReceived.AsObservable()
            .Where(damage => damage > 50)
            .Subscribe(damage => ShowCriticalHitEffect())
            .AddTo(this);
    }

    private void ShowCriticalHitEffect() { /* ... */ }
}

Transforming event data

Transform event data before processing.

using Tang3cko.ReactiveSO;
using Tang3cko.ReactiveSO.R3;
using R3;
using UnityEngine;

public class DamageLogger : MonoBehaviour
{
    [SerializeField] private IntEventChannelSO onDamageReceived;

    private void Start()
    {
        onDamageReceived.AsObservable()
            .Select(damage => $"Damage: {damage}")
            .Subscribe(message => Debug.Log(message))
            .AddTo(this);
    }
}

Buffering events

Collect multiple events over time.

using Tang3cko.ReactiveSO;
using Tang3cko.ReactiveSO.R3;
using R3;
using System;
using System.Linq;
using UnityEngine;

public class ComboCounter : MonoBehaviour
{
    [SerializeField] private IntEventChannelSO onDamageDealt;

    private void Start()
    {
        // Sum damage dealt in 2-second windows
        onDamageDealt.AsObservable()
            .Buffer(TimeSpan.FromSeconds(2))
            .Where(damages => damages.Count > 0)
            .Subscribe(damages =>
            {
                int totalDamage = damages.Sum();
                if (totalDamage > 100)
                {
                    ShowComboEffect(damages.Count, totalDamage);
                }
            })
            .AddTo(this);
    }

    private void ShowComboEffect(int hitCount, int totalDamage) { /* ... */ }
}

Best practices

Always dispose subscriptions

Use AddTo to automatically dispose subscriptions when the MonoBehaviour is destroyed.

// Good: Subscription is disposed when GameObject is destroyed
onDamageReceived.AsObservable()
    .Subscribe(damage => HandleDamage(damage))
    .AddTo(this);

// Bad: Manual disposal required, easy to forget
var subscription = onDamageReceived.AsObservable()
    .Subscribe(damage => HandleDamage(damage));
// Must call subscription.Dispose() manually

Use CompositeDisposable for multiple subscriptions

When you have many subscriptions, use CompositeDisposable.

using Tang3cko.ReactiveSO;
using Tang3cko.ReactiveSO.R3;
using R3;
using UnityEngine;

public class GameUI : MonoBehaviour
{
    [SerializeField] private IntEventChannelSO onHealthChanged;
    [SerializeField] private IntEventChannelSO onScoreChanged;
    [SerializeField] private VoidEventChannelSO onGameOver;

    private CompositeDisposable disposables = new();

    private void Start()
    {
        onHealthChanged.AsObservable()
            .Subscribe(UpdateHealthUI)
            .AddTo(disposables);

        onScoreChanged.AsObservable()
            .Subscribe(UpdateScoreUI)
            .AddTo(disposables);

        onGameOver.AsObservable()
            .Subscribe(_ => ShowGameOverScreen())
            .AddTo(disposables);
    }

    private void OnDestroy()
    {
        disposables.Dispose();
    }

    private void UpdateHealthUI(int health) { /* ... */ }
    private void UpdateScoreUI(int score) { /* ... */ }
    private void ShowGameOverScreen() { /* ... */ }
}

Avoid subscribing in hot paths

Subscriptions have overhead. Subscribe once during initialization, not every frame.

// Good: Subscribe once in Start
private void Start()
{
    onDamageReceived.AsObservable()
        .Subscribe(HandleDamage)
        .AddTo(this);
}

// Bad: Creating new subscriptions frequently
private void Update()
{
    // Don't do this!
    onDamageReceived.AsObservable()
        .Take(1)
        .Subscribe(HandleDamage);
}

When to use R3 integration

Use R3 when

  • You need time-based operations (Throttle, Delay, Buffer)
  • You want to combine multiple event streams (Merge, CombineLatest)
  • You need complex filtering or transformation logic
  • Your team is already familiar with reactive programming

Use standard events when

  • Simple one-to-one event handling
  • No complex processing required
  • Minimal dependencies preferred

References


This site uses Just the Docs, a documentation theme for Jekyll.