Introdução ao RxDart: métodos de extensão

Tempo de leitura: 5 minutes

O RxDart estende os recursos de Dart Streams e StreamControllers adicionando funcionalidade da especificação de extensões reativas sobre ele.

 

1. Visão geral do RxDart

RxDart é uma biblioteca com muitas classes e funções para a classe Stream. O RxDart adiciona funcionalidade ao Dart Streams de três maneiras:

  • Stream Classes — crie xxStreams com recursos específicos, como combinar ou mesclar muitos Streams.
  • Métodos de extensão — transforma um Stream de origem em um novo Stream com recursos diferentes, como limitação ou buffer de eventos.
  • Assuntos — StreamControllers com poderes adicionais

Na parte anterior, apresentei algumas classes de fluxo. Agora, também apresentarei algumas funções de extensão notáveis, para que você possa entender aproximadamente o que são e seus propósitos. Também abordarei funções que não foram abordadas na parte anterior.

2. Métodos de Extensão

No momento em que escrevo isso, a versão mais recente do RxDart é 0.27.7, então usarei esta versão para a demonstração.

2.1. debounceTime

Link rxmarbles: https://rxmarbles.com/#debounceTime

 

Da documentação:

Transforma um Stream para que só emita itens da sequência de origem sempre que o intervalo de tempo definido por duration passar, sem que a sequência de origem emita outro item. Este intervalo de tempo começa após o último evento debounce ter sido emitido.

Resumindo, esta função nos permite passar uma Duração, neste caso 10 segundos. Se o Source Stream emitir um evento e não emitir outro após 10s, o Output Stream emitirá esse evento.

Esta função é útil no recurso de pesquisa ao vivo (pesquisa em tempo real) porque o aplicativo pode impedir a API de chamada de spam. Vou demonstrar a pesquisa ao vivo aqui:

void main() {
  // Se o demoStream emitir um evento e não emitir outro após 500ms,
  // o fluxo de saída emitirá esse evento
  demoStream().debounceTime(Duration(milliseconds: 500)).listen((event) { 
    // chama a pesquisa da API
    print('Search with keyword: $event');
  });
}

// emite eventos quando o usuário digita algo em um TextField
Stream<String> demoStream() async* {
  yield 'L'; // ignored
  yield 'Le'; // ignored
  yield 'Lea'; // ignored
  yield 'Lear'; // ignored
  yield 'Learn'; // ignored
  await Future.delayed(Duration(seconds: 1)); // emit 'Learn'
  yield 'Learn R'; // ignored
  yield 'Learn Rx'; // ignored
  yield 'Learn RxD'; // ignored
  yield 'Learn RxDa'; // ignored
  yield 'Learn RxDar'; // ignored
  yield 'Learn RxDart'; // emit 'Learn RxDart'
}

Saída:

Search with keyword: Learn
Search with keyword: Learn RxDart

2.2. throttleTime

Link rxmarbles: https://rxmarbles.com/#throttleTime

Stream<T> throttleTime(Duration duration, {bool trailing = false, bool leading = true})

Emite um valor do Stream de origem, ignora os valores de origem subsequentes por um período e repete esse processo.

Se a liderança for verdadeira, o primeiro item em cada janela será emitido. O diagrama acima descreve este caso. Essa função é útil no recurso pós-reação (clique em curtir ou botões de coração) porque o aplicativo pode impedir o spam de cliques.

void main() {
  var currentTime = DateTime.now();
  
  demoStream()
      .throttleTime(Duration(seconds: 5), leading: true, trailing: false)
      .listen((event) => println(event, currentTime));
}

Stream<String> demoStream() async* {
  yield 'A'; // emit 'A'
  await Future.delayed(Duration(seconds: 1));
  yield 'B'; // ignored
  await Future.delayed(Duration(seconds: 1));
  yield 'C'; // ignored
  await Future.delayed(Duration(seconds: 5));
  yield 'D'; // emit 'D'
  await Future.delayed(Duration(seconds: 2));
  yield 'E'; // ignored
}

void println(Object value, DateTime currentTime) {
  print('Emit $value after ${DateTime.now().difference(currentTime).inSeconds} seconds');
}

Saída:

Emit A after 0 seconds
Emit D after 7 seconds

Se à direita for verdadeiro, o último item será emitido em seu lugar.

void main() {
  var currentTime = DateTime.now();

  demoStream()
      .throttleTime(Duration(seconds: 5), leading: false, trailing: true)
      .listen((event) => println(event, currentTime));
}

