Saturday 31 August 2019

Optional Items in Nextflow Set Channels

The problem

I recently needed to add support for having three optional output files to a Nextflow tuple-based (set) channel. In my case, this was because I had a process that outputs the variant files for a sample separately but simulataneously and where I want each variant type to be handled separately downstream. That is, it needed to outut the SNVs, indels and delins variants for a sample separately and simulataneously as three separate files, but not all variant types would necessarily be present. For example:

process tricksyProcess {
    input:
    set(
        val(sampleName),
        file(variantsFile)
    ) from tricksyInputChannel
    val variantTypes

    output:
    set(
        val(sampleName),
        file("snvs.vcf"),
        file("indels.vcf"),
        file("delins.vcf")
    ) into tricksyOutputChannel

    shell:
    '''
    my-tricksy-process \
        --variants "!{variantsFile}" \
        --sample-name "!{sampleName}" \
        --snv-output "snvs.vcf" \
        --indel-output "indels.vcf" \
        --delins-output "delins.vcf";
    '''
}

Fortunately, Nextflow has an optional output directive. Unfortunately, it operates at the entire-set level, rather than individual items within the set.

There are a few options for handling this:

  1. Combine output files into single file, and filter/split into component variant types where needed. Since in my case I have very large lists of variants, this would greatly slow down the pipeline and still require a fair amount of extra boilerplate code downstream.
  2. Use flow control if/else blocks to define different processes in different scenarios. With three different boolean options, I’d need 8 different definitions of my process!
  3. Always output all variant types and discard/ignore unwanted files downstream. In my case, my-tricksy-process would fail if a required variant type doesn’t appear in the input file, so I’d need to add dummy data to allow it to work, which risks contaminating my results. Also, it incurs significant additional compute time.
  4. Create dummy output files for unwanted file types. This is by far the easiest option. You would still need control downstream to prevent processing of unwanted variant types, but it’s simple. It isn’t very defensive, however, and my application is for a pipeline for use in a clinical setting. I therefore want to use Nextflow’s error and file checking strengths to ensure all and only expected file types are present. I also don’t want dummy files to get mixed up with legitimate output files when archiving the results, which could lead to confusion or inappropriate use of the files.
  5. Place each output file in a separate channel and join them together. This requires having a unique key to join items together, but requires no unnecessary compute, involves minimal boilerplate and outputs only the files actually needed.

Chosen solution

As you can probably guess, I opted for option 5.

Define your process with each independent output in a separate channel. You need to include the unique identifier in each channel. Each channel will be marked optional if the output file is not expected.

    output:
    set(
        val(sampleName),
        file("snvs.vcf"),
    ) optional (!variantTypes.contains("snv")) into tricksyOutputSnvChannel

Next, inside the process’ shell block, we need to dynamically build the list of command arguments

    shell:
    '''
    file_args=();
    if [[ "!{variantTypes.contains("snv")}" == "true" ]]; then
        file_args+=(--snv-output snvs.vcf);
    fi
    if [[ "!{variantTypes.contains("indel")}" == "true" ]]; then
        file_args+=(--indel-output indels.vcf);
    fi
    if [[ "!{variantTypes.contains("delins")}" == "true" ]]; then
        file_args+=(--delins-output delins.vcf);
    fi

    my-tricksy-process \
        --variants "!{variantsFile}" \
        --sample-name "!{sampleName}" \
        ${file_args[@]:-}
    '''

After the process definition, build a combined output channel by joining separate channels:

tricksyOutCh =
    tricksyOutSnvCh
    .join(tricksyOutIndelCh, remainder: true, by: 0)
    .join(tricksyOutDelinsCh, remainder: true, by: 0)
    .map { entry ->
        def item = entry[0]
        assert (entry[1] == null) == (! variantTypes.contains("snv")): \
            "snv status for ${item} should be correct"
        assert (entry[2] == null) == (! variantTypes.contains("indel")): \
            "indel status for ${item} should be correct"
        assert (entry[3] == null) == (! variantTypes.contains("delins")): \
            "delins status for ${item} should be correct"

        entry
    }

After doing the join, we use a map closure to assert that the value of the output files is null if and only if the output file is not expected. This defends agains our previous process output files that it’s not supposed to, or being misused in some way.

Finally, we can use the new output channel in any desired downstream process. While it would be best to use the variantTypes object to decide which input items to use, we can also infer when an input file exists or is null as Nextflow will generate a filename in the pattern input.XXX for a null input object:

    input:
    set(
        val(sampleName),
        file(snvFile),
        file(indelFile),
        file(delinsFile)
    ) from tricksyOutCh

    shell:
    '''
    use_files=();
    for in_file in "!{snvFile}" "!{indelFile}" "!{delinsFile}"; do
        if [[ ! "${in_file}" =~ ^input\\.[0-9]*$ ]]; then
            use_files+=("${in_file}");
        fi
    done

    ...
    '''

Working example

Try the above out for yourself with the code below!

Copy this into a file called bin/my-tricksy-process:

#!/bin/bash

# A simple demo script that just generates specific output files.

