Running SubmissionPublisher with the Reactive-Streams TCK

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Running SubmissionPublisher with the Reactive-Streams TCK

Dávid Karnok
Hello.

I've run the SubmissionPublisher against the Reactive-Streams TCK via the following code:

import org.reactivestreams.Publisher;
import org.reactivestreams.tck.PublisherVerification;
import org.reactivestreams.tck.TestEnvironment;
import org.testng.annotations.Test;

import java.util.concurrent.Flow;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.SubmissionPublisher;

@Test
public class SubmissionPublisherTckTest extends PublisherVerification<Integer> {

public SubmissionPublisherTckTest() {
super(new TestEnvironment(200));
}

@Override
public Publisher<Integer> createPublisher(long elements) {
SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();
ForkJoinPool.commonPool().submit(() -> {
while (!sp.hasSubscribers()) {
Thread.yield();
}
for (int i = 0; i < elements; i++) {
sp.submit(i);
}
sp.close();
});
return toRs(sp);
}

@Override
public Publisher<Integer> createFailedPublisher() {
return null;
}

@Override
public long maxElementsFromPublisher() {
return 100;
}

<T> org.reactivestreams.Publisher<T> toRs(Flow.Publisher<T> fp) {
return rs -> {
fp.subscribe(new Flow.Subscriber<T>() {

@Override
public void onSubscribe(Flow.Subscription subscription) {
rs.onSubscribe(new org.reactivestreams.Subscription() {

@Override
public void request(long l) {
subscription.request(l);
}

@Override
public void cancel() {
subscription.cancel();
}
});
}

@Override
public void onNext(T item) {
rs.onNext(item);
}

@Override
public void onError(Throwable throwable) {
rs.onError(throwable);
}

@Override
public void onComplete() {
rs.onComplete();
}
});
};
}

}

It requires the "org.reactivestreams:reactive-streams-tck:1.0.0" dependency.

There were 3 failures:

  • SubmissionPublisherTckTest.required_spec109_subscribeThrowNPEOnNullSubscriber
java.lang.AssertionError: Publisher did not throw a NullPointerException when given a null Subscribe in subscribe

at org.testng.Assert.fail(Assert.java:89)
at org.reactivestreams.tck.TestEnvironment.flop(TestEnvironment.java:117)
at org.reactivestreams.tck.PublisherVerification$10.run(PublisherVerification.java:459)
at org.reactivestreams.tck.PublisherVerification.activePublisherTest(PublisherVerification.java:1092)
at org.reactivestreams.tck.PublisherVerification.required_spec109_subscribeThrowNPEOnNullSubscriber(PublisherVerification.java:454)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:547)
at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:74)
at org.testng.internal.Invoker.invokeMethod(Invoker.java:673)
at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:846)
at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1170)
at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
at org.testng.TestRunner.runWorkers(TestRunner.java:1147)
at org.testng.TestRunner.privateRun(TestRunner.java:749)
at org.testng.TestRunner.run(TestRunner.java:600)
at org.testng.SuiteRunner.runTest(SuiteRunner.java:317)
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:312)
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:274)
at org.testng.SuiteRunner.run(SuiteRunner.java:223)
at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:86)
at org.testng.TestNG.runSuitesSequentially(TestNG.java:1039)
at org.testng.TestNG.runSuitesLocally(TestNG.java:964)
at org.testng.TestNG.run(TestNG.java:900)
at org.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:72)
at org.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:127)

  • SubmissionPublisherTckTest.required_spec309_requestNegativeNumberMustSignalIllegalArgumentException

java.lang.AssertionError: Got expected exception [class java.lang.IllegalArgumentException] but missing message part [3.9], was: negative subscription request 
Expected :true
Actual   :false
 <Click to see difference>


at org.testng.Assert.fail(Assert.java:89)
at org.testng.Assert.failNotEquals(Assert.java:480)
at org.testng.Assert.assertTrue(Assert.java:37)
at org.reactivestreams.tck.TestEnvironment$ManualSubscriber.expectErrorWithMessage(TestEnvironment.java:466)
at org.reactivestreams.tck.TestEnvironment$ManualSubscriber.expectErrorWithMessage(TestEnvironment.java:459)
at org.reactivestreams.tck.PublisherVerification$22.run(PublisherVerification.java:876)
at org.reactivestreams.tck.PublisherVerification.activePublisherTest(PublisherVerification.java:1092)
at org.reactivestreams.tck.PublisherVerification.required_spec309_requestNegativeNumberMustSignalIllegalArgumentException(PublisherVerification.java:870)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:547)
at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:74)
at org.testng.internal.Invoker.invokeMethod(Invoker.java:673)
at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:846)
at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1170)
at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
at org.testng.TestRunner.runWorkers(TestRunner.java:1147)
at org.testng.TestRunner.privateRun(TestRunner.java:749)
at org.testng.TestRunner.run(TestRunner.java:600)
at org.testng.SuiteRunner.runTest(SuiteRunner.java:317)
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:312)
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:274)
at org.testng.SuiteRunner.run(SuiteRunner.java:223)
at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:86)
at org.testng.TestNG.runSuitesSequentially(TestNG.java:1039)
at org.testng.TestNG.runSuitesLocally(TestNG.java:964)
at org.testng.TestNG.run(TestNG.java:900)
at org.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:72)
at org.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:127)

  • SubmissionPublisherTckTest.required_spec309_requestZeroMustSignalIllegalArgumentException 

java.lang.AssertionError: Expected onError(java.lang.IllegalArgumentException) within 200 ms

at org.testng.Assert.fail(Assert.java:78)
at org.reactivestreams.tck.TestEnvironment.flopAndFail(TestEnvironment.java:180)
at org.reactivestreams.tck.TestEnvironment$Receptacle.expectError(TestEnvironment.java:930)
at org.reactivestreams.tck.TestEnvironment$ManualSubscriber.expectError(TestEnvironment.java:484)
at org.reactivestreams.tck.TestEnvironment$ManualSubscriber.expectError(TestEnvironment.java:476)
at org.reactivestreams.tck.TestEnvironment$ManualSubscriber.expectErrorWithMessage(TestEnvironment.java:464)
at org.reactivestreams.tck.TestEnvironment$ManualSubscriber.expectErrorWithMessage(TestEnvironment.java:459)
at org.reactivestreams.tck.PublisherVerification$21.run(PublisherVerification.java:862)
at org.reactivestreams.tck.PublisherVerification.activePublisherTest(PublisherVerification.java:1092)
at org.reactivestreams.tck.PublisherVerification.required_spec309_requestZeroMustSignalIllegalArgumentException(PublisherVerification.java:858)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:547)
at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:74)
at org.testng.internal.Invoker.invokeMethod(Invoker.java:673)
at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:846)
at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1170)
at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
at org.testng.TestRunner.runWorkers(TestRunner.java:1147)
at org.testng.TestRunner.privateRun(TestRunner.java:749)
at org.testng.TestRunner.run(TestRunner.java:600)
at org.testng.SuiteRunner.runTest(SuiteRunner.java:317)
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:312)
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:274)
at org.testng.SuiteRunner.run(SuiteRunner.java:223)
at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:86)
at org.testng.TestNG.runSuitesSequentially(TestNG.java:1039)
at org.testng.TestNG.runSuitesLocally(TestNG.java:964)
at org.testng.TestNG.run(TestNG.java:900)
at org.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:72)
at org.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:127)
Caused by: java.lang.AssertionError: Expected onError(java.lang.IllegalArgumentException) within 200 ms
at org.testng.Assert.fail(Assert.java:89)
at org.reactivestreams.tck.TestEnvironment.flopAndFail(TestEnvironment.java:177)
... 32 more



--
Best regards,
David Karnok

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Running SubmissionPublisher with the Reactive-Streams TCK

Dávid Karnok
Correction, the failure due to SubmissionPublisherTckTest.required_spec109_subscribeThrowNPEOnNullSubscriber was my mistake not forwarding a null in the toRs() converter function:

return rs -> {
if (rs == null) {
fp.subscribe(null);
return;
}
fp.subscribe(new Flow.Subscriber<T>() {



2017-03-01 10:50 GMT+01:00 Dávid Karnok <[hidden email]>:
Hello.

I've run the SubmissionPublisher against the Reactive-Streams TCK via the following code:

import org.reactivestreams.Publisher;
import org.reactivestreams.tck.PublisherVerification;
import org.reactivestreams.tck.TestEnvironment;
import org.testng.annotations.Test;

import java.util.concurrent.Flow;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.SubmissionPublisher;

@Test
public class SubmissionPublisherTckTest extends PublisherVerification<Integer> {

public SubmissionPublisherTckTest() {
super(new TestEnvironment(200));
}

@Override
public Publisher<Integer> createPublisher(long elements) {
SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();
ForkJoinPool.commonPool().submit(() -> {
while (!sp.hasSubscribers()) {
Thread.yield();
}
for (int i = 0; i < elements; i++) {
sp.submit(i);
}
sp.close();
});
return toRs(sp);
}

@Override
public Publisher<Integer> createFailedPublisher() {
return null;
}

@Override
public long maxElementsFromPublisher() {
return 100;
}

<T> org.reactivestreams.Publisher<T> toRs(Flow.Publisher<T> fp) {
return rs -> {
fp.subscribe(new Flow.Subscriber<T>() {

@Override
public void onSubscribe(Flow.Subscription subscription) {
rs.onSubscribe(new org.reactivestreams.Subscription() {

@Override
public void request(long l) {
subscription.request(l);
}

@Override
public void cancel() {
subscription.cancel();
}
});
}

@Override
public void onNext(T item) {
rs.onNext(item);
}

@Override
public void onError(Throwable throwable) {
rs.onError(throwable);
}

@Override
public void onComplete() {
rs.onComplete();
}
});
};
}

}

It requires the "org.reactivestreams:reactive-streams-tck:1.0.0" dependency.

There were 3 failures:

  • SubmissionPublisherTckTest.required_spec109_subscribeThrowNPEOnNullSubscriber
java.lang.AssertionError: Publisher did not throw a NullPointerException when given a null Subscribe in subscribe

at org.testng.Assert.fail(Assert.java:89)
at org.reactivestreams.tck.TestEnvironment.flop(TestEnvironment.java:117)
at org.reactivestreams.tck.PublisherVerification$10.run(PublisherVerification.java:459)
at org.reactivestreams.tck.PublisherVerification.activePublisherTest(PublisherVerification.java:1092)
at org.reactivestreams.tck.PublisherVerification.required_spec109_subscribeThrowNPEOnNullSubscriber(PublisherVerification.java:454)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:547)
at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:74)
at org.testng.internal.Invoker.invokeMethod(Invoker.java:673)
at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:846)
at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1170)
at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
at org.testng.TestRunner.runWorkers(TestRunner.java:1147)
at org.testng.TestRunner.privateRun(TestRunner.java:749)
at org.testng.TestRunner.run(TestRunner.java:600)
at org.testng.SuiteRunner.runTest(SuiteRunner.java:317)
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:312)
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:274)
at org.testng.SuiteRunner.run(SuiteRunner.java:223)
at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:86)
at org.testng.TestNG.runSuitesSequentially(TestNG.java:1039)
at org.testng.TestNG.runSuitesLocally(TestNG.java:964)
at org.testng.TestNG.run(TestNG.java:900)
at org.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:72)
at org.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:127)

  • SubmissionPublisherTckTest.required_spec309_requestNegativeNumberMustSignalIllegalArgumentException

java.lang.AssertionError: Got expected exception [class java.lang.IllegalArgumentException] but missing message part [3.9], was: negative subscription request 
Expected :true
Actual   :false
 <Click to see difference>


at org.testng.Assert.fail(Assert.java:89)
at org.testng.Assert.failNotEquals(Assert.java:480)
at org.testng.Assert.assertTrue(Assert.java:37)
at org.reactivestreams.tck.TestEnvironment$ManualSubscriber.expectErrorWithMessage(TestEnvironment.java:466)
at org.reactivestreams.tck.TestEnvironment$ManualSubscriber.expectErrorWithMessage(TestEnvironment.java:459)
at org.reactivestreams.tck.PublisherVerification$22.run(PublisherVerification.java:876)
at org.reactivestreams.tck.PublisherVerification.activePublisherTest(PublisherVerification.java:1092)
at org.reactivestreams.tck.PublisherVerification.required_spec309_requestNegativeNumberMustSignalIllegalArgumentException(PublisherVerification.java:870)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:547)
at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:74)
at org.testng.internal.Invoker.invokeMethod(Invoker.java:673)
at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:846)
at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1170)
at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
at org.testng.TestRunner.runWorkers(TestRunner.java:1147)
at org.testng.TestRunner.privateRun(TestRunner.java:749)
at org.testng.TestRunner.run(TestRunner.java:600)
at org.testng.SuiteRunner.runTest(SuiteRunner.java:317)
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:312)
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:274)
at org.testng.SuiteRunner.run(SuiteRunner.java:223)
at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:86)
at org.testng.TestNG.runSuitesSequentially(TestNG.java:1039)
at org.testng.TestNG.runSuitesLocally(TestNG.java:964)
at org.testng.TestNG.run(TestNG.java:900)
at org.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:72)
at org.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:127)

  • SubmissionPublisherTckTest.required_spec309_requestZeroMustSignalIllegalArgumentException 

java.lang.AssertionError: Expected onError(java.lang.IllegalArgumentException) within 200 ms

at org.testng.Assert.fail(Assert.java:78)
at org.reactivestreams.tck.TestEnvironment.flopAndFail(TestEnvironment.java:180)
at org.reactivestreams.tck.TestEnvironment$Receptacle.expectError(TestEnvironment.java:930)
at org.reactivestreams.tck.TestEnvironment$ManualSubscriber.expectError(TestEnvironment.java:484)
at org.reactivestreams.tck.TestEnvironment$ManualSubscriber.expectError(TestEnvironment.java:476)
at org.reactivestreams.tck.TestEnvironment$ManualSubscriber.expectErrorWithMessage(TestEnvironment.java:464)
at org.reactivestreams.tck.TestEnvironment$ManualSubscriber.expectErrorWithMessage(TestEnvironment.java:459)
at org.reactivestreams.tck.PublisherVerification$21.run(PublisherVerification.java:862)
at org.reactivestreams.tck.PublisherVerification.activePublisherTest(PublisherVerification.java:1092)
at org.reactivestreams.tck.PublisherVerification.required_spec309_requestZeroMustSignalIllegalArgumentException(PublisherVerification.java:858)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:547)
at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:74)
at org.testng.internal.Invoker.invokeMethod(Invoker.java:673)
at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:846)
at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1170)
at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
at org.testng.TestRunner.runWorkers(TestRunner.java:1147)
at org.testng.TestRunner.privateRun(TestRunner.java:749)
at org.testng.TestRunner.run(TestRunner.java:600)
at org.testng.SuiteRunner.runTest(SuiteRunner.java:317)
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:312)
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:274)
at org.testng.SuiteRunner.run(SuiteRunner.java:223)
at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:86)
at org.testng.TestNG.runSuitesSequentially(TestNG.java:1039)
at org.testng.TestNG.runSuitesLocally(TestNG.java:964)
at org.testng.TestNG.run(TestNG.java:900)
at org.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:72)
at org.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:127)
Caused by: java.lang.AssertionError: Expected onError(java.lang.IllegalArgumentException) within 200 ms
at org.testng.Assert.fail(Assert.java:89)
at org.reactivestreams.tck.TestEnvironment.flopAndFail(TestEnvironment.java:177)
... 32 more



--
Best regards,
David Karnok



--
Best regards,
David Karnok

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest