.. _high-level-graphs:

High Level Graphs
=================

Dask graphs produced by collections like Arrays, Bags, and DataFrames have
high-level structure that can be useful for visualization and high-level
optimization.  The task graphs produced by these collections encode this
structure explicitly as ``HighLevelGraph`` objects.  This document describes
how to work with these in more detail.


Motivation and Example
----------------------

In full generality, Dask schedulers expect arbitrary task graphs where each
node is a single Python function call and each edge is a dependency between
two function calls.  These are usually stored in flat dictionaries.  Here is
some simple Dask DataFrame code and the task graph that it might generate:

.. code-block:: python

    import dask.dataframe as dd

    df = dd.read_csv('myfile.*.csv')
    df = df + 100
    df = df[df.name == 'Alice']

.. code-block:: python

   {
    ('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
    ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
    ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
    ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv'),
    ('add', 0): (operator.add, ('read-csv', 0), 100),
    ('add', 1): (operator.add, ('read-csv', 1), 100),
    ('add', 2): (operator.add, ('read-csv', 2), 100),
    ('add', 3): (operator.add, ('read-csv', 3), 100),
    ('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
    ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
    ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
    ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3)),
   }

The task graph is a dictionary that stores every Pandas-level function call
necessary to compute the final result.  We can see that there is some structure
to this dictionary if we separate out the tasks that were associated to each
high-level Dask DataFrame operation:

.. code-block:: python

   {
    # From the dask.dataframe.read_csv call
    ('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
    ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
    ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
    ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv'),

    # From the df + 100 call
    ('add', 0): (operator.add, ('read-csv', 0), 100),
    ('add', 1): (operator.add, ('read-csv', 1), 100),
    ('add', 2): (operator.add, ('read-csv', 2), 100),
    ('add', 3): (operator.add, ('read-csv', 3), 100),

    # From the df[df.name == 'Alice'] call
    ('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
    ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
    ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
    ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3)),
   }

By understanding this high-level structure we are able to understand our task
graphs more easily (this is more important for larger datasets when there are
thousands of tasks per layer) and how to perform high-level optimizations.  For
example, in the case above we may want to automatically rewrite our code to
filter our datasets before adding 100:

.. code-block:: python

    # Before
    df = dd.read_csv('myfile.*.csv')
    df = df + 100
    df = df[df.name == 'Alice']

    # After
    df = dd.read_csv('myfile.*.csv')
    df = df[df.name == 'Alice']
    df = df + 100

Dask's high level graphs help us to explicitly encode this structure by storing
our task graphs in layers with dependencies between layers:

.. code-block:: python

   >>> import dask.dataframe as dd

   >>> df = dd.read_csv('myfile.*.csv')
   >>> df = df + 100
   >>> df = df[df.name == 'Alice']

   >>> graph = df.__dask_graph__()
   >>> graph.layers
   {
    'read-csv': {('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
                 ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
                 ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
                 ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv')},

    'add': {('add', 0): (operator.add, ('read-csv', 0), 100),
            ('add', 1): (operator.add, ('read-csv', 1), 100),
            ('add', 2): (operator.add, ('read-csv', 2), 100),
            ('add', 3): (operator.add, ('read-csv', 3), 100)}

    'filter': {('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
               ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
               ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
               ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3))}
   }

   >>> graph.dependencies
   {
    'read-csv': set(),
    'add': {'read-csv'},
    'filter': {'add'}
   }

While the DataFrame points to the output layers on which it depends directly:

.. code-block:: python

   >>> df.__dask_layers__()
   {'filter'}


HighLevelGraphs
---------------

The :obj:`HighLevelGraph` object is a ``Mapping`` object composed of other
sub-``Mappings``, along with a high-level dependency mapping between them:

.. code-block:: python

   class HighLevelGraph(Mapping):
       layers: Dict[str, Mapping]
       dependencies: Dict[str, Set[str]]

You can construct a HighLevelGraph explicitly by providing both to the
constructor:

.. code-block:: python

   layers = {
      'read-csv': {('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
                   ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
                   ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
                   ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv')},

      'add': {('add', 0): (operator.add, ('read-csv', 0), 100),
              ('add', 1): (operator.add, ('read-csv', 1), 100),
              ('add', 2): (operator.add, ('read-csv', 2), 100),
              ('add', 3): (operator.add, ('read-csv', 3), 100)},

      'filter': {('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
                 ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
                 ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
                 ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3))}
   }

   dependencies = {'read-csv': set(),
                   'add': {'read-csv'},
                   'filter': {'add'}}

   graph = HighLevelGraph(layers, dependencies)

This object satisfies the ``Mapping`` interface, and so operates as a normal
Python dictionary that is the semantic merger of the underlying layers:

.. code-block:: python

   >>> len(graph)
   12
   >>> graph[('read-csv', 0)]
   ('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),


API
---

.. currentmodule:: dask.highlevelgraph

.. autoclass:: HighLevelGraph
   :members:
   :inherited-members:
   :exclude-members: visualize

.. TODO: Fix graphviz dependency in docs build and remove ``visualize`` from
   exclude-members in the above directive
