Skip to content

5. Channels

Channels are a key data structure of Nextflow that allows the implementation of reactive-functional oriented computational workflows based on the Dataflow programming paradigm.

They are used to logically connect tasks to each other or to implement functional style data transformations.

eyJ2ZXJzaW9uIjoiMSIsImVuY29kaW5nIjoiYnN0cmluZyIsImNvbXByZXNzZWQiOnRydWUsImVuY29kZWQiOiJ4nO1b61LbSlx1MDAxMv7PU1Ccv0GZ7rmnamsr4EA4iXPjnq1TlLCFLZAtY8lgcipcdTAwMGa150H2mbYlgyXLXHUwMDE2vnCJzy5OXHUwMDE1sTWaUU9Pf91fa3r+XFxZXV2Lbzre2pvVNa9fc1x1MDAwM7/eda/XXiXXr7xu5IdtasL0d1x1MDAxNPa6tfTOZlx1MDAxY3eiN69ft9zuhVx1MDAxN3dcdTAwMDK35jlXftRzgyju1f3QqYWt137staJ/Jn8/uS3vXHUwMDFmnbBVj7tO9pB1r+7HYXfwLC/wWl47jmj0f9Hv1dU/07856bpeLXbbjcBLO6RNOVx1MDAwMVx1MDAxMYpXP4XtVFjLtLKg2bDdjyr0tNirU+NcdTAwMTlJ7GUtyaW19fN+/3ynf1mtoCtcdTAwMGZcdTAwMDJ1c35ROclcdTAwMWV65lx1MDAwN8FufFx1MDAxM1xmXHUwMDE04daavW5OpCjuhlx1MDAxN96hX4+b1Fx1MDAwZYXrw35RSDrIenXDXqPZ9qJopE/YcWt+fJNcXGOZ+Fx1MDAwM1x1MDAxZLxZza706ZeU4GhcdTAwMTTIXGaXKCTHYWvSXHUwMDFmXHUwMDE1OkxcdCVcZlhkTFx1MDAxNcTaXGZcdTAwMDNaXHUwMDA3XHUwMDEyK+667ajjdmklMuFO3dpFgyRs14f3/XbmnjIms3uubyfMjXVI24IplMPGpuc3mjG1KuEgMMu1ZOmHZ3J46XKARiDpbU7C5MmdnXpqXHUwMDE4f+RcdTAwMTXWrt8q7M5SMlvht1d+ZnNI7n9XtLG8neVcZuCz5u3KQaR3T8Lf97+0XHUwMDBlwr2DsDec64hRut1ueL02bPl5+y1cdTAwMTO016m7XHUwMDAzU1x1MDAwM6WlUEYqVCZbncBvX1Bju1x1MDAxN1x1MDAwNNm1sHaRWedKbibzgVx1MDAwMmwpKDhcdTAwMDMpJUM1MyraXHUwMDFhT7Y3t65cdTAwMDFbJz/MydUx25Z7i6FcdTAwMDKfXHJcdTAwMTVoXHUwMDE5mSOS4oVcdTAwMTRM6Fx1MDAwMiq4daRAqbRApVxmXHUwMDE5Zlx1MDAxOS5+XHUwMDFi2Ou9mJiInTtcXJD/cVxmMlx1MDAwZZNgXHUwMDAxTDpacjtcdTAwMDKaO0RYaTlcdK5/XHIg7uws9vrxKFx1MDAwMlx1MDAwNlZxXHUwMDE4729cIlx1MDAwZipH3dZOdFx0jbPD3umXXHUwMDFjXHUwMDFlXk1cdTAwMWV20Plcbi9Pq/FW/9MnXHUwMDFlrW9cdTAwMDXqqt88Pn0snFx0pTSYx8LZyPxzXHUwMDEwXHUwMDEzulxmYUBIZ1KANDNDbLIylzzwcNRcdTAwMGUwtMA0m1x1MDAwMDEtXHUwMDFjQTaMjFx1MDAwMtTTQsyacWRxVcSTQqOZNsLOgadcdTAwMTGjXHUwMDE5XHUwMDAzzn02qEFcdTAwMGI5h1xyZmtcdTAwMWS2413/R6JuNCNXt9yWXHUwMDFm3IwsV2qciXrc6GL1P//OqzDy6KHpKHLk9reB30jsd61G0/C6I6ZcdTAwMWT7xNCGN7T8ej1cdTAwMWZeaiSDS2N2d2ZcdFx1MDAwYmHXb/htN9griHgv2Fx1MDAwNmCfxPKUKYWbllx1MDAwNriBOVwi2v3uZy64iWeDm1Cc4KaYJawpba1Qo3izwkmMT1xunbCsnPMpwk1YataLw01cdTAwMWKHoqZcdTAwMDYuJGig546jjznMXHUwMDFhS1x1MDAxMZYjcXCOXFyYXCJcdTAwMWPBXHUwMDEwXHUwMDE0KfAxmFx1MDAwM4/j8Vxyp8e3aaHCLlx1MDAwMtModrvxht+u++3GqGC3+cwsIEmBXetFXHUwMDAzjXFJa1x1MDAwYlx0XHUwMDE1sFx1MDAxYa3RubtcdTAwMWFuJ1x1MDAxMdohRsBQs4TXICqpxibvtevTharsXHUwMDFlfLMs/Fx1MDAxNuuTxv5cdTAwMDZ0XHUwMDFhzd0v55OEWiepQChG3NUgMkFyZfw8XHUwMDEzXHUwMDBiXHUwMDFjkGBcdTAwMTVypjRHOU45XHUwMDAyN4o3w1bLj0n7X0K/XHUwMDFkXHUwMDE3tZyq822C/qbn1outNKt8W9FNdJJcdTAwMTFHqUz2bTWDUfpj+P2PV1x1MDAxM+8ute7ksz7BsLNcdTAwMDFX8v+XuTgvXGL8TlTC2ss5XHUwMDA1N4Yr8nKZM5/m5O5f5SXlXHUwMDE0UqKjXGYpl1g7JawqXHUwMDFiZZDMgiMsgtGyINHjkVx0XHUwMDE05XSdsliQXHUwMDEzqLqQ5E1cdTAwMTBcdTAwMWXG1GfwZL+MUiNXVmd+/oGUOiUmk+xfqlL7T9aciCbame3/Ovx6oJpcdTAwMWZcdTAwMWGbh5+s+ai/7n/2vveX3v6VSt6iMMpcdTAwMWEn2j86o/b3yMbP6Vx1MDAwMdxy0CqlsExPiu1jkVx1MDAxY7gkqVx1MDAxOPI57H9hZj23KZaF7GKEKWl55Ng10vaogat87Vxu3WdcblNlXHUwMDE4xVxc/lTEqLLKUORcdTAwMTSZXHUwMDFkTMOoODpq7KPY/H6JjbjDqzsnXHUwMDA3x+7fXHUwMDAxo1xiYIlcclHWIXMvaG7SrFhcdTAwMTJNI/qGllupn1x1MDAxNKzKISmUpVG0Id41XHUwMDAzVpHWiFJcdTAwMDc7T6h6gerjQ7Vk5VxuvVx1MDAxZoRUa7F4dYhUiVJqylx1MDAxM2dcdTAwMDYqr1Yu6lx1MDAxYlx1MDAxZqOzk72tVnP7pva2elVZdqCq5Fx1MDAxZLDUZIhAqSYlMYVgKlxiqEowLjilVJSOPlx1MDAxOVIpQ7KU0YFcdTAwMTJSXHUwMDFhQ44jU/xcdTAwMTCq0pJTSSBcdTAwMDOK0lx1MDAwZjOJZHJKPXKTeP50+Vx1MDAwNdBlSaOjXHUwMDE0Uv6LlL5zYmk231x1MDAxYiguK4aMMnejiNmhmDZcXLm9JFx1MDAxZrSO1lxuXGK+gEJYLqdcdTAwMGXHXHUwMDFjQjskeVx1MDAxNdFcdTAwMDJD3/LDXHRcdTAwMWGO8KE0+SNcdTAwMGVMmmnDUVx1MDAxMmQ5zUJcdTAwMGLOLVLOL/PDjdnxI7kzQF6aXHUwMDFkXHUwMDEw7Un2XHUwMDE2+ezJMXyrvN1tfF2vVFx1MDAwZSN5vnHdqlx1MDAxZJr9ZfdnNMtkL5eWXG6t4PkgPvRnzFxuLVx1MDAxNVx1MDAxOFx1MDAwNlDca1syd6aT7i/+7MWf/b/6M2ZK9+hJt1ZJmtzsXHUwMDBlLTw5fL+3vq+OLt2rba968H776MfGsjs0XHUwMDAz2tG0lOTQXHUwMDE0MTBWdGjc4VIyXHRcdTAwMTa40U+5R/9cdTAwMThcdTAwMGVcclx1MDAxOJJcdTAwMGK2/MWjvXi0/2mPVlZcdTAwMTFcdTAwMDHle7RJrY0kyTPLnObP9urvsHpx2jjfW2/gVmX7+PfGvlp2f0a241iyXHUwMDA3RNBKMlF4e2vAUYxcYpzmxFVcctqnezWksydcdTAwMGZcdTAwMWRcdTAwMTjm8ttbjyVcdTAwMTiZlFAwj8davCRCMZr1XCKuKV9cdTAwMTLBRq5cdTAwMGVLXCKy2HBXXHUwMDEyQVx1MDAxNuKt/phcXFx1MDAxMcFG7lx1MDAxZVx1MDAxNjxcdTAwMDTe2ahhz1NcdTAwMGYxMpFi8cOtMIvBypbmPZopxcHK2StcXGNcdTAwMDZn/db+SaPxPYKjvVx1MDAwZa9C72jZYaXRkD9N6oyUpLSH8zFYXHUwMDE5aVxy51JcdTAwMTjB8enynllRRU6fXHSu5ilseFx1MDAwMKqQONLzoupmmVB1s1x1MDAxOKpcdTAwMTDK9zHAgDSK4tXMsHpvd6y2XHUwMDA39aOb9/X167hdv17/uPTRylx1MDAxMK+QXGZcdTAwMDSXQKyH5+p4llx1MDAxMlZE3DSRXHUwMDE2+UzBSlxiwZ5cdTAwMTdW/WWCVX/BQj0yplJcXFlcdTAwMGJoZP5cdTAwMTDCNFxc3V+Pv6SFelagIyxoiZTiW1vYdbDaQWJcXGC0sUS9bCmsXHUwMDFlWqZnlIPSMElcdTAwMWOPXHUwMDFiJXJPypXpXHUwMDAxyKTYRUjOKI1Ralx1MDAxY3aW+iquf2VSK1GDnufgRE6nM1Xp3X/uZzWrvyNcdTAwMDZcdTAwMDJcXFx1MDAwYm2lgNzr5tWROj7DKe1cIvLPpWKUcy5YpFx1MDAxN4iNj82b7cpcdTAwMDehv+7tVC/E18aH3lx1MDAwNJmUw0g/yVx1MDAxZdbgaI2eINR6utCGXHUwMDFinpyHUoCIZkyqv1ORXqltp73HrTpcdTAwMWJvJf//QmdrXHUwMDA0Z8Wrd1x1MDAxZY7yP2KnWs2+r3r/Ki/p2Vx1MDAxYWBCOIah5Fx1MDAxYSUjklDIc7kh/6dcZuGWMj5cdTAwMGVFRrNcdTAwMDRna6xMnGs+Vi3V2Zr6XHUwMDExVuv221V7a/eydcBiXGI63eNZz9Y86Vx1MDAxObbHLVx1MDAwNCwl5+UpL+fWMqPs7Nx8sjKXnJtcdTAwMDMz6CCgspzc+jjENHeSd0j0j+OTQiy3XHUwMDEy95ytXHUwMDAxyWjNXGKOz0PO6Y9YqGr/XHUwMDAxh2v+etbDNVPiwuTDNX8t+IKJlb63RUhKm5nls8Pt27tcdTAwMGaf8dj/cH2hXCJxfFxu/NhWl35nXSridlIht4pNOEOdbFNcdTAwMDFxXHSp0jPU7Fx0t9Zzot6DN6tcdTAwMDRalj/y9IRwkyw5fPBscNtsuu22XHUwMDE3zFx1MDAwNbcpyXBcdTAwMWN2XHUwMDE2yoTvRFx1MDAxOcBq5TYyrrmdzm5MOlx1MDAxYZJcdTAwMDLSvl+/nWg24NqV711vTDpCn36SUVOoplx0d8owfq78/C9/l9JcdTAwMDIifQ== task αfile zfile yfile xtask βChannel

