From 7b37aa2f148d734c3b1ee2db8bacd585712d9b95 Mon Sep 17 00:00:00 2001 From: Simon Howe Date: Wed, 6 May 2020 14:03:16 +0200 Subject: [PATCH 1/2] Remove segment from users-sync - Marketo should stay as we're still using that --- users-sync/attrsync/segment.go | 154 ----------------------------- users-sync/attrsync/sync.go | 47 ++++----- users-sync/attrsync/sync_test.go | 160 ------------------------------- users-sync/cmd/users-sync.go | 10 +- 4 files changed, 26 insertions(+), 345 deletions(-) delete mode 100644 users-sync/attrsync/segment.go delete mode 100644 users-sync/attrsync/sync_test.go diff --git a/users-sync/attrsync/segment.go b/users-sync/attrsync/segment.go deleted file mode 100644 index c23e23e11..000000000 --- a/users-sync/attrsync/segment.go +++ /dev/null @@ -1,154 +0,0 @@ -package attrsync - -import ( - "io/ioutil" - - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "github.com/segmentio/analytics-go" - - "github.com/weaveworks/common/logging" - "github.com/weaveworks/service/users" -) - -var segmentMessagesTotalCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "segment_client", - Name: "messages_total", - Help: "Number of messages processed", - }, - []string{"status"}, -) - -// NewSegmentClient returns an instrumented segment client -func NewSegmentClient(writeKeyFilename string, logger logging.Interface) (analytics.Client, error) { - var client analytics.Client - callback := &sendCountingCallback{ - counter: segmentMessagesTotalCounter, - } - - if writeKeyFilename == "" { - client = &alwaysErrorSegmentClient{ - Callback: callback, - Logger: logger, - } - } else { - keyBytes, err := ioutil.ReadFile(writeKeyFilename) - if err != nil { - return nil, errors.Wrap(err, "Failed to read segment write key") - } - - client, err = analytics.NewWithConfig( - string(keyBytes), - analytics.Config{ - Callback: callback, - Logger: &segmentLogAdapter{logger}, - }, - ) - if err != nil { - return nil, err - } - } - - return &enqueueCountingClient{ - client: client, - counter: segmentMessagesTotalCounter, - }, nil -} - -// sendCountingCallback increments a success or failure metric for each message processed -type sendCountingCallback struct { - counter *prometheus.CounterVec -} - -var _ analytics.Callback = &sendCountingCallback{} - -func (cb *sendCountingCallback) Success(analytics.Message) { - cb.counter.WithLabelValues("success").Inc() -} - -func (cb *sendCountingCallback) Failure(analytics.Message, error) { - cb.counter.WithLabelValues("failure").Inc() -} - -func segmentTrait(user *users.User, attrs map[string]int) analytics.Traits { - trait := analytics.NewTraits().SetEmail(user.Email).SetCreatedAt(user.CreatedAt) - - // Since old users won't have this data, send it optionally - if user.Name != "" { - trait.SetName(user.Name) - } - if user.FirstName != "" { - trait.SetFirstName(user.FirstName) - } - if user.LastName != "" { - trait.SetLastName(user.LastName) - } - if user.Company != "" { - trait.Set("company", map[string]string{"name": user.Company}) - } - - for name, val := range attrs { - trait.Set(name, val) - } - - return trait -} - -// enqueueCountingClient increments a metric for each message enqueued -type enqueueCountingClient struct { - client analytics.Client - counter *prometheus.CounterVec -} - -var _ analytics.Client = &enqueueCountingClient{} - -func (c *enqueueCountingClient) Enqueue(msg analytics.Message) error { - err := c.client.Enqueue(msg) - if err == nil { - c.counter.WithLabelValues("enqueued").Inc() - } else { - c.counter.WithLabelValues("enqueue_error").Inc() - } - return err -} - -func (c *enqueueCountingClient) Close() error { - return c.client.Close() -} - -// alwaysErrorSegmentClient is a segment client which accepts all messages, -// but reports a delivery failure for every message -type alwaysErrorSegmentClient struct { - Callback analytics.Callback - Logger logging.Interface -} - -var _ analytics.Client = &alwaysErrorSegmentClient{} - -var errNotImplemented = errors.New("Not implemented") - -func (c *alwaysErrorSegmentClient) Enqueue(msg analytics.Message) error { - c.Logger.WithField("message", msg).Warnf("alwaysErrorSegmentClient pretending to enqueue message") - c.Callback.Failure(msg, errNotImplemented) - return nil -} - -func (c *alwaysErrorSegmentClient) Close() error { - return nil -} - -// segmentLogAdapter provide a compatible interface to our logging interface -type segmentLogAdapter struct { - logger logging.Interface -} - -var _ analytics.Logger = &segmentLogAdapter{} - -func (s *segmentLogAdapter) Logf(format string, args ...interface{}) { - s.logger.Infof(format, args...) -} - -func (s *segmentLogAdapter) Errorf(format string, args ...interface{}) { - s.logger.Errorf(format, args...) -} diff --git a/users-sync/attrsync/sync.go b/users-sync/attrsync/sync.go index d1a255901..4f5c3cf70 100644 --- a/users-sync/attrsync/sync.go +++ b/users-sync/attrsync/sync.go @@ -33,7 +33,6 @@ const ( func init() { attrsComputeDurationCollector.Register() - prometheus.MustRegister(segmentMessagesTotalCounter) } // AttributeSyncer sends metadata about users to external services @@ -45,17 +44,15 @@ type AttributeSyncer struct { quit chan struct{} db db.DB billingClient billing_grpc.BillingClient - segmentClient analytics.Client marketoClient marketing.MarketoClient } // New creates a attributeSyncer service -func New(log logging.Interface, db db.DB, billingClient billing_grpc.BillingClient, segmentClient analytics.Client, marketoClient marketing.MarketoClient) *AttributeSyncer { +func New(log logging.Interface, db db.DB, billingClient billing_grpc.BillingClient, marketoClient marketing.MarketoClient) *AttributeSyncer { return &AttributeSyncer{ log: log, db: db, billingClient: billingClient, - segmentClient: segmentClient, marketoClient: marketoClient, recentUsersTicker: time.NewTicker(recentUsersPeriod), staleUsersTicker: time.NewTicker(staleUsersPeriod), @@ -166,6 +163,30 @@ func (c *AttributeSyncer) syncUsers(ctx context.Context, userFilter filter.User) return nil } +func segmentTrait(user *users.User, attrs map[string]int) analytics.Traits { + trait := analytics.NewTraits().SetEmail(user.Email).SetCreatedAt(user.CreatedAt) + + // Since old users won't have this data, send it optionally + if user.Name != "" { + trait.SetName(user.Name) + } + if user.FirstName != "" { + trait.SetFirstName(user.FirstName) + } + if user.LastName != "" { + trait.SetLastName(user.LastName) + } + if user.Company != "" { + trait.Set("company", map[string]string{"name": user.Company}) + } + + for name, val := range attrs { + trait.Set(name, val) + } + + return trait +} + func (c *AttributeSyncer) postUsers(ctx context.Context, users []*users.User) { traits := map[string]analytics.Traits{} var prospects []marketing.Prospect @@ -195,24 +216,6 @@ func (c *AttributeSyncer) postUsers(ctx context.Context, users []*users.User) { } } - // Segment - for email, traits := range traits { - err := c.segmentClient.Enqueue(analytics.Identify{ - UserId: email, - Traits: traits, - }) - if err != nil { - c.log.WithFields(logging.Fields{ - "err": err, - "email": email, - "traits": traits, - }).Errorln("Error enqueuing segment message") - } - - // Sleep for a while to avoid overloading other services - time.Sleep(10 * time.Millisecond) - } - return nil }) } diff --git a/users-sync/attrsync/sync_test.go b/users-sync/attrsync/sync_test.go deleted file mode 100644 index 84d75d3ac..000000000 --- a/users-sync/attrsync/sync_test.go +++ /dev/null @@ -1,160 +0,0 @@ -package attrsync - -import ( - "context" - "testing" - "time" - - "github.com/golang/mock/gomock" - "github.com/segmentio/analytics-go" - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/require" - - "github.com/weaveworks/common/logging" - billing_grpc "github.com/weaveworks/service/common/billing/grpc" - "github.com/weaveworks/service/users" - users_db "github.com/weaveworks/service/users/db" - "github.com/weaveworks/service/users/db/dbtest" - "github.com/weaveworks/service/users/marketing" -) - -type MockSegment struct { - Messages []analytics.Message -} - -func (m *MockSegment) Enqueue(msg analytics.Message) error { - m.Messages = append(m.Messages, msg) - return nil -} - -func (m *MockSegment) Close() error { - return nil -} - -type testFixtures struct { - billingClient *billing_grpc.MockBillingClient - ctrl *gomock.Controller - ctx context.Context - db users_db.DB - mockSegment *MockSegment -} - -func setup(t *testing.T) (testFixtures, *AttributeSyncer) { - ctx := context.Background() - db := dbtest.Setup(t) - - logger := logging.Logrus(logrus.StandardLogger()) - - ctrl := gomock.NewController(t) - billingClient := billing_grpc.NewMockBillingClient(ctrl) - mockSegment := MockSegment{} - noopMarketo := marketing.NoopMarketoClient{} - - attrSync := New(logger, db, billingClient, &mockSegment, &noopMarketo) - - return testFixtures{billingClient, ctrl, ctx, db, &mockSegment}, attrSync -} - -func (tf *testFixtures) cleanup(t *testing.T) { - dbtest.Cleanup(t, tf.db) - tf.ctrl.Finish() -} - -func Test_AttrSyncNoOrgs(t *testing.T) { - tf, attrSync := setup(t) - defer tf.cleanup(t) - - user := dbtest.GetUser(t, tf.db) - user = dbtest.AddUserInfoToUser(t, tf.db, user) - - attrSync.postUsers(tf.ctx, []*users.User{user}) - - require.Len(t, tf.mockSegment.Messages, 1) - ident, ok := tf.mockSegment.Messages[0].(analytics.Identify) - require.True(t, ok) - - require.Equal(t, analytics.Identify{ - UserId: user.Email, - Traits: analytics.Traits{ - "name": user.Name, - "firstName": user.FirstName, - "lastName": user.LastName, - "email": user.Email, - "createdAt": user.CreatedAt, - "company": map[string]string{ - "name": user.Company, - }, - - "instances_ever_connected_flux_total": 0, - "instances_ever_connected_net_total": 0, - "instances_ever_connected_prom_total": 0, - "instances_ever_connected_scope_total": 0, - "instances_ever_connected_total": 0, - "instances_status_active_total": 0, - "instances_status_payment_due_total": 0, - "instances_status_payment_error_total": 0, - "instances_status_subscription_inactive_total": 0, - "instances_status_trial_active_total": 0, - "instances_status_trial_expired_total": 0, - "instances_status_unknown_total": 0, - "instances_total": 0, - }, - }, ident) -} - -func Test_AttrSyncWithOrg(t *testing.T) { - tf, attrSync := setup(t) - defer tf.cleanup(t) - - user, org := dbtest.GetOrg(t, tf.db) - user = dbtest.AddUserInfoToUser(t, tf.db, user) - - now := time.Now() - tf.db.SetOrganizationFirstSeenConnectedAt( - tf.ctx, org.ExternalID, &now) - tf.db.SetOrganizationFirstSeenPromConnectedAt( - tf.ctx, org.ExternalID, &now) - - tf.billingClient.EXPECT().GetInstanceBillingStatus( - gomock.Any(), &billing_grpc.InstanceBillingStatusRequest{ - InternalID: org.ID, - }). - Times(1). - Return(&billing_grpc.InstanceBillingStatusResponse{ - BillingStatus: billing_grpc.ACTIVE, - }, nil) - - attrSync.postUsers(tf.ctx, []*users.User{user}) - - require.Len(t, tf.mockSegment.Messages, 1) - ident, ok := tf.mockSegment.Messages[0].(analytics.Identify) - require.True(t, ok) - - require.Equal(t, analytics.Identify{ - UserId: user.Email, - Traits: analytics.Traits{ - "name": user.Name, - "firstName": user.FirstName, - "lastName": user.LastName, - "email": user.Email, - "createdAt": user.CreatedAt, - "company": map[string]string{ - "name": user.Company, - }, - - "instances_ever_connected_flux_total": 0, - "instances_ever_connected_net_total": 0, - "instances_ever_connected_prom_total": 1, - "instances_ever_connected_scope_total": 0, - "instances_ever_connected_total": 1, - "instances_status_active_total": 1, - "instances_status_payment_due_total": 0, - "instances_status_payment_error_total": 0, - "instances_status_subscription_inactive_total": 0, - "instances_status_trial_active_total": 0, - "instances_status_trial_expired_total": 0, - "instances_status_unknown_total": 0, - "instances_total": 1, - }, - }, ident) -} diff --git a/users-sync/cmd/users-sync.go b/users-sync/cmd/users-sync.go index 87ff7c1c9..1069c364e 100644 --- a/users-sync/cmd/users-sync.go +++ b/users-sync/cmd/users-sync.go @@ -35,8 +35,6 @@ func main() { defer traceCloser.Close() var ( - segementWriteKeyFile = flag.String("segment-write-key-file", "", "File containing segment write key") - dbCfg dbconfig.Config billingCfg billing_grpc.Config marketoCfg marketing.MarketoConfig @@ -82,12 +80,6 @@ func main() { } defer billingClient.Close() - segmentClient, err := attrsync.NewSegmentClient(*segementWriteKeyFile, logger) - if err != nil { - logrus.Fatalf("Failed creating a segment client: %v", err) - } - defer segmentClient.Close() - marketoClient, err := attrsync.NewMarketoClient(marketoCfg) if err != nil { logrus.Fatalf("Failed creating a marketo client: %v", err) @@ -96,7 +88,7 @@ func main() { weeklyReporter := weeklyreporter.New(logger, usersClient) orgCleaner := cleaner.New(cleanupURLs, logger, db) attributeSyncer := attrsync.New( - logger, db, billingClient, segmentClient, marketoClient) + logger, db, billingClient, marketoClient) logger.Debugln("Debug logging enabled") logger.Infof("users-sync listening on ports %d (HTTP) and %d (gRPC)", serverConfig.HTTPListenPort, serverConfig.GRPCListenPort) From d0133cb71360c746bf7b791bdf22b45f595287cb Mon Sep 17 00:00:00 2001 From: Simon Howe Date: Wed, 6 May 2020 14:26:45 +0200 Subject: [PATCH 2/2] Restore segment-write-key-file flag to ease deployment --- users-sync/cmd/users-sync.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/users-sync/cmd/users-sync.go b/users-sync/cmd/users-sync.go index 1069c364e..99689e892 100644 --- a/users-sync/cmd/users-sync.go +++ b/users-sync/cmd/users-sync.go @@ -35,6 +35,8 @@ func main() { defer traceCloser.Close() var ( + segementWriteKeyFile = flag.String("segment-write-key-file", "", "DEPRECATED, will be ignored: File containing segment write key") + dbCfg dbconfig.Config billingCfg billing_grpc.Config marketoCfg marketing.MarketoConfig @@ -59,6 +61,10 @@ func main() { flag.Parse() + if segementWriteKeyFile != nil { + logrus.Warn("Support for -segment-write-key-file has been removed") + } + if err := logging.Setup(serverConfig.LogLevel.String()); err != nil { logrus.Fatalf("Error configuring logging: %v", err) return