Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 80 additions & 82 deletions common/event/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,91 +97,89 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
return
}

go func() {
var (
err error
wrappedEvent *pb.Event
key []byte = nil
)

switch e := e.(type) {
case *pb.Ev_MetaEvent_CoreStart:
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_CoreStartEvent{CoreStartEvent: e},
}
case *pb.Ev_MetaEvent_MesosHeartbeat:
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_MesosHeartbeatEvent{MesosHeartbeatEvent: e},
}
case *pb.Ev_MetaEvent_FrameworkEvent:
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_FrameworkEvent{FrameworkEvent: e},
}
case *pb.Ev_TaskEvent:
key = []byte(e.Taskid)
if len(key) == 0 {
key = nil
}
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_TaskEvent{TaskEvent: e},
}
case *pb.Ev_RoleEvent:
key = extractAndConvertEnvID(e)
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_RoleEvent{RoleEvent: e},
}
case *pb.Ev_EnvironmentEvent:
key = extractAndConvertEnvID(e)
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_EnvironmentEvent{EnvironmentEvent: e},
}
case *pb.Ev_CallEvent:
key = extractAndConvertEnvID(e)
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_CallEvent{CallEvent: e},
}
case *pb.Ev_IntegratedServiceEvent:
key = extractAndConvertEnvID(e)
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_IntegratedServiceEvent{IntegratedServiceEvent: e},
}
case *pb.Ev_RunEvent:
key = extractAndConvertEnvID(e)
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_RunEvent{RunEvent: e},
}
var (
err error
wrappedEvent *pb.Event
key []byte = nil
)

switch e := e.(type) {
case *pb.Ev_MetaEvent_CoreStart:
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_CoreStartEvent{CoreStartEvent: e},
}

if wrappedEvent == nil {
err = fmt.Errorf("unsupported event type")
} else {
err = w.doWriteEvent(key, wrappedEvent)
case *pb.Ev_MetaEvent_MesosHeartbeat:
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_MesosHeartbeatEvent{MesosHeartbeatEvent: e},
}

if err != nil {
log.WithField("event", e).
WithField("level", infologger.IL_Support).
Error(err.Error())
case *pb.Ev_MetaEvent_FrameworkEvent:
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_FrameworkEvent{FrameworkEvent: e},
}
case *pb.Ev_TaskEvent:
key = []byte(e.Taskid)
if len(key) == 0 {
key = nil
}
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_TaskEvent{TaskEvent: e},
}
case *pb.Ev_RoleEvent:
key = extractAndConvertEnvID(e)
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_RoleEvent{RoleEvent: e},
}
case *pb.Ev_EnvironmentEvent:
key = extractAndConvertEnvID(e)
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_EnvironmentEvent{EnvironmentEvent: e},
}
case *pb.Ev_CallEvent:
key = extractAndConvertEnvID(e)
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_CallEvent{CallEvent: e},
}
case *pb.Ev_IntegratedServiceEvent:
key = extractAndConvertEnvID(e)
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_IntegratedServiceEvent{IntegratedServiceEvent: e},
}
}()
case *pb.Ev_RunEvent:
key = extractAndConvertEnvID(e)
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_RunEvent{RunEvent: e},
}
}

if wrappedEvent == nil {
err = fmt.Errorf("unsupported event type")
} else {
err = w.doWriteEvent(key, wrappedEvent)
}

if err != nil {
log.WithField("event", e).
WithField("level", infologger.IL_Support).
Error(err.Error())
}
}

func (w *KafkaWriter) doWriteEvent(key []byte, e *pb.Event) error {
Expand Down