Skip to content

RxJS Observable 完全指南

Observable 是 RxJS 的核心概念,它代表一个异步数据流,可以随时间推送多个值。让我深入解释:

1. 核心概念

typescript
// 基本定义
import { Observable } from 'rxjs';

// 创建 Observable
const observable = new Observable(subscriber => {
  subscriber.next(1);     // 发出值
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();  // 完成流
});

// 订阅 Observable
observable.subscribe({
  next: value => console.log('值:', value),
  error: err => console.error('错误:', err),
  complete: () => console.log('完成')
});

// 输出:
// 值: 1
// 值: 2
// 值: 3
// 完成

2. Observable 与其他异步模式的比较

特性ObservablePromise回调函数
多值
可取消
延迟执行
操作符
错误处理
typescript
// Promise: 只能发出一个值
const promise = Promise.resolve(1);

// Observable: 可以发出多个值
const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  // 可以在未来任何时间点继续发出值
  setTimeout(() => {
    subscriber.next(3);
    subscriber.complete();
  }, 1000);
});

3. 创建 Observable 的方式

typescript
// 从值创建
import { of, from, fromEvent, interval } from 'rxjs';

// 1. of - 从固定值创建
of(1, 2, 3).subscribe(console.log);  // 1, 2, 3

// 2. from - 从数组或Promise创建
from([4, 5, 6]).subscribe(console.log);  // 4, 5, 6
from(Promise.resolve(7)).subscribe(console.log);  // 7

// 3. fromEvent - 从事件创建
fromEvent(document, 'click').subscribe(event => console.log('点击:', event));

// 4. interval - 定时器
interval(1000).subscribe(count => console.log(`${count}秒过去了`));

4. Observable 操作符示例

typescript
import { of, from } from 'rxjs';
import { map, filter, tap, switchMap, catchError } from 'rxjs/operators';

// 原始数据流
const numbers$ = of(1, 2, 3, 4, 5);

// 转换流
numbers$.pipe(
  tap(n => console.log('原始值:', n)),  // 调试
  filter(n => n % 2 === 0),            // 过滤
  map(n => n * 10),                    // 转换
  tap(n => console.log('结果值:', n))   // 调试
).subscribe();

// 输出:
// 原始值: 1
// 原始值: 2
// 结果值: 20
// 原始值: 3
// 原始值: 4
// 结果值: 40
// 原始值: 5

5. 高级流操作

typescript
import { of, throwError, timer } from 'rxjs';
import { mergeMap, switchMap, catchError, retry, timeout } from 'rxjs/operators';

// HTTP 请求示例
function getUser(id: number) {
  return of({ id, name: `User ${id}` }).pipe(
    // 模拟延迟
    switchMap(user => timer(1000).pipe(
      map(() => user)
    ))
  );
}

// 错误处理
of(1, 2, 3).pipe(
  mergeMap(id => getUser(id).pipe(
    timeout(2000),                                // 添加超时
    catchError(err => of({ id, error: true })),  // 处理错误
    retry(3)                                      // 重试机制
  ))
).subscribe(console.log);

6. NestJS 中的应用

typescript
// 控制器中的使用
@Controller('users')
export class UsersController {
  constructor(private usersService: UsersService) {}

  @Get()
  getUsers(): Observable<User[]> {
    return this.usersService.findAll();
  }
}

// 服务中的实现
@Injectable()
export class UsersService {
  findAll(): Observable<User[]> {
    return of([
      { id: 1, name: 'John' },
      { id: 2, name: 'Jane' }
    ]).pipe(
      delay(100),  // 模拟异步
      tap(users => console.log(`Found ${users.length} users`))
    );
  }
}

7. 常见实际应用场景

typescript
// 1. 自动完成搜索
fromEvent(searchInput, 'input').pipe(
  map(e => (e.target as HTMLInputElement).value),
  debounceTime(300),  // 等待用户停止输入
  distinctUntilChanged(),  // 只在值变化时触发
  switchMap(term => searchService.search(term))
).subscribe(results => {
  // 更新UI显示结果
});

// 2. 轮询API
timer(0, 10000).pipe(
  switchMap(() => http.get('/api/status')),
  retry()
).subscribe(status => {
  // 更新状态显示
});

// 3. 并发请求
forkJoin([
  http.get('/api/users'),
  http.get('/api/products'),
  http.get('/api/orders')
]).subscribe(([users, products, orders]) => {
  // 所有请求完成后处理数据
});

8. 内存管理和取消订阅

typescript
// 1. 手动取消
const subscription = interval(1000).subscribe(console.log);
// 稍后取消
setTimeout(() => {
  subscription.unsubscribe();
  console.log('已取消订阅');
}, 5000);

// 2. 自动完成
of(1, 2, 3).subscribe({
  next: console.log,
  complete: () => console.log('自动完成,无需手动取消')
});

// 3. 在Angular中使用takeUntil模式
@Component({...})
export class MyComponent implements OnInit, OnDestroy {
  private destroy$ = new Subject<void>();
  
  ngOnInit() {
    interval(1000).pipe(
      takeUntil(this.destroy$)  // 在组件销毁时自动取消
    ).subscribe(console.log);
  }
  
  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

9. 高级模式

typescript
// 1. 缓存结果
function getCachedData(): Observable<Data> {
  return this.data$.pipe(
    shareReplay(1)  // 缓存并共享最后一个结果
  );
}

// 2. 重试带回退策略
http.get('/api/data').pipe(
  retryWhen(errors => errors.pipe(
    scan((attempts, error) => attempts + 1, 0),
    takeWhile(attempts => attempts < 5),
    delay(1000)  // 每次重试间隔1秒
  ))
).subscribe();

// 3. 组合多个来源
const clicks$ = fromEvent(document, 'click');
const timer$ = interval(1000);

// 将点击事件转换为带计时器的流
clicks$.pipe(
  exhaustMap(() => timer$.pipe(
    take(5)  // 每次点击后发出5个计时值
  ))
).subscribe(console.log);

Observable 是一个强大的概念,掌握它可以优雅地处理各种异步和事件驱动的场景,无论是前端用户交互、HTTP请求还是后端的数据流处理。

本站访客数 人次 本站总访问量