O objetivo deste post, é apresentar uma forma de entregar dados em tempo real através da web com uma técnica utilizada pelos grandes players como Google e Twitter: O Streaming de Dados.

O streaming de dados é muito útil quando você precisa entregar grandes quantidades de dados onde provavelmente seria utilizada uma técnica de paginação no consumo de apis, ou mesmo quando é necessário entregar conteúdo em tempo real a partir de eventos.

No .NET Framework, era possível fazer isso com o auxílio da classe PushStreamContent que retornava os dados em HttpResponseMessage. Entretanto no .NET Core deixamos de utilizar o HttpResponseMessage para utilizar a interface IActionResult em nossos retornos de API, sendo assim, pretendo mostrar uma forma de criar uma api de streaming em .NET Core utilizando este novo modelo.

Criando o projeto

Primeiro vamos criar o projeto, para isso basta no Visual Studio clicar em File>New>Project e criar um ASP.NET Core Web Application

Após clicar em OK, irá perguntar sobre o template do projeto, selecione empty e clique em OK.

Apenas um detalhe, é que o Suporte a Docker não é obrigatório, você pode seguir este tutorial sem docker normalmente.

Feito isso, o Visual Studio criará a seguinte estrutura:

Até aqui tudo bem! Vamos codar! \o/

Codando \o/

Dentro do projeto, crie uma pasta Controllers e dentro desta pasta crie uma controller chamada Cliente.

Vamos criar uma pasta chamada Results e dentro dela criaremos uma classe com o nome de PushStreamResult, que será responsável por obter o stream, setar o content type e retornar para um callback onde vamos identificar quem receberá os conteúdos. Sua estrutura deve estar assim:

Beleza! Agora vamos colocar código na classe PushStreamResult, sua classe deve ficar mais ou menos assim:

using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Net.Http.Headers;
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace DotNetCoreStreaming.Results
{
public class PushStreamResult : IActionResult
{
private readonly Action<Stream, CancellationToken> _onStreamAvailable;
private readonly string _contentType;
private readonly CancellationToken _requestAborted;
public PushStreamResult(Action<Stream, CancellationToken> onStreamAvailable, string contentType, CancellationToken requestAborted)
{
_onStreamAvailable = onStreamAvailable;
_contentType = contentType;
_requestAborted = requestAborted;
}
public Task ExecuteResultAsync(ActionContext context)
{
var stream = context.HttpContext.Response.Body;
context.HttpContext.Response.GetTypedHeaders().ContentType = new MediaTypeHeaderValue(_contentType);
_onStreamAvailable(stream, _requestAborted);
return Task.CompletedTask;
}
}
}

view raw
PushStreamResult.cs
hosted with ❤ by GitHub

 

Observe que ela herda de IActionResult para podermos retornar em nossa rota da controller e no método ExecuteResultAsync nós apenas pegamos o body e o cancellationToken para passar ao callback _onStreamAvailable e setamos o ContentType que foi passado no construtor da classe.

Agora precisamos modificar nossa classe de startup, para que ela reconheça a estrutura do MVC:

using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCoreStreaming
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
else
{
app.UseHsts();
}
app.UseHttpsRedirection();
app.UseMvc();
}
}
}

view raw
Startup.cs
hosted with ❤ by GitHub

 

Antes de começar a mexer na controller de cliente para simular os eventos de update e insert, eu criei um enum de eventos e uma classe de cliente bem simples conforme abaixo:

Enum de eventos:

namespace DotNetCoreStreaming.Enums
{
public enum EventoEnum
{
Insert = 1,
Update = 2
}
}

view raw
EventoEnum.cs
hosted with ❤ by GitHub

 

Modelo de cliente:

namespace DotNetCoreStreaming.Models
{
public class Cliente
{
public long Id { get; set; }
public string Nome { get; set; }
}
}

view raw
Cliente.cs
hosted with ❤ by GitHub

 

Feito isso, vamos à controller de cliente:

Na controller de cliente, precisamos criar uma ConcurrentBag estática que será onde vamos armazenar os clients do nosso streaming, ou seja, quem irá consumir os dados:

private static ConcurrentBag<StreamWriter> _clients;
static ClienteController()
{
_clients = new ConcurrentBag<StreamWriter>();
}

view raw
ClienteController.cs
hosted with ❤ by GitHub

 

Perceba que já inicializamos no construtor da controller para que possamos ir adicionando os clients conforme forem chegando 😉

