O objetivo deste post, é apresentar uma forma de entregar dados em tempo real através da web com uma técnica utilizada pelos grandes players como Google e Twitter: O Streaming de Dados.
O streaming de dados é muito útil quando você precisa entregar grandes quantidades de dados onde provavelmente seria utilizada uma técnica de paginação no consumo de apis, ou mesmo quando é necessário entregar conteúdo em tempo real a partir de eventos.
No .NET Framework, era possível fazer isso com o auxílio da classe PushStreamContent que retornava os dados em HttpResponseMessage. Entretanto no .NET Core deixamos de utilizar o HttpResponseMessage para utilizar a interface IActionResult em nossos retornos de API, sendo assim, pretendo mostrar uma forma de criar uma api de streaming em .NET Core utilizando este novo modelo.
Criando o projeto
Primeiro vamos criar o projeto, para isso basta no Visual Studio clicar em File>New>Project e criar um ASP.NET Core Web Application

Após clicar em OK, irá perguntar sobre o template do projeto, selecione empty e clique em OK.
Apenas um detalhe, é que o Suporte a Docker não é obrigatório, você pode seguir este tutorial sem docker normalmente.

Feito isso, o Visual Studio criará a seguinte estrutura:

Até aqui tudo bem! Vamos codar! \o/
Codando \o/
Dentro do projeto, crie uma pasta Controllers e dentro desta pasta crie uma controller chamada Cliente.

Vamos criar uma pasta chamada Results e dentro dela criaremos uma classe com o nome de PushStreamResult, que será responsável por obter o stream, setar o content type e retornar para um callback onde vamos identificar quem receberá os conteúdos. Sua estrutura deve estar assim:

Beleza! Agora vamos colocar código na classe PushStreamResult, sua classe deve ficar mais ou menos assim:
using Microsoft.AspNetCore.Http; | |
using Microsoft.AspNetCore.Mvc; | |
using Microsoft.Net.Http.Headers; | |
using System; | |
using System.IO; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace DotNetCoreStreaming.Results | |
{ | |
public class PushStreamResult : IActionResult | |
{ | |
private readonly Action<Stream, CancellationToken> _onStreamAvailable; | |
private readonly string _contentType; | |
private readonly CancellationToken _requestAborted; | |
public PushStreamResult(Action<Stream, CancellationToken> onStreamAvailable, string contentType, CancellationToken requestAborted) | |
{ | |
_onStreamAvailable = onStreamAvailable; | |
_contentType = contentType; | |
_requestAborted = requestAborted; | |
} | |
public Task ExecuteResultAsync(ActionContext context) | |
{ | |
var stream = context.HttpContext.Response.Body; | |
context.HttpContext.Response.GetTypedHeaders().ContentType = new MediaTypeHeaderValue(_contentType); | |
_onStreamAvailable(stream, _requestAborted); | |
return Task.CompletedTask; | |
} | |
} | |
} |
Observe que ela herda de IActionResult para podermos retornar em nossa rota da controller e no método ExecuteResultAsync nós apenas pegamos o body e o cancellationToken para passar ao callback _onStreamAvailable e setamos o ContentType que foi passado no construtor da classe.
Agora precisamos modificar nossa classe de startup, para que ela reconheça a estrutura do MVC:
using Microsoft.AspNetCore.Builder; | |
using Microsoft.AspNetCore.Hosting; | |
using Microsoft.AspNetCore.Mvc; | |
using Microsoft.Extensions.DependencyInjection; | |
namespace DotNetCoreStreaming | |
{ | |
public class Startup | |
{ | |
public void ConfigureServices(IServiceCollection services) | |
{ | |
services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1); | |
} | |
public void Configure(IApplicationBuilder app, IHostingEnvironment env) | |
{ | |
if (env.IsDevelopment()) | |
{ | |
app.UseDeveloperExceptionPage(); | |
} | |
else | |
{ | |
app.UseHsts(); | |
} | |
app.UseHttpsRedirection(); | |
app.UseMvc(); | |
} | |
} | |
} |
Antes de começar a mexer na controller de cliente para simular os eventos de update e insert, eu criei um enum de eventos e uma classe de cliente bem simples conforme abaixo:

