51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
# File 'app/jobs/ishapi/email_message_intake_job.rb', line 51
def perform id
stub = ::Office::EmailMessageStub.find id
puts "Performing EmailMessageIntakeJob for object_key #{stub.object_key}"
if stub.state != ::Office::EmailMessageStub::STATE_PENDING
raise "This stub has already been processed: #{stub.id.to_s}."
return
end
client = Aws::S3::Client.new({
region: ::S3_CREDENTIALS[:region],
access_key_id: ::S3_CREDENTIALS[:access_key_id],
secret_access_key: ::S3_CREDENTIALS[:secret_access_key] })
_mail = client.get_object( bucket: ::S3_CREDENTIALS[:bucket_ses], key: stub.object_key ).body.read
the_mail = Mail.new(_mail)
message_id = the_mail.['message-id'].decoded
in_reply_to_id = the_mail.['in-reply-to']&.to_s
email_inbox_tag_id = WpTag.email_inbox_tag.id
@message = ::Office::EmailMessage.where( message_id: message_id ).first
@message ||= ::Office::EmailMessage.new
@message.assign_attributes({
raw: _mail,
message_id: message_id,
in_reply_to_id: in_reply_to_id,
object_key: stub.object_key,
subject: the_mail.subject,
date: the_mail.date,
from: the_mail.from[0],
froms: the_mail.from,
to: the_mail.to[0],
tos: the_mail.to,
ccs: the_mail.cc,
bccs: the_mail.bcc,
})
the_mail.parts.each do |part|
churn_subpart( @message, part )
end
if the_mail.parts.length == 0
body = the_mail.body.decoded
@message.part_txt = body
end
if in_reply_to_id
in_reply_to_msg = ::Office::EmailMessage.where({ message_id: in_reply_to_id }).first
if !in_reply_to_msg
conv = ::Office::EmailConversation.find_or_create_by({
subject: the_mail.subject,
})
in_reply_to_msg = ::Office::EmailMessage.find_or_create_by({
message_id: in_reply_to_id,
email_conversation_id: conv.id,
})
end
conv = in_reply_to_msg.email_conversation
else
conv = ::Office::EmailConversation.find_or_create_by({
subject: the_mail.subject,
})
end
@message.email_conversation_id = conv.id
conv.update_attributes({
state: Conv::STATE_UNREAD,
latest_at: the_mail.date,
wp_term_ids: ( [ email_inbox_tag_id ] + conv.wp_term_ids + stub.wp_term_ids ).uniq,
})
domain = @message.from.split('@')[1]
leadset = Leadset.find_or_create_by( company_url: domain )
lead = Lead.find_or_create_by( email: @message.from, m3_leadset_id: leadset.id )
conv.lead_ids = conv.lead_ids.push( lead.id ).uniq
email_filters = Office::EmailFilter.active
email_filters.each do |filter|
if @message.from.match( filter.from_regex ) @message.apply_filter( filter )
end
end
flag = @message.save
if flag
puts! @message.message_id, 'Saved this message'
else
puts! @message.errors.full_messages.join(', '), 'Cannot save email_message'
end
conv.save
stub.update_attributes({ state: ::Office::EmailMessageStub::STATE_PROCESSED })
if conv.wp_term_ids.include?( email_inbox_tag_id )
::Ishapi::ApplicationMailer.forwarder_notify( @message.id.to_s ).deliver_later
end
end
|