Feito isso, vamos definir os métodos de insert e update da controller, veja que está bem simples para que possamos focar na funcionalidade do streaming:

[HttpPost]
public IActionResult Post(Cliente cliente)
{
//Fazer o Insert
EnviarEvento(cliente, EventoEnum.Insert);
return Ok();
}
[HttpPut]
public IActionResult Put(Cliente cliente)
{
//Fazer o Update
EnviarEvento(cliente, EventoEnum.Update);
return Ok();
}
private static async Task EnviarEvento(object dados, EventoEnum evento)
{
foreach (var client in _clients)
{
string jsonEvento = string.Format("{0}\n", JsonConvert.SerializeObject(new { dados, evento }));
await client.WriteAsync(jsonEvento);
await client.FlushAsync();
}
}

view raw
ClienteController.cs
hosted with ❤ by GitHub

 

Aqui também já adicionamos o método que irá escrever o json no streaming, ele recebe o cliente na variável dados e o enum de evento, para que possamos passar tudo isso à requisição http do streaming.

Por último, vamos adicionar a rota para se inscrever no streaming e o callback do PushStreamResult que adicionará os clients em nossa ConcurrentBag e irá tratar caso algum dos clients feche a sessão.

[HttpGet]
[Route("Streaming")]
public IActionResult Stream()
{
return new PushStreamResult(OnStreamAvailable, "text/event-stream", HttpContext.RequestAborted);
}
private void OnStreamAvailable(Stream stream, CancellationToken requestAborted)
{
var wait = requestAborted.WaitHandle;
var client = new StreamWriter(stream);
_clients.Add(client);
wait.WaitOne();
StreamWriter ignore;
_clients.TryTake(out ignore);
}

view raw
ClienteController.cs
hosted with ❤ by GitHub

 

Aqui criamos uma action para recebermos os dados do streaming utilizando a classe PushStreamResult e adicionamos o método OnStreamAvailable que será responsável por tratar os clients que quiserem receber os dados do streaming.

A controller de cliente deve ficar assim:

using DotNetCoreStreaming.Enums;
using DotNetCoreStreaming.Models;
using DotNetCoreStreaming.Results;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Newtonsoft.Json;
using System.Collections.Concurrent;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace DotNetCoreStreaming.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class ClienteController : ControllerBase
{
private static ConcurrentBag<StreamWriter> _clients;
static ClienteController()
{
_clients = new ConcurrentBag<StreamWriter>();
}
[HttpPost]
public IActionResult Post(Cliente cliente)
{
//Fazer o Insert
EnviarEvento(cliente, EventoEnum.Insert);
return Ok();
}
[HttpPut]
public IActionResult Put(Cliente cliente)
{
//Fazer o Update
EnviarEvento(cliente, EventoEnum.Update);
return Ok();
}
private static async Task EnviarEvento(object dados, EventoEnum evento)
{
foreach (var client in _clients)
{
string jsonEvento = string.Format("{0}\n", JsonConvert.SerializeObject(new { dados, evento }));
await client.WriteAsync(jsonEvento);
await client.FlushAsync();
}
}
[HttpGet]
[Route("Streaming")]
public IActionResult Stream()
{
return new PushStreamResult(OnStreamAvailable, "text/event-stream", HttpContext.RequestAborted);
}
private void OnStreamAvailable(Stream stream, CancellationToken requestAborted)
{
var wait = requestAborted.WaitHandle;
var client = new StreamWriter(stream);
_clients.Add(client);
wait.WaitOne();
StreamWriter ignore;
_clients.TryTake(out ignore);
}
}
}

view raw
ClienteController.cs
hosted with ❤ by GitHub

 

Funciona??

Para testar, criei no postmam um json de cliente, para ver se o streaming está funcionando basta acessar pelo browser a rota https://localhost:44385/api/cliente/streaming (troque a porta pela sua).

Verifique que quando você colocar essa url no browser, ele ficará carregando e não fará nada além disso:

Ainda com o browser aberto, experimente fazer uma requisição para nossas API’s de Insert ou Update:

Veja o que aparece no browser:

Repare que os dados são escritos em tempo real no browser e que podemos utilizar essa arquitetura de diversas formas, até mesmo para aplicativos mobile realtime, mas isso é assunto para outro post…

O projeto está no meu github em https://github.com/sergioprates/DotNetCoreStreaming

​(Cross-post de https://medium.com/@sergioprates/criando-uma-api-streaming-com-net-core-b2eeaab0dfac)