
Stream processor/aggregator/manager



Quick Start

Prep your environment

$ echo "APP_NAME=appName" >> .env
$ echo "DATABASE_URL=postgres://user:password@hostname:port/database" >> .env
$ echo "REDIS_URL=redis://hostname:port/" >> .env
$ echo "ADMIN_USER=foo" >> .env
$ echo "ADMIN_PASS=pa$$word" >> .env
$ echo "NODE_ENV=local" >> .env
$ echo "CALLBACK_URL=http://localhost:3000" >> .env
$ echo "SECRET=secretkey" >> .env

Ensure your database and user is created

createdb dbname
createuser dbuser

Start it up

foreman start

Meanwhile, in your code

  const Cher = require('cher');
  const cherTwitter = require('cher-twitter');
  const cherInstagram = require('cher-instagram');
  const tools = require('cher-tools');
  // filter dirty words
  const dirtyFilter = tools.pipeline.profanityFilter;
  // save data to the database
  const save =;
  // config helper
  const config = tools.utils.config;  

  // Cher streams get configured here.
  const sourceStreams = [
      name: 'twitter',
      module: cherTwitter
      name: 'instagram',
      module: cherInstagram

    // When data is received it is immediately sent through this pipeline.
    // Place your filters, maps, etc here in the order in which you want them
    // executed
    pipeline: [ dirtyFilter, save ],
    // The sources for our streams
    sourceStreams: sourceStreams    
  .spread((streams, cher) => {
      const twitterSource = cher.getStream('twitter');
        .spread((dataChannel, twitterSettings) => {
          dataChannel.on('data', (data) => {
            console.log(data) // a tweet!

      // sources are also event emitters
      twitterSource.on('start', (dataChannel, twitterSettings) => {
        // ...


Cher is a stream processing platform. The best way to process a stream of data is through a data pipeline. Cher provides several pipeline steps out of the box for you to utilize including a save() which simply saves data to the database. In order to create a step in the pipeline just create a function which returns a through stream. The function which returns the through stream will receive one parameter deps which is an object that provides several components that you can utilize in your pipeline step.



An instance of the cher model manager which gives you access to your underlyning data model. You can access it like so

const DataModel = models.get('DataModel');


An instance of a key/value store that cher has access to which you can use to store arbitrary data.

ctx.set('foo', {bar: 'beep'})
  .then((foo) {
    console.log(foo); // {bar: 'beep'}
  // don't forget to catch errors!


The configured settings for the current sourceStream. If the data item coming through the pipeline is from twitter, this will be the twitter settings. If it is from S3, it will be the S3 settings.

Putting it all together

So, what does a custom pipeline step look like? Simple

const es = require('event-stream');
const tools = require('cher-tools');

module.exports = (deps) => {
  return es.through((data) {
    // parse the raw stream data into a JS object
    let item = utils.parse(data);
    // transform the data somehow
    // ...
    // reserialize the data for more efficient processing
    const serialized = utils.serialize(item);
    // when you're done, emit the serialized data so that it can go to the next step
    // in the pipeline. You can also NOT emit data based on some condition. This effectively
    // turns the stream from a map to a filter
    this.emit('data', serialize);

Related Repositories

cher db

cher twitter

cher instagram

cher facebook

cher tools

cher source interface


npm i cher


  • MIT
  • ^4.2.1
  • ISL
  • released 7/6/2016