Stream<String> demoStream() async* {
  yield 'A';
  await Future.delayed(Duration(seconds: 1));
  yield 'B';
  await Future.delayed(Duration(seconds: 1));
  yield 'C';
  await Future.delayed(Duration(seconds: 5));
  yield 'D';
  await Future.delayed(Duration(seconds: 2));
  yield 'E';
}

void println(Object value, DateTime currentTime) {
  print('Emit $value after ${DateTime.now().difference(currentTime).inSeconds} seconds');
}

Saída:

Emit C after 5 seconds
Emit E after 12 seconds

 

2.3. onErrorResumeNext

Essa função é passada em um Stream que chamaremos de recoveryStream. Quando o Source Streams encontra um erro, em vez de emitir o evento de erro, ele emite elementos dentro do recoveryStream. Note que no exemplo abaixo, onError não imprime nada.

void main() {
  errorStream()
      .onErrorResumeNext(Stream.fromIterable([
        '1 from recoveryStream',
        '2 from recoveryStream',
        '3 from recoveryStream',
      ]))
      .listen(print, onError: (e) => print('Error: $e'));
}

Stream<String> errorStream() async* {
  yield 'a';
  yield 'b';
  throw FormatException('wrong format');
  yield 'c';
}

Saída:

a
b
1 from recoveryStream
2 from recoveryStream
3 from recoveryStream

 

2.4. intervalo

Cria um Stream que emite cada item no Stream após uma determinada duração.

Stream.fromIterable([1, 2, 3])
  .interval(Duration(seconds: 1))
  .listen((i) => print('$i sec'); // prints 1 sec, 2 sec, 3 sec

2.5. concatWith

Rxmarbles link: https://rxmarbles.com/#concat

Da documentação:

Concatena todas as sequências de fluxo especificadas, desde que a sequência de fluxo anterior tenha sido encerrada com sucesso.

Ele faz isso assinando cada fluxo um por um, emitindo todos os itens e concluindo antes de assinar o próximo fluxo.

Se os fluxos fornecidos estiverem vazios, a sequência resultante será concluída imediatamente sem emitir nenhum item.

Veja o diagrama acima como exemplo, depois de emitir todos os eventos no Stream 1, o ConcatStream espera x segundos (no exemplo) antes de emitir o primeiro evento no Stream 2 em vez de fazê-lo imediatamente. É por isso que a soma de t1 (quanto tempo o Stream 1 para) + t2 (quanto tempo o Stream 2 para) = t3 (quanto tempo ConcatStream para).

void main() {
  var currentTime = DateTime.now();

  var concatStream = firstStream().concatWith([secondStream()]);
  concatStream.listen((event) => println(event, currentTime));
}

// stream 1 takes 13 seconds to emit all events
Stream<int> firstStream() async* {
  await Future.delayed(Duration(seconds: 1));
  yield 20;
  await Future.delayed(Duration(seconds: 1));
  yield 40;
  await Future.delayed(Duration(seconds: 2));
  yield 60;
  await Future.delayed(Duration(seconds: 6));
  yield 80;
  await Future.delayed(Duration(seconds: 3));
  yield 100;
}

// stream 2 takes 16 seconds to emit all events
Stream<int> secondStream() async* {
  await Future.delayed(Duration(seconds: 7));
  yield 1;
  await Future.delayed(Duration(seconds: 9));
  yield 1;
}

void println(Object value, DateTime currentTime) {
  print('Emit $value after ${DateTime.now().difference(currentTime).inSeconds} seconds');
}

Saída. Leva 13 segundos para emitir todos os eventos no Stream 1 e outros 16s para o Stream 2. No total, são 29s.

Emit 20 after 1 seconds
Emit 40 after 2 seconds
Emit 60 after 4 seconds
Emit 80 after 10 seconds
Emit 100 after 13 seconds
Emit 1 after 20 seconds
Emit 1 after 29 seconds

 

2.6. distinctUnique

Link rxmarbles: https://rxmarbles.com/#distinct

A função distintaUnique do RxDart é diferente da classe Stream:

O distinto do Dart é usado para pular elementos iguais ao elemento anterior. Observe que ele só se compara ao elemento anterior.

O distintoUnique do RxDart é usado para emitir elementos exclusivos no Source Stream.

OK, aqui está meu código para ilustrar os distintos e os distintosUnique.

void main() {
  // distinct
  Stream.fromIterable([1, 2, 2, 1, 3]).distinct()
      .listen(print); // print: 1, 2, 1, 3

  // distinctUnique
  Stream.fromIterable([1, 2, 2, 1, 3]).distinctUnique()
      .listen(print); // print: 1, 2, 3
}

Conclusão

Então, ilustrei algumas funções de extensão poderosas do RxDart. Aprenda mais sobre eles aqui. Na próxima parte desta série, vou me concentrar em Assuntos.