diff --git a/src/backend/core/connector/ms-teams.go b/src/backend/core/connector/ms-teams.go index 03cf6d59..8cdbb997 100644 --- a/src/backend/core/connector/ms-teams.go +++ b/src/backend/core/connector/ms-teams.go @@ -31,7 +31,7 @@ const ( msTeamsFolderContent = "https://graph.microsoft.com/v1.0/groups/%s/drive/items/%s/children" msTeamsChats = "https://graph.microsoft.com/v1.0/chats?$top=50" - msTeamsChatMessagesURL = "https://graph.microsoft.com/v1.0/chats/%s/messages" + msTeamsChatMessagesURL = "https://graph.microsoft.com/v1.0/chats/%s/messages?$top=50" msTeamsParamTeamID = "team_id" @@ -98,7 +98,7 @@ type ( } MSTeamsResult struct { PrevLoadTime string - Messages []byte + Messages []string } ) @@ -159,7 +159,13 @@ func (c *MSTeams) Execute(ctx context.Context, param map[string]string) chan *Re func (c *MSTeams) execute(ctx context.Context, param map[string]string) error { if c.param.AnalyzeChats { - if err := c.loadChats(ctx, ""); err != nil { + msDrive := microsoftcore.NewMSDrive(c.param.Files, + c.model, + c.sessionID, c.client, + "", "", + c.getFile, + ) + if err := c.loadChats(ctx, msDrive, ""); err != nil { zap.S().Errorf("error loading chats : %s ", err.Error()) //return fmt.Errorf("load chats : %s", err.Error()) } @@ -242,7 +248,7 @@ func (c *MSTeams) loadChannels(ctx context.Context, teamID string) error { Bucket: model.BucketName(c.model.User.EmbeddingModel.TenantID), URL: "", AppendContent: true, - Body: replies.Messages, + Body: []byte(strings.Join(replies.Messages, "\n")), }, UpToData: false, } @@ -331,7 +337,7 @@ func (c *MSTeams) getReplies(ctx context.Context, teamID, channelID string, msg } } - result.Messages = []byte(strings.Join(messages, "\n")) + result.Messages = messages state.LastCreatedDateTime = lastTime return &result, nil } @@ -430,7 +436,7 @@ func (c *MSTeams) buildMDMessage(msg *microsoftcore.MessageBody) string { return fmt.Sprintf(messageTemplate, userName, message) } -func (c *MSTeams) loadChats(ctx context.Context, nextLink string) error { +func (c *MSTeams) loadChats(ctx context.Context, msDrive *microsoftcore.MSDrive, nextLink string) error { var response microsoftcore.MSTeamsChatResponse url := nextLink if url == "" { @@ -439,15 +445,22 @@ func (c *MSTeams) loadChats(ctx context.Context, nextLink string) error { if err := c.requestAndParse(ctx, url, &response); err != nil { return nil } - for _, chat := range response.Value { sourceID := fmt.Sprintf("chat:%s", chat.Id) - result, err := c.loadChatMessages(ctx, chat.Id) + state, ok := c.state.Chats[chat.Id] + if !ok { + state = &MSTeamMessageState{ + LastCreatedDateTime: time.Time{}, + } + c.state.Chats[chat.Id] = state + } + + result, err := c.loadChatMessages(ctx, msDrive, state, chat.Id, fmt.Sprintf(msTeamsChatMessagesURL, chat.Id)) if err != nil { zap.S().Errorf("error loading chat messages: %s", err.Error()) continue } - if len(result.Messages) == 0 { + if len(result) == 0 { continue } doc := &model.Document{ @@ -469,47 +482,45 @@ func (c *MSTeams) loadChats(ctx context.Context, nextLink string) error { Name: fileName, SourceID: sourceID, DocumentID: doc.ID.IntPart(), - MimeType: "plain/text", + MimeType: "text/markdown", FileType: proto.FileType_MD, Signature: "", Content: &Content{ Bucket: model.BucketName(c.model.User.EmbeddingModel.TenantID), URL: "", AppendContent: true, - Body: result.Messages, + Body: []byte(strings.Join(result, "\n")), }, UpToData: false, } } if response.NexLink != "" { - zap.S().Debugf("load next chats...") - return c.loadChats(ctx, response.NexLink) + return c.loadChats(ctx, msDrive, response.NexLink) } return nil } -func (c *MSTeams) loadChatMessages(ctx context.Context, chatID string) (*MSTeamsResult, error) { +func (c *MSTeams) loadChatMessages(ctx context.Context, + msDrive *microsoftcore.MSDrive, + state *MSTeamMessageState, + chatID, url string) ([]string, error) { var response microsoftcore.MessageResponse - if err := c.requestAndParse(ctx, fmt.Sprintf(msTeamsChatMessagesURL, chatID), &response); err != nil { + if err := c.requestAndParse(ctx, url, &response); err != nil { return nil, err } - state, ok := c.state.Chats[chatID] - if !ok { - state = &MSTeamMessageState{ - LastCreatedDateTime: time.Time{}, - } - c.state.Chats[chatID] = state - } - lastTime := state.LastCreatedDateTime + lastTime := state.LastCreatedDateTime.UTC() var messages []string for _, msg := range response.Value { - // do not scan message if it was scanned before or if it system message - if msg.MessageType != messageTypeMessage || - state.LastCreatedDateTime.UTC().After(msg.CreatedDateTime.UTC()) || - state.LastCreatedDateTime.UTC().Equal(msg.CreatedDateTime.UTC()) { + // do not scan system messages + if msg.MessageType != messageTypeMessage { continue } + if state.LastCreatedDateTime.UTC().After(msg.CreatedDateTime.UTC()) || + state.LastCreatedDateTime.UTC().Equal(msg.CreatedDateTime.UTC()) { + // messages in desc order. not needed to process messages that were loaded before. + return messages, nil + } // renew newest message time if lastTime.UTC().Before(msg.CreatedDateTime.UTC()) { @@ -519,27 +530,26 @@ func (c *MSTeams) loadChatMessages(ctx context.Context, chatID string) (*MSTeams messages = append(messages, message) } for _, attachment := range msg.Attachments { - if err := c.loadAttachment(ctx, attachment); err != nil { + if err := c.loadAttachment(ctx, msDrive, attachment); err != nil { zap.S().Errorf("error loading attachment: %v", err) } } } - state.LastCreatedDateTime = lastTime - return &MSTeamsResult{ - PrevLoadTime: state.LastCreatedDateTime.Format("2006-01-02-15-04-05"), - Messages: []byte(strings.Join(messages, "\n")), - }, nil + if response.OdataNextLink != "" { + if nested, err := c.loadChatMessages(ctx, msDrive, state, chatID, response.OdataNextLink); err == nil { + messages = append(messages, nested...) + } else { + zap.S().Errorf("error loading nested chat messages: %v", err) + } + + } + state.LastCreatedDateTime = lastTime + return messages, nil } -func (c *MSTeams) loadAttachment(ctx context.Context, attachment *microsoftcore.Attachment) error { +func (c *MSTeams) loadAttachment(ctx context.Context, msDrive *microsoftcore.MSDrive, attachment *microsoftcore.Attachment) error { - msDrive := microsoftcore.NewMSDrive(c.param.Files, - c.model, - c.sessionID, c.client, - "", "", - c.getFile, - ) if attachment.ContentType != attachmentContentTypReference { // do not scrap replies return nil