args=("$@");
nargs="${#args[@]}";
idx=0;
for (( ; idx < nargs; idx ++ )); do
    this_arg="${args[$idx]}";
    next_arg="${args[((idx + 1))]:-}";

    echo "$this_arg $next_arg"
    case "${this_arg}" in
        --variants)
            (( idx ++ ));
            variants_file="$next_arg";
            ;;

        --sample-name)
            (( idx ++ ));
            sample_name="$next_arg";
            ;;

        --snv-output)
            (( idx ++ ));
            snv_file="$next_arg";
            ;;

        --indel-output)
            (( idx ++ ));
            indel_file="$next_arg";
            ;;

        --delins-output)
            (( idx ++ ));
            delins_file="$next_arg";
            ;;

    esac
done

if [[ ! -z "${snv_file}" ]]; then
    echo "$sample_name" > "$snv_file";
fi

if [[ ! -z "${indel_file}" ]]; then
    echo "$sample_name" > "$indel_file";
fi

if [[ ! -z "${delins_file}" ]]; then
    echo "$sample_name" > "$delins_file";
fi

Then in bin/workflow.nf save this:

#!/usr/bin/env nextflow

// The list of variant types to use. This would proably be taken from input
// parameters. Feel free to try this script out with different combinations of
// these values.
variantTypes = ["snv", "indel", "delins"]

// Our dummy input channel.
tricksyInCh =
    Channel.from(["SampleA", "SampleB", "SampleC"])
    .map { sample -> [sample, new File("${sample}.vcf")] }

// Some shortcuts to the boolean flags for convenience
useSnv = variantTypes.contains("snv")
useIndel = variantTypes.contains("indel")
useDelins = variantTypes.contains("delins")

process tricksyProc {
    storeDir 'out'

    input:
    set(
        val(sampleName),
        file(variantsFile)
    ) from tricksyInCh
    val useSnv
    val useIndel
    val useDelins

    output:
    set(
        val(sampleName),
        file("${sampleName}_snvs.vcf"),
    ) optional (!useSnv) into tricksyOutSnvCh

    set(
        val(sampleName),
        file("${sampleName}_indels.vcf"),
    ) optional (!useIndel) into tricksyOutIndelCh

    set(
        val(sampleName),
        file("${sampleName}_delins.vcf"),
    ) optional (!useDelins) into tricksyOutDelinsCh


    shell:
    '''
    file_args=();
    if [[ "!{useSnv}" == "true" ]]; then
        file_args+=(--snv-output !{sampleName}_snvs.vcf);
    fi
    if [[ "!{useIndel}" == "true" ]]; then
        file_args+=(--indel-output !{sampleName}_indels.vcf);
    fi
    if [[ "!{useDelins}" == "true" ]]; then
        file_args+=(--delins-output !{sampleName}_delins.vcf);
    fi

    my-tricksy-process \
        --variants "!{variantsFile}" \
        --sample-name "!{sampleName}" \
        ${file_args[@]:-}
    '''
}

tricksyOutCh =
    tricksyOutSnvCh
    .join(tricksyOutIndelCh, remainder: true, by: 0)
    .join(tricksyOutDelinsCh, remainder: true, by: 0)
    .map { entry ->
        def item = entry[0]
        assert (entry[1] == null) == (! useSnv): \
            "snv status for ${item} should be correct"
        assert (entry[2] == null) == (! useIndel): \
            "indel status for ${item} should be correct"
        assert (entry[3] == null) == (! useDelins): \
            "delins status for ${item} should be correct"

        entry
    }

process downstreamProc {
    storeDir 'out'

    input:
    set(
        val(sampleName),
        file(snvFile),
        file(indelFile),
        file(delinsFile)
    ) from tricksyOutCh

    output:
    file("${sampleName}_files.txt")

    shell:
    '''
    use_files=();
    for in_file in "!{snvFile}" "!{indelFile}" "!{delinsFile}"; do
        if [[ ! "${in_file}" =~ ^input\\.[0-9]*$ ]]; then
            use_files+=("${in_file}");
        fi
    done

    echo "Using files: ${use_files[@]}" > "!{sampleName}_files.txt";
    '''
}

Then run it like so:

export PATH="$(realpath bin):$PATH"

chmod +x bin/my-tricksy-process bin/workflow.nf

rm -rf .nextflow* work out

bin/workflow.nf
# N E X T F L O W  ~  version 19.07.0
# Launching `bin/example.nf` [ridiculous_stallman] - revision: 96c7560721
# executor >  local (6)
# [cb/ed62cf] process > tricksyProc (3)    [100%] 3 of 3 ✔
# [a0/34825e] process > downstreamProc (3) [100%] 3 of 3 ✔

cat out/*_files.txt
# Using files: SampleA_snvs.vcf SampleA_indels.vcf SampleA_delins.vcf
# Using files: SampleB_snvs.vcf SampleB_indels.vcf SampleB_delins.vcf
# Using files: SampleC_snvs.vcf SampleC_indels.vcf SampleC_delins.vcf

Conclusion

Nextflow is powerful but currently lacks an out-of-the-box feature for controlling which items within a Set Channel should be present. However, with just a little bit of extra code and no wasted compute time, it’s possible to guarantee exactly the correct output files are present and nothing more.