5.1 Channel types

Nextflow distinguishes two different kinds of channels: queue channels and value channels.

5.1.1 Queue channel

A queue channel is an asynchronous unidirectional FIFO queue that connects two processes or operators.

  • asynchronous means that operations are non-blocking.
  • unidirectional means that data flows from a producer to a consumer.
  • FIFO means that the data is guaranteed to be delivered in the same order as it is produced. First In, First Out.

A queue channel is implicitly created by process output definitions or using channel factories such as Channel.of or Channel.fromPath.

Try the following snippets:

Click the icons in the code for explanations.

1
2
3
ch = Channel.of(1, 2, 3)
println(ch) // (1)!
ch.view() // (2)!
  1. Use the built-in print line function println to print the ch channel
  2. Apply the view channel operator to the ch channel prints each item emitted by the channels

Exercise

Try to execute this snippet. You can do that by creating a new .nf file or by editing an already existing .nf file.

ch = Channel.of(1, 2, 3)
ch.view()

5.1.2 Value channels

A value channel (a.k.a. singleton channel) by definition is bound to a single value and it can be read unlimited times without consuming its contents. A value channel is created using the value channel factory or by operators returning a single value, such as first, last, collect, count, min, max, reduce, and sum.

To better understand the difference between value and queue channels, save the snippet below as example.nf.

example.nf
ch1 = Channel.of(1, 2, 3)
ch2 = Channel.of(1)

process SUM {
    input:
    val x
    val y

    output:
    stdout

    script:
    """
    echo \$(($x+$y))
    """
}

workflow {
    SUM(ch1, ch2).view()
}

When you run the script, it prints only 2, as you can see below:

2

A process will only instantiate a task when there are elements to be consumed from all the channels provided as input to it. Because ch1 and ch2 are queue channels, and the single element of ch2 has been consumed, no new process instances will be launched, even if there are other elements to be consumed in ch1.

To use the single element in ch2 multiple times, we can either use Channel.value as mentioned above, or use a channel operator that returns a single element such as first() below:

ch1 = Channel.of(1, 2, 3)
ch2 = Channel.of(1)

process SUM {
    input:
    val x
    val y

    output:
    stdout

    script:
    """
    echo \$(($x+$y))
    """
}

workflow {
    SUM(ch1, ch2.first()).view()
}
Output
4

3

2

Besides, in many situations, Nextflow will implicitly convert variables to value channels when they are used in a process invocation. For example, when you invoke a process with a workflow parameter (params.example) which has a string value, it is automatically cast into a value channel.

5.2 Channel factories

These are Nextflow commands for creating channels that have implicit expected inputs and functions.