Enum de eventos:
namespace DotNetCoreStreaming.Enums | |
{ | |
public enum EventoEnum | |
{ | |
Insert = 1, | |
Update = 2 | |
} | |
} |
Modelo de cliente:
namespace DotNetCoreStreaming.Models | |
{ | |
public class Cliente | |
{ | |
public long Id { get; set; } | |
public string Nome { get; set; } | |
} | |
} |
Feito isso, vamos à controller de cliente:
Na controller de cliente, precisamos criar uma ConcurrentBag estática que será onde vamos armazenar os clients do nosso streaming, ou seja, quem irá consumir os dados:
private static ConcurrentBag<StreamWriter> _clients; | |
static ClienteController() | |
{ | |
_clients = new ConcurrentBag<StreamWriter>(); | |
} |
Perceba que já inicializamos no construtor da controller para que possamos ir adicionando os clients conforme forem chegando 😉
Feito isso, vamos definir os métodos de insert e update da controller, veja que está bem simples para que possamos focar na funcionalidade do streaming:
[HttpPost] | |
public IActionResult Post(Cliente cliente) | |
{ | |
//Fazer o Insert | |
EnviarEvento(cliente, EventoEnum.Insert); | |
return Ok(); | |
} | |
[HttpPut] | |
public IActionResult Put(Cliente cliente) | |
{ | |
//Fazer o Update | |
EnviarEvento(cliente, EventoEnum.Update); | |
return Ok(); | |
} | |
private static async Task EnviarEvento(object dados, EventoEnum evento) | |
{ | |
foreach (var client in _clients) | |
{ | |
string jsonEvento = string.Format("{0}\n", JsonConvert.SerializeObject(new { dados, evento })); | |
await client.WriteAsync(jsonEvento); | |
await client.FlushAsync(); | |
} | |
} |
Aqui também já adicionamos o método que irá escrever o json no streaming, ele recebe o cliente na variável dados e o enum de evento, para que possamos passar tudo isso à requisição http do streaming.
Por último, vamos adicionar a rota para se inscrever no streaming e o callback do PushStreamResult que adicionará os clients em nossa ConcurrentBag e irá tratar caso algum dos clients feche a sessão.
[HttpGet] | |
[Route("Streaming")] | |
public IActionResult Stream() | |
{ | |
return new PushStreamResult(OnStreamAvailable, "text/event-stream", HttpContext.RequestAborted); | |
} | |
private void OnStreamAvailable(Stream stream, CancellationToken requestAborted) | |
{ | |
var wait = requestAborted.WaitHandle; | |
var client = new StreamWriter(stream); | |
_clients.Add(client); | |
wait.WaitOne(); | |
StreamWriter ignore; | |
_clients.TryTake(out ignore); | |
} |
Aqui criamos uma action para recebermos os dados do streaming utilizando a classe PushStreamResult e adicionamos o método OnStreamAvailable que será responsável por tratar os clients que quiserem receber os dados do streaming.
A controller de cliente deve ficar assim:
using DotNetCoreStreaming.Enums; | |
using DotNetCoreStreaming.Models; | |
using DotNetCoreStreaming.Results; | |
using Microsoft.AspNetCore.Http; | |
using Microsoft.AspNetCore.Mvc; | |
using Newtonsoft.Json; | |
using System.Collections.Concurrent; | |
using System.IO; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace DotNetCoreStreaming.Controllers | |
{ | |
[Route("api/[controller]")] | |
[ApiController] | |
public class ClienteController : ControllerBase | |
{ | |
private static ConcurrentBag<StreamWriter> _clients; | |
static ClienteController() | |
{ | |
_clients = new ConcurrentBag<StreamWriter>(); | |
} | |
[HttpPost] | |
public IActionResult Post(Cliente cliente) | |
{ | |
//Fazer o Insert | |
EnviarEvento(cliente, EventoEnum.Insert); | |
return Ok(); | |
} | |
[HttpPut] | |
public IActionResult Put(Cliente cliente) | |
{ | |
//Fazer o Update | |
EnviarEvento(cliente, EventoEnum.Update); | |
return Ok(); | |
} | |
private static async Task EnviarEvento(object dados, EventoEnum evento) | |
{ | |
foreach (var client in _clients) | |
{ | |
string jsonEvento = string.Format("{0}\n", JsonConvert.SerializeObject(new { dados, evento })); | |
await client.WriteAsync(jsonEvento); | |
await client.FlushAsync(); | |
} | |
} | |
[HttpGet] | |
[Route("Streaming")] | |
public IActionResult Stream() | |
{ | |
return new PushStreamResult(OnStreamAvailable, "text/event-stream", HttpContext.RequestAborted); | |
} | |
private void OnStreamAvailable(Stream stream, CancellationToken requestAborted) | |
{ | |
var wait = requestAborted.WaitHandle; | |
var client = new StreamWriter(stream); | |
_clients.Add(client); | |
wait.WaitOne(); | |
StreamWriter ignore; | |
_clients.TryTake(out ignore); | |
} | |
} | |
} |
Funciona??
Para testar, criei no postmam um json de cliente, para ver se o streaming está funcionando basta acessar pelo browser a rota https://localhost:44385/api/cliente/streaming (troque a porta pela sua).
Verifique que quando você colocar essa url no browser, ele ficará carregando e não fará nada além disso:

Ainda com o browser aberto, experimente fazer uma requisição para nossas API’s de Insert ou Update:

Veja o que aparece no browser:

Repare que os dados são escritos em tempo real no browser e que podemos utilizar essa arquitetura de diversas formas, até mesmo para aplicativos mobile realtime, mas isso é assunto para outro post…
O projeto está no meu github em https://github.com/sergioprates/DotNetCoreStreaming
(Cross-post de https://medium.com/@sergioprates/criando-uma-api-streaming-com-net-core-b2eeaab0dfac)