From e6544a59be59e5268fefb6a9ad3854b07ea89f5b Mon Sep 17 00:00:00 2001 From: Jorge Aguilera Date: Tue, 3 May 2022 15:15:26 +0200 Subject: [PATCH 1/2] feature: create a hello channel extension point closes #3 Signed-off-by: Jorge Aguilera --- plugins/nf-hello/build.gradle | 4 +- .../main/nextflow/hello/HelloExtension.groovy | 78 ++++++++ .../main/nextflow/hello/HelloPlugin.groovy | 1 + .../src/resources/META-INF/extensions.idx | 1 + .../hello/ChannelExtensionHelloTest.groovy | 53 +++++ .../test/nextflow/hello/HelloDslTest.groovy | 47 +++++ .../test/nextflow/hello/MockHelpers.groovy | 182 ++++++++++++++++++ .../src/test/nextflow/hello/TestHelper.groovy | 45 +++++ 8 files changed, 409 insertions(+), 2 deletions(-) create mode 100644 plugins/nf-hello/src/main/nextflow/hello/HelloExtension.groovy create mode 100644 plugins/nf-hello/src/test/nextflow/hello/ChannelExtensionHelloTest.groovy create mode 100644 plugins/nf-hello/src/test/nextflow/hello/HelloDslTest.groovy create mode 100644 plugins/nf-hello/src/test/nextflow/hello/MockHelpers.groovy create mode 100644 plugins/nf-hello/src/test/nextflow/hello/TestHelper.groovy diff --git a/plugins/nf-hello/build.gradle b/plugins/nf-hello/build.gradle index 3d9be79..c193145 100644 --- a/plugins/nf-hello/build.gradle +++ b/plugins/nf-hello/build.gradle @@ -53,7 +53,7 @@ sourceSets { dependencies { // This dependency is exported to consumers, that is to say found on their compile classpath. - compileOnly 'io.nextflow:nextflow:21.04.0' + compileOnly 'io.nextflow:nextflow:22.04.0' compileOnly 'org.slf4j:slf4j-api:1.7.10' compileOnly 'org.pf4j:pf4j:3.4.1' // add here plugins depepencies @@ -61,7 +61,7 @@ dependencies { // test configuration testImplementation "org.codehaus.groovy:groovy:3.0.8" testImplementation "org.codehaus.groovy:groovy-nio:3.0.8" - testImplementation 'io.nextflow:nextflow:21.04.0' + testImplementation 'io.nextflow:nextflow:22.04.0' testImplementation ("org.codehaus.groovy:groovy-test:3.0.8") { exclude group: 'org.codehaus.groovy' } testImplementation ("cglib:cglib-nodep:3.3.0") testImplementation ("org.objenesis:objenesis:3.1") diff --git a/plugins/nf-hello/src/main/nextflow/hello/HelloExtension.groovy b/plugins/nf-hello/src/main/nextflow/hello/HelloExtension.groovy new file mode 100644 index 0000000..5be2985 --- /dev/null +++ b/plugins/nf-hello/src/main/nextflow/hello/HelloExtension.groovy @@ -0,0 +1,78 @@ +package nextflow.hello + +import groovy.util.logging.Slf4j +import groovyx.gpars.dataflow.DataflowReadChannel +import groovyx.gpars.dataflow.DataflowWriteChannel +import groovyx.gpars.dataflow.expression.DataflowExpression +import nextflow.Channel +import nextflow.Global +import nextflow.Session +import nextflow.extension.ChannelExtensionPoint +import nextflow.extension.CH +import nextflow.NF +import nextflow.extension.DataflowHelper +import nextflow.plugin.Scoped + +import java.util.concurrent.CompletableFuture + +/** + * @author : jorge + * + */ +@Slf4j +@Scoped('hello') +class HelloExtension extends ChannelExtensionPoint{ + + private Session session + + @Override + protected void init(Session session) { + this.session = session + } + + DataflowWriteChannel sayHello() { + createHelloChannel() + } + + static String goodbyeMessage + + DataflowWriteChannel goodbye(DataflowReadChannel source) { + final target = CH.createBy(source) + final next = { + goodbyeMessage = "$it" + target.bind(it) + } + final done = { + target.bind(Channel.STOP) + } + DataflowHelper.subscribeImpl(source, [onNext: next, onComplete: done]) + target + } + + protected DataflowWriteChannel createHelloChannel(){ + final channel = CH.create() + if( NF.isDsl2() ){ + session.addIgniter { -> + sayHelloImpl(channel) + } + }else{ + sayHelloImpl(channel) + } + channel + } + + protected sayHelloImpl(final DataflowWriteChannel channel){ + def future = CompletableFuture.runAsync({ + channel.bind("Hi") + channel.bind(Channel.STOP) + }) + future.exceptionally(this.&handlerException) + } + + static private void handlerException(Throwable e) { + final error = e.cause ?: e + log.error(error.message, error) + final session = Global.session as Session + session?.abort(error) + } +} diff --git a/plugins/nf-hello/src/main/nextflow/hello/HelloPlugin.groovy b/plugins/nf-hello/src/main/nextflow/hello/HelloPlugin.groovy index 759d8a0..e096ceb 100644 --- a/plugins/nf-hello/src/main/nextflow/hello/HelloPlugin.groovy +++ b/plugins/nf-hello/src/main/nextflow/hello/HelloPlugin.groovy @@ -18,6 +18,7 @@ package nextflow.hello import groovy.transform.CompileStatic import nextflow.plugin.BasePlugin +import nextflow.plugin.Scoped import org.pf4j.PluginWrapper /** diff --git a/plugins/nf-hello/src/resources/META-INF/extensions.idx b/plugins/nf-hello/src/resources/META-INF/extensions.idx index fb53930..29d93e9 100644 --- a/plugins/nf-hello/src/resources/META-INF/extensions.idx +++ b/plugins/nf-hello/src/resources/META-INF/extensions.idx @@ -1 +1,2 @@ nextflow.hello.HelloFactory +nextflow.hello.HelloExtension \ No newline at end of file diff --git a/plugins/nf-hello/src/test/nextflow/hello/ChannelExtensionHelloTest.groovy b/plugins/nf-hello/src/test/nextflow/hello/ChannelExtensionHelloTest.groovy new file mode 100644 index 0000000..70492dd --- /dev/null +++ b/plugins/nf-hello/src/test/nextflow/hello/ChannelExtensionHelloTest.groovy @@ -0,0 +1,53 @@ +package nextflow.hello + +import groovyx.gpars.dataflow.DataflowQueue +import nextflow.Channel +import nextflow.Session +import nextflow.extension.ChannelExtensionDelegate +import spock.lang.Specification + + +/** + * @author : jorge + * + */ +class ChannelExtensionHelloTest extends Specification{ + + def "should create a channel from hello"(){ + + given: + def session = Mock(Session) + + and: + def helloExtension = new HelloExtension(); helloExtension.init(session) + + when: + def result = helloExtension.sayHello() + + then: + result.val == 'Hi' + result.val == Channel.STOP + } + + def "should receive a hi from hello"(){ + + given: + def session = Mock(Session) + + and: + def helloExtension = new HelloExtension(); helloExtension.init(session) + + and: + def ch = new DataflowQueue() + ch.bind('Goodbye folks') + ch.bind( Channel.STOP ) + + when: + def result = helloExtension.goodbye(ch) + + then: + result.val == 'Goodbye folks' + result.val == Channel.STOP + helloExtension.goodbyeMessage == 'Goodbye folks' + } +} diff --git a/plugins/nf-hello/src/test/nextflow/hello/HelloDslTest.groovy b/plugins/nf-hello/src/test/nextflow/hello/HelloDslTest.groovy new file mode 100644 index 0000000..eb78a11 --- /dev/null +++ b/plugins/nf-hello/src/test/nextflow/hello/HelloDslTest.groovy @@ -0,0 +1,47 @@ +package nextflow.hello + +import nextflow.Channel +import nextflow.extension.ChannelExtensionDelegate +import nextflow.plugin.Plugins +import spock.lang.Specification + + +/** + * @author : jorge + * + */ +class HelloDslTest extends Specification{ + + def setup () { + ChannelExtensionDelegate.reloadExtensionPoints() + } + + def 'should perform a hi and create a channel' () { + when: + def SCRIPT = ''' + channel.hello.sayHello() + ''' + and: + def result = new MockScriptRunner([:]).setScript(SCRIPT).execute() + then: + result.val == 'Hi' + result.val == Channel.STOP + } + + def 'should store a goodbye' () { + when: + def SCRIPT = ''' + channel + .of('Bye bye folks') + .goodbye() + ''' + and: + def result = new MockScriptRunner([:]).setScript(SCRIPT).execute() + then: + result.val == 'Bye bye folks' + result.val == Channel.STOP + + and: + HelloExtension.goodbyeMessage == 'Bye bye folks' + } +} diff --git a/plugins/nf-hello/src/test/nextflow/hello/MockHelpers.groovy b/plugins/nf-hello/src/test/nextflow/hello/MockHelpers.groovy new file mode 100644 index 0000000..ac81cb3 --- /dev/null +++ b/plugins/nf-hello/src/test/nextflow/hello/MockHelpers.groovy @@ -0,0 +1,182 @@ +package nextflow.hello + +import groovy.util.logging.Slf4j +import groovyx.gpars.dataflow.DataflowBroadcast +import nextflow.Session +import nextflow.executor.Executor +import nextflow.executor.ExecutorFactory +import nextflow.processor.TaskHandler +import nextflow.processor.TaskMonitor +import nextflow.processor.TaskRun +import nextflow.processor.TaskStatus +import nextflow.script.BaseScript +import nextflow.script.ChannelOut +import nextflow.script.ScriptRunner +import nextflow.script.ScriptType + +import java.nio.file.Paths + +class MockScriptRunner extends ScriptRunner { + + MockScriptRunner() { + super(new MockSession()) + } + + MockScriptRunner(Map config) { + super(new MockSession(config)) + } + + MockScriptRunner setScript(String str) { + def script = TestHelper.createInMemTempFile('main.nf', str) + setScript(script) + return this + } + + MockScriptRunner invoke() { + execute() + return this + } + + BaseScript getScript() { getScriptObj() } + + @Override + def normalizeOutput(output) { + if( output instanceof ChannelOut ) { + def list = new ArrayList(output.size()) + for( int i=0; i(output.size()) + for( def item : output ) { + ((List)result).add(read0(item)) + } + return result + } + + else { + return read0(output) + } + } + + + private read0( obj ) { + if( obj instanceof DataflowBroadcast ) + return obj.createReadChannel() + return obj + } + +} + +class MockSession extends Session { + + @Override + Session start() { + this.executorFactory = new MockExecutorFactory() + return super.start() + } + + MockSession() { + super() + } + + MockSession(Map config) { + super(config) + } +} + +class MockExecutorFactory extends ExecutorFactory { + @Override + protected Class getExecutorClass(String executorName) { + return MockExecutor + } + + @Override + protected boolean isTypeSupported(ScriptType type, Object executor) { + true + } +} + +/** + * + * @author Paolo Di Tommaso + */ +class MockExecutor extends Executor { + + @Override + void signal() { } + + protected TaskMonitor createTaskMonitor() { + new MockMonitor() + } + + @Override + TaskHandler createTaskHandler(TaskRun task) { + return new MockTaskHandler(task) + } +} + +class MockMonitor implements TaskMonitor { + + void schedule(TaskHandler handler) { + handler.submit() + } + + /** + * Remove the {@code TaskHandler} instance from the queue of tasks to be processed + * + * @param handler A not null {@code TaskHandler} instance + */ + boolean evict(TaskHandler handler) { } + + /** + * Start the monitoring activity for the queued tasks + * @return The instance itself, useful to chain methods invocation + */ + TaskMonitor start() { } + + /** + * Notify when a task terminates + */ + void signal() { } +} + +@Slf4j +class MockTaskHandler extends TaskHandler { + + protected MockTaskHandler(TaskRun task) { + super(task) + } + + @Override + void submit() { + log.info ">> launching mock task: ${task}" + if( task.type == ScriptType.SCRIPTLET ) { + task.workDir = Paths.get('.').complete() + task.stdout = task.script + task.exitStatus = 0 + } + else { + task.code.call() + } + status = TaskStatus.COMPLETED + task.processor.finalizeTask(task) + } + + @Override + boolean checkIfRunning() { + return false + } + + @Override + boolean checkIfCompleted() { + true + } + + @Override + void kill() { } + +} diff --git a/plugins/nf-hello/src/test/nextflow/hello/TestHelper.groovy b/plugins/nf-hello/src/test/nextflow/hello/TestHelper.groovy new file mode 100644 index 0000000..d63444b --- /dev/null +++ b/plugins/nf-hello/src/test/nextflow/hello/TestHelper.groovy @@ -0,0 +1,45 @@ +/* + * Copyright 2020-2022, Seqera Labs + * Copyright 2013-2019, Centre for Genomic Regulation (CRG) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.hello + +import com.google.common.jimfs.Configuration +import com.google.common.jimfs.Jimfs +import groovy.transform.Memoized + +import java.nio.file.Files +import java.nio.file.Path +import java.util.zip.GZIPInputStream + +/** + * + * @author Paolo Di Tommaso + */ +class TestHelper { + + static private fs = Jimfs.newFileSystem(Configuration.unix()); + + static Path createInMemTempFile(String name='temp.file', String content=null) { + Path tmp = fs.getPath("/tmp"); + tmp.mkdir() + def result = Files.createTempDirectory(tmp, 'test').resolve(name) + if( content ) + result.text = content + return result + } + +} From 11ab862b087c5d80bfcb3cfaee2933ade33ed256 Mon Sep 17 00:00:00 2001 From: Jorge Aguilera Date: Tue, 3 May 2022 17:05:23 +0200 Subject: [PATCH 2/2] feature: create a hello channel extension point closes #3 Signed-off-by: Jorge Aguilera --- README.md | 15 +++++ .../main/nextflow/hello/HelloExtension.groovy | 57 ++++++++++++++++--- .../src/resources/META-INF/MANIFEST.MF | 4 +- .../hello/ChannelExtensionHelloTest.groovy | 8 +-- .../test/nextflow/hello/HelloDslTest.groovy | 8 ++- 5 files changed, 75 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 778bedc..534ba8e 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,9 @@ This project shows how to implement a simple Nextflow plugin named `nf-hello` that intercepts workflow execution events to print a message when the execution starts and on workflow completion. +Also, this plugin enriches the `channel` with a `producer` a `consumer` methods (`sayHello` and `goodbye`) +allowing to include them into the script + ## Plugin assets - `settings.gradle` @@ -38,6 +41,18 @@ workflow execution events to print a message when the execution starts and on wo The plugin unit tests. +## ExtensionPointS + +ExtensionPoint is the basic interface who use nextflow-core to integrate plugins into it. +It's only a basic interface and serves as starting point for more specialized extensions. + +Among others, nextflow-core integrate following sub ExtensionPointS: + +- `TraceObserverFactory` to provide a list of TraceObserverS +- `ChannelExtensionPoint` to enrich the channel with custom methods + +In this plugin you can find examples for both of them + ## Compile & run unit tests Run the following command in the project root directory (ie. where the file `settings.gradle` is located): diff --git a/plugins/nf-hello/src/main/nextflow/hello/HelloExtension.groovy b/plugins/nf-hello/src/main/nextflow/hello/HelloExtension.groovy index 5be2985..c2fef9c 100644 --- a/plugins/nf-hello/src/main/nextflow/hello/HelloExtension.groovy +++ b/plugins/nf-hello/src/main/nextflow/hello/HelloExtension.groovy @@ -23,23 +23,57 @@ import java.util.concurrent.CompletableFuture @Scoped('hello') class HelloExtension extends ChannelExtensionPoint{ + /* + * A session hold information about current execution of the script + */ private Session session + /* + * nf-core initializes the plugin once loaded and session is ready + * @param session + */ @Override protected void init(Session session) { this.session = session } - DataflowWriteChannel sayHello() { - createHelloChannel() + /* + * reverse is a `producer` method and will be available to the script because: + * + * - it's public + * - it returns a DataflowWriteChannel + * + * nf-core will inspect the extension class and allow the script to call all these kind of methods + * + * the method can require arguments but it's not mandatory, it depends of the business logic of the method + * + * business logic can write into the channel once ready and values will be consumed from it + */ + DataflowWriteChannel reverse(String message) { + createReverseChannel(message) } static String goodbyeMessage + /* + * goodbye is a `consumer` method as it receives values from a channel to perform some logic. + * + * Consumer methods are introspected by nextflow-core and include into the DSL if the method: + * + * - it's public + * - it returns a DataflowWriteChannel + * - it has only one arguments of DataflowReadChannel class + * + * a consumer method needs to proporcionate 2 closures: + * - a closure to consume items (one by one) + * - a finalizer closure + * + * in this case `goodbye` will consume a message and will store it as an upper case + */ DataflowWriteChannel goodbye(DataflowReadChannel source) { final target = CH.createBy(source) final next = { - goodbyeMessage = "$it" + goodbyeMessage = "$it".toString().toUpperCase() target.bind(it) } final done = { @@ -49,26 +83,33 @@ class HelloExtension extends ChannelExtensionPoint{ target } - protected DataflowWriteChannel createHelloChannel(){ + protected DataflowWriteChannel createReverseChannel(final String message){ final channel = CH.create() if( NF.isDsl2() ){ session.addIgniter { -> - sayHelloImpl(channel) + businessLogicHere(channel, message) } }else{ - sayHelloImpl(channel) + businessLogicHere(channel, message) } channel } - protected sayHelloImpl(final DataflowWriteChannel channel){ + /* + * businessLogicHere will send, across the channel, the message reversed + * and after will send an STOP signal to let know the channel it has been finished + */ + protected static businessLogicHere(final DataflowWriteChannel channel, final String message){ def future = CompletableFuture.runAsync({ - channel.bind("Hi") + channel.bind(message.reverse()) channel.bind(Channel.STOP) }) future.exceptionally(this.&handlerException) } + /* + * an util class to trace exceptions + */ static private void handlerException(Throwable e) { final error = e.cause ?: e log.error(error.message, error) diff --git a/plugins/nf-hello/src/resources/META-INF/MANIFEST.MF b/plugins/nf-hello/src/resources/META-INF/MANIFEST.MF index b7e5398..3798455 100644 --- a/plugins/nf-hello/src/resources/META-INF/MANIFEST.MF +++ b/plugins/nf-hello/src/resources/META-INF/MANIFEST.MF @@ -1,6 +1,6 @@ Manifest-Version: 1.0 Plugin-Id: nf-hello -Plugin-Version: 0.1.0 +Plugin-Version: 0.2.0 Plugin-Class: nextflow.hello.HelloPlugin Plugin-Provider: nextflow -Plugin-Requires: >=21.04.0 +Plugin-Requires: >=22.04.0 diff --git a/plugins/nf-hello/src/test/nextflow/hello/ChannelExtensionHelloTest.groovy b/plugins/nf-hello/src/test/nextflow/hello/ChannelExtensionHelloTest.groovy index 70492dd..2806ba7 100644 --- a/plugins/nf-hello/src/test/nextflow/hello/ChannelExtensionHelloTest.groovy +++ b/plugins/nf-hello/src/test/nextflow/hello/ChannelExtensionHelloTest.groovy @@ -22,14 +22,14 @@ class ChannelExtensionHelloTest extends Specification{ def helloExtension = new HelloExtension(); helloExtension.init(session) when: - def result = helloExtension.sayHello() + def result = helloExtension.reverse("Hi") then: - result.val == 'Hi' + result.val == 'iH' result.val == Channel.STOP } - def "should receive a hi from hello"(){ + def "should consume a message from script"(){ given: def session = Mock(Session) @@ -48,6 +48,6 @@ class ChannelExtensionHelloTest extends Specification{ then: result.val == 'Goodbye folks' result.val == Channel.STOP - helloExtension.goodbyeMessage == 'Goodbye folks' + helloExtension.goodbyeMessage == 'Goodbye folks'.toUpperCase() } } diff --git a/plugins/nf-hello/src/test/nextflow/hello/HelloDslTest.groovy b/plugins/nf-hello/src/test/nextflow/hello/HelloDslTest.groovy index eb78a11..573487f 100644 --- a/plugins/nf-hello/src/test/nextflow/hello/HelloDslTest.groovy +++ b/plugins/nf-hello/src/test/nextflow/hello/HelloDslTest.groovy @@ -4,12 +4,14 @@ import nextflow.Channel import nextflow.extension.ChannelExtensionDelegate import nextflow.plugin.Plugins import spock.lang.Specification +import spock.lang.Timeout /** * @author : jorge * */ +@Timeout(10) class HelloDslTest extends Specification{ def setup () { @@ -19,12 +21,12 @@ class HelloDslTest extends Specification{ def 'should perform a hi and create a channel' () { when: def SCRIPT = ''' - channel.hello.sayHello() + channel.hello.reverse('hi!') ''' and: def result = new MockScriptRunner([:]).setScript(SCRIPT).execute() then: - result.val == 'Hi' + result.val == '!ih' result.val == Channel.STOP } @@ -42,6 +44,6 @@ class HelloDslTest extends Specification{ result.val == Channel.STOP and: - HelloExtension.goodbyeMessage == 'Bye bye folks' + HelloExtension.goodbyeMessage == 'Bye bye folks'.toUpperCase() } }