5.2.1 value()

The value channel factory is used to create a value channel. An optional not null argument can be specified to bind the channel to a specific value. For example:

1
2
3
ch1 = Channel.value() // (1)!
ch2 = Channel.value('Hello there') // (2)!
ch3 = Channel.value([1, 2, 3, 4, 5]) // (3)!
  1. Creates an empty value channel
  2. Creates a value channel and binds a string to it
  3. Creates a value channel and binds a list object to it that will be emitted as a sole emission

5.2.2 of()

The factory Channel.of allows the creation of a queue channel with the values specified as arguments.

ch = Channel.of(1, 3, 5, 7)
ch.view { "value: $it" }

The first line in this example creates a variable ch which holds a channel object. This channel emits the values specified as a parameter in the of channel factory. Thus the second line will print the following:

value: 1
value: 3
value: 5
value: 7

The Channel.of channel factory works in a similar manner to Channel.from (which is now deprecated), fixing some inconsistent behaviors of the latter and providing better handling when specifying a range of values. For example, the following works with a range from 1 to 23:

1
2
3
Channel
    .of(1..23, 'X', 'Y')
    .view()

5.2.3 fromList()

The Channel.fromList channel factory creates a channel emitting the elements provided by a list object specified as an argument:

1
2
3
4
5
list = ['hello', 'world']

Channel
    .fromList(list)
    .view()

5.2.4 fromPath()

The fromPath channel factory creates a queue channel emitting one or more files matching the specified glob pattern.

Channel.fromPath('./data/meta/*.csv')

This example creates a channel and emits as many items as there are files with a csv extension in the ./data/meta folder. Each element is a file object implementing the Path interface.

Tip

Two asterisks, i.e. **, works like * but cross directory boundaries. This syntax is generally used for matching complete paths. Curly brackets specify a collection of sub-patterns.

Name Description
glob When true interprets characters *, ?, [] and {} as glob wildcards, otherwise handles them as normal characters (default: true)
type Type of path returned, either file, dir or any (default: file)
hidden When true includes hidden files in the resulting paths (default: false)
maxDepth Maximum number of directory levels to visit (default: no limit)
followLinks When true symbolic links are followed during directory tree traversal, otherwise they are managed as files (default: true)
relative When true return paths are relative to the top-most common directory (default: false)
checkIfExists When true throws an exception when the specified path does not exist in the file system (default: false)

Learn more about the glob patterns syntax at this link.

Exercise

Use the Channel.fromPath channel factory to create a channel emitting all files with the suffix .fq in the data/ggal/ directory and any subdirectory, in addition to hidden files. Then print the file names.

Solution
1
2
3
Channel
    .fromPath('./data/ggal/**.fq', hidden: true)
    .view()

5.2.5 fromFilePairs()

The fromFilePairs channel factory creates a channel emitting the file pairs matching a glob pattern provided by the user. The matching files are emitted as tuples, in which the first element is the grouping key of the matching pair and the second element is the list of files (sorted in lexicographical order).

1
2
3
Channel
    .fromFilePairs('./data/ggal/*_{1,2}.fq')
    .view()

It will produce an output similar to the following:

[liver, [/workspace/gitpod/nf-training/data/ggal/liver_1.fq, /workspace/gitpod/nf-training/data/ggal/liver_2.fq]]
[gut, [/workspace/gitpod/nf-training/data/ggal/gut_1.fq, /workspace/gitpod/nf-training/data/ggal/gut_2.fq]]
[lung, [/workspace/gitpod/nf-training/data/ggal/lung_1.fq, /workspace/gitpod/nf-training/data/ggal/lung_2.fq]]

Warning

The glob pattern must contain at least a star wildcard character (*).

Name Description
type Type of paths returned, either file, dir or any (default: file)
hidden When true includes hidden files in the resulting paths (default: false)
maxDepth Maximum number of directory levels to visit (default: no limit)
followLinks When true symbolic links are followed during directory tree traversal, otherwise they are managed as files (default: true)
size Defines the number of files each emitted item is expected to hold (default: 2). Set to -1 for any
flat When true the matching files are produced as sole elements in the emitted tuples (default: false)
checkIfExists When true, it throws an exception of the specified path that does not exist in the file system (default: false)

Exercise

Use the fromFilePairs channel factory to create a channel emitting all pairs of fastq read in the data/ggal/ directory and print them. Then use the flat: true option and compare the output with the previous execution.

Solution

Use the following, with or without flat: true:

1
2
3
Channel
    .fromFilePairs('./data/ggal/*_{1,2}.fq', flat: true)
    .view()

Then check the square brackets around the file names, to see the difference with flat.

5.2.6 fromSRA()

The Channel.fromSRA channel factory makes it possible to query the NCBI SRA archive and returns a channel emitting the FASTQ files matching the specified selection criteria.

The query can be project ID(s) or accession number(s) supported by the NCBI ESearch API.

Info

This function now requires an API key you can only get by logging into your NCBI account.

Instructions for NCBI login and key acquisition
  1. Go to: https://www.ncbi.nlm.nih.gov/
  2. Click the top right "Log in" button to sign into NCBI. Follow their instructions.
  3. Once into your account, click the button at the top right, usually your ID.
  4. Go to Account settings
  5. Scroll down to the API Key Management section.
  6. Click on "Create an API Key".
  7. The page will refresh and the key will be displayed where the button was. Copy your key.

For example, the following snippet will print the contents of an NCBI project ID:

1
2
3
4
5
params.ncbi_api_key = '<Your API key here>'

Channel
    .fromSRA(['SRP073307'], apiKey: params.ncbi_api_key)
    .view()

Replace <Your API key here> with your API key.

This should print:

[SRR3383346, [/vol1/fastq/SRR338/006/SRR3383346/SRR3383346_1.fastq.gz, /vol1/fastq/SRR338/006/SRR3383346/SRR3383346_2.fastq.gz]]
[SRR3383347, [/vol1/fastq/SRR338/007/SRR3383347/SRR3383347_1.fastq.gz, /vol1/fastq/SRR338/007/SRR3383347/SRR3383347_2.fastq.gz]]
[SRR3383344, [/vol1/fastq/SRR338/004/SRR3383344/SRR3383344_1.fastq.gz, /vol1/fastq/SRR338/004/SRR3383344/SRR3383344_2.fastq.gz]]
[SRR3383345, [/vol1/fastq/SRR338/005/SRR3383345/SRR3383345_1.fastq.gz, /vol1/fastq/SRR338/005/SRR3383345/SRR3383345_2.fastq.gz]]
// (remaining omitted)

Multiple accession IDs can be specified using a list object:

1
2
3
4
ids = ['ERR908507', 'ERR908506', 'ERR908505']
Channel
    .fromSRA(ids, apiKey: params.ncbi_api_key)
    .view()
[ERR908507, [/vol1/fastq/ERR908/ERR908507/ERR908507_1.fastq.gz, /vol1/fastq/ERR908/ERR908507/ERR908507_2.fastq.gz]]
[ERR908506, [/vol1/fastq/ERR908/ERR908506/ERR908506_1.fastq.gz, /vol1/fastq/ERR908/ERR908506/ERR908506_2.fastq.gz]]
[ERR908505, [/vol1/fastq/ERR908/ERR908505/ERR908505_1.fastq.gz, /vol1/fastq/ERR908/ERR908505/ERR908505_2.fastq.gz]]

Info

Read pairs are implicitly managed and are returned as a list of files.

It’s straightforward to use this channel as an input using the usual Nextflow syntax. The code below creates a channel containing two samples from a public SRA study and runs FASTQC on the resulting files. See:

params.ncbi_api_key = '<Your API key here>'

params.accession = ['ERR908507', 'ERR908506']

process FASTQC {
    input:
    tuple val(sample_id), path(reads_file)

    output:
    path("fastqc_${sample_id}_logs")

    script:
    """
    mkdir fastqc_${sample_id}_logs
    fastqc -o fastqc_${sample_id}_logs -f fastq -q ${reads_file}
    """
}

workflow {
    reads = Channel.fromSRA(params.accession, apiKey: params.ncbi_api_key)
    FASTQC(reads)
}

If you want to run the workflow above and do not have fastqc installed in your machine, don’t forget what you learned in the previous section. Run this workflow with -with-docker biocontainers/fastqc:v0.11.5, for example.

5.2.7 Text files

The splitText operator allows you to split multi-line strings or text file items, emitted by a source channel into chunks containing n lines, which will be emitted by the resulting channel. See:

1
2
3
4
Channel
    .fromPath('data/meta/random.txt') // (1)!
    .splitText() // (2)!
    .view() // (3)!
  1. Instructs Nextflow to make a channel from the path data/meta/random.txt
  2. The splitText operator splits each item into chunks of one line by default.
  3. View contents of the channel.

You can define the number of lines in each chunk by using the parameter by, as shown in the following example:

1
2
3
4
5
6
7
Channel
    .fromPath('data/meta/random.txt')
    .splitText(by: 2)
    .subscribe {
        print it;
        print "--- end of the chunk ---\n"
    }

Info

The subscribe operator permits execution of user defined functions each time a new value is emitted by the source channel.

An optional closure can be specified in order to transform the text chunks produced by the operator. The following example shows how to split text files into chunks of 10 lines and transform them into capital letters:

1
2
3
4
Channel
    .fromPath('data/meta/random.txt')
    .splitText(by: 10) { it.toUpperCase() }
    .view()

You can also make counts for each line:

1
2
3
4
5
6
count = 0

Channel
    .fromPath('data/meta/random.txt')
    .splitText()
    .view { "${count++}: ${it.toUpperCase().trim()}" }

Finally, you can also use the operator on plain files (outside of the channel context):

1
2
3
4
5
6
def f = file('data/meta/random.txt')
def lines = f.splitText()
def count = 0
for (String row : lines) {
    log.info "${count++} ${row.toUpperCase()}"
}

5.2.8 Comma separate values (.csv)

The splitCsv operator allows you to parse text items emitted by a channel, that are CSV formatted.

It then splits them into records or groups them as a list of records with a specified length.

In the simplest case, just apply the splitCsv operator to a channel emitting a CSV formatted text files or text entries. For example, to view only the first and fourth columns:

1
2
3
4
5
Channel
    .fromPath("data/meta/patients_1.csv")
    .splitCsv()
    // row is a list object
    .view { row -> "${row[0]}, ${row[3]}" }

When the CSV begins with a header line defining the column names, you can specify the parameter header: true which allows you to reference each value by its column name, as shown in the following example:

1
2
3
4
5
Channel
    .fromPath("data/meta/patients_1.csv")
    .splitCsv(header: true)
    // row is a list object
    .view { row -> "${row.patient_id}, ${row.num_samples}" }

Alternatively, you can provide custom header names by specifying a list of strings in the header parameter as shown below:

1
2
3
4
5
Channel
    .fromPath("data/meta/patients_1.csv")
    .splitCsv(header: ['col1', 'col2', 'col3', 'col4', 'col5'])
    // row is a list object
    .view { row -> "${row.col1}, ${row.col4}" }

You can also process multiple CSV files at the same time:

1
2
3
4
Channel
    .fromPath("data/meta/patients_*.csv") // <-- just use a pattern
    .splitCsv(header: true)
    .view { row -> "${row.patient_id}\t${row.num_samples}" }

Tip

Notice that you can change the output format simply by adding a different delimiter.

Finally, you can also operate on CSV files outside the channel context:

1
2
3
4
5
def f = file('data/meta/patients_1.csv')
def lines = f.splitCsv()
for (List row : lines) {
    log.info "${row[0]} -- ${row[2]}"
}

Exercise

Try inputting fastq reads into the RNA-Seq workflow from earlier using .splitCsv.

Solution

Add a CSV text file containing the following, as an example input with the name "fastq.csv":

gut,/workspace/gitpod/nf-training/data/ggal/gut_1.fq,/workspace/gitpod/nf-training/data/ggal/gut_2.fq

Then replace the input channel for the reads in script7.nf. Changing the following lines:

1
2
3
Channel
    .fromFilePairs(params.reads, checkIfExists: true)
    .set { read_pairs_ch }

To a splitCsv channel factory input:

1
2
3
4
5
Channel
    .fromPath("fastq.csv")
    .splitCsv()
    .view { row -> "${row[0]}, ${row[1]}, ${row[2]}" }
    .set { read_pairs_ch }

Finally, change the cardinality of the processes that use the input data. For example, for the quantification process, change it from:

process QUANTIFICATION {
    tag "$sample_id"

    input:
    path salmon_index
    tuple val(sample_id), path(reads)

    output:
    path sample_id, emit: quant_ch

    script:
    """
    salmon quant --threads $task.cpus --libType=U -i $salmon_index -1 ${reads[0]} -2 ${reads[1]} -o $sample_id
    """
}

To:

process QUANTIFICATION {
    tag "$sample_id"

    input:
    path salmon_index
    tuple val(sample_id), path(reads1), path(reads2)

    output:
    path sample_id, emit: quant_ch

    script:
    """
    salmon quant --threads $task.cpus --libType=U -i $salmon_index -1 ${reads1} -2 ${reads2} -o $sample_id
    """
}

Repeat the above for the fastqc step.

process FASTQC {
    tag "FASTQC on $sample_id"

    input:
    tuple val(sample_id), path(reads1), path(reads2)

    output:
    path "fastqc_${sample_id}_logs"

    script:
    """
    mkdir fastqc_${sample_id}_logs
    fastqc -o fastqc_${sample_id}_logs -f fastq -q ${reads1} ${reads2}
    """
}

Now the workflow should run from a CSV file.

5.2.9 Tab separated values (.tsv)

Parsing TSV files works in a similar way, simply add the sep: '\t' option in the splitCsv context:

1
2
3
4
5
Channel
    .fromPath("data/meta/regions.tsv", checkIfExists: true)
    // use `sep` option to parse TAB separated files
    .splitCsv(sep: '\t')
    .view()

Exercise

Try using the tab separation technique on the file data/meta/regions.tsv, but print just the first column, and remove the header.

Solution
1
2
3
4
5
6
Channel
    .fromPath("data/meta/regions.tsv", checkIfExists: true)
    // use `sep` option to parse TAB separated files
    .splitCsv(sep: '\t', header: true)
    // row is a list object
    .view { row -> "${row.patient_id}" }

5.3 More complex file formats

5.3.1 JSON

We can also easily parse the JSON file format using the splitJson channel operator.

The splitJson operator supports JSON arrays:

1
2
3
4
Channel
    .of('["Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday"]')
    .splitJson()
    .view { "Item: ${it}" }
Item: Sunday
Item: Monday
Item: Tuesday
Item: Wednesday
Item: Thursday
Item: Friday
Item: Saturday

JSON objects:

1
2
3
4
Channel
    .of('{"player": {"name": "Bob", "height": 180, "champion": false}}')
    .splitJson()
    .view { "Item: ${it}" }
Item: [key:player, value:[name:Bob, height:180, champion:false]]

And even a JSON array of JSON objects!

1
2
3
4
5
Channel
    .of('[{"name": "Bob", "height": 180, "champion": false}, \
        {"name": "Alice", "height": 170, "champion": false}]')
    .splitJson()
    .view { "Item: ${it}" }
Item: [name:Bob, height:180, champion:false]
Item: [name:Alice, height:170, champion:false]

Files containing JSON content can also be parsed:

1
2
3
4
Channel
    .fromPath('file.json')
    .splitJson()
    .view { "Item: ${it}" }
[
  { "name": "Bob", "height": 180, "champion": false },
  { "name": "Alice", "height": 170, "champion": false }
]
Item: [name:Bob, height:180, champion:false]
Item: [name:Alice, height:170, champion:false]

5.3.2 YAML

This can also be used as a way to parse YAML files:

1
2
3
4
5
6
7
8
9
import org.yaml.snakeyaml.Yaml

def f = file('data/meta/regions.yml')
def records = new Yaml().load(f)


for (def entry : records) {
    log.info "$entry.patient_id -- $entry.feature"
}
- patient_id: ATX-TBL-001-GB-01-105
  region_id: R1
  feature: pass_vafqc_flag
  pass_flag: "TRUE"
- patient_id: ATX-TBL-001-GB-01-105
  region_id: R1
  feature: pass_stripy_flag
  pass_flag: "TRUE"
- patient_id: ATX-TBL-001-GB-01-105
  region_id: R1
  feature: pass_manual_flag
  pass_flag: "TRUE"
- patient_id: ATX-TBL-001-GB-01-105
  region_id: R1
  feature: other_region_selection_flag
  pass_flag: "TRUE"
- patient_id: ATX-TBL-001-GB-01-105
  region_id: R1
  feature: ace_information_gained
  pass_flag: "TRUE"
- patient_id: ATX-TBL-001-GB-01-105
  region_id: R1
  feature: concordance_flag
  pass_flag: "TRUE"
- patient_id: ATX-TBL-001-GB-01-105
  region_id: R2
  feature: pass_vafqc_flag
  pass_flag: "TRUE"
- patient_id: ATX-TBL-001-GB-01-105
  region_id: R2
  feature: pass_stripy_flag
  pass_flag: "TRUE"
- patient_id: ATX-TBL-001-GB-01-105
  region_id: R2
  feature: pass_manual_flag
  pass_flag: "TRUE"
- patient_id: ATX-TBL-001-GB-01-105
  region_id: R2
  feature: other_region_selection_flag
  pass_flag: "TRUE"
- patient_id: ATX-TBL-001-GB-01-105
  region_id: R2
  feature: ace_information_gained
  pass_flag: "TRUE"
- patient_id: ATX-TBL-001-GB-01-105
  region_id: R2
  feature: concordance_flag
  pass_flag: "TRUE"
- patient_id: ATX-TBL-001-GB-01-105
  region_id: R3
  feature: pass_vafqc_flag
  pass_flag: "TRUE"
- patient_id: ATX-TBL-001-GB-01-105
  region_id: R3
  feature: pass_stripy_flag
  pass_flag: "FALSE"
ATX-TBL-001-GB-01-105 -- pass_vafqc_flag
ATX-TBL-001-GB-01-105 -- pass_stripy_flag
ATX-TBL-001-GB-01-105 -- pass_manual_flag
ATX-TBL-001-GB-01-105 -- other_region_selection_flag
ATX-TBL-001-GB-01-105 -- ace_information_gained
ATX-TBL-001-GB-01-105 -- concordance_flag
ATX-TBL-001-GB-01-105 -- pass_vafqc_flag
ATX-TBL-001-GB-01-105 -- pass_stripy_flag
ATX-TBL-001-GB-01-105 -- pass_manual_flag
ATX-TBL-001-GB-01-105 -- other_region_selection_flag
ATX-TBL-001-GB-01-105 -- ace_information_gained
ATX-TBL-001-GB-01-105 -- concordance_flag
ATX-TBL-001-GB-01-105 -- pass_vafqc_flag
ATX-TBL-001-GB-01-105 -- pass_stripy_flag

5.3.3 Storage of parsers into modules

The best way to store parser scripts is to keep them in a Nextflow module file.

Let's say we don't have a JSON channel operator, but we create a function instead. The parsers.nf file should contain the parseJsonFile function. See the contente below:

include { parseJsonFile } from './modules/parsers.nf'

process FOO {
    input:
    tuple val(patient_id), val(feature)

    output:
    stdout

    script:
    """
    echo $patient_id has $feature as feature
    """
}

workflow {
    Channel
        .fromPath('data/meta/regions*.json')
        | flatMap { parseJsonFile(it) }
        | map { record -> [record.patient_id, record.feature] }
        | unique
        | FOO
        | view
}
1
2
3
4
5
6
7
import groovy.json.JsonSlurper

def parseJsonFile(json_file) {
    def f = file(json_file)
    def records = new JsonSlurper().parse(f)
    return records
}
ATX-TBL-001-GB-01-105 has pass_stripy_flag as feature

ATX-TBL-001-GB-01-105 has ace_information_gained as feature

ATX-TBL-001-GB-01-105 has concordance_flag as feature

ATX-TBL-001-GB-01-105 has pass_vafqc_flag as feature

ATX-TBL-001-GB-01-105 has pass_manual_flag as feature

ATX-TBL-001-GB-01-105 has other_region_selection_flag as feature

Nextflow will use this as a custom function within the workflow scope.

Tip

You will learn more about module files later in the Modularization section of this tutorial.