|
18 | 18 | import com.google.cloud.bigquery.BigQueryException; |
19 | 19 | import com.google.cloud.storage.Blob; |
20 | 20 | import com.google.cloud.storage.StorageException; |
| 21 | +import com.google.pubsub.v1.Encoding; |
21 | 22 | import io.cdap.e2e.pages.actions.CdfConnectionActions; |
22 | 23 | import io.cdap.e2e.pages.actions.CdfPluginPropertiesActions; |
23 | 24 | import io.cdap.e2e.utils.BigQueryClient; |
@@ -62,6 +63,9 @@ public class TestSetupHooks { |
62 | 63 | public static String bqSourceTable2 = StringUtils.EMPTY; |
63 | 64 | public static String bqSourceView = StringUtils.EMPTY; |
64 | 65 | public static String pubSubTargetTopic = StringUtils.EMPTY; |
| 66 | + public static String pubSubSourceTopic = StringUtils.EMPTY; |
| 67 | + public static String pubSubSourceSubscription = StringUtils.EMPTY; |
| 68 | + public static String pubSubSchemaId = StringUtils.EMPTY; |
65 | 69 | public static String spannerInstance = StringUtils.EMPTY; |
66 | 70 | public static String spannerDatabase = StringUtils.EMPTY; |
67 | 71 | public static String spannerSourceTable = StringUtils.EMPTY; |
@@ -481,12 +485,102 @@ private static String createGCSBucketWithFilesAndFolder(String folderPath) throw |
481 | 485 | return bucketName; |
482 | 486 | } |
483 | 487 |
|
| 488 | + @Before(order = 1, value = "@PUBSUB_SOURCE_TEST") |
| 489 | + public static void createSourcePubSubTopic() throws IOException { |
| 490 | + pubSubSourceTopic = "cdf-e2e-test-" + UUID.randomUUID(); |
| 491 | + PubSubClient.createTopic(pubSubSourceTopic); |
| 492 | + BeforeActions.scenario.write("Source PubSub topic " + pubSubSourceTopic); |
| 493 | + } |
| 494 | + |
| 495 | + @Before(order = 1, value = "@PUBSUB_SCHEMA_TEST") |
| 496 | + public static void createSourcePubSubSchema() throws IOException { |
| 497 | + pubSubSchemaId = "cdf-e2e-test-" + UUID.randomUUID(); |
| 498 | + PubSubClient.createAvroSchema(pubSubSchemaId, PluginPropertyUtils.pluginProp("avrofile")); |
| 499 | + BeforeActions.scenario.write("Source Schema " + pubSubSchemaId); |
| 500 | + } |
| 501 | + |
| 502 | + @Before(order = 2, value = "@PUBSUB_SCHEMA_TOPIC_TEST") |
| 503 | + public static void createSourcePubSubSchemaTopic() throws IOException, InterruptedException { |
| 504 | + pubSubSourceTopic = "cdf-e2e-test-" + UUID.randomUUID(); |
| 505 | + PubSubClient.createTopicWithSchema(pubSubSourceTopic, pubSubSchemaId, Encoding.BINARY); |
| 506 | + BeforeActions.scenario.write("Schema Topic " + pubSubSourceTopic); |
| 507 | + } |
| 508 | + |
| 509 | + @Before(order = 3, value = "@PUBSUB_SUBSCRIPTION_TEST") |
| 510 | + public static void createSubscriptionPubSubTopic() throws IOException { |
| 511 | + pubSubSourceSubscription = "cdf-e2e-test-" + UUID.randomUUID(); |
| 512 | + PubSubClient.createSubscription(pubSubSourceSubscription, pubSubSourceTopic); |
| 513 | + BeforeActions.scenario.write("Source PubSub subscription " + pubSubSourceSubscription); |
| 514 | + } |
| 515 | + |
| 516 | + @After(order = 1, value = "@PUBSUB_SOURCE_TEST") |
| 517 | + public static void deleteSourcePubSubTopic() { |
| 518 | + try { |
| 519 | + PubSubClient.deleteTopic(pubSubSourceTopic); |
| 520 | + BeforeActions.scenario.write("Deleted target PubSub topic " + pubSubSourceTopic); |
| 521 | + pubSubSourceTopic = StringUtils.EMPTY; |
| 522 | + } catch (Exception e) { |
| 523 | + if (e.getMessage().contains("Invalid resource name given") || e.getMessage().contains("Resource not found")) { |
| 524 | + BeforeActions.scenario.write("Source PubSub topic " + pubSubSourceTopic + " does not exist."); |
| 525 | + } else { |
| 526 | + Assert.fail(e.getMessage()); |
| 527 | + } |
| 528 | + } |
| 529 | + } |
| 530 | + |
| 531 | + @After(order = 2, value = "@PUBSUB_SCHEMA_TOPIC_TEST") |
| 532 | + public static void deleteSourcePubSubSchemaTopic() { |
| 533 | + try { |
| 534 | + PubSubClient.deleteTopic(pubSubSourceTopic); |
| 535 | + BeforeActions.scenario.write("Deleted target PubSub topic " + pubSubSourceTopic); |
| 536 | + pubSubSourceTopic = StringUtils.EMPTY; |
| 537 | + } catch (Exception e) { |
| 538 | + if (e.getMessage().contains("Invalid resource name given") || e.getMessage().contains("Resource not found")) { |
| 539 | + BeforeActions.scenario.write("Schema " + pubSubSchemaId + " does not exist."); |
| 540 | + } else { |
| 541 | + Assert.fail(e.getMessage()); |
| 542 | + } |
| 543 | + } |
| 544 | + } |
| 545 | + |
484 | 546 | @Before(order = 1, value = "@PUBSUB_SINK_TEST") |
485 | 547 | public static void createTargetPubSubTopic() { |
486 | 548 | pubSubTargetTopic = "cdf-e2e-test-" + UUID.randomUUID(); |
487 | 549 | BeforeActions.scenario.write("Target PubSub topic " + pubSubTargetTopic); |
488 | 550 | } |
489 | 551 |
|
| 552 | + @After(order = 1, value = "@PUBSUB_SCHEMA_TEST") |
| 553 | + public static void deletePubSubSchema() throws IOException { |
| 554 | + PubSubClient.deleteSchema(PluginPropertyUtils.pluginProp("projectId"), pubSubSchemaId); |
| 555 | + BeforeActions.scenario.write("Deleted PubSub schema " + pubSubSchemaId); |
| 556 | + } |
| 557 | + |
| 558 | + @After(order = 2, value = "@PUBSUB_SUBSCRIPTION_TEST") |
| 559 | + public static void deletePubSubSubscription() throws IOException { |
| 560 | + PubSubClient.deleteSubscription(PluginPropertyUtils.pluginProp("projectId"), pubSubSourceSubscription); |
| 561 | + BeforeActions.scenario.write("Deleted PubSub subscription " + pubSubSourceSubscription); |
| 562 | + } |
| 563 | + |
| 564 | + public static void publishMessageJsonFormat() throws IOException, InterruptedException { |
| 565 | + String jsonMessage = PluginPropertyUtils.pluginProp("message"); |
| 566 | + String jsonMessage2 = PluginPropertyUtils.pluginProp("message2"); |
| 567 | + List<String> jsonMessagesList = Arrays.asList(jsonMessage, jsonMessage2); |
| 568 | + PubSubClient.publishMessagesWithPubsub(PluginPropertyUtils.pluginProp("projectId"), |
| 569 | + pubSubSourceTopic, jsonMessagesList); |
| 570 | + } |
| 571 | + |
| 572 | + public static void publishMessageAvroFormat() throws IOException, InterruptedException, ExecutionException { |
| 573 | + PubSubClient.publishAvroRecords(PluginPropertyUtils.pluginProp("projectId"), pubSubSourceTopic); |
| 574 | + } |
| 575 | + |
| 576 | + public static void publishMessage() throws IOException, InterruptedException { |
| 577 | + String dataMessage1 = PluginPropertyUtils.pluginProp("firstMessage"); |
| 578 | + String dataMessage2 = PluginPropertyUtils.pluginProp("secondMessage"); |
| 579 | + List<String> dataMessagesList = Arrays.asList(dataMessage1, dataMessage2); |
| 580 | + PubSubClient.publishMessagesWithPubsub(PluginPropertyUtils.pluginProp |
| 581 | + ("projectId"), pubSubSourceTopic, dataMessagesList); |
| 582 | + } |
| 583 | + |
490 | 584 | @After(order = 1, value = "@PUBSUB_SINK_TEST") |
491 | 585 | public static void deleteTargetPubSubTopic() { |
492 | 586 | try { |
|
0 commit comments