From 838ca44203b6a9573bc8e9d5b528a151c553ce25 Mon Sep 17 00:00:00 2001 From: aemrob Date: Tue, 3 Jan 2017 14:00:03 +0100 Subject: [PATCH 1/2] remove classcast exception on topology.message.timeout.secs --- src/main/java/org/apache/storm/jms/spout/JmsSpout.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/apache/storm/jms/spout/JmsSpout.java b/src/main/java/org/apache/storm/jms/spout/JmsSpout.java index abdad74..a8ca97a 100644 --- a/src/main/java/org/apache/storm/jms/spout/JmsSpout.java +++ b/src/main/java/org/apache/storm/jms/spout/JmsSpout.java @@ -156,10 +156,10 @@ public void open(Map conf, TopologyContext context, if(this.tupleProducer == null){ throw new IllegalStateException("JMS Tuple Producer has not been set."); } - Integer topologyTimeout = (Integer)conf.get("topology.message.timeout.secs"); + Long topologyTimeout = (Long)conf.get("topology.message.timeout.secs"); // TODO fine a way to get the default timeout from storm, so we're not hard-coding to 30 seconds (it could change) - topologyTimeout = topologyTimeout == null ? 30 : topologyTimeout; - if( (topologyTimeout.intValue() * 1000 )> this.recoveryPeriod){ + topologyTimeout = topologyTimeout == null ? new Long(30L) : topologyTimeout; + if( (topologyTimeout.intValue() * 1000) > this.recoveryPeriod){ LOG.warn("*** WARNING *** : " + "Recovery period ("+ this.recoveryPeriod + " ms.) is less then the configured " + "'topology.message.timeout.secs' of " + topologyTimeout + From 2498e764b8efda604f18b7e90a3516879b02db3f Mon Sep 17 00:00:00 2001 From: Aemro Amare Date: Tue, 3 Jan 2017 14:09:56 +0100 Subject: [PATCH 2/2] added test with Long topology.message.timeout.secs --- src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java b/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java index e80f70a..400fd3b 100644 --- a/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java +++ b/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java @@ -51,7 +51,11 @@ public void testFailure() throws JMSException, Exception{ spout.setJmsTupleProducer(new MockTupleProducer()); spout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); spout.setRecoveryPeriod(10); // Rapid recovery for testing. - spout.open(new HashMap(), null, collector); + + HashMap conf = new HashMap(); + conf.put("topology.message.timeout.secs", new Long(10L)); + spout.open(conf, null, collector); + Message msg = this.sendMessage(mockProvider.connectionFactory(), mockProvider.destination()); Thread.sleep(100); spout.nextTuple(); // Pretend to